Back to Blog
AirflowXComVariableConnectionHookTaskGroup

0x03. Airflow 데이터 공유와 관리

Airflow에서 Task 간 데이터를 공유하는 XCom, 설정을 관리하는 Variable, 외부 시스템 연결을 위한 Connection과 Hook, 그리고 TaskGroup을 알아본다.

Airflow DAG를 작성하다 보면, 단일 Task만으로 파이프라인을 완성하는 경우는 거의 없다. 데이터를 추출한 뒤 변환하고, 변환 결과를 적재하는 과정에서 Task 간에 데이터를 주고받아야 하고, 외부 시스템에 접속해야 하며, 공통 설정값을 관리해야 한다. 복잡한 파이프라인이라면 Task를 논리적으로 그룹핑할 필요도 생긴다.

이 글에서는 Airflow가 이런 요구를 해결하기 위해 제공하는 네 가지 핵심 메커니즘을 다룬다. XCom, Variable, Connection & Hook, 그리고 TaskGroup이다.


XCom: Task 간 데이터 전달

XCom이란?

XCom(Cross-Communication) 은 Airflow에서 Task 간에 소량의 데이터를 주고받기 위한 메커니즘이다. 이름 그대로 "교차 통신"을 의미한다.

각 XCom 값은 세 가지로 식별된다.

  • key: 데이터의 이름 (예: "user_count")
  • task_id: 데이터를 생성한 Task
  • dag_id: 해당 Task가 속한 DAG

택배 시스템에 비유하면, XCom은 Task 간의 사물함 같은 존재다. Task A가 사물함에 데이터를 넣어두면(push), Task B가 그 사물함에서 꺼내 쓰는(pull) 방식이다.

push와 pull

XCom의 기본 동작은 xcom_pushxcom_pull 메서드로 이루어진다.

def push_task(**context):
    # XCom에 데이터 저장
    context["ti"].xcom_push(key="user_count", value=42)

def pull_task(**context):
    # XCom에서 데이터 가져오기
    count = context["ti"].xcom_pull(task_ids="push_task", key="user_count")
    print(f"User count: {count}")

tiTaskInstance의 약자로, 현재 실행 중인 Task의 인스턴스를 가리킨다. xcom_push로 key-value 쌍을 저장하고, 다른 Task에서 xcom_pulltask_idskey를 지정하여 꺼내온다.

반환값 자동 XCom

매번 xcom_push를 호출하는 것은 번거롭다. Airflow는 Task의 반환값(return value) 을 자동으로 XCom에 저장하는 기능을 제공한다. 이때 key는 "return_value"가 된다.

TaskFlow API(@task 데코레이터)를 사용하면 이 과정이 더 자연스럽다.

from airflow.decorators import dag, task
from datetime import datetime

@dag(schedule="@daily", start_date=datetime(2026, 1, 1), catchup=False)
def xcom_example():

    @task
    def extract():
        return {"users": 100, "orders": 250}

    @task
    def transform(data: dict):
        data["total"] = data["users"] + data["orders"]
        return data

    @task
    def load(data: dict):
        print(f"Loading data: {data}")

    raw = extract()
    transformed = transform(raw)
    load(transformed)

xcom_example()

extract()의 반환값이 자동으로 XCom에 저장되고, transform(raw)에서 자동으로 pull된다. TaskFlow API 덕분에 일반 Python 함수처럼 데이터를 넘길 수 있고, 내부적으로는 XCom이 동작한다.

multiple_outputs=True 옵션을 사용하면 딕셔너리의 각 key를 별도의 XCom으로 분리하여 저장할 수도 있다.

@task(multiple_outputs=True)
def extract():
    return {"users": 100, "orders": 250}

이 경우 "users""orders"가 각각 독립적인 XCom key로 저장되어, 하류 Task에서 필요한 값만 선택적으로 가져올 수 있다.

크기 제한과 주의사항

XCom은 소량의 메타데이터 전달을 위해 설계되었다. 기본적으로 Airflow의 메타데이터 DB에 직렬화하여 저장하기 때문에, 대용량 데이터를 XCom으로 전달하면 DB에 과부하가 걸린다.

실질적인 크기 제한은 사용하는 DB 백엔드에 따라 다르다.

DB 백엔드XCom 최대 크기
SQLite2 GB (BLOB)
PostgreSQL1 GB (BYTEA)
MySQL64 KB (기본 TEXT)

