Back to Blog
AirflowTaskFlow API@taskBest Practices성능 최적화모니터링

0x04. Airflow 심화 - TaskFlow API와 Best Practices

Airflow 2.0에서 도입된 TaskFlow API의 사용법과 프로덕션 환경에서의 DAG 작성 모범 사례, 성능 최적화 전략을 알아본다.

앞선 글들에서 DAG, Operator, XCom, Variable 등 Airflow의 핵심 구성 요소를 살펴보았다. 이 도구들로 대부분의 파이프라인을 구축할 수 있지만, PythonOperator와 XCom을 직접 다루는 코드는 꽤 장황하다. "그냥 Python 함수처럼 쓸 수 없을까?" 라는 질문에 대한 답이 바로 Airflow 2.0에서 도입된 TaskFlow API이다.

이 글에서는 TaskFlow API의 사용법, 프로덕션 수준의 DAG 작성 모범 사례, 그리고 성능 최적화 전략을 다룬다.


TaskFlow API: 함수형 DAG 작성

기존 방식의 불편함

PythonOperator로 ETL 파이프라인을 작성하면, XCom을 통한 데이터 전달이 명시적이고 반복적이다.

def extract(**context):
    data = {"users": 100, "events": 5000}
    context["ti"].xcom_push(key="raw_data", value=data)

def transform(**context):
    raw = context["ti"].xcom_pull(task_ids="extract", key="raw_data")
    result = {"total": raw["users"] + raw["events"]}
    context["ti"].xcom_push(key="transformed", value=result)

def load(**context):
    data = context["ti"].xcom_pull(task_ids="transform", key="transformed")
    print(f"Loading {data}")

with DAG("etl_old_style", ...):
    t1 = PythonOperator(task_id="extract", python_callable=extract)
    t2 = PythonOperator(task_id="transform", python_callable=transform)
    t3 = PythonOperator(task_id="load", python_callable=load)
    t1 >> t2 >> t3

xcom_push/xcom_pull을 반복적으로 호출해야 하고, Task 간의 데이터 흐름이 코드에서 직관적으로 드러나지 않는다.

@task 데코레이터

TaskFlow API는 @task 데코레이터 하나로 이 모든 보일러플레이트를 제거한다.

from airflow.decorators import dag, task
from datetime import datetime

@dag(schedule="@daily", start_date=datetime(2026, 1, 1), catchup=False)
def etl_taskflow():

    @task
    def extract():
        return {"users": 100, "events": 5000}

    @task
    def transform(raw_data: dict):
        return {"total": raw_data["users"] + raw_data["events"]}

    @task
    def load(result: dict):
        print(f"Loading {result}")

    raw = extract()
    transformed = transform(raw)
    load(transformed)

etl_taskflow()

코드량이 절반으로 줄었다. 핵심적인 변화는 세 가지이다.

  1. 자동 XCom: 함수의 return 값이 자동으로 XCom에 저장되고, 다음 Task의 인자로 전달된다
  2. 암묵적 의존성: transform(raw)처럼 함수 호출 형태로 연결하면, Airflow가 자동으로 Task 의존성을 생성한다
  3. @dag 데코레이터: DAG 정의도 데코레이터로 깔끔하게 처리한다

Multiple Outputs

하나의 Task에서 여러 값을 반환하고, 다음 Task들이 각각 필요한 값만 가져가게 하려면 multiple_outputs=True를 사용한다.

@task(multiple_outputs=True)
def extract():
    return {
        "users": [{"id": 1}, {"id": 2}],
        "metadata": {"source": "api", "timestamp": "2026-01-15"}
    }

@task
def process_users(users: list):
    return [u["id"] for u in users]

@task
def log_metadata(metadata: dict):
    print(f"Source: {metadata['source']}")

data = extract()
process_users(data["users"])
log_metadata(data["metadata"])

multiple_outputs=True이면 반환된 dict의 각 키가 별도의 XCom 키로 저장된다. 타입 힌트로 dict를 지정하면 자동으로 multiple_outputs=True가 적용된다.

기존 Operator와 혼합 사용

TaskFlow API는 기존 Operator와 자연스럽게 혼합할 수 있다. @task로 작성하기 어려운 작업(Bash 실행, SQL 쿼리, 외부 서비스 연동 등)은 전용 Operator를 쓰고, Python 로직은 @task로 작성하면 된다.

from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

@dag(schedule="@daily", start_date=datetime(2026, 1, 1), catchup=False)
def mixed_pipeline():

    query_result = SQLExecuteQueryOperator(
        task_id="fetch_data",
        conn_id="my_postgres",
        sql="SELECT count(*) FROM users WHERE created_at = '{{ ds }}'",
    )

    @task
    def process(query_output):
        count = query_output[0][0]
        return {"user_count": count, "date": "{{ ds }}"}

    @task
    def notify(result: dict):
        print(f"Date {result['date']}: {result['user_count']} users")

    data = process(query_result.output)
    notify(data)

