Back to Blog
FlinkBatchDataStreamBoundedExecution ModeUnified API

0x06. Flink 배치 처리와 DataStream 통합

Flink의 배치 처리 지원 방식, DataStream API에서의 배치/스트리밍 통합, 그리고 실행 모드 설정을 알아본다.

Flink는 스트리밍 우선(Streaming-first) 엔진이지만, 배치 처리도 강력하게 지원한다. 사실 Flink의 관점에서 배치는 "유한한 스트림(Bounded Stream)" 에 불과하다. 이 통합된 시각 덕분에, 같은 코드로 배치와 스트리밍 처리를 모두 수행할 수 있다.


배치와 스트리밍의 통합

Flink의 철학: Bounded vs Unbounded Stream

전통적으로 배치와 스트리밍은 별개의 기술로 취급되었다. Spark는 배치 엔진에 스트리밍을 얹었고(마이크로배치), Storm은 스트리밍만 지원했다.

Flink는 다른 접근을 택했다. 모든 데이터를 스트림으로 본다.

  • Unbounded Stream: 끝이 없는 스트림. 실시간 이벤트 처리. 일반적인 "스트리밍"
  • Bounded Stream: 시작과 끝이 있는 유한한 스트림. 파일이나 DB 스냅샷. 일반적인 "배치"
Unbounded:  ──event──event──event──event──event──→ ( 없음)
Bounded:    ──event──event──event──event──| ( 있음)

이 관점에서 배치 처리는 스트리밍 처리의 특수한 경우이다. 데이터에 끝이 있으므로 최적화를 더 적극적으로 적용할 수 있다.


DataSet API에서 DataStream API로

DataSet API (레거시)

초기 Flink는 배치 전용 DataSet API를 별도로 제공했다.

// 레거시 DataSet API (더 이상 권장되지 않음)
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> text = env.readTextFile("input.txt");
DataSet<Tuple2<String, Integer>> counts = text
    .flatMap(new Tokenizer())
    .groupBy(0)
    .sum(1);
counts.writeAsCsv("output.csv");
env.execute();

DataSet API는 배치에 최적화되어 있었지만, 스트리밍과 코드를 공유할 수 없다는 근본적 한계가 있었다.

통합 DataStream API

Flink 1.12부터 DataStream API에 배치 실행 모드가 추가되면서, 하나의 API로 배치와 스트리밍을 모두 처리할 수 있게 되었다. DataSet API는 점진적으로 폐기(deprecated) 중이다.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 실행 모드만 바꾸면 동일한 코드가 배치/스트리밍으로 동작
env.setRuntimeMode(RuntimeExecutionMode.BATCH);  // 또는 STREAMING, AUTOMATIC

DataStream<String> text = env.readFile(
    new TextInputFormat(new Path("input.txt")), "input.txt"
);

DataStream<Tuple2<String, Integer>> counts = text
    .flatMap(new Tokenizer())
    .keyBy(value -> value.f0)
    .sum(1);

counts.print();
env.execute("Word Count");

RuntimeExecutionMode 설정:

모드동작
STREAMING스트리밍 모드. 레코드 단위 처리. 기본값
BATCH배치 모드. Bounded 소스에 대해 배치 최적화 적용
AUTOMATIC소스가 모두 Bounded이면 BATCH, 하나라도 Unbounded면 STREAMING

커맨드라인으로도 지정할 수 있다.

flink run -Dexecution.runtime-mode=BATCH my-job.jar

배치 모드의 최적화

BATCH 모드에서 Flink는 스트리밍과 다른 실행 전략을 적용한다.

정렬 기반 Shuffle

스트리밍 모드에서는 데이터가 네트워크를 통해 실시간으로 전달된다(Pipelined Shuffle). 배치 모드에서는 데이터를 디스크에 먼저 기록한 후 다음 스테이지에서 읽는다(Blocking Shuffle).

스트리밍: Task A ──파이프라인──→ Task B  (동시 실행)
배치:     Task A  [디스크]  Task B     (순차 실행 가능)

Blocking Shuffle은 메모리를 덜 사용하고, 장애 시 전체를 재실행하지 않아도 된다. 이미 디스크에 쓰인 중간 결과를 재사용할 수 있기 때문이다.

스테이지 기반 스케줄링

스트리밍에서는 모든 Task가 동시에 실행된다. 배치에서는 스테이지 단위로 실행하여 리소스를 절약한다.

스트리밍: [Stage 1 + Stage 2 + Stage 3] 모두 동시 실행
배치:     [Stage 1]  [Stage 2]  [Stage 3] 순차 실행

이 방식은 더 적은 TaskManager로 대규모 배치 잡을 실행할 수 있게 해준다.

이벤트 타임 처리

배치 모드에서는 Watermark가 필요 없다. 모든 데이터가 이미 있으므로 완벽한 정렬이 가능하다.

  • 윈도우 연산에서 늦은 데이터(Late Data)가 발생하지 않는다
  • 타이머는 모든 데이터 처리 후 일괄 발화된다
  • 결과의 결정성(determinism)이 보장된다

Table API와 SQL