하지만 제한에 가까운 크기의 데이터를 XCom에 넣는 것 자체가 안티패턴이다. DataFrame이나 대용량 파일을 Task 간에 전달해야 한다면, XCom 대신 다음 방법을 사용한다.

  • S3, GCS 등 오브젝트 스토리지에 파일을 저장하고, XCom에는 파일 경로만 전달
  • Custom XCom Backend: xcom_backend 설정을 통해 XCom 저장소 자체를 오브젝트 스토리지로 변경
# airflow.cfg
[core]
xcom_backend = airflow.providers.common.io.xcom.backend.XComObjectStorageBackend

핵심 원칙은 간단하다. XCom에는 경로, ID, 카운트 같은 작은 값만 넣고, 실제 데이터는 외부 스토리지에 보관하는 것이다.


Variable: 전역 설정값 관리

Variable이란?

Variable은 Airflow 환경 전체에서 사용할 수 있는 전역 key-value 저장소다. XCom이 특정 DAG Run 내에서 Task 간 데이터를 전달한다면, Variable은 DAG 실행과 무관하게 유지되는 설정값을 관리한다.

예를 들어, 데이터 파이프라인에서 공통으로 사용하는 S3 버킷 이름, API 엔드포인트, 배치 크기 같은 값을 Variable에 저장하면, 여러 DAG에서 하드코딩 없이 참조할 수 있다.

설정 방법

Variable은 세 가지 방법으로 설정한다.

1. Airflow UI

Admin > Variables 메뉴에서 key-value 쌍을 직접 입력할 수 있다. 가장 직관적인 방법이다.

2. CLI

airflow variables set my_key my_value
airflow variables get my_key

3. Python 코드

from airflow.models import Variable

#  저장
Variable.set("s3_bucket", "my-data-lake")

#  조회
bucket = Variable.get("s3_bucket")

JSON 값 저장

관련된 여러 설정값이 있다면, 각각을 별도 Variable로 만드는 대신 JSON으로 묶어서 저장하는 것이 효율적이다. Variable을 조회할 때마다 메타데이터 DB에 쿼리가 발생하므로, 호출 횟수를 줄이는 것이 성능에 유리하다.

import json

# JSON  저장
Variable.set("ml_config", json.dumps({
    "model_name": "xgboost",
    "learning_rate": 0.01,
    "batch_size": 256
}))

# JSON  조회 (자동 역직렬화)
config = Variable.get("ml_config", deserialize_json=True)
print(config["learning_rate"])  # 0.01

deserialize_json=True 옵션을 주면 문자열을 자동으로 Python 딕셔너리로 변환해준다.

환경 변수를 통해서도 Variable을 설정할 수 있다. AIRFLOW_VAR_ 접두어를 붙이면 Airflow가 자동으로 인식한다.

export AIRFLOW_VAR_ML_CONFIG='{"model_name": "xgboost", "learning_rate": 0.01}'

보안과 암호화

Variable 값은 Fernet 암호화를 통해 메타데이터 DB에 암호화 상태로 저장된다. Fernet key 없이는 DB에 직접 접근하더라도 값을 읽을 수 없다.

또한 Airflow는 Variable의 key 이름에 password, secret, token, api_key민감 키워드가 포함되어 있으면, UI와 로그에서 값을 자동으로 마스킹 처리한다. 예를 들어, slack_api_token이라는 Variable의 값은 UI에서 ***로 표시된다.

주의사항: Top-level 코드에서의 사용 금지

가장 흔한 실수 중 하나는 DAG 파일의 최상위 레벨(Task 외부)에서 Variable을 조회하는 것이다.

# [잘못된 ] DAG 파싱 시마다 DB 쿼리가 발생한다
bucket = Variable.get("s3_bucket")

@dag(schedule="@daily", start_date=datetime(2026, 1, 1))
def my_dag():

    @task
    def process():
        print(bucket)

Airflow Scheduler는 DAG 파일을 주기적으로 파싱한다. Top-level에 Variable.get()이 있으면, 파싱할 때마다 메타데이터 DB에 쿼리가 발생하여 성능 저하를 유발한다. Variable 조회는 반드시 Task 내부에서 수행해야 한다.

# [올바른 ] Task 실행 시에만 DB 쿼리가 발생한다
@dag(schedule="@daily", start_date=datetime(2026, 1, 1))
def my_dag():

    @task
    def process():
        bucket = Variable.get("s3_bucket")
        print(bucket)

Jinja 템플릿을 사용하는 것도 좋은 대안이다. 템플릿은 Task 실행 시점에 렌더링되므로 파싱 성능에 영향을 주지 않는다.

bash_task = BashOperator(
    task_id="echo_bucket",
    bash_command="echo {{ var.value.s3_bucket }}",
)

