Back to Blog
AirflowDAGTaskWorkflowOrchestration

0x01. Apache Airflow - DAG와 Task의 이해

Apache Airflow의 핵심 개념인 DAG와 Task를 이해하고, 워크플로 오케스트레이션의 기본 구조를 알아본다.

데이터 파이프라인이 하나일 때는 크론탭(crontab)으로 충분하다. 하지만 파이프라인이 수십 개로 늘어나고, 작업 간 의존성이 복잡해지며, 실패 시 재시도와 알림까지 필요해지면 이야기가 달라진다. Apache Airflow는 바로 이 문제를 해결하기 위해 탄생한 워크플로 오케스트레이션(Workflow Orchestration) 도구다.


Airflow란?

Apache Airflow는 2014년 Airbnb에서 시작된 오픈소스 프로젝트로, 현재는 Apache Software Foundation의 Top-Level 프로젝트다. 데이터 파이프라인을 프로그래밍 방식으로 정의하고, 스케줄링하며, 모니터링할 수 있게 해준다.

핵심 철학은 "워크플로를 코드로 관리한다" 는 것이다. XML이나 YAML이 아닌, Python 코드로 파이프라인을 정의하기 때문에 버전 관리, 테스트, 협업이 자연스럽다.

Airflow는 데이터 파이프라인의 지휘자(Orchestrator) 역할을 한다. 직접 데이터를 처리하는 것이 아니라, "어떤 작업을 어떤 순서로 언제 실행할지"를 관리한다.

기본 아키텍처

Airflow는 네 가지 핵심 컴포넌트로 구성된다.

  • Scheduler(스케줄러): DAG 파일을 지속적으로 파싱하고, 실행할 Task를 큐에 넣는 핵심 엔진이다. 어떤 Task가 실행 가능한 상태인지 판단하고, 의존성을 확인하며, 상태 전이를 관리한다.
  • Executor(실행기): 실제로 Task를 실행하는 주체다. LocalExecutor, CeleryExecutor, KubernetesExecutor 등 다양한 실행기를 환경에 맞게 선택할 수 있다.
  • Web Server(웹 서버): DAG의 상태, 실행 이력, 로그 등을 시각화하는 UI를 제공한다. 파이프라인의 현재 상태를 한눈에 파악하고, 수동 트리거나 재실행도 가능하다.
  • Metadata Database(메타데이터 DB): DAG 정의, Task 상태, 실행 이력 등 모든 메타데이터를 저장하는 관계형 데이터베이스다. PostgreSQL이나 MySQL을 주로 사용한다.

이 네 컴포넌트의 관계를 정리하면 다음과 같다. Scheduler가 DAG 파일을 파싱하여 실행 계획을 세우고, 그 정보를 Metadata DB에 기록한다. Executor는 DB에서 실행할 Task를 가져와 실행하며, 결과를 다시 DB에 저장한다. Web Server는 DB의 정보를 읽어 사용자에게 시각화하여 보여준다.


DAG: 워크플로의 설계도

DAG란?

DAG(Directed Acyclic Graph, 유향 비순환 그래프) 는 Airflow의 가장 핵심적인 개념이다. Task들을 모아놓고, 이들 사이의 의존 관계와 실행 순서를 정의하는 워크플로의 설계도라고 할 수 있다.

이름을 분해하면 다음과 같다.

  • Directed(유향): 의존성에 방향이 있다. A -> B는 "A 다음에 B를 실행한다"는 뜻이다.
  • Acyclic(비순환): 순환(사이클)이 없다. A -> B -> C -> A 같은 구조는 허용되지 않는다. 순환이 있으면 실행이 끝나지 않기 때문이다.
  • Graph(그래프): Task(노드)와 의존성(간선)으로 구성된 그래프 구조다.

택배 물류 센터에 비유하면, DAG는 배송 계획표다. "포장이 끝나면 분류하고, 분류가 끝나면 배송한다"는 순서를 정의하되, "배송 후 다시 포장한다"는 순환은 존재하지 않는다.

DAG 파일 작성법

DAG는 Python 파일로 작성되며, Airflow의 dags/ 폴더에 위치한다. 세 가지 방식으로 DAG를 정의할 수 있다.

1. Context Manager 방식

가장 널리 사용되는 방식이다. with 문 안에서 정의된 Task는 자동으로 해당 DAG에 소속된다.

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

with DAG(
    dag_id="my_first_dag",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,
) as dag:

    def extract():
        print("데이터 추출 시작")

    extract_task = PythonOperator(
        task_id="extract",
        python_callable=extract,
    )

2. @dag 데코레이터 방식 (TaskFlow API)

Airflow 2.0 이후 도입된 방식으로, 더 Pythonic한 코드를 작성할 수 있다.

from airflow.decorators import dag, task
from datetime import datetime

