Flink는 스트리밍 우선(Streaming-first) 엔진이지만, 배치 처리도 강력하게 지원한다. 사실 Flink의 관점에서 배치는 "유한한 스트림(Bounded Stream)" 에 불과하다. 이 통합된 시각 덕분에, 같은 코드로 배치와 스트리밍 처리를 모두 수행할 수 있다.
배치와 스트리밍의 통합
Flink의 철학: Bounded vs Unbounded Stream
전통적으로 배치와 스트리밍은 별개의 기술로 취급되었다. Spark는 배치 엔진에 스트리밍을 얹었고(마이크로배치), Storm은 스트리밍만 지원했다.
Flink는 다른 접근을 택했다. 모든 데이터를 스트림으로 본다.
- Unbounded Stream: 끝이 없는 스트림. 실시간 이벤트 처리. 일반적인 "스트리밍"
- Bounded Stream: 시작과 끝이 있는 유한한 스트림. 파일이나 DB 스냅샷. 일반적인 "배치"
Unbounded: ──event──event──event──event──event──→ (끝 없음)
Bounded: ──event──event──event──event──| (끝 있음)
이 관점에서 배치 처리는 스트리밍 처리의 특수한 경우이다. 데이터에 끝이 있으므로 최적화를 더 적극적으로 적용할 수 있다.
DataSet API에서 DataStream API로
DataSet API (레거시)
초기 Flink는 배치 전용 DataSet API를 별도로 제공했다.
// 레거시 DataSet API (더 이상 권장되지 않음)
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> text = env.readTextFile("input.txt");
DataSet<Tuple2<String, Integer>> counts = text
.flatMap(new Tokenizer())
.groupBy(0)
.sum(1);
counts.writeAsCsv("output.csv");
env.execute();
DataSet API는 배치에 최적화되어 있었지만, 스트리밍과 코드를 공유할 수 없다는 근본적 한계가 있었다.
통합 DataStream API
Flink 1.12부터 DataStream API에 배치 실행 모드가 추가되면서, 하나의 API로 배치와 스트리밍을 모두 처리할 수 있게 되었다. DataSet API는 점진적으로 폐기(deprecated) 중이다.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 실행 모드만 바꾸면 동일한 코드가 배치/스트리밍으로 동작
env.setRuntimeMode(RuntimeExecutionMode.BATCH); // 또는 STREAMING, AUTOMATIC
DataStream<String> text = env.readFile(
new TextInputFormat(new Path("input.txt")), "input.txt"
);
DataStream<Tuple2<String, Integer>> counts = text
.flatMap(new Tokenizer())
.keyBy(value -> value.f0)
.sum(1);
counts.print();
env.execute("Word Count");
RuntimeExecutionMode 설정:
| 모드 | 동작 |
|---|---|
STREAMING | 스트리밍 모드. 레코드 단위 처리. 기본값 |
BATCH | 배치 모드. Bounded 소스에 대해 배치 최적화 적용 |
AUTOMATIC | 소스가 모두 Bounded이면 BATCH, 하나라도 Unbounded면 STREAMING |
커맨드라인으로도 지정할 수 있다.
flink run -Dexecution.runtime-mode=BATCH my-job.jar
배치 모드의 최적화
BATCH 모드에서 Flink는 스트리밍과 다른 실행 전략을 적용한다.
정렬 기반 Shuffle
스트리밍 모드에서는 데이터가 네트워크를 통해 실시간으로 전달된다(Pipelined Shuffle). 배치 모드에서는 데이터를 디스크에 먼저 기록한 후 다음 스테이지에서 읽는다(Blocking Shuffle).
스트리밍: Task A ──파이프라인──→ Task B (동시 실행)
배치: Task A → [디스크] → Task B (순차 실행 가능)
Blocking Shuffle은 메모리를 덜 사용하고, 장애 시 전체를 재실행하지 않아도 된다. 이미 디스크에 쓰인 중간 결과를 재사용할 수 있기 때문이다.
스테이지 기반 스케줄링
스트리밍에서는 모든 Task가 동시에 실행된다. 배치에서는 스테이지 단위로 실행하여 리소스를 절약한다.
스트리밍: [Stage 1 + Stage 2 + Stage 3] 모두 동시 실행
배치: [Stage 1] → [Stage 2] → [Stage 3] 순차 실행
이 방식은 더 적은 TaskManager로 대규모 배치 잡을 실행할 수 있게 해준다.
이벤트 타임 처리
배치 모드에서는 Watermark가 필요 없다. 모든 데이터가 이미 있으므로 완벽한 정렬이 가능하다.
- 윈도우 연산에서 늦은 데이터(Late Data)가 발생하지 않는다
- 타이머는 모든 데이터 처리 후 일괄 발화된다
- 결과의 결정성(determinism)이 보장된다
Table API와 SQL
Flink의 Table API/SQL은 배치와 스트리밍에서 동일하게 사용할 수 있는 상위 레벨 API이다.
배치 SQL 예제
TableEnvironment tableEnv = TableEnvironment.create(
EnvironmentSettings.newInstance().inBatchMode().build()
);
// CSV 파일을 테이블로 등록
tableEnv.executeSql(
"CREATE TABLE orders (" +
" order_id STRING," +
" user_id STRING," +
" amount DOUBLE," +
" order_date DATE" +
") WITH (" +
" 'connector' = 'filesystem'," +
" 'path' = '/data/orders.csv'," +
" 'format' = 'csv'" +
")"
);
// SQL로 배치 집계
Table result = tableEnv.sqlQuery(
"SELECT user_id, SUM(amount) as total, COUNT(*) as cnt " +
"FROM orders " +
"GROUP BY user_id " +
"HAVING SUM(amount) > 1000"
);
result.execute().print();
스트리밍 SQL 예제
같은 SQL이 스트리밍 모드에서도 동작한다. 다만 결과가 지속적으로 업데이트된다.
TableEnvironment tableEnv = TableEnvironment.create(
EnvironmentSettings.newInstance().inStreamingMode().build()
);
// Kafka 토픽을 테이블로 등록
tableEnv.executeSql(
"CREATE TABLE orders (" +
" order_id STRING," +
" user_id STRING," +
" amount DOUBLE," +
" event_time TIMESTAMP(3)," +
" WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'orders'," +
" 'properties.bootstrap.servers' = 'broker:9092'," +
" 'format' = 'json'" +
")"
);
// 동일한 SQL이 스트리밍에서도 동작 (결과가 계속 업데이트됨)
tableEnv.sqlQuery(
"SELECT user_id, SUM(amount) as total " +
"FROM orders " +
"GROUP BY user_id"
);
배치에서는 orders가 유한한 CSV 파일이고, 스트리밍에서는 무한한 Kafka 토픽이다. SQL은 동일하다.
Flink 배치 vs Spark 배치
| 특성 | Flink (배치 모드) | Spark |
|---|---|---|
| 기본 철학 | 스트리밍 엔진에서 배치 지원 | 배치 엔진에서 스트리밍 지원 |
| 실행 모델 | 스트리밍 런타임 + 배치 최적화 | DAG 기반 배치 런타임 |
| 배치 성숙도 | 빠르게 발전 중 | 오랜 기간 검증됨 |
| 에코시스템 | 상대적으로 작음 | 매우 풍부 (MLlib, GraphX 등) |
| 스트리밍 전환 | 동일 코드, 모드만 변경 | API 변경 필요 |
| 커뮤니티/채용 | 성장 중 | 대규모, 풍부한 자료 |
순수 배치 처리만 필요하다면 Spark가 더 성숙하고 에코시스템이 풍부하다. 하지만 배치와 스트리밍을 하나의 코드베이스로 통합하고 싶다면, Flink의 통합 API가 강력한 이점이 된다.
실전: 배치-스트리밍 통합 파이프라인
같은 비즈니스 로직을 배치(백필)와 스트리밍(실시간)에서 재사용하는 패턴이다.
public class OrderAggregation {
public static void main(String[] args) {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 커맨드라인으로 모드 결정: --mode batch 또는 --mode streaming
ParameterTool params = ParameterTool.fromArgs(args);
String mode = params.get("mode", "streaming");
DataStream<Order> orders;
if ("batch".equals(mode)) {
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
orders = env.readFile(new OrderInputFormat(), "/data/orders/");
} else {
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
KafkaSource<Order> source = KafkaSource.<Order>builder()
.setBootstrapServers("broker:9092")
.setTopics("orders")
.setValueOnlyDeserializer(new OrderDeserializer())
.build();
orders = env.fromSource(source,
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)),
"Kafka Source");
}
// 비즈니스 로직은 동일
DataStream<CategoryTotal> result = orders
.keyBy(Order::getCategory)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.sum("amount");
result.print();
env.execute("Order Aggregation - " + mode);
}
}
소스만 다르고, 처리 로직은 완전히 동일하다. 배치 모드로 과거 데이터를 백필하고, 이후 스트리밍 모드로 실시간 처리를 이어갈 수 있다.
정리
- Flink에서 배치는 유한한 스트림(Bounded Stream) 이며, 스트리밍의 특수한 경우이다
- DataStream API 통합:
RuntimeExecutionMode만 바꾸면 동일한 코드가 배치/스트리밍으로 동작한다 - 배치 최적화: Blocking Shuffle, 스테이지 기반 스케줄링으로 리소스를 효율적으로 사용한다
- Table API/SQL: 배치와 스트리밍에서 동일한 SQL을 사용할 수 있다
- 실전 패턴: 소스만 다르고 비즈니스 로직을 공유하여 배치 백필과 실시간 처리를 통합한다