mixed_pipeline()

.output 속성을 통해 기존 Operator의 결과를 TaskFlow 함수에 전달할 수 있다.


DAG 작성 Best Practices

Idempotent DAG 설계

멱등성(Idempotency) 은 같은 DAG를 같은 날짜로 여러 번 실행해도 결과가 동일해야 한다는 원칙이다. 이는 재실행(retry)과 백필(backfill)의 기반이 된다.

# Bad: 실행할 때마다 결과가 달라진다
@task
def process():
    df = read_all_new_records()  # "새로운" 레코드의 기준이 모호
    write_append(df)             # 재실행  중복 적재

# Good: 날짜 기준으로 범위가 고정된다
@task
def process(**context):
    ds = context["ds"]
    df = read_records_for_date(ds)    # 특정 날짜의 레코드만
    write_overwrite_partition(df, ds)  # 해당 파티션을 덮어쓰기

핵심 원칙은 두 가지이다.

  • 시간 범위를 명시적으로 지정: ds(execution date)를 기준으로 처리할 데이터 범위를 한정한다
  • 쓰기는 덮어쓰기(overwrite): INSERT보다 DELETE + INSERT 또는 파티션 덮어쓰기를 사용한다

적절한 Task 분리 기준

Task를 어떤 단위로 나눌지는 자주 고민되는 문제이다.

너무 잘게 나누면: Task 간 XCom 전달 오버헤드 증가, 스케줄러 부하 증가, DAG 그래프가 복잡해진다.

너무 크게 합치면: 실패 시 전체 Task를 재실행해야 하고, 병렬 처리 기회를 잃는다.

경험적 가이드라인:

  • 재시도 단위로 분리한다. API 호출과 DB 적재가 하나의 Task에 있으면, DB 적재 실패 시 API도 다시 호출해야 한다
  • 병렬 실행 가능성이 있으면 분리한다. 독립적인 데이터 소스는 각각 별도 Task로
  • 논리적 단계(Extract, Transform, Load) 를 기본 분리 단위로 삼는다

에러 핸들링과 재시도

@task(
    retries=3,
    retry_delay=timedelta(minutes=5),
    retry_exponential_backoff=True,
    max_retry_delay=timedelta(minutes=30),
)
def call_external_api():
    response = requests.get("https://api.example.com/data")
    response.raise_for_status()
    return response.json()
  • retries: 실패 시 최대 재시도 횟수
  • retry_delay: 재시도 간 대기 시간
  • retry_exponential_backoff: 지수 백오프 적용 (5분 → 10분 → 20분)
  • max_retry_delay: 백오프 최대 대기 시간

외부 API 호출, 네트워크 의존 작업에는 재시도 설정이 필수이다. 반면 데이터 검증 실패처럼 재시도해도 결과가 같은 경우에는 재시도를 비활성화하고 즉시 알림을 보내는 것이 낫다.

템플릿 변수(Jinja) 활용

Airflow는 Jinja 템플릿을 통해 실행 컨텍스트 정보를 동적으로 주입한다. Operator의 문자열 파라미터에서 사용할 수 있다.

BashOperator(
    task_id="process",
    bash_command="python etl.py --date {{ ds }} --env {{ var.value.environment }}",
)

자주 사용되는 템플릿 변수:

변수설명예시
{{ ds }}실행 날짜 (YYYY-MM-DD)2026-01-15
{{ ds_nodash }}실행 날짜 (대시 없음)20260115
{{ data_interval_start }}데이터 구간 시작2026-01-15T00:00:00+00:00
{{ data_interval_end }}데이터 구간 끝2026-01-16T00:00:00+00:00
{{ var.value.key }}Airflow Variable 값-
{{ conn.conn_id.host }}Connection 정보-

@task 데코레이터에서는 **context를 통해 이 값들에 접근할 수 있다.


성능 최적화

Executor 선택

Executor는 Task가 어디서, 어떻게 실행되는지를 결정한다.

Executor특징적합한 환경
SequentialExecutor한 번에 Task 1개만 실행개발/테스트
LocalExecutor로컬 프로세스 기반 병렬 실행소규모 (Task 수십 개)
CeleryExecutor분산 워커 기반중대규모
KubernetesExecutorTask마다 Pod 생성동적 리소스 관리 필요 시

LocalExecutor는 설정이 간단하고 별도 인프라가 불필요하여, 중소규모 환경에서 가장 많이 사용된다. DAG 수가 많아지고 Task 실행이 잦아지면 CeleryExecutor로 전환하여 워커를 수평 확장한다. 리소스 요구량이 Task마다 크게 다르거나 격리가 중요한 경우 KubernetesExecutor가 적합하다.

