Back to Blog
FlinkKafkaConnectorExactly-onceTwo-Phase CommitCheckpoint

0x05. Flink Kafka 커넥터와 End-to-End Exactly-once

Flink의 Kafka Source/Sink 커넥터를 활용한 실시간 파이프라인 구축과 End-to-End Exactly-once 보장 메커니즘을 알아본다.

실시간 데이터 처리 파이프라인에서 가장 흔한 조합은 Kafka + Flink이다. Kafka가 데이터의 수집과 전달을 담당하고, Flink가 실시간 변환과 분석을 수행한다. 이 조합의 가장 강력한 특징은 End-to-End Exactly-once 처리를 보장할 수 있다는 점이다. Kafka에서 읽고, Flink에서 처리하고, 다시 Kafka에 쓰는 전체 과정에서 메시지가 유실되지도, 중복 처리되지도 않는다.

이 글에서는 Flink의 Kafka 커넥터 사용법과, Exactly-once를 가능하게 하는 내부 메커니즘을 다룬다.


Kafka Source: Kafka에서 데이터 읽기

KafkaSource API

Flink 1.14부터 도입된 새로운 KafkaSource API를 사용한다.

import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.api.common.serialization.SimpleStringSchema;

KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers("broker1:9092,broker2:9092")
    .setTopics("input-topic")
    .setGroupId("flink-consumer-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

DataStream<String> stream = env.fromSource(
    source,
    WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)),
    "Kafka Source"
);

Offset 초기화 전략

전략설명
OffsetsInitializer.earliest()가장 처음부터 읽기
OffsetsInitializer.latest()최신 메시지부터 읽기
OffsetsInitializer.committedOffsets()마지막 커밋 지점부터
OffsetsInitializer.timestamp(ts)특정 타임스탬프 이후부터

Flink는 Kafka의 Consumer Group Offset을 직접 관리하지 않는다. 대신 Flink Checkpoint에 Kafka Offset을 포함시켜, 장애 복구 시 정확한 위치에서 재개한다.

역직렬화

단순 문자열이 아닌 JSON이나 Avro 데이터를 읽으려면 커스텀 DeserializationSchema를 구현한다.

import org.apache.flink.api.common.typeinfo.TypeInformation;
import com.fasterxml.jackson.databind.ObjectMapper;

public class OrderDeserializer implements DeserializationSchema<Order> {
    private final ObjectMapper mapper = new ObjectMapper();

    @Override
    public Order deserialize(byte[] message) throws IOException {
        return mapper.readValue(message, Order.class);
    }

    @Override
    public boolean isEndOfStream(Order nextElement) {
        return false;
    }

    @Override
    public TypeInformation<Order> getProducedType() {
        return TypeInformation.of(Order.class);
    }
}

Kafka Sink: Kafka에 데이터 쓰기

KafkaSink API

import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;

KafkaSink<String> sink = KafkaSink.<String>builder()
    .setBootstrapServers("broker1:9092,broker2:9092")
    .setRecordSerializer(
        KafkaRecordSerializationSchema.builder()
            .setTopic("output-topic")
            .setValueSerializationSchema(new SimpleStringSchema())
            .build()
    )
    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
    .setTransactionalIdPrefix("flink-tx")
    .build();

stream.sinkTo(sink);

DeliveryGuarantee 설정

레벨동작성능
NONE전송 보장 없음최고
AT_LEAST_ONCE최소 1회 전달 (중복 가능)중간
EXACTLY_ONCE정확히 1회 전달낮음

EXACTLY_ONCE를 선택하면, Flink는 Kafka의 트랜잭션 기능을 사용하여 원자적 쓰기를 수행한다. 이때 setTransactionalIdPrefix가 필수이다.


End-to-End Exactly-once의 원리

문제: 왜 어려운가?

"읽기 → 처리 → 쓰기"의 세 단계에서 각 단계의 Exactly-once를 개별적으로 보장해도, 전체 과정의 Exactly-once는 보장되지 않는다. 처리는 완료했지만 쓰기 전에 장애가 발생하면, 재시작 시 같은 데이터를 다시 처리하고 중복으로 쓰게 된다.

핵심은 "Offset 커밋"과 "결과 쓰기"를 원자적으로 수행하는 것이다. 하나만 성공하고 다른 하나가 실패하면 불일치가 발생한다.

해결: Two-Phase Commit

Flink는 Two-Phase Commit(2PC) 프로토콜을 사용하여 이 문제를 해결한다. Checkpoint와 Kafka 트랜잭션을 연동하는 방식이다.

Phase 1: Pre-commit (Checkpoint 진행 중)

  1. Checkpoint가 트리거된다
  2. Kafka Source: 현재 읽은 Offset을 Checkpoint 상태에 저장한다
  3. Flink 연산자: 내부 상태를 스냅샷한다
  4. Kafka Sink: 현재까지의 데이터를 Kafka 트랜잭션에 쓰되, 아직 커밋하지 않는다 (pre-commit)