@dag(
    dag_id="my_taskflow_dag",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,
)
def my_pipeline():

    @task
    def extract():
        return {"data": [1, 2, 3]}

    @task
    def transform(raw_data):
        return [x * 2 for x in raw_data["data"]]

    @task
    def load(transformed_data):
        print(f"저장할 데이터: {transformed_data}")

    raw = extract()
    transformed = transform(raw)
    load(transformed)

my_pipeline()

TaskFlow API에서는 함수의 반환값이 자동으로 다음 Task의 입력으로 전달되기 때문에, 데이터 흐름이 직관적이다.

3. 표준 생성자 방식

DAG 객체를 직접 생성하고, 각 Task에 dag 파라미터로 전달하는 방식이다. Context Manager 방식이 등장한 이후로는 잘 사용되지 않는다.

dag = DAG(
    dag_id="standard_dag",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
)

task_a = PythonOperator(
    task_id="task_a",
    python_callable=my_function,
    dag=dag,  # 명시적으로 DAG 지정
)

주요 DAG 파라미터

파라미터설명예시
dag_idDAG의 고유 식별자"etl_pipeline"
start_dateDAG 스케줄링의 시작 기준일datetime(2026, 1, 1)
schedule실행 주기 (cron 표현식 또는 프리셋)"@daily", "0 9 * * *"
catchup과거 미실행 구간을 소급 실행할지 여부False
default_args모든 Task에 적용할 기본 파라미터{"retries": 3}
max_active_runs동시에 실행 가능한 DAG Run 수1
tagsWeb UI에서 DAG를 분류하기 위한 태그["etl", "production"]

여기서 흔히 혼동하는 것이 start_date다. 이 값은 "DAG이 처음 실행되는 날짜"가 아니다. 정확히는 DAG의 첫 번째 데이터 구간(data interval)의 시작점이다. start_date가 1월 1일이고 schedule@daily라면, 첫 번째 DAG Run은 1월 1일~2일의 데이터를 대상으로 1월 2일에 실행된다.

catchup 파라미터도 주의가 필요하다. True로 설정하면 start_date부터 현재까지 실행되지 않은 모든 구간에 대해 DAG Run을 생성한다. 과거 데이터를 소급 처리해야 할 때는 유용하지만, 의도치 않게 수백 개의 DAG Run이 한꺼번에 트리거될 수 있으므로 주의해야 한다.


Task: 실행의 최소 단위

Task란?

Task는 DAG 안에서 실행되는 개별 작업 단위다. 데이터를 추출하는 것, 변환하는 것, 적재하는 것 각각이 하나의 Task에 해당한다. DAG가 설계도라면, Task는 설계도 위의 각 공정이다.

Airflow에서 Task를 정의하는 방법은 크게 세 가지다.

  • Operator(오퍼레이터): 미리 정의된 작업 템플릿이다. PythonOperator, BashOperator, EmailOperator 등 다양한 Operator가 내장되어 있으며, 외부 시스템 연동을 위한 Provider 패키지도 풍부하다.
  • Sensor(센서): 특정 조건이 충족될 때까지 대기하는 특수한 Operator다. 파일이 생성될 때까지 기다리거나, 외부 API의 응답을 기다리는 등의 용도로 사용된다.
  • @task 데코레이터(TaskFlow): Python 함수를 그대로 Task로 변환한다. 반환값이 자동으로 XCom을 통해 다음 Task에 전달되어 편리하다.

Operator는 템플릿이고, 이를 DAG 안에서 인스턴스화하면 Task가 된다. 설계도면에서 "용접"이라는 공정 유형이 Operator라면, "A 부품 용접"이라는 구체적 작업이 Task인 셈이다.

Task Instance: 실행의 실체

Task Instance(태스크 인스턴스) 는 특정 DAG Run에서 특정 Task가 실제로 실행된 결과물이다.

DAG가 매일 실행된다면, extract라는 Task는 매일 하나씩 Task Instance를 생성한다. 1월 1일의 extract와 1월 2일의 extract는 같은 Task지만, 서로 다른 Task Instance다. 각 Instance는 독립적인 상태를 가진다.

Task 상태(State)

Task Instance는 라이프사이클 동안 다양한 상태(State) 를 거친다. 정상적인 흐름은 다음과 같다.

none -> scheduled -> queued -> running -> success

주요 상태를 정리하면 다음과 같다.

상태설명
noneTask가 아직 큐에 들어가지 않은 초기 상태
scheduledScheduler가 실행 대상으로 지정한 상태
queuedExecutor의 실행 큐에 들어간 상태
running현재 실행 중인 상태
success성공적으로 완료된 상태
failed실행 중 오류가 발생한 상태
skipped조건에 의해 건너뛴 상태 (BranchOperator 등)
upstream_failed상위 Task가 실패하여 실행되지 않은 상태
up_for_retry실패 후 재시도 대기 중인 상태

