배치 처리는 데이터를 모아서 한 번에 처리한다. 하지만 현실의 데이터는 끊임없이 흘러들어온다. 사용자 클릭 로그, IoT 센서 데이터, 금융 거래 내역 등 실시간으로 발생하는 데이터를 발생 즉시 처리해야 하는 경우가 점점 많아지고 있다. Spark는 이 문제를 어떻게 풀었을까?
Structured Streaming이란?
Structured Streaming은 Spark 2.0에서 도입된 스트림 처리 엔진이다. 핵심 아이디어는 놀라울 정도로 단순하다. 실시간 데이터 스트림을 끝없이 행이 추가되는 테이블로 취급하는 것이다.
이 테이블을 무한 테이블(Unbounded Table) 이라 부른다. 스트림에 새로운 데이터가 도착할 때마다, 그것은 이 테이블에 새로운 행이 추가되는 것과 같다. 개발자는 이 테이블에 대해 일반적인 배치 쿼리를 작성하면 되고, Spark가 이를 증분 쿼리(Incremental Query) 로 변환하여 연속적으로 실행한다.
Structured Streaming은 전체 테이블을 메모리에 올리지 않는다. 최신 데이터만 읽어서 증분 처리하고, 결과를 갱신하는 데 필요한 최소한의 중간 상태만 유지한다.
마이크로 배치(Micro-batch) 모델
내부적으로 Structured Streaming은 마이크로 배치(Micro-batch) 방식으로 동작한다. 연속적인 데이터 스트림을 짧은 간격의 작은 배치로 나누어 처리하는 것이다. 기본 설정에서 약 100밀리초 수준의 지연 시간으로 정확히 한 번(exactly-once) 의 처리 보장을 제공한다.
이 방식의 장점은 배치 처리와 동일한 API를 사용할 수 있다는 점이다. DataFrame과 Dataset API를 그대로 쓸 수 있으므로, 배치 처리를 할 줄 알면 스트림 처리도 거의 같은 방식으로 작성할 수 있다.
DStream과의 차이
Structured Streaming 이전에는 DStream(Discretized Stream) 이라는 API가 Spark의 스트림 처리를 담당했다. 두 방식의 핵심적인 차이는 다음과 같다.
| 구분 | DStream | Structured Streaming |
|---|---|---|
| 기반 API | RDD | DataFrame / Dataset |
| 처리 보장 | At-least-once | Exactly-once |
| 이벤트 시간 처리 | 미지원 (수신 시간만) | 지원 (Watermark) |
| 최적화 | 수동 | Catalyst + Tungsten 자동 최적화 |
| 상태 | Deprecated | 현재 표준 |
DStream은 RDD 기반으로 저수준 제어가 가능했지만, 이벤트 시간(Event Time) 처리를 지원하지 않고 최적화가 어려웠다. Structured Streaming은 Spark SQL의 Catalyst 옵티마이저와 Tungsten 실행 엔진을 활용하여 자동으로 쿼리를 최적화한다. 현재 DStream은 공식적으로 deprecated 상태이며, 새 프로젝트에서는 Structured Streaming을 사용하는 것이 권장된다.
기본 구조: readStream과 writeStream
Structured Streaming의 기본 흐름은 세 단계로 구성된다. 입력(Source) 에서 데이터를 읽고, 변환(Transformation) 을 수행한 뒤, 출력(Sink) 으로 결과를 쓴다.
데이터 읽기 - readStream
배치에서 spark.read를 쓰듯, 스트리밍에서는 spark.readStream을 사용한다.
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split
spark = SparkSession.builder.appName("StreamingExample").getOrCreate()
# 소켓에서 스트리밍 데이터 읽기
lines = spark \
.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()
# CSV 파일 디렉토리에서 스트리밍으로 읽기
from pyspark.sql.types import StructType
userSchema = StructType().add("name", "string").add("age", "integer")
csvDF = spark \
.readStream \
.option("sep", ";") \
.schema(userSchema) \
.csv("/path/to/directory")
readStream으로 생성된 DataFrame은 일반 DataFrame과 동일하게 select, filter, groupBy 등의 변환을 적용할 수 있다. df.isStreaming으로 해당 DataFrame이 스트리밍인지 확인할 수 있다.
결과 쓰기 - writeStream
변환된 결과를 출력하려면 writeStream을 사용한다.
words = lines.select(
explode(split(lines.value, " ")).alias("word")
)
wordCounts = words.groupBy("word").count()
query = wordCounts \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()
start()를 호출하면 백그라운드에서 스트리밍 쿼리가 실행되며, awaitTermination()으로 쿼리가 종료될 때까지 대기한다.
Output Mode
writeStream의 outputMode는 결과 테이블의 어떤 부분을 출력할지 결정한다. 세 가지 모드가 있다.
Append Mode - 마지막 트리거 이후 새로 추가된 행만 출력한다. 결과 테이블의 기존 행이 변경되지 않는 쿼리에만 사용할 수 있다. 집계가 없는 단순 변환에 적합하다.
query = df.writeStream.outputMode("append").format("console").start()
Complete Mode - 매 트리거마다 결과 테이블 전체를 출력한다. 집계 쿼리에서 전체 결과를 항상 확인하고 싶을 때 사용한다. 결과가 커지면 비용이 높아질 수 있다.
query = wordCounts.writeStream.outputMode("complete").format("console").start()
Update Mode - 마지막 트리거 이후 변경된 행만 출력한다. Complete와 달리 변경되지 않은 행은 출력하지 않으므로 효율적이다. 집계가 없으면 Append와 동일하게 동작한다.
query = wordCounts.writeStream.outputMode("update").format("console").start()
택배 회사에 비유하면, Complete는 매번 전체 재고 목록을 보내는 것이고, Append는 새로 입고된 물품만 보내는 것이며, Update는 수량이 변경된 품목만 보내는 것이다.
Trigger 설정
Trigger는 스트리밍 쿼리가 얼마나 자주 실행될지를 결정한다.
# 5초 간격으로 마이크로 배치 실행
query = df.writeStream.trigger(processingTime='5 seconds').start()
# 가용한 데이터를 한 번만 처리하고 종료
query = df.writeStream.trigger(once=True).start()
# 가용한 데이터를 여러 배치로 나눠 처리하고 종료
query = df.writeStream.trigger(availableNow=True).start()
# 연속 처리 모드 (at-least-once, 1ms 수준 지연)
query = df.writeStream.trigger(continuous='1 second').start()
once와 availableNow는 배치처럼 한 번만 실행하고 종료되므로, 스트리밍 파이프라인을 주기적으로 스케줄링할 때 유용하다. Continuous 모드는 마이크로 배치 대신 진정한 연속 처리를 수행하여 1밀리초 수준의 지연을 달성하지만, at-least-once 보장만 제공하며 지원하는 연산이 제한적이다.
Window 연산: 시간 기반 집계
스트리밍 데이터에서 가장 흔한 요구사항 중 하나는 시간 구간별 집계다. "최근 10분간 단어별 등장 횟수"처럼, 특정 시간 범위의 데이터를 묶어서 처리해야 할 때 윈도우(Window) 연산을 사용한다.
이벤트 시간(Event Time)과 처리 시간(Processing Time)
시간 기반 연산에서는 어떤 시간을 기준으로 삼느냐가 중요하다.
- 이벤트 시간(Event Time): 데이터가 실제로 발생한 시간. 데이터 자체에 포함된 타임스탬프.
- 처리 시간(Processing Time): Spark가 데이터를 처리하는 시간.
네트워크 지연이나 시스템 장애로 인해 두 시간 사이에 차이가 생길 수 있다. Structured Streaming은 이벤트 시간 기반의 윈도우 연산을 지원하여, 데이터가 늦게 도착하더라도 올바른 시간 구간에 포함시킬 수 있다.
텀블링 윈도우(Tumbling Window)
텀블링 윈도우는 고정 크기의, 겹치지 않는 연속 시간 구간이다. 10분 텀블링 윈도우라면 [00:00-00:10), [00:10-00:20), [00:20-00:30) 처럼 빈틈없이 이어진다.
from pyspark.sql.functions import window
# words: { timestamp: Timestamp, word: String } 스키마의 스트리밍 DataFrame
windowedCounts = words.groupBy(
window(words.timestamp, "10 minutes"), # 10분 텀블링 윈도우
words.word
).count()
window() 함수에 윈도우 크기만 지정하면 텀블링 윈도우가 된다.
슬라이딩 윈도우(Sliding Window)
슬라이딩 윈도우는 윈도우 크기와 슬라이드 간격이 다르기 때문에, 윈도우끼리 겹칠 수 있다. "10분 윈도우를 5분마다" 계산하면, 하나의 데이터가 두 개의 윈도우에 동시에 속하게 된다.
# 10분 윈도우, 5분 슬라이드 간격 -> 슬라이딩 윈도우
windowedCounts = words.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()
두 번째 인자가 윈도우 크기, 세 번째 인자가 슬라이드 간격이다. 슬라이드 간격이 윈도우 크기보다 작으면 윈도우가 겹치고, 같으면 텀블링 윈도우와 동일해진다.
세션 윈도우(Session Window)
세션 윈도우는 고정된 크기가 아닌, 데이터의 활동 패턴에 따라 동적으로 결정되는 윈도우다. 일정 시간(갭 기간) 동안 데이터가 없으면 세션이 종료되고, 새 데이터가 오면 새 세션이 시작된다.
from pyspark.sql.functions import session_window
events = spark.readStream. ...
sessionizedCounts = events \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
session_window(events.timestamp, "5 minutes"), # 5분 갭
events.userId
).count()
사용자별 활동 세션 분석 같은 시나리오에 적합하다. 세션 윈도우는 반드시 Watermark와 함께 사용해야 한다.
Watermark: 지연 데이터 처리
현실에서 데이터는 항상 순서대로 도착하지 않는다. 네트워크 지연, 디바이스 오프라인 등의 이유로 이벤트가 늦게 도착할 수 있다. 10시 5분에 발생한 이벤트가 10시 20분에야 시스템에 도달하는 상황이다.
Watermark는 "얼마나 늦은 데이터까지 허용할 것인가"를 정의하는 임계값이다.
windowedCounts = words \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()
withWatermark("timestamp", "10 minutes")는 이벤트 시간 기준으로 최대 10분까지 늦은 데이터를 허용한다는 의미다.
Watermark의 동작 원리
Spark 엔진은 지금까지 본 최대 이벤트 시간을 추적한다. 시간 T에 끝나는 윈도우에 대해, 다음 조건이 충족될 때까지 상태를 유지하고 늦은 데이터를 반영한다.
(엔진이 본 최대 이벤트 시간) - (지연 임계값) > T
예를 들어, 엔진이 본 최대 이벤트 시간이 10시 25분이고 임계값이 10분이면, 10시 15분 이전에 끝나는 윈도우의 상태는 정리된다. 10시 14분 59초에 발생한 이벤트가 지금 도착해도 해당 윈도우는 이미 닫혀 있으므로 무시될 수 있다.
Watermark의 보장 범위
중요한 점이 있다. Watermark는 한 방향으로만 엄격하게 보장된다.
- 임계값 이내의 지연 데이터: 반드시 집계에 포함된다 (보장)
- 임계값을 초과한 지연 데이터: 포함될 수도, 버려질 수도 있다 (비보장)
따라서 Watermark 임계값은 지연 데이터 허용 범위와 메모리 사용량 사이의 트레이드오프다. 임계값이 크면 더 많은 지연 데이터를 수용하지만, 그만큼 더 많은 상태를 메모리에 유지해야 한다.
Watermark 사용 조건
Watermark가 상태를 정리하려면 다음 조건을 만족해야 한다.
- Output Mode가 Append 또는 Update여야 한다. Complete 모드는 모든 집계 데이터를 유지해야 하므로 상태 정리가 불가능하다.
- 집계에 이벤트 시간 컬럼 또는 이벤트 시간 기반의
window가 포함되어야 한다. withWatermark는 집계에 사용되는 타임스탬프 컬럼과 동일한 컬럼에 적용해야 한다.withWatermark는 집계 이전에 호출해야 한다.
외부 시스템 연동
Kafka Source / Sink
Apache Kafka는 Structured Streaming에서 가장 널리 사용되는 외부 소스이자 싱크다. Kafka 연동을 위해서는 별도의 패키지가 필요하다.
spark-submit \
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.7 \
your_script.py
Kafka에서 읽기
Kafka를 소스로 사용할 때, 토픽을 지정하는 방법은 세 가지다.
# 1) 특정 토픽 구독
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:9092,host2:9092") \
.option("subscribe", "topic1,topic2") \
.load()
# 2) 패턴으로 구독
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:9092") \
.option("subscribePattern", "topic.*") \
.load()
# 3) 특정 파티션 지정
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:9092") \
.option("assign", '{"topicA":[0,1],"topicB":[2,4]}') \
.load()
Kafka 소스에서 읽은 DataFrame은 다음과 같은 스키마를 가진다.
| 컬럼 | 타입 | 설명 |
|---|---|---|
key | binary | 메시지 키 |
value | binary | 메시지 값 |
topic | string | 토픽 이름 |
partition | int | 파티션 번호 |
offset | long | 오프셋 |
timestamp | timestamp | 메시지 타임스탬프 |
timestampType | int | 타임스탬프 유형 |
key와 value가 binary 타입이므로, 문자열로 사용하려면 캐스팅이 필요하다.
parsed = df.selectExpr(
"CAST(key AS STRING)",
"CAST(value AS STRING)",
"topic",
"partition",
"offset"
)
주요 옵션으로는 startingOffsets(시작 위치: "earliest", "latest", 또는 JSON 오프셋 맵), failOnDataLoss(데이터 유실 감지 시 쿼리 실패 여부), maxOffsetsPerTrigger(트리거당 최대 오프셋 수) 등이 있다.
Kafka로 쓰기
Kafka 싱크에 쓸 DataFrame에는 value 컬럼이 필수이며, key와 topic 컬럼은 선택적이다.
query = parsed \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:9092") \
.option("topic", "output_topic") \
.option("checkpointLocation", "/tmp/checkpoint") \
.start()
topic 옵션을 지정하면 모든 메시지가 해당 토픽으로 전송된다. DataFrame에 topic 컬럼이 있으면, 행마다 다른 토픽으로 라우팅할 수도 있다. 체크포인트 경로는 장애 복구를 위해 반드시 설정해야 한다.
파일 Source
디렉토리에 새로 생성되는 파일을 자동으로 읽는 방식이다. CSV, JSON, Parquet, ORC 등 다양한 형식을 지원한다.
from pyspark.sql.types import StructType
userSchema = StructType().add("name", "string").add("age", "integer")
csvDF = spark.readStream \
.format("csv") \
.option("sep", ";") \
.option("maxFilesPerTrigger", 100) \
.option("latestFirst", True) \
.schema(userSchema) \
.load("/path/to/directory")
파일 소스는 스키마를 명시적으로 지정해야 한다. 주요 옵션은 다음과 같다.
| 옵션 | 설명 | 기본값 |
|---|---|---|
maxFilesPerTrigger | 트리거당 처리할 최대 파일 수 | 무제한 |
latestFirst | 최신 파일 우선 처리 | false |
maxFileAge | 무시할 파일의 최대 나이 | 1주 |
cleanSource | 처리 완료 파일 정리 (archive/delete/off) | off |
foreachBatch: 커스텀 싱크
기본 제공 싱크(콘솔, 파일, Kafka)로 충분하지 않을 때, foreachBatch를 사용하면 각 마이크로 배치의 출력을 임의의 로직으로 처리할 수 있다.
def process_batch(batch_df, batch_id):
# 마이크로 배치 하나를 일반 DataFrame처럼 처리
batch_df.write \
.format("jdbc") \
.option("url", "jdbc:postgresql://localhost/mydb") \
.option("dbtable", "my_table") \
.option("user", "username") \
.option("password", "password") \
.mode("append") \
.save()
query = streamingDF \
.writeStream \
.outputMode("append") \
.foreachBatch(process_batch) \
.start()
foreachBatch의 콜백 함수는 두 개의 인자를 받는다. batch_df는 해당 마이크로 배치의 데이터를 담은 일반 DataFrame이고, batch_id는 배치의 고유 식별자다. 일반 DataFrame이므로, JDBC 쓰기, 외부 API 호출, 여러 테이블에 동시 쓰기 등 배치 처리에서 가능한 모든 작업을 수행할 수 있다.
이 패턴은 Structured Streaming이 기본으로 지원하지 않는 데이터베이스나 서비스에 결과를 전달할 때 매우 유용하다.
정리
Structured Streaming은 스트림을 무한 테이블로 추상화하여, 배치와 동일한 DataFrame API로 스트림 처리를 가능하게 한 Spark의 핵심 엔진이다.
- 무한 테이블 모델: 스트림 데이터를 끝없이 추가되는 테이블로 취급하고, 증분 쿼리로 처리한다
- Output Mode: Append(새 행만), Complete(전체), Update(변경분만)로 출력 방식을 제어한다
- Window 연산: 텀블링, 슬라이딩, 세션 윈도우로 시간 기반 집계를 수행한다
- Watermark: 지연 데이터의 허용 범위를 정의하고, 오래된 상태를 자동으로 정리한다
- Kafka 연동: subscribe, subscribePattern, assign으로 토픽을 지정하고, readStream/writeStream으로 양방향 통합한다
- foreachBatch: 마이크로 배치 단위로 커스텀 로직을 적용하여 임의의 외부 시스템에 결과를 전달한다
배치 처리에 익숙한 Spark 사용자라면, Structured Streaming은 자연스러운 확장이다. 같은 API, 같은 최적화 엔진 위에서 실시간 처리를 시작할 수 있다.