DAG가 파이프라인의 구조를 정의한다면, Operator는 그 안에서 실제로 무엇을 할 것인지를 정의한다. 그리고 Scheduler는 이 작업들을 언제 실행할 것인지를 결정한다. Airflow를 실전에서 다루려면, Operator의 종류와 스케줄링의 동작 원리를 정확히 이해해야 한다.
Operator란 무엇인가
Operator(오퍼레이터) 는 DAG 내에서 하나의 태스크가 수행할 작업 단위를 정의하는 클래스다. 쉽게 말해, "이 태스크는 Bash 명령어를 실행해라", "이 태스크는 Python 함수를 호출해라"처럼 구체적인 행동을 지정하는 템플릿이다.
Airflow 공식 문서에서는 Operator를 크게 세 가지로 분류한다.
- Action Operator: 실제 작업을 수행한다. BashOperator, PythonOperator 등이 여기에 해당한다.
- Transfer Operator: 데이터를 한 시스템에서 다른 시스템으로 이동한다. S3에서 Redshift로 데이터를 옮기는 식이다.
- Sensor: 특정 조건이 만족될 때까지 대기한다. 파일이 생성될 때까지 기다리거나, 외부 API 응답을 폴링하는 용도로 사용한다.
이 글에서는 가장 기본이 되는 Action Operator들을 중심으로 살펴본다.
BashOperator
BashOperator는 Bash 셸 명령어를 실행하는 Operator다. 간단한 셸 스크립트 실행, 외부 프로그램 호출, 파일 조작 등에 사용한다.
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
with DAG(
dag_id="bash_example",
start_date=datetime(2026, 1, 1),
schedule="@daily",
) as dag:
print_date = BashOperator(
task_id="print_date",
bash_command="date",
)
run_script = BashOperator(
task_id="run_script",
bash_command="/opt/scripts/etl_job.sh ",
)
print_date >> run_script
주의할 점이 하나 있다. bash_command에 스크립트 파일 경로를 지정할 때는 끝에 공백을 추가해야 한다. Airflow가 내부적으로 Jinja 템플릿을 처리하는 과정에서, 공백이 없으면 파일 경로를 템플릿 파일로 인식하여 오류가 발생할 수 있다.
Airflow 2.0 이후부터는 @task.bash 데코레이터 방식도 지원한다. TaskFlow API를 사용하는 경우 이 방식이 더 직관적이다.
PythonOperator
PythonOperator는 Python 함수를 호출하는 Operator다. ETL 로직, 데이터 변환, API 호출 등 Python으로 작성할 수 있는 거의 모든 작업에 사용한다.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def extract_data(**kwargs):
print(f"Extracting data for {kwargs['ds']}")
return {"record_count": 1500}
def transform_data(**kwargs):
ti = kwargs["ti"]
data = ti.xcom_pull(task_ids="extract")
print(f"Transforming {data['record_count']} records")
with DAG(
dag_id="python_example",
start_date=datetime(2026, 1, 1),
schedule="@daily",
) as dag:
extract = PythonOperator(
task_id="extract",
python_callable=extract_data,
)
transform = PythonOperator(
task_id="transform",
python_callable=transform_data,
)
extract >> transform
python_callable에 전달되는 함수는 **kwargs를 통해 Airflow의 컨텍스트 변수에 접근할 수 있다. ds(논리적 실행 날짜), ti(태스크 인스턴스) 등이 자동으로 주입되며, XCom을 통해 태스크 간 데이터를 전달할 수도 있다.
op_args와 op_kwargs 파라미터를 사용하면 함수에 추가 인자를 넘길 수 있다.
greet = PythonOperator(
task_id="greet",
python_callable=lambda name, greeting: print(f"{greeting}, {name}!"),
op_kwargs={"name": "Airflow", "greeting": "Hello"},
)
마찬가지로 TaskFlow API의 @task 데코레이터를 사용하면 더 간결하게 작성할 수 있다.
EmailOperator
EmailOperator는 이메일을 발송하는 Operator다. 파이프라인의 완료 알림, 에러 보고, 리포트 전송 등에 활용한다.
from airflow.operators.email import EmailOperator
send_report = EmailOperator(
task_id="send_report",
to="team@example.com",
subject="Daily ETL Report - {{ ds }}",
html_content="<h3>ETL completed successfully.</h3><p>Date: {{ ds }}</p>",
)
EmailOperator를 사용하려면 Airflow의 SMTP 설정(airflow.cfg의 [smtp] 섹션)이 먼저 완료되어야 한다. subject와 html_content에는 Jinja 템플릿을 사용할 수 있어, 실행 날짜 등의 동적 값을 삽입하기 편리하다.
커스텀 Operator 만들기
기본 제공 Operator만으로는 부족한 경우, BaseOperator를 상속하여 직접 만들 수 있다. 핵심은 execute() 메서드를 구현하는 것이다.
from airflow.models import BaseOperator
class MyCustomOperator(BaseOperator):
def __init__(self, my_param: str, **kwargs):
super().__init__(**kwargs)
self.my_param = my_param
def execute(self, context):
print(f"Running custom logic with param: {self.my_param}")
ds = context["ds"]
print(f"Logical date: {ds}")
return "custom_result"
커스텀 Operator를 작성할 때 지켜야 할 원칙이 있다.
__init__에서 무거운 연산을 하지 않는다. 스케줄러가 매 사이클마다 Operator를 인스턴스화하기 때문에, 데이터베이스 연결이나 API 호출 같은 작업은execute()안에서 수행해야 한다.execute()는 멱등성(Idempotent)을 보장해야 한다. 재시도 시 동일한 결과를 반환하도록 설계해야, 실패 복구가 안전해진다.execute()의 반환값은 자동으로 XCom에 저장된다.
Scheduling: 언제 실행할 것인가
DAG를 정의했으면 이제 언제 실행할지를 정해야 한다. Airflow의 스케줄링 시스템은 단순한 타이머가 아니라, 데이터 구간(Data Interval) 이라는 개념을 기반으로 동작한다.
schedule 파라미터
DAG의 schedule 파라미터로 실행 주기를 설정한다. Cron 표현식, timedelta 객체, 또는 사전 정의된 프리셋을 사용할 수 있다.
from datetime import timedelta
# cron 표현식: 매일 자정
DAG(dag_id="daily_job", schedule="0 0 * * *", ...)
# timedelta: 6시간 간격
DAG(dag_id="every_6h", schedule=timedelta(hours=6), ...)
# 프리셋: 매주 일요일 자정
DAG(dag_id="weekly_job", schedule="@weekly", ...)
자주 사용하는 Cron 프리셋은 다음과 같다.
| 프리셋 | Cron 표현식 | 의미 |
|---|---|---|
@once | - | 한 번만 실행 |
@hourly | 0 * * * * | 매시 정각 |
@daily | 0 0 * * * | 매일 자정 |
@weekly | 0 0 * * 0 | 매주 일요일 자정 |
@monthly | 0 0 1 * * | 매월 1일 자정 |
@yearly | 0 0 1 1 * | 매년 1월 1일 자정 |
Cron 표현식 읽는 법
Cron 표현식은 다섯 개의 필드로 구성된다.
┌───────────── 분 (0-59)
│ ┌───────────── 시 (0-23)
│ │ ┌───────────── 일 (1-31)
│ │ │ ┌───────────── 월 (1-12)
│ │ │ │ ┌───────────── 요일 (0-6, 일=0)
│ │ │ │ │
* * * * *
몇 가지 예시를 보면 감이 온다.
30 2 * * *-- 매일 오전 2시 30분0 9 * * 1-5-- 평일 오전 9시*/15 * * * *-- 15분 간격0 0 1,15 * *-- 매월 1일과 15일 자정
start_date, end_date, 그리고 Data Interval
Airflow의 스케줄링을 처음 접하면 혼란스러운 부분이 있다. DAG Run이 실행되는 시점은 start_date가 아니라, 첫 번째 Data Interval이 끝나는 시점이라는 점이다.
DAG(
dag_id="my_dag",
start_date=datetime(2026, 1, 1),
schedule="@daily",
)
이 DAG의 첫 번째 실행은 2026-01-01이 아니라 2026-01-02에 발생한다. 왜냐하면 start_date는 첫 번째 Data Interval의 시작점이기 때문이다. Airflow는 2026-01-01 ~ 2026-01-02 구간의 데이터를 처리하기 위해, 이 구간이 **끝난 후인 2026-01-02**에 DAG Run을 트리거한다.
이 설계가 택배 배송에 비유하면 이해하기 쉽다. 1월 1일치 택배를 모으려면, 1월 1일이 끝난 후에야 그날 접수된 전체 택배를 알 수 있다. 그래서 실제 배송(=DAG Run)은 1월 2일에 시작한다.
end_date를 설정하면 해당 날짜 이후에는 DAG Run이 더 이상 생성되지 않는다.
Catchup과 Backfill
Catchup
Catchup은 start_date와 현재 시각 사이에 실행되지 않은 DAG Run을 자동으로 채워 넣는 기능이다.
DAG(
dag_id="catchup_example",
start_date=datetime(2026, 1, 1),
schedule="@daily",
catchup=True, # 기본값은 airflow.cfg 설정에 따라 다름
)
예를 들어, start_date가 1월 1일인 DAG를 2월 1일에 처음 활성화하면, Airflow는 1월 1일부터 1월 31일까지 31개의 DAG Run을 한꺼번에 생성하여 실행한다. 과거 데이터를 한 번에 처리해야 할 때 유용하지만, 의도치 않게 수십~수백 개의 DAG Run이 동시에 실행될 수 있으므로 주의가 필요하다.
과거 실행이 불필요한 경우 catchup=False로 설정하면, 가장 최근 Data Interval부터만 실행한다.
Backfill
Backfill은 특정 과거 기간에 대해 수동으로 DAG Run을 실행하는 것이다. CLI 명령어로 수행한다.
airflow dags backfill \
--start-date 2026-01-01 \
--end-date 2026-01-15 \
my_dag_id
Catchup이 "빠진 구간을 자동으로 채우는 것"이라면, Backfill은 "특정 구간을 명시적으로 재실행하는 것"이다. 데이터 파이프라인에 버그가 있어 과거 데이터를 재처리해야 할 때 주로 사용한다.
Branching: 조건에 따라 다른 경로 실행하기
데이터 파이프라인에서는 조건에 따라 다른 작업을 실행해야 하는 경우가 많다. "데이터가 일정 크기 이상이면 전체 처리, 아니면 경량 처리"와 같은 식이다. BranchPythonOperator가 이 역할을 한다.
BranchPythonOperator
BranchPythonOperator는 Python 함수의 반환값으로 다음에 실행할 태스크의 task_id를 지정하는 Operator다. 반환되지 않은 경로의 태스크들은 스킵(skip) 처리된다.
from airflow.operators.python import BranchPythonOperator
def choose_branch(**kwargs):
ds = kwargs["ds"]
day = int(ds[-2:]) # 날짜의 일(day) 부분
if day % 2 == 0:
return "even_day_task"
else:
return "odd_day_task"
branch = BranchPythonOperator(
task_id="branch",
python_callable=choose_branch,
)
even_task = BashOperator(task_id="even_day_task", bash_command="echo 'Even day'")
odd_task = BashOperator(task_id="odd_day_task", bash_command="echo 'Odd day'")
branch >> [even_task, odd_task]
이 예시에서 날짜가 짝수일이면 even_day_task만 실행되고, odd_day_task는 스킵된다.
분기 후 합류 문제
분기 처리에서 흔히 마주치는 문제는 분기 후 합류 지점이다. 다음과 같은 구조를 생각해 보자.
branch --> even_task --> join_task
\-> odd_task -/
join_task는 두 분기가 합류하는 지점이다. 그런데 기본 Trigger Rule인 all_success에서는, 스킵된 태스크도 "성공하지 않은 것"으로 간주한다. 즉, 어떤 분기를 타든 join_task는 항상 스킵된다.
이 문제를 해결하려면 join_task의 Trigger Rule을 변경해야 한다.
join = BashOperator(
task_id="join_task",
bash_command="echo 'Joined'",
trigger_rule="none_failed_min_one_success",
)
none_failed_min_one_success는 "실패한 상위 태스크가 없고, 최소 하나가 성공했으면 실행"이라는 의미다. 스킵은 실패로 간주하지 않으므로, 분기 후 합류에 적합하다.
Trigger Rules: 태스크 실행 조건 제어하기
Airflow의 기본 동작은 상위 태스크가 모두 성공해야 다음 태스크를 실행하는 것이다. 하지만 실제 파이프라인에서는 더 유연한 조건이 필요한 경우가 많다. Trigger Rule이 이를 가능하게 한다.
주요 Trigger Rule 목록
| Trigger Rule | 실행 조건 |
|---|---|
all_success | 모든 상위 태스크 성공 (기본값) |
all_failed | 모든 상위 태스크 실패 |
all_done | 모든 상위 태스크 완료 (성공/실패/스킵 무관) |
one_success | 하나 이상 성공 (나머지 완료 안 기다림) |
one_failed | 하나 이상 실패 (나머지 완료 안 기다림) |
one_done | 하나 이상 성공 또는 실패 |
none_failed | 실패한 태스크 없음 (스킵은 허용) |
none_skipped | 스킵된 태스크 없음 |
none_failed_min_one_success | 실패 없음 + 최소 하나 성공 |
always | 상위 태스크 상태 무관, 항상 실행 |
활용 시나리오
각 Trigger Rule이 언제 유용한지 구체적으로 살펴보자.
all_done -- 정리(cleanup) 작업
ETL 작업 후 임시 파일 삭제처럼, 성공이든 실패든 반드시 실행해야 하는 작업에 사용한다.
cleanup = BashOperator(
task_id="cleanup",
bash_command="rm -rf /tmp/etl_staging/*",
trigger_rule="all_done",
)
one_failed -- 즉시 알림
상위 태스크 중 하나라도 실패하면 즉시 알림을 보내야 할 때 사용한다. 나머지 태스크의 완료를 기다리지 않고 바로 실행된다.
alert = EmailOperator(
task_id="failure_alert",
to="oncall@example.com",
subject="Pipeline failure detected",
html_content="<p>At least one upstream task has failed.</p>",
trigger_rule="one_failed",
)
none_failed -- 분기 후 합류
앞서 살펴본 것처럼, 분기 구조에서 스킵을 허용하면서도 실패는 차단하고 싶을 때 사용한다.
스킵 전파(Skip Propagation)에 대한 이해
Trigger Rule을 다룰 때 반드시 알아야 할 개념이 스킵 전파다.
all_success와all_failed는 스킵을 전파한다. 상위 태스크가 스킵되면 하위 태스크도 연쇄적으로 스킵된다.one_success,none_failed,none_skipped등은 스킵을 전파하지 않는다.
BranchPythonOperator와 함께 사용할 때 이 차이가 중요하다. 분기 후 합류 태스크의 Trigger Rule을 all_success로 두면, 스킵이 전파되어 합류 태스크까지 스킵된다. none_failed_min_one_success나 none_failed로 변경해야 의도대로 동작한다.
정리
Airflow를 실전에서 활용하기 위한 핵심 개념들을 정리하면 다음과 같다.
- Operator는 태스크의 실행 내용을 정의하며, BashOperator, PythonOperator, EmailOperator 등 다양한 종류가 있다. 필요하면 BaseOperator를 상속하여 커스텀 Operator를 만들 수 있다.
- Scheduling은 Cron 표현식이나 timedelta로 설정하며, Data Interval 개념을 이해해야 실행 시점의 혼란을 피할 수 있다.
- Catchup은 과거 미실행 구간을 자동으로 채우고, Backfill은 특정 구간을 수동으로 재실행한다.
- BranchPythonOperator로 조건 분기를 처리하되, 합류 지점에서는 Trigger Rule 설정이 필수다.
- Trigger Rule은 태스크 실행 조건을 세밀하게 제어하며, 기본값
all_success외에도 상황에 맞는 규칙을 선택해야 파이프라인이 의도대로 동작한다.