Connection & Hook: 외부 시스템 연결

Connection이란?

Connection은 외부 시스템(DB, 클라우드, API 등)에 접속하기 위한 인증 정보와 접속 설정의 묶음이다. 호스트, 포트, 로그인 정보, 비밀번호 등을 하나의 Connection으로 관리한다.

각 Connection은 고유한 conn_id로 식별되며, Operator나 Hook에 이 ID를 전달하면 해당 외부 시스템에 접속할 수 있다.

Connection이 없다면 어떻게 될까? DAG 코드 곳곳에 DB 비밀번호, API 키 같은 민감 정보가 하드코딩될 것이다. Connection은 인증 정보를 코드에서 분리하여, 보안성재사용성을 동시에 확보한다.

Connection의 구성 요소

하나의 Connection은 다음 필드로 구성된다.

필드설명예시
conn_id고유 식별자my_postgres
conn_type연결 유형postgres, aws, http
host호스트 주소db.example.com
port포트 번호5432
login사용자명airflow_user
password비밀번호****
schema스키마/DB 이름analytics
extra추가 설정 (JSON){"sslmode": "require"}

Connection 생성 방법

1. Airflow UI

Admin > Connections에서 폼을 통해 직관적으로 생성할 수 있다. Connection Type을 선택하면 해당 타입에 맞는 입력 필드가 표시된다.

2. CLI

airflow connections add my_postgres \
    --conn-type postgres \
    --conn-host db.example.com \
    --conn-port 5432 \
    --conn-login airflow_user \
    --conn-password secret123 \
    --conn-schema analytics

3. 환경 변수

AIRFLOW_CONN_ 접두어와 함께 URI 형식으로 설정할 수 있다.

export AIRFLOW_CONN_MY_POSTGRES='postgresql://airflow_user:secret123@db.example.com:5432/analytics'

환경 변수 방식은 Docker나 Kubernetes 환경에서 시크릿 관리 시스템과 연동하기 편리하다.

Hook이란?

Hook은 Connection을 기반으로 외부 시스템과 실제로 통신하는 고수준 인터페이스다. Connection이 "어디에 어떻게 접속할지"에 대한 정보라면, Hook은 그 정보를 사용하여 실제 연결을 수립하고 작업을 수행하는 도구다.

비유하자면, Connection은 전화번호부이고 Hook은 전화기다. 전화번호(Connection)를 알아야 전화기(Hook)로 통화할 수 있다.

Hook 사용 예시

from airflow.decorators import dag, task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime

@dag(schedule="@daily", start_date=datetime(2026, 1, 1), catchup=False)
def hook_example():

    @task
    def get_user_count():
        hook = PostgresHook(postgres_conn_id="my_postgres")
        conn = hook.get_conn()
        cursor = conn.cursor()
        cursor.execute("SELECT COUNT(*) FROM users")
        count = cursor.fetchone()[0]
        return count

    @task
    def upload_to_s3(count: int):
        from airflow.providers.amazon.aws.hooks.s3 import S3Hook
        hook = S3Hook(aws_conn_id="aws_default")
        hook.load_string(
            string_data=f'{{"user_count": {count}}}',
            key="reports/user_count.json",
            bucket_name="my-data-lake",
            replace=True,
        )

    count = get_user_count()
    upload_to_s3(count)

hook_example()

PostgresHookmy_postgres라는 conn_id로 등록된 Connection 정보를 가져와 PostgreSQL에 연결한다. S3Hookaws_default Connection으로 AWS S3에 접근한다. 개발자는 Connection의 세부 인증 정보를 코드에서 다룰 필요가 없다.

주요 Hook 종류

Airflow는 Provider 패키지를 통해 다양한 외부 시스템용 Hook을 제공한다.

Hook대상 시스템Provider 패키지
PostgresHookPostgreSQLapache-airflow-providers-postgres
MySqlHookMySQLapache-airflow-providers-mysql
S3HookAWS S3apache-airflow-providers-amazon
GCSHookGoogle Cloud Storageapache-airflow-providers-google
HttpHookREST APIapache-airflow-providers-http
SlackHookSlackapache-airflow-providers-slack

모든 Hook은 BaseHook을 상속하며, get_conn() 메서드를 통해 실제 연결 객체를 반환하는 공통 인터페이스를 따른다. 필요하다면 BaseHook을 상속하여 커스텀 Hook을 만들 수도 있다.


TaskGroup: Task 논리적 그룹핑

TaskGroup이란?

