실시간 데이터 처리 파이프라인에서 가장 흔한 조합은 Kafka + Flink이다. Kafka가 데이터의 수집과 전달을 담당하고, Flink가 실시간 변환과 분석을 수행한다. 이 조합의 가장 강력한 특징은 End-to-End Exactly-once 처리를 보장할 수 있다는 점이다. Kafka에서 읽고, Flink에서 처리하고, 다시 Kafka에 쓰는 전체 과정에서 메시지가 유실되지도, 중복 처리되지도 않는다.
이 글에서는 Flink의 Kafka 커넥터 사용법과, Exactly-once를 가능하게 하는 내부 메커니즘을 다룬다.
Kafka Source: Kafka에서 데이터 읽기
KafkaSource API
Flink 1.14부터 도입된 새로운 KafkaSource API를 사용한다.
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("broker1:9092,broker2:9092")
.setTopics("input-topic")
.setGroupId("flink-consumer-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> stream = env.fromSource(
source,
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)),
"Kafka Source"
);
Offset 초기화 전략
| 전략 | 설명 |
|---|---|
OffsetsInitializer.earliest() | 가장 처음부터 읽기 |
OffsetsInitializer.latest() | 최신 메시지부터 읽기 |
OffsetsInitializer.committedOffsets() | 마지막 커밋 지점부터 |
OffsetsInitializer.timestamp(ts) | 특정 타임스탬프 이후부터 |
Flink는 Kafka의 Consumer Group Offset을 직접 관리하지 않는다. 대신 Flink Checkpoint에 Kafka Offset을 포함시켜, 장애 복구 시 정확한 위치에서 재개한다.
역직렬화
단순 문자열이 아닌 JSON이나 Avro 데이터를 읽으려면 커스텀 DeserializationSchema를 구현한다.
import org.apache.flink.api.common.typeinfo.TypeInformation;
import com.fasterxml.jackson.databind.ObjectMapper;
public class OrderDeserializer implements DeserializationSchema<Order> {
private final ObjectMapper mapper = new ObjectMapper();
@Override
public Order deserialize(byte[] message) throws IOException {
return mapper.readValue(message, Order.class);
}
@Override
public boolean isEndOfStream(Order nextElement) {
return false;
}
@Override
public TypeInformation<Order> getProducedType() {
return TypeInformation.of(Order.class);
}
}
Kafka Sink: Kafka에 데이터 쓰기
KafkaSink API
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers("broker1:9092,broker2:9092")
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic("output-topic")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("flink-tx")
.build();
stream.sinkTo(sink);
DeliveryGuarantee 설정
| 레벨 | 동작 | 성능 |
|---|---|---|
NONE | 전송 보장 없음 | 최고 |
AT_LEAST_ONCE | 최소 1회 전달 (중복 가능) | 중간 |
EXACTLY_ONCE | 정확히 1회 전달 | 낮음 |
EXACTLY_ONCE를 선택하면, Flink는 Kafka의 트랜잭션 기능을 사용하여 원자적 쓰기를 수행한다. 이때 setTransactionalIdPrefix가 필수이다.
End-to-End Exactly-once의 원리
문제: 왜 어려운가?
"읽기 → 처리 → 쓰기"의 세 단계에서 각 단계의 Exactly-once를 개별적으로 보장해도, 전체 과정의 Exactly-once는 보장되지 않는다. 처리는 완료했지만 쓰기 전에 장애가 발생하면, 재시작 시 같은 데이터를 다시 처리하고 중복으로 쓰게 된다.
핵심은 "Offset 커밋"과 "결과 쓰기"를 원자적으로 수행하는 것이다. 하나만 성공하고 다른 하나가 실패하면 불일치가 발생한다.
해결: Two-Phase Commit
Flink는 Two-Phase Commit(2PC) 프로토콜을 사용하여 이 문제를 해결한다. Checkpoint와 Kafka 트랜잭션을 연동하는 방식이다.
Phase 1: Pre-commit (Checkpoint 진행 중)
- Checkpoint가 트리거된다
- Kafka Source: 현재 읽은 Offset을 Checkpoint 상태에 저장한다
- Flink 연산자: 내부 상태를 스냅샷한다
- Kafka Sink: 현재까지의 데이터를 Kafka 트랜잭션에 쓰되, 아직 커밋하지 않는다 (pre-commit)
Phase 2: Commit (Checkpoint 완료)
- 모든 Task의 Checkpoint가 성공하면, JobManager가 완료를 통보한다
- Kafka Sink: 트랜잭션을 커밋한다. 이제 Consumer(
read_committed)가 이 데이터를 읽을 수 있다
시간 →
Checkpoint 시작 Checkpoint 완료
| |
Source: [offset 저장] |
| |
Operator: [상태 스냅샷] |
| |
Sink: [트랜잭션 pre-commit] → [트랜잭션 commit]
장애 발생 시: Checkpoint 완료 전에 장애가 발생하면, 트랜잭션이 커밋되지 않으므로 Consumer는 해당 데이터를 볼 수 없다. Flink는 마지막 성공한 Checkpoint에서 복구하여, 해당 시점의 Kafka Offset부터 다시 읽는다. 결과적으로 처리되지 않은 데이터는 다시 처리되고, 이미 처리된 데이터는 중복 처리되지 않는다.
필수 설정
End-to-End Exactly-once를 위해 양쪽 모두 설정이 필요하다.
Flink 설정:
// Checkpoint 활성화 (필수)
env.enableCheckpointing(60000); // 60초 간격
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
env.getCheckpointConfig().setCheckpointTimeout(120000);
Kafka Broker 설정:
# 트랜잭션 타임아웃 (Flink Checkpoint 간격보다 커야 함)
transaction.max.timeout.ms=900000 # 15분
Kafka Consumer 설정 (결과를 읽는 쪽):
isolation.level=read_committed
read_committed로 설정하지 않으면, 아직 커밋되지 않은 트랜잭션의 데이터도 읽게 되어 Exactly-once가 깨진다.
실전 파이프라인 예제
Kafka에서 주문 이벤트를 읽고, 실시간 집계 후 결과를 다시 Kafka에 쓰는 파이프라인이다.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000);
// Kafka Source
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("broker:9092")
.setTopics("orders")
.setGroupId("order-processor")
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> orders = env.fromSource(
source,
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10)),
"Order Source"
);
// 처리: 카테고리별 5분 윈도우 매출 집계
DataStream<String> aggregated = orders
.map(json -> parseOrder(json))
.keyBy(order -> order.getCategory())
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.sum("amount")
.map(result -> toJson(result));
// Kafka Sink (Exactly-once)
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers("broker:9092")
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic("order-aggregates")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("order-agg-tx")
.build();
aggregated.sinkTo(sink);
env.execute("Order Aggregation Pipeline");
주의사항과 트레이드오프
Exactly-once의 비용
- 지연 시간 증가: 트랜잭션 커밋은 Checkpoint 완료 시점에 이루어지므로, Checkpoint 간격만큼 결과 전달이 지연된다. Checkpoint가 60초 간격이면, 최대 60초의 출력 지연이 발생한다
- 처리량 감소: 트랜잭션 오버헤드로 인해 AT_LEAST_ONCE 대비 처리량이 낮아질 수 있다
- Kafka 설정 요구: 트랜잭션 관련 설정이 필요하고,
transaction.max.timeout.ms를 Checkpoint 간격보다 크게 설정해야 한다
AT_LEAST_ONCE로 충분한 경우
모든 시스템에 Exactly-once가 필요한 것은 아니다. Consumer 측에서 멱등성을 보장할 수 있다면, AT_LEAST_ONCE + 멱등 처리가 더 단순하고 성능도 좋다. 예를 들어 결과를 DB에 UPSERT하는 경우, 같은 키로 중복 쓰기가 발생해도 최종 결과는 동일하다.
정리
- KafkaSource/KafkaSink: Flink의 공식 Kafka 커넥터로, Source는 Checkpoint 기반 Offset 관리, Sink는 트랜잭션 기반 쓰기를 지원한다
- End-to-End Exactly-once: Two-Phase Commit 프로토콜로 Checkpoint와 Kafka 트랜잭션을 연동하여 달성한다
- 필수 조건: Checkpoint 활성화, Sink의
EXACTLY_ONCE설정, Consumer의read_committed격리 수준 - 트레이드오프: Exactly-once는 Checkpoint 간격만큼 출력 지연이 발생하므로, 요구 사항에 맞게 AT_LEAST_ONCE와 선택한다