failedupstream_failed의 차이를 명확히 알아두자. failed는 해당 Task 자체에서 오류가 발생한 것이고, upstream_failed는 해당 Task는 문제없지만 앞선 Task가 실패하여 실행 자체가 불가능한 상태다.


Task Dependencies: 실행 순서 정의하기

DAG의 핵심은 Task 간 의존성(Dependency) 을 정의하는 것이다. 어떤 Task가 먼저 실행되어야 하는지, 어떤 Task는 병렬로 실행 가능한지를 명시해야 한다.

>><< 연산자

Airflow는 Python의 비트 시프트(bitshift) 연산자를 오버로딩하여 직관적인 의존성 표현을 지원한다.

# A 다음에 B를 실행
task_a >> task_b

# 위와 동일 (방향만 반대)
task_b << task_a

# 체이닝: A -> B -> C 순서로 실행
task_a >> task_b >> task_c

>>downstream(하류) 관계를, <<upstream(상류) 관계를 설정한다. 강물의 흐름을 떠올리면 직관적이다. 상류(upstream)에서 하류(downstream)로 데이터가 흘러간다.

set_upstream / set_downstream

메서드 호출 방식으로도 의존성을 설정할 수 있다.

# task_a 다음에 task_b 실행 (A -> B)
task_a.set_downstream(task_b)

# 위와 동일
task_b.set_upstream(task_a)

기능적으로 >>, << 연산자와 완전히 동일하다. 하지만 >> 연산자가 더 간결하고 가독성이 좋기 때문에 대부분의 경우 연산자 방식이 권장된다.

의존성 패턴

실제 파이프라인에서 자주 사용되는 의존성 패턴은 크게 네 가지다.

1. 직렬(Sequential)

Task를 하나씩 순서대로 실행한다.

extract >> transform >> load

2. 병렬(Parallel)

서로 독립적인 Task를 동시에 실행한다.

start >> [task_a, task_b, task_c] >> end

task_a, task_b, task_cstart 이후 동시에 실행되며, 세 Task가 모두 완료된 후 end가 실행된다.

3. 팬아웃(Fan-Out)

하나의 Task 이후에 여러 Task가 분기되는 패턴이다.

extract >> [transform_users, transform_orders, transform_products]

데이터를 추출한 뒤, 사용자/주문/상품 데이터를 각각 독립적으로 변환하는 경우에 해당한다.

4. 팬인(Fan-In)

여러 Task의 결과를 하나의 Task로 합치는 패턴이다.

[transform_users, transform_orders, transform_products] >> generate_report

모든 변환 작업이 완료된 후에 통합 리포트를 생성하는 경우다.

이 패턴들을 조합하면 복잡한 파이프라인도 표현할 수 있다. 실제 ETL 파이프라인의 전체 모습은 대체로 다음과 같은 형태가 된다.

from airflow.decorators import dag, task
from datetime import datetime

@dag(
    dag_id="etl_pipeline",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,
    default_args={"retries": 2},
    tags=["etl", "production"],
)
def etl_pipeline():

    @task
    def extract_users():
        return {"users": ["Alice", "Bob"]}

    @task
    def extract_orders():
        return {"orders": [101, 102]}

    @task
    def transform(data):
        return {k: len(v) for k, v in data.items()}

    @task
    def load(report):
        print(f"리포트 저장: {report}")

    users = extract_users()
    orders = extract_orders()
    user_stats = transform(users)
    order_stats = transform(orders)
    load(user_stats)
    load(order_stats)

etl_pipeline()

TaskFlow API에서는 함수 호출 관계가 곧 의존성이 되므로, >> 연산자를 명시적으로 쓰지 않아도 된다. 반환값을 다음 함수의 인자로 전달하면 Airflow가 자동으로 의존성을 추론한다.


정리

Apache Airflow의 핵심 개념을 다시 정리하면 다음과 같다.

  • Airflow: 워크플로를 Python 코드로 정의하고 관리하는 오케스트레이션 도구. Scheduler, Executor, Web Server, Metadata DB로 구성된다.
  • DAG: Task들의 의존 관계를 정의하는 유향 비순환 그래프. 워크플로의 설계도 역할을 한다.
  • Task: DAG 내의 개별 작업 단위. Operator, Sensor, @task 데코레이터로 정의한다.
  • Task Instance: 특정 DAG Run에서 Task가 실행된 구체적인 인스턴스. 고유한 상태를 가진다.
  • Task Dependencies: >>, << 연산자 또는 set_upstream/set_downstream 메서드로 실행 순서를 정의한다.

이 다섯 가지 개념만 확실히 이해하면, Airflow로 어떤 데이터 파이프라인이든 설계할 수 있는 기초가 갖춰진다. 다음 글에서는 Airflow의 다양한 Operator와 실제 데이터 파이프라인 구축 사례를 다룰 예정이다.