데이터 파이프라인을 구축할 때 가장 핵심적인 작업은 데이터 변환(Transformation) 이다. 원천 데이터를 필터링하고, 구조를 바꾸고, 집계하여 의미 있는 결과를 만들어내는 과정이 파이프라인의 본질이기 때문이다.
Apache Flink는 이러한 변환을 위해 다양한 연산자를 제공한다. 단순한 map부터 여러 스트림을 결합하는 connect까지, 각 연산자가 어떤 역할을 하고 언제 사용하는지 알아본다.
기본 Transformations
Flink의 DataStream API에서 가장 자주 사용하는 네 가지 기본 연산자부터 살펴본다. 이 연산자들은 함수형 프로그래밍의 기본 연산과 직접 대응되므로, 개념 자체는 직관적이다.
map
map은 입력 요소 하나를 받아 정확히 하나의 출력 요소를 반환하는 1:1 변환이다.
DataStream<Integer> doubled = input.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return value * 2;
}
});
람다 표현식을 사용하면 더 간결하게 작성할 수 있다.
DataStream<Integer> doubled = input.map(value -> value * 2);
택배로 비유하면, 모든 상자를 열어서 내용물을 다른 것으로 교체한 뒤 다시 포장하는 것과 같다. 상자의 수는 변하지 않는다.
flatMap
flatMap은 입력 요소 하나를 받아 0개, 1개, 또는 여러 개의 출력 요소를 생성하는 1:N 변환이다. Collector를 통해 원하는 만큼 결과를 내보낼 수 있다.
DataStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> out) throws Exception {
for (String word : line.split(" ")) {
out.collect(word);
}
}
});
map과의 핵심적인 차이는 출력 개수가 고정되지 않는다는 점이다. 문장을 단어로 분리하거나, 특정 조건에 맞지 않는 요소를 아예 출력하지 않을 수도 있다. 워드카운트(Word Count) 예제에서 문장을 단어로 쪼개는 것이 대표적인 flatMap 사용 사례다.
filter
filter는 조건에 맞는 요소만 통과시키는 연산자다. 불리언 함수가 true를 반환하는 요소만 다음 단계로 전달된다.
DataStream<Integer> positive = input.filter(value -> value > 0);
데이터를 변형하지 않고 걸러내기만 한다는 점에서 map, flatMap과 구분된다. 물론 flatMap에서 조건 분기를 통해 동일한 동작을 구현할 수도 있지만, 의도를 명확히 드러내려면 filter를 사용하는 것이 좋다.
keyBy
keyBy는 스트림의 요소를 특정 키(Key) 기준으로 논리적으로 분할하는 연산자다. 같은 키를 가진 모든 요소가 동일한 파티션으로 모이도록 보장한다.
KeyedStream<Event, String> keyed = events.keyBy(event -> event.getUserId());
데이터베이스의 GROUP BY와 유사하지만, 중요한 차이가 있다. keyBy는 데이터를 실제로 집계하지 않는다. 대신 후속 연산(reduce, sum, window 등)이 키별로 독립적으로 실행될 수 있도록 스트림을 재배치한다. 내부적으로는 해시 파티셔닝(Hash Partitioning) 을 사용하여, 동일한 키를 가진 레코드가 같은 태스크 슬롯에서 처리되도록 한다.
keyBy의 결과는 DataStream이 아닌 KeyedStream이며, 이 KeyedStream 위에서만 reduce, aggregate 같은 키 기반 연산을 수행할 수 있다.
고급 Transformations
기본 연산자로 개별 요소를 변환했다면, 고급 연산자는 여러 요소를 결합하거나 저수준 제어를 가능하게 한다.
reduce
reduce는 KeyedStream에서 동일한 키를 가진 요소들을 누적 결합하는 연산자다. 두 개의 입력을 받아 하나의 출력을 반환하는 함수를 반복 적용하여, 스트림의 요소를 점진적으로 하나의 값으로 줄여나간다.
DataStream<Tuple2<String, Integer>> summed = wordCounts
.keyBy(tuple -> tuple.f0)
.reduce((a, b) -> new Tuple2<>(a.f0, a.f1 + b.f1));
위 예시에서는 같은 단어(키)에 대해 카운트 값을 계속 더해나간다. 새로운 요소가 들어올 때마다 이전 결과와 결합하는 롤링 집계(Rolling Aggregation) 방식으로 동작하며, 매 입력마다 최신 결합 결과를 출력한다.
reduce의 제약 사항은 입력과 출력의 타입이 동일해야 한다는 것이다. 이 제약이 불편하다면, 더 유연한 aggregate나 process 함수를 고려해야 한다.
Aggregate (집계 단축 연산)
Flink는 자주 쓰이는 집계 패턴을 위해 sum, min, max, minBy, maxBy 같은 단축 연산을 제공한다. 이들은 KeyedStream에서 바로 호출할 수 있다.
// 특정 필드 기준 합계
keyedStream.sum("amount");
// 특정 필드의 최솟값을 가진 요소 반환
keyedStream.minBy("timestamp");
min과 minBy의 차이에 주의해야 한다. min("price")은 price 필드의 최솟값만 추적하고 나머지 필드는 임의의 값일 수 있지만, minBy("price")는 price가 최소인 전체 레코드를 반환한다.
Process Function
ProcessFunction은 Flink의 가장 저수준(Low-Level) 변환 연산자다. map이나 flatMap처럼 요소를 변환할 수 있으면서도, 추가로 다음에 대한 접근 권한을 제공한다.
- 이벤트의 타임스탬프: 이벤트 시간(Event Time) 정보에 접근 가능
- 타이머 서비스(TimerService): 특정 시간에 콜백을 등록하여 시간 기반 로직 구현
- 상태(State): KeyedProcessFunction에서 키별 상태를 읽고 쓸 수 있음
- 사이드 출력(Side Output): 하나의 연산에서 여러 출력 스트림 생성
DataStream<Result> result = input
.keyBy(event -> event.getKey())
.process(new KeyedProcessFunction<String, Event, Result>() {
private ValueState<Long> countState;
@Override
public void open(Configuration parameters) {
countState = getRuntimeContext().getState(
new ValueStateDescriptor<>("count", Long.class)
);
}
@Override
public void processElement(Event event, Context ctx, Collector<Result> out)
throws Exception {
Long count = countState.value();
if (count == null) count = 0L;
count++;
countState.update(count);
// 타이머 등록 (이벤트 시간 기준 10초 후)
ctx.timerService().registerEventTimeTimer(
event.getTimestamp() + 10000
);
out.collect(new Result(event.getKey(), count));
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Result> out)
throws Exception {
// 타이머 만료 시 실행되는 로직
}
});
ProcessFunction이 강력한 이유는, 시간과 상태를 직접 다룰 수 있다는 점이다. Window 연산으로 해결하기 어려운 복잡한 이벤트 처리 패턴이 있을 때 ProcessFunction으로 직접 로직을 구현한다.
ProcessFunction에는 여러 변형이 있다.
| 변형 | 적용 대상 | 특징 |
|---|---|---|
ProcessFunction | DataStream | 기본 형태, 상태/타이머 없음 |
KeyedProcessFunction | KeyedStream | 키별 상태와 타이머 사용 가능 |
CoProcessFunction | ConnectedStreams | 두 스트림을 결합 처리 |
ProcessWindowFunction | WindowedStream | 윈도우 내 전체 요소에 접근 |
Side Output (사이드 출력)
일반적인 Transformation은 하나의 출력 스트림만 생성한다. 하지만 실무에서는 하나의 연산에서 여러 종류의 출력을 만들어야 하는 경우가 자주 있다. 예를 들어, 정상 데이터와 오류 데이터를 분리하거나, 특정 조건에 맞는 데이터를 별도 스트림으로 빼내야 할 때다.
Side Output은 OutputTag를 사용하여 메인 출력 외에 추가 출력 스트림을 생성하는 메커니즘이다.
// 1. OutputTag 정의
final OutputTag<String> errorTag = new OutputTag<String>("error-output") {};
// 2. ProcessFunction에서 사이드 출력으로 내보내기
SingleOutputStreamOperator<Integer> mainStream = input
.process(new ProcessFunction<String, Integer>() {
@Override
public void processElement(String value, Context ctx, Collector<Integer> out)
throws Exception {
try {
int parsed = Integer.parseInt(value);
out.collect(parsed); // 메인 출력
} catch (NumberFormatException e) {
ctx.output(errorTag, value); // 사이드 출력 (파싱 실패)
}
}
});
// 3. 사이드 출력 스트림 가져오기
DataStream<String> errorStream = mainStream.getSideOutput(errorTag);
Side Output의 장점은 타입이 다른 여러 출력 스트림을 하나의 연산에서 생성할 수 있다는 것이다. 메인 출력이 Integer이고 사이드 출력이 String인 것처럼, 각 출력의 타입이 서로 달라도 문제없다. filter를 여러 번 적용하는 것보다 데이터를 한 번만 순회하므로 효율적이다.
연결 연산: 여러 스트림 결합하기
실제 데이터 파이프라인에서는 하나의 소스만 사용하는 경우가 드물다. 클릭 스트림과 구매 스트림을 결합하거나, 실시간 데이터에 설정 정보를 반영해야 하는 경우가 많다. Flink는 이를 위해 union과 connect 두 가지 결합 연산을 제공한다.
union
union은 동일한 타입의 두 개 이상의 스트림을 하나로 합치는 연산자다.
DataStream<Event> stream1 = ...;
DataStream<Event> stream2 = ...;
DataStream<Event> stream3 = ...;
DataStream<Event> merged = stream1.union(stream2, stream3);
모든 스트림의 요소가 하나의 스트림에 섞여 들어간다. 단순하고 직관적이지만, 제약 조건이 있다. 합치려는 스트림들의 데이터 타입이 반드시 동일해야 한다. DataStream<String>과 DataStream<Integer>는 union할 수 없다.
connect
connect는 서로 다른 타입의 두 스트림을 결합할 수 있는 연산자다. union과 달리, 두 스트림은 합쳐지지 않고 내부적으로 별도로 유지된다.
DataStream<Integer> numbers = ...;
DataStream<String> labels = ...;
ConnectedStreams<Integer, String> connected = numbers.connect(labels);
connect의 결과는 ConnectedStreams이며, 이 위에서 CoMapFunction 또는 CoFlatMapFunction을 적용하여 두 스트림의 요소를 처리한다.
CoMap / CoFlatMap
CoMapFunction은 ConnectedStreams의 각 스트림에 대해 별도의 map 함수를 정의한다. 첫 번째 스트림의 요소는 map1으로, 두 번째 스트림의 요소는 map2로 처리되며, 두 함수 모두 동일한 타입의 출력을 반환한다.
DataStream<String> result = connected.map(new CoMapFunction<Integer, String, String>() {
@Override
public String map1(Integer value) throws Exception {
return "Number: " + value;
}
@Override
public String map2(String value) throws Exception {
return "Label: " + value;
}
});
CoFlatMapFunction도 같은 원리지만, flatMap처럼 Collector를 통해 0개 이상의 요소를 출력할 수 있다.
DataStream<String> result = connected.flatMap(
new CoFlatMapFunction<Integer, String, String>() {
@Override
public void flatMap1(Integer value, Collector<String> out) throws Exception {
out.collect("Number: " + value);
}
@Override
public void flatMap2(String value, Collector<String> out) throws Exception {
for (String word : value.split(" ")) {
out.collect(word);
}
}
}
);
connect가 진정으로 빛나는 순간은 상태(State)와 함께 사용할 때다. 예를 들어, 한쪽 스트림으로 들어오는 규칙(Rule)을 상태에 저장해두고, 다른 스트림의 데이터를 그 규칙에 따라 처리하는 패턴이 대표적이다. 이 경우 CoProcessFunction을 사용하면 키별 상태에 접근할 수 있어, 동적 규칙 엔진 같은 고급 패턴을 구현할 수 있다.
union vs connect 비교
| 특성 | union | connect |
|---|---|---|
| 스트림 타입 | 동일해야 함 | 달라도 됨 |
| 결합 가능 스트림 수 | 2개 이상 | 정확히 2개 |
| 결과 타입 | DataStream | ConnectedStreams |
| 내부 처리 | 하나로 합침 | 별도 유지 |
| 상태 공유 | 불가 | CoProcessFunction으로 가능 |
단순히 같은 타입의 스트림을 합치는 것이라면 union이 적합하고, 서로 다른 타입이거나 두 스트림 간 상태 공유가 필요하면 connect를 사용한다.
Batch Processing: 배치와 스트림의 통합
Flink는 원래 스트림 처리 엔진으로 시작했지만, 배치 처리를 위해 별도의 DataSet API를 제공했다. 그러나 Flink의 철학은 점차 하나의 명확한 방향으로 수렴했다.
배치는 유한한(bounded) 스트림의 특수한 경우일 뿐이다.
이 철학에 따라 Flink 1.12부터 DataStream API에 BATCH 실행 모드(Execution Mode) 가 도입되었고, DataSet API는 점진적으로 폐지(deprecated)되어 Flink 2.0에서 완전히 제거되었다.
Bounded Stream으로서의 배치
Flink에서 데이터 소스는 두 가지로 구분된다.
- 유한 스트림(Bounded Stream): 시작과 끝이 있는 데이터. 파일, 데이터베이스 테이블 등
- 무한 스트림(Unbounded Stream): 끝이 없는 데이터. Kafka 토픽, 소켓 스트림 등
배치 처리는 본질적으로 유한 스트림을 처리하는 것이다. 같은 DataStream API를 사용하되, 실행 모드만 바꾸면 된다.
RuntimeExecutionMode
DataStream API에서 배치 처리를 활성화하려면 RuntimeExecutionMode를 설정한다.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 방법 1: 코드에서 설정
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
// 방법 2: 실행 시 커맨드라인으로 설정
// flink run -Dexecution.runtime-mode=BATCH ...
세 가지 모드가 있다.
| 모드 | 설명 | 사용 시점 |
|---|---|---|
STREAMING | 기본값. 연속적 증분 처리 | 무한 스트림 |
BATCH | 배치 스타일 실행 | 유한 스트림 |
AUTOMATIC | 소스의 유한성에 따라 자동 결정 | 소스 타입에 맞춰 자동 선택 |
BATCH 모드는 모든 소스가 유한(bounded)할 때만 사용할 수 있다. 하나라도 무한 소스가 있으면 BATCH 모드로 실행할 수 없다. 반면 STREAMING 모드는 유한/무한 소스 모두에서 동작한다.
STREAMING 모드와 BATCH 모드의 차이
같은 코드라도 실행 모드에 따라 내부 동작이 달라진다.
STREAMING 모드에서는 각 레코드가 도착하는 즉시 처리되고, 상태가 지속적으로 유지된다. 결과는 증분적으로 출력된다.
BATCH 모드에서는 Flink가 전통적인 배치 프레임워크처럼 동작한다. 데이터를 단계별로 처리하고, 중간 결과를 셔플(shuffle)하며, 최종 결과만 한꺼번에 출력한다. 장애 복구도 체크포인트 대신 태스크 재실행 방식을 사용한다.
유한 스트림이라면 BATCH 모드가 더 효율적인 경우가 많다. 상태를 지속적으로 유지할 필요가 없고, 데이터 정렬과 셔플을 최적화할 수 있기 때문이다.
DataSet API에서 DataStream으로
기존에 DataSet API를 사용하던 코드를 마이그레이션할 때 알아야 할 핵심 변화는 다음과 같다.
- DataSet의
groupBy는 DataStream의keyBy로 대체된다 - DataSet의
reduceGroup은reduce또는process로 대체된다 - 배치 전용이었던
sortPartition,join등의 연산자는 DataStream의 윈도우 연산이나 ProcessFunction으로 구현한다 - 실행 환경이
ExecutionEnvironment에서StreamExecutionEnvironment로 바뀐다
Flink 공식 문서에서는 DataSet API의 연산자를 네 가지 범주로 분류하여 마이그레이션 가이드를 제공한다. DataStream에 정확히 동일한 대응 API가 있는 것부터, 의미는 다르지만 같은 동작을 구현할 수 있는 것까지 정리되어 있다.
정리
Flink의 Transformation 연산자는 단순한 요소 변환부터 복잡한 스트림 결합, 그리고 저수준 상태 관리까지 폭넓은 스펙트럼을 갖는다. 핵심을 다시 정리하면 다음과 같다.
- map, flatMap, filter: 개별 요소를 변환하거나 걸러내는 기본 연산
- keyBy: 키 기준으로 스트림을 논리적으로 분할하여 키별 연산의 기반을 마련
- reduce, aggregate: 동일 키의 요소들을 누적 결합하는 집계 연산
- ProcessFunction: 상태, 타이머, 사이드 출력 등 저수준 제어가 가능한 범용 연산자
- Side Output: 하나의 연산에서 타입이 다른 여러 출력 스트림을 생성
- union / connect: 여러 스트림을 결합하되, 동일 타입은 union, 다른 타입이나 상태 공유가 필요하면 connect
- Batch = Bounded Stream: DataStream API의 BATCH 실행 모드로 배치와 스트림 처리가 통합됨
어떤 연산자를 선택할지는 결국 "입력과 출력의 관계가 어떤가", "상태가 필요한가", "여러 스트림을 다루는가"라는 세 가지 질문으로 귀결된다. 이 기준을 잡아두면 복잡한 파이프라인에서도 적절한 연산자를 선택할 수 있다.