Phase 2: Commit (Checkpoint 완료)

  1. 모든 Task의 Checkpoint가 성공하면, JobManager가 완료를 통보한다
  2. Kafka Sink: 트랜잭션을 커밋한다. 이제 Consumer(read_committed)가 이 데이터를 읽을 수 있다
시간 

Checkpoint 시작          Checkpoint 완료
     |                        |
Source: [offset 저장]          |
     |                        |
Operator: [상태 스냅샷]         |
     |                        |
Sink: [트랜잭션 pre-commit]  [트랜잭션 commit]

장애 발생 시: Checkpoint 완료 전에 장애가 발생하면, 트랜잭션이 커밋되지 않으므로 Consumer는 해당 데이터를 볼 수 없다. Flink는 마지막 성공한 Checkpoint에서 복구하여, 해당 시점의 Kafka Offset부터 다시 읽는다. 결과적으로 처리되지 않은 데이터는 다시 처리되고, 이미 처리된 데이터는 중복 처리되지 않는다.

필수 설정

End-to-End Exactly-once를 위해 양쪽 모두 설정이 필요하다.

Flink 설정:

// Checkpoint 활성화 (필수)
env.enableCheckpointing(60000); // 60초 간격
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
env.getCheckpointConfig().setCheckpointTimeout(120000);

Kafka Broker 설정:

# 트랜잭션 타임아웃 (Flink Checkpoint 간격보다 커야 )
transaction.max.timeout.ms=900000  # 15분

Kafka Consumer 설정 (결과를 읽는 쪽):

isolation.level=read_committed

read_committed로 설정하지 않으면, 아직 커밋되지 않은 트랜잭션의 데이터도 읽게 되어 Exactly-once가 깨진다.


실전 파이프라인 예제

Kafka에서 주문 이벤트를 읽고, 실시간 집계 후 결과를 다시 Kafka에 쓰는 파이프라인이다.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000);

// Kafka Source
KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers("broker:9092")
    .setTopics("orders")
    .setGroupId("order-processor")
    .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

DataStream<String> orders = env.fromSource(
    source,
    WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10)),
    "Order Source"
);

// 처리: 카테고리별 5분 윈도우 매출 집계
DataStream<String> aggregated = orders
    .map(json -> parseOrder(json))
    .keyBy(order -> order.getCategory())
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .sum("amount")
    .map(result -> toJson(result));

// Kafka Sink (Exactly-once)
KafkaSink<String> sink = KafkaSink.<String>builder()
    .setBootstrapServers("broker:9092")
    .setRecordSerializer(
        KafkaRecordSerializationSchema.builder()
            .setTopic("order-aggregates")
            .setValueSerializationSchema(new SimpleStringSchema())
            .build()
    )
    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
    .setTransactionalIdPrefix("order-agg-tx")
    .build();

aggregated.sinkTo(sink);
env.execute("Order Aggregation Pipeline");

주의사항과 트레이드오프

Exactly-once의 비용

  • 지연 시간 증가: 트랜잭션 커밋은 Checkpoint 완료 시점에 이루어지므로, Checkpoint 간격만큼 결과 전달이 지연된다. Checkpoint가 60초 간격이면, 최대 60초의 출력 지연이 발생한다
  • 처리량 감소: 트랜잭션 오버헤드로 인해 AT_LEAST_ONCE 대비 처리량이 낮아질 수 있다
  • Kafka 설정 요구: 트랜잭션 관련 설정이 필요하고, transaction.max.timeout.ms를 Checkpoint 간격보다 크게 설정해야 한다

AT_LEAST_ONCE로 충분한 경우

모든 시스템에 Exactly-once가 필요한 것은 아니다. Consumer 측에서 멱등성을 보장할 수 있다면, AT_LEAST_ONCE + 멱등 처리가 더 단순하고 성능도 좋다. 예를 들어 결과를 DB에 UPSERT하는 경우, 같은 키로 중복 쓰기가 발생해도 최종 결과는 동일하다.


정리

  • KafkaSource/KafkaSink: Flink의 공식 Kafka 커넥터로, Source는 Checkpoint 기반 Offset 관리, Sink는 트랜잭션 기반 쓰기를 지원한다
  • End-to-End Exactly-once: Two-Phase Commit 프로토콜로 Checkpoint와 Kafka 트랜잭션을 연동하여 달성한다
  • 필수 조건: Checkpoint 활성화, Sink의 EXACTLY_ONCE 설정, Consumer의 read_committed 격리 수준
  • 트레이드오프: Exactly-once는 Checkpoint 간격만큼 출력 지연이 발생하므로, 요구 사항에 맞게 AT_LEAST_ONCE와 선택한다