DAG의 Task가 수십 개로 늘어나면, Graph View가 복잡해져서 전체 구조를 파악하기 어려워진다. TaskGroup은 관련 Task들을 논리적으로 묶어 UI에서 접고 펼 수 있게 해주는 기능이다.

중요한 점은 TaskGroup이 순수하게 UI 레벨의 그룹핑이라는 것이다. TaskGroup에 속한 Task들은 원래 DAG에 그대로 남아 있으며, 별도의 DAG가 생성되지 않는다.

TaskGroup 사용법

from airflow.decorators import dag, task
from airflow.utils.task_group import TaskGroup
from datetime import datetime

@dag(schedule="@daily", start_date=datetime(2026, 1, 1), catchup=False)
def taskgroup_example():

    @task
    def start():
        print("Pipeline started")

    with TaskGroup("extract", tooltip="데이터 추출 단계") as extract_group:

        @task
        def extract_users():
            return "users_data"

        @task
        def extract_orders():
            return "orders_data"

        extract_users()
        extract_orders()

    with TaskGroup("transform", tooltip="데이터 변환 단계") as transform_group:

        @task
        def clean_data():
            print("Cleaning data")

        @task
        def enrich_data():
            print("Enriching data")

        clean_data() >> enrich_data()

    @task
    def load():
        print("Loading to warehouse")

    start() >> extract_group >> transform_group >> load()

taskgroup_example()

with TaskGroup(...) 블록 안에 Task를 정의하면, 해당 Task들이 하나의 그룹으로 묶인다. Airflow UI의 Graph View에서는 그룹이 하나의 박스로 표시되고, 클릭하면 내부 Task가 펼쳐진다.

중첩 TaskGroup

TaskGroup은 중첩(nesting) 이 가능하다. 복잡한 파이프라인에서 단계를 계층적으로 구성할 때 유용하다.

with TaskGroup("processing") as processing_group:
    with TaskGroup("validation") as validation_group:

        @task
        def check_schema():
            print("Schema validation")

        @task
        def check_nulls():
            print("Null check")

        check_schema() >> check_nulls()

    with TaskGroup("transformation") as transformation_group:

        @task
        def normalize():
            print("Normalizing")

        normalize()

    validation_group >> transformation_group

UI에서는 processing 그룹을 클릭하면 validationtransformation 하위 그룹이 나타나고, 각 하위 그룹을 다시 클릭하면 개별 Task가 보이는 트리 구조가 된다.

TaskGroup vs SubDAG

TaskGroup 이전에는 SubDAGOperator가 Task 그룹핑의 유일한 방법이었다. 하지만 SubDAG는 여러 심각한 문제가 있어 현재는 deprecated(사용 중단) 상태다.

비교 항목TaskGroupSubDAG
DAG 구조원래 DAG에 포함별도 DAG 생성
스케줄링메인 DAG과 동일독립적 스케줄링
워커 점유개별 Task별 할당/해제SubDAG 완료까지 슬롯 점유
UI접기/펼치기 지원별도 DAG 페이지로 이동
데드락 위험없음워커 슬롯 경쟁으로 가능
상태권장Deprecated

SubDAG의 가장 큰 문제는 워커 슬롯 점유다. SubDAG이 트리거되면 SubDAG 전체가 완료될 때까지 워커 슬롯을 점유하기 때문에, 다른 Task의 실행이 지연될 수 있고 심한 경우 데드락이 발생한다. TaskGroup은 이런 문제 없이 순수한 UI 그룹핑만 제공하므로, 스케줄링과 리소스 관리에 영향을 주지 않는다.


정리

Airflow 파이프라인을 구성할 때, 데이터 공유와 외부 시스템 연결은 피할 수 없는 과제다. 각 메커니즘의 역할을 다시 정리하면 다음과 같다.

  • XCom: Task 간 소량 데이터 전달. TaskFlow API와 함께 쓰면 자연스러운 Python 함수 호출처럼 데이터를 넘길 수 있다. 대용량 데이터는 외부 스토리지를 활용한다.
  • Variable: DAG 실행과 무관한 전역 설정값 관리. JSON 저장, Fernet 암호화, 민감 키워드 마스킹을 지원하며, Top-level 코드에서의 조회는 반드시 피해야 한다.
  • Connection & Hook: 외부 시스템의 인증 정보를 코드에서 분리하고, Hook을 통해 고수준 인터페이스로 접근한다. 보안과 재사용성을 동시에 확보하는 핵심 패턴이다.
  • TaskGroup: 관련 Task를 논리적으로 묶어 UI 가독성을 높인다. SubDAG의 리소스 문제 없이 순수 UI 레벨의 그룹핑을 제공한다.