Flink의 Table API/SQL은 배치와 스트리밍에서 동일하게 사용할 수 있는 상위 레벨 API이다.

배치 SQL 예제

TableEnvironment tableEnv = TableEnvironment.create(
    EnvironmentSettings.newInstance().inBatchMode().build()
);

// CSV 파일을 테이블로 등록
tableEnv.executeSql(
    "CREATE TABLE orders (" +
    "  order_id STRING," +
    "  user_id STRING," +
    "  amount DOUBLE," +
    "  order_date DATE" +
    ") WITH (" +
    "  'connector' = 'filesystem'," +
    "  'path' = '/data/orders.csv'," +
    "  'format' = 'csv'" +
    ")"
);

// SQL로 배치 집계
Table result = tableEnv.sqlQuery(
    "SELECT user_id, SUM(amount) as total, COUNT(*) as cnt " +
    "FROM orders " +
    "GROUP BY user_id " +
    "HAVING SUM(amount) > 1000"
);

result.execute().print();

스트리밍 SQL 예제

같은 SQL이 스트리밍 모드에서도 동작한다. 다만 결과가 지속적으로 업데이트된다.

TableEnvironment tableEnv = TableEnvironment.create(
    EnvironmentSettings.newInstance().inStreamingMode().build()
);

// Kafka 토픽을 테이블로 등록
tableEnv.executeSql(
    "CREATE TABLE orders (" +
    "  order_id STRING," +
    "  user_id STRING," +
    "  amount DOUBLE," +
    "  event_time TIMESTAMP(3)," +
    "  WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND" +
    ") WITH (" +
    "  'connector' = 'kafka'," +
    "  'topic' = 'orders'," +
    "  'properties.bootstrap.servers' = 'broker:9092'," +
    "  'format' = 'json'" +
    ")"
);

// 동일한 SQL이 스트리밍에서도 동작 (결과가 계속 업데이트됨)
tableEnv.sqlQuery(
    "SELECT user_id, SUM(amount) as total " +
    "FROM orders " +
    "GROUP BY user_id"
);

배치에서는 orders가 유한한 CSV 파일이고, 스트리밍에서는 무한한 Kafka 토픽이다. SQL은 동일하다.


특성Flink (배치 모드)Spark
기본 철학스트리밍 엔진에서 배치 지원배치 엔진에서 스트리밍 지원
실행 모델스트리밍 런타임 + 배치 최적화DAG 기반 배치 런타임
배치 성숙도빠르게 발전 중오랜 기간 검증됨
에코시스템상대적으로 작음매우 풍부 (MLlib, GraphX 등)
스트리밍 전환동일 코드, 모드만 변경API 변경 필요
커뮤니티/채용성장 중대규모, 풍부한 자료

순수 배치 처리만 필요하다면 Spark가 더 성숙하고 에코시스템이 풍부하다. 하지만 배치와 스트리밍을 하나의 코드베이스로 통합하고 싶다면, Flink의 통합 API가 강력한 이점이 된다.


실전: 배치-스트리밍 통합 파이프라인

같은 비즈니스 로직을 배치(백필)와 스트리밍(실시간)에서 재사용하는 패턴이다.

public class OrderAggregation {

    public static void main(String[] args) {
        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();

        // 커맨드라인으로 모드 결정: --mode batch 또는 --mode streaming
        ParameterTool params = ParameterTool.fromArgs(args);
        String mode = params.get("mode", "streaming");

        DataStream<Order> orders;

        if ("batch".equals(mode)) {
            env.setRuntimeMode(RuntimeExecutionMode.BATCH);
            orders = env.readFile(new OrderInputFormat(), "/data/orders/");
        } else {
            env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
            KafkaSource<Order> source = KafkaSource.<Order>builder()
                .setBootstrapServers("broker:9092")
                .setTopics("orders")
                .setValueOnlyDeserializer(new OrderDeserializer())
                .build();
            orders = env.fromSource(source,
                WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)),
                "Kafka Source");
        }

        // 비즈니스 로직은 동일
        DataStream<CategoryTotal> result = orders
            .keyBy(Order::getCategory)
            .window(TumblingEventTimeWindows.of(Time.hours(1)))
            .sum("amount");

        result.print();
        env.execute("Order Aggregation - " + mode);
    }
}

소스만 다르고, 처리 로직은 완전히 동일하다. 배치 모드로 과거 데이터를 백필하고, 이후 스트리밍 모드로 실시간 처리를 이어갈 수 있다.


정리

  • Flink에서 배치는 유한한 스트림(Bounded Stream) 이며, 스트리밍의 특수한 경우이다
  • DataStream API 통합: RuntimeExecutionMode만 바꾸면 동일한 코드가 배치/스트리밍으로 동작한다
  • 배치 최적화: Blocking Shuffle, 스테이지 기반 스케줄링으로 리소스를 효율적으로 사용한다
  • Table API/SQL: 배치와 스트리밍에서 동일한 SQL을 사용할 수 있다
  • 실전 패턴: 소스만 다르고 비즈니스 로직을 공유하여 배치 백필과 실시간 처리를 통합한다