Flink나 Spark 같은 별도의 클러스터를 구축하지 않고, 일반 Java/Kotlin 애플리케이션 안에서 스트림 처리를 수행할 수 있다면 어떨까? Kafka Streams는 Kafka에 내장된 클라이언트 라이브러리로, 별도의 분산 처리 클러스터 없이 스트림 처리를 구현할 수 있다.
Kafka Streams란?
Kafka Streams는 Kafka Topic의 데이터를 읽어 변환하고, 결과를 다시 Kafka Topic에 쓰는 스트림 처리 라이브러리이다.
핵심 특징:
- 라이브러리: 별도의 클러스터가 필요 없다. Maven/Gradle 의존성만 추가하면 된다
- Kafka만 필요: 입력과 출력 모두 Kafka Topic이다. 외부 시스템 의존성이 없다
- 수평 확장: 애플리케이션 인스턴스를 늘리면 자동으로 파티션이 재분배된다
- Exactly-once: Kafka의 트랜잭션 기능을 활용한 정확한 처리를 보장한다
- 상태 저장: 로컬 상태 저장소(RocksDB)로 상태 기반 연산을 지원한다
┌─────────────────────────────┐
│ 일반 Java 애플리케이션 │
│ ┌───────────────────────┐ │
│ │ Kafka Streams 라이브러리│ │
│ │ (토폴로지 실행) │ │
│ └───────────────────────┘ │
│ ↕ Kafka │
└─────────────────────────────┘
Flink/Spark과의 차이점은 클러스터 관리가 불필요하다는 것이다. 마이크로서비스에 바로 내장할 수 있어, 이벤트 기반 아키텍처에서 특히 유용하다.
KStream과 KTable
Kafka Streams의 두 가지 핵심 추상화이다.
KStream: 이벤트 스트림
KStream은 끝없이 흘러가는 이벤트 시퀀스이다. 각 레코드는 독립적인 사실(fact)을 나타낸다.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream("orders");
// 모든 레코드를 독립적으로 처리
stream.filter((key, value) -> value.contains("premium"))
.mapValues(value -> value.toUpperCase())
.to("premium-orders");
orders 토픽에서 같은 키로 3개의 메시지가 오면, KStream은 3개 모두를 개별 이벤트로 처리한다.
Key: "user-1" → {"item": "A", "amount": 100} → 이벤트 1
Key: "user-1" → {"item": "B", "amount": 200} → 이벤트 2
Key: "user-1" → {"item": "C", "amount": 300} → 이벤트 3
KTable: 변경 로그 테이블
KTable은 키 기준으로 최신 값만 유지하는 테이블이다. 같은 키의 새 메시지가 오면 기존 값을 덮어쓴다. 데이터베이스 테이블의 CDC 변경 로그와 동일한 개념이다.
KTable<String, String> table = builder.table("user-profiles");
같은 키 "user-1"로 3개의 메시지가 오면, KTable은 마지막 값만 유지한다.
Key: "user-1" → {"name": "Kim"} → 현재 값: {"name": "Kim"}
Key: "user-1" → {"name": "Kim Lee"} → 현재 값: {"name": "Kim Lee"} (덮어쓰기)
Key: "user-1" → null → 삭제 (tombstone)
KStream vs KTable
| 특성 | KStream | KTable |
|---|---|---|
| 의미 | 이벤트(사실)의 흐름 | 상태(최신 값)의 변화 |
| 같은 키 | 모두 유지 | 마지막 값만 유지 |
| null 값 | 일반 레코드 | 삭제(tombstone) |
| 비유 | 은행 거래 내역 | 계좌 잔액 |
| Kafka 토픽 | 일반 토픽 | Compacted 토픽 |
상태 저장소(State Store)
집계, 조인 같은 연산에는 이전 데이터를 기억하는 상태가 필요하다. Kafka Streams는 로컬 RocksDB를 상태 저장소로 사용한다.
KStream<String, Long> purchases = builder.stream("purchases");
KTable<String, Long> totalByUser = purchases
.groupByKey()
.reduce(Long::sum); // 상태 저장소에 누적 합계 유지
상태 저장소의 내용은 Changelog Topic에 자동 백업된다. 애플리케이션이 재시작되면 이 토픽에서 상태를 복원한다.
애플리케이션 인스턴스
├── RocksDB (로컬 상태)
└── Changelog Topic (Kafka에 백업)
→ 장애 시 다른 인스턴스가 복원 가능
이 설계 덕분에 별도의 외부 데이터베이스 없이도 상태 기반 처리가 가능하며, 장애 복구도 자동으로 이루어진다.
윈도우 연산
시간 기반 집계를 위한 윈도우 연산을 지원한다.
Tumbling Window
고정 크기의 겹치지 않는 윈도우이다.
KTable<Windowed<String>, Long> hourlyCounts = stream
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
.count();
|---1시간---|---1시간---|---1시간---|
| window1 | window2 | window3 |
Hopping Window
고정 크기이지만, 일정 간격으로 겹치는 윈도우이다.
TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(10))
.advanceBy(Duration.ofMinutes(5));
|----10분----|
|----10분----|
|----10분----|
5분 간격으로 슬라이딩
Session Window
이벤트 간의 비활동 간격(gap) 을 기준으로 동적 크기의 윈도우를 생성한다. 사용자 세션 분석에 적합하다.
KTable<Windowed<String>, Long> sessionCounts = stream
.groupByKey()
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(30)))
.count();
30분 동안 이벤트가 없으면 세션이 종료된다. 사용자마다 세션 길이가 다르다.
조인(Join)
KStream-KTable Join
스트림의 각 이벤트를 테이블의 최신 값으로 보강(enrichment) 한다. 가장 흔한 패턴이다.
KStream<String, Order> orders = builder.stream("orders");
KTable<String, User> users = builder.table("users");
KStream<String, EnrichedOrder> enriched = orders.join(
users,
(order, user) -> new EnrichedOrder(order, user.getName(), user.getEmail())
);
주문 이벤트가 들어올 때마다, 해당 사용자의 최신 프로필 정보를 붙여서 출력한다. users 테이블이 업데이트되면, 이후 들어오는 주문에는 업데이트된 정보가 반영된다.
KStream-KStream Join
두 스트림을 시간 윈도우 내에서 조인한다. 같은 키를 가진 이벤트가 지정된 시간 범위 내에 양쪽 스트림에서 모두 도착해야 매칭된다.
KStream<String, Payment> payments = builder.stream("payments");
KStream<String, Shipment> shipments = builder.stream("shipments");
KStream<String, String> matched = payments.join(
shipments,
(payment, shipment) -> "Paid & Shipped: " + payment.getOrderId(),
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1))
);
결제 후 1시간 이내에 배송이 시작된 주문만 매칭된다.
KTable-KTable Join
두 테이블의 현재 상태를 조인한다. 어느 쪽이든 업데이트되면 결과가 자동으로 갱신된다.
KTable<String, User> users = builder.table("users");
KTable<String, Address> addresses = builder.table("addresses");
KTable<String, String> userWithAddress = users.join(
addresses,
(user, address) -> user.getName() + " @ " + address.getCity()
);
Exactly-once 처리
Kafka Streams는 processing.guarantee 설정으로 Exactly-once를 지원한다.
Properties props = new Properties();
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
내부적으로 Kafka 트랜잭션을 사용하여, 읽기(offset 커밋) + 상태 업데이트(changelog) + 쓰기(출력 토픽) 를 원자적으로 수행한다.
하나의 트랜잭션 내에서:
1. 입력 토픽의 offset 커밋
2. 상태 저장소 changelog 쓰기
3. 출력 토픽에 결과 쓰기
→ 모두 성공하거나 모두 실패 (원자적)
이 방식은 Flink의 Two-Phase Commit과 목적은 같지만, Kafka 내부 트랜잭션만으로 구현된다는 차이가 있다.
토폴로지와 실행
Processor Topology
Kafka Streams의 처리 파이프라인은 토폴로지(Topology) 라 부르는 DAG(방향 비순환 그래프)로 표현된다.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("input-topic");
source.filter((key, value) -> value != null)
.mapValues(String::toUpperCase)
.to("output-topic");
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
수평 확장
Kafka Streams의 병렬성은 입력 토픽의 파티션 수에 의해 결정된다. 각 파티션은 하나의 Stream Task에 할당되고, Task들은 애플리케이션 인스턴스 간에 분배된다.
input-topic (4 파티션)
인스턴스 1: Task 0 (파티션 0), Task 1 (파티션 1)
인스턴스 2: Task 2 (파티션 2), Task 3 (파티션 3)
인스턴스를 추가하면 Task가 자동으로 재분배된다. 최대 병렬도는 파티션 수와 같다.
Kafka Streams vs Flink
| 특성 | Kafka Streams | Flink |
|---|---|---|
| 배포 | 라이브러리 (앱에 내장) | 클러스터 (별도 인프라) |
| 입출력 | Kafka 전용 | Kafka, DB, 파일 등 다양 |
| 상태 관리 | RocksDB + Changelog | RocksDB + Checkpoint |
| 윈도우 | 기본 윈도우 지원 | 고급 윈도우/트리거 지원 |
| 적합한 규모 | 중소 규모, 마이크로서비스 | 대규모, 복잡한 파이프라인 |
| 학습 곡선 | 낮음 | 높음 |
Kafka 에코시스템 안에서 비교적 단순한 스트림 처리를 해야 한다면 Kafka Streams가 적합하다. 복잡한 이벤트 처리, 다양한 소스/싱크, 대규모 클러스터가 필요하다면 Flink를 선택한다.
정리
- Kafka Streams는 별도 클러스터 없이 Kafka 기반 스트림 처리를 구현하는 클라이언트 라이브러리이다
- KStream은 이벤트의 무한한 흐름, KTable은 키 기준 최신 상태를 나타낸다
- 상태 저장소(RocksDB) 와 Changelog Topic으로 상태 기반 연산과 장애 복구를 지원한다
- 윈도우 연산(Tumbling, Hopping, Session)으로 시간 기반 집계가 가능하다
- Exactly-once를 Kafka 트랜잭션으로 보장하며, 입력 파티션 수만큼 수평 확장된다