Pool과 Priority로 리소스 관리

Pool은 동시에 실행할 수 있는 Task 수를 제한하는 메커니즘이다. 외부 시스템의 동시 연결 수에 제한이 있을 때 유용하다.

@task(pool="db_connections", pool_slots=1)
def query_database():
    # DB 연결  제한이 있는 작업
    ...

@task(pool="db_connections", pool_slots=2)
def heavy_query():
    # 리소스를  많이 사용하는 쿼리
    ...

Airflow UI에서 Pool의 크기(예: db_connections = 5)를 설정하면, 해당 Pool에 할당된 Task들의 총 slot 사용량이 5를 넘지 않도록 스케줄러가 조절한다.

Priority Weight는 같은 Pool 내에서 어떤 Task를 먼저 실행할지 결정한다.

@task(priority_weight=10)
def critical_task():
    ...

@task(priority_weight=1)
def low_priority_task():
    ...

값이 높을수록 우선순위가 높다.

DAG 파싱 성능 개선

Airflow 스케줄러는 주기적으로 DAG 파일을 파싱한다. DAG 파일이 많거나 파싱이 느리면 스케줄링 지연이 발생한다.

파싱 시간을 줄이는 방법:

  • DAG 파일 수 최소화: 관련된 DAG는 하나의 파일에 모을 수 있다
  • Top-level 코드 최소화: DAG 파일의 최상위에서 무거운 연산(DB 조회, API 호출)을 하지 않는다. 이 코드는 매 파싱 주기마다 실행된다
  • .airflowignore 활용: DAG 디렉토리에서 파싱 대상에서 제외할 파일 패턴을 지정한다
# Bad:  파싱마다 DB 쿼리 실행
connection = get_db_connection()
tables = connection.execute("SHOW TABLES").fetchall()

for table in tables:
    @dag(...)
    def dynamic_dag():
        ...

# Good: DAG 정의는 가볍게, 실제 로직은 Task 내부에서
TABLES = ["users", "orders", "products"]  # 정적 목록 사용

for table in TABLES:
    @dag(dag_id=f"etl_{table}", ...)
    def dynamic_dag():
        ...

모니터링과 알림

SLA(Service Level Agreement)

SLA는 Task가 특정 시간 내에 완료되어야 한다는 기대치를 설정한다. SLA를 초과하면 알림이 발생한다.

@task(sla=timedelta(hours=2))
def must_finish_in_2_hours():
    ...

SLA miss는 Airflow UI의 SLA 탭에서 확인할 수 있고, 이메일 알림으로도 받을 수 있다.

콜백(Callback)

Task나 DAG의 상태 변화에 반응하는 콜백 함수를 등록할 수 있다.

def on_failure(context):
    task_id = context["task_instance"].task_id
    dag_id = context["dag"].dag_id
    execution_date = context["execution_date"]
    send_slack_alert(f"Task {dag_id}.{task_id} failed at {execution_date}")

def on_success(context):
    send_metric("task_success", 1)

@task(on_failure_callback=on_failure, on_success_callback=on_success)
def important_task():
    ...

주요 콜백 종류:

  • on_failure_callback: Task 실패 시
  • on_success_callback: Task 성공 시
  • on_retry_callback: Task 재시도 시
  • on_execute_callback: Task 실행 시작 시

DAG 레벨에서도 콜백을 설정할 수 있다. 특히 on_failure_callback은 Slack이나 PagerDuty와 연동하여 장애 알림 체계를 구축하는 데 필수적이다.


정리

  • TaskFlow API: @task 데코레이터로 XCom 보일러플레이트를 제거하고, Python 함수처럼 자연스러운 DAG 작성이 가능하다. 기존 Operator와 혼합 사용도 자유롭다
  • Idempotent 설계: 실행 날짜 기반의 데이터 범위 지정과 덮어쓰기 방식으로 멱등성을 보장한다
  • 에러 핸들링: 재시도, 지수 백오프, 콜백을 적절히 조합하여 장애에 대응한다
  • 성능 최적화: Executor 선택, Pool/Priority 설정, DAG 파싱 경량화로 스케줄링 효율을 높인다
  • 모니터링: SLA와 콜백을 통해 파이프라인 상태를 추적하고, 문제 발생 시 빠르게 대응한다

Airflow는 설정할 수 있는 옵션이 매우 많은 시스템이다. 처음부터 모든 것을 최적화하려 하기보다, 기본 설정으로 시작하여 실제 운영에서 발견되는 병목을 하나씩 해결해 나가는 접근이 효과적이다.