Back to Blog
FlinkWindowTumblingSlidingSessionWatermarkTriggerEvictor

0x03. Flink Window와 시간 - 스트림에 구간을 만들다

Flink의 윈도우 개념(Tumbling, Sliding, Session)과 Watermark, Trigger, Evictor를 통한 시간 기반 스트림 처리를 알아본다.

스트림 데이터는 끝이 없다. 센서 데이터, 클릭 로그, 금융 거래 기록 등 실시간으로 쏟아지는 데이터에서 "최근 5분간 평균 온도"나 "지난 1시간 매출 합계"를 구하려면 어떻게 해야 할까? 무한한 데이터 흐름에 유한한 구간을 만들어야 한다. 이것이 바로 Apache Flink의 윈도우(Window) 가 하는 일이다.


왜 윈도우가 필요한가?

배치 처리에서는 데이터의 시작과 끝이 명확하다. CSV 파일 하나를 읽어서 집계하면 된다. 하지만 스트림은 다르다. 데이터가 끊임없이 들어오기 때문에, 어디서부터 어디까지를 하나의 단위로 묶을지 기준이 필요하다.

윈도우는 무한 스트림을 유한한 조각으로 나누는 메커니즘이다. 마치 컨베이어 벨트 위를 흘러가는 물건들을 일정 구간마다 상자에 담아 처리하는 것과 같다. 상자에 담긴 데이터에 대해 합계, 평균, 카운트 같은 집계 연산을 수행할 수 있다.


Keyed Window vs Non-Keyed Window

Flink에서 윈도우를 사용하는 방식은 크게 두 가지로 나뉜다.

Keyed Window

keyBy()로 스트림을 키별로 분류한 뒤 윈도우를 적용하는 방식이다. 같은 키를 가진 이벤트끼리 독립적인 윈도우에서 처리된다. 병렬 처리가 가능하므로 대부분의 경우 이 방식을 사용한다.

stream
    .keyBy(event -> event.getUserId())
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .sum("amount");

Non-Keyed Window

keyBy() 없이 전체 스트림에 윈도우를 적용하는 방식이다. window() 대신 windowAll()을 사용한다. 모든 이벤트가 하나의 윈도우로 모이기 때문에, 병렬 처리가 불가능하다. 전체 스트림에 대한 글로벌 집계가 필요한 경우에만 사용해야 한다.

stream
    .windowAll(TumblingEventTimeWindows.of(Time.minutes(5)))
    .sum("amount");

윈도우 구성 요소

Keyed Window의 전체 구조를 코드로 표현하면 다음과 같다.

stream
    .keyBy(...)                        // 키 분류
    .window(...)                       // 필수: Window Assigner
   [.trigger(...)]                     // 선택: Trigger
   [.evictor(...)]                     // 선택: Evictor
   [.allowedLateness(...)]             // 선택: 허용 지연 시간
   [.sideOutputLateData(...)]          // 선택: 지연 데이터 사이드 출력
    .reduce() / .aggregate() / .apply()  // 필수: Window Function

Window Assigner가 이벤트를 어떤 윈도우에 배정할지 결정하고, Trigger가 언제 결과를 방출할지, Evictor가 어떤 요소를 제거할지를 담당한다. 이 세 가지 조합으로 매우 유연한 윈도우 처리가 가능하다.


Window 종류

Flink는 네 가지 기본 윈도우 타입을 제공한다. 각각의 특성을 이해하면, 상황에 맞는 윈도우를 선택할 수 있다.

Tumbling Window (텀블링 윈도우)

고정 크기의 윈도우가 겹치지 않고 연속으로 이어지는 방식이다. 각 이벤트는 정확히 하나의 윈도우에만 속한다. 가장 직관적이고 자주 사용되는 윈도우 타입이다.

|--- Window 1 ---|--- Window 2 ---|--- Window 3 ---|
0               5               10              15  ()

"5분마다 매출을 집계한다"처럼 고정 간격 집계에 적합하다.

// 이벤트 타임 기준 5분 텀블링 윈도우
stream
    .keyBy(event -> event.getSensorId())
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .sum("value");

// 처리 시간 기준 10초 텀블링 윈도우
stream
    .keyBy(event -> event.getSensorId())
    .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
    .sum("value");

Sliding Window (슬라이딩 윈도우)

고정 크기의 윈도우가 일정 간격(slide)으로 이동하면서 생성되는 방식이다. 윈도우끼리 겹칠 수 있으며, 하나의 이벤트가 여러 윈도우에 동시에 속할 수 있다.

|------ Window 1 ------|
      |------ Window 2 ------|
            |------ Window 3 ------|
0     2     4     6     8    10    12  ()
      (slide=2분, size=6분)

두 개의 파라미터가 필요하다. 윈도우 크기(size)슬라이드 간격(slide) 이다. "최근 10분간의 이동 평균을 2분마다 갱신한다"처럼, 중첩된 집계가 필요한 경우에 사용한다.

// 10분 크기, 2분 간격 슬라이딩 윈도우
stream
    .keyBy(event -> event.getSensorId())
    .window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(2)))
    .aggregate(new AverageAggregate());

슬라이드 간격이 윈도우 크기보다 작으면 윈도우가 겹치고, 같으면 텀블링 윈도우와 동일해진다.

Session Window (세션 윈도우)

텀블링이나 슬라이딩과 달리, 세션 윈도우는 고정된 크기가 없다. 대신, 이벤트 간의 비활성 간격(gap) 을 기준으로 윈도우를 구분한다. 일정 시간 동안 이벤트가 들어오지 않으면 세션이 종료되고, 새 이벤트가 오면 새 세션이 시작된다.

|-- Session 1 --|  gap  |---- Session 2 ----|  gap  |- Session 3 -|
 e  e  e   e           e   e   e    e   e           e   e

웹사이트 사용자의 활동 세션 분석이 대표적인 사용 사례다. 사용자가 30분간 아무 행동도 하지 않으면 세션이 끝났다고 판단하는 것이다.

// 고정 갭: 30분간 비활성이면 세션 종료
stream
    .keyBy(event -> event.getUserId())
    .window(EventTimeSessionWindows.withGap(Time.minutes(30)))
    .process(new SessionAnalyzer());

// 동적 갭: 이벤트에 따라 갭이 달라짐
stream
    .keyBy(event -> event.getUserId())
    .window(EventTimeSessionWindows.withDynamicGap(event -> {
        // 프리미엄 사용자는 1시간, 일반 사용자는 30분
        return event.isPremium() ? 3600000 : 1800000;
    }))
    .process(new SessionAnalyzer());

세션 윈도우의 내부 동작은 특이하다. 새 이벤트가 들어올 때마다 개별 윈도우가 생성되고, 간격이 충분히 가까운 윈도우끼리 병합(merge) 된다. 따라서 세션 윈도우와 함께 사용하는 Trigger나 Window Function도 병합을 지원해야 한다.

Global Window

모든 이벤트를 하나의 윈도우에 담는 방식이다. 기본 트리거가 NeverTrigger이므로, 커스텀 트리거를 반드시 지정해야 결과가 방출된다. 트리거 없이는 윈도우가 절대 닫히지 않는다.

stream
    .keyBy(event -> event.getKey())
    .window(GlobalWindows.create())
    .trigger(CountTrigger.of(100))  // 100개 이벤트마다 방출
    .aggregate(new MyAggregateFunction());

일반적인 시간 기반 집계에는 잘 사용하지 않지만, 특수한 커스텀 윈도우 로직이 필요할 때 유용하다.

윈도우 타입 비교

윈도우크기겹침주요 용도
Tumbling고정없음고정 간격 집계 (분/시간/일 단위 리포트)
Sliding고정있음이동 평균, 중첩 집계
Session가변없음사용자 세션, 활동 기반 분석
Global무한-커스텀 트리거와 조합한 특수 로직

시간의 종류: Event Time vs Processing Time

윈도우가 "어떤 구간"을 의미한다면, 그 구간의 기준이 되는 시간도 중요하다. Flink는 두 가지 주요 시간 개념을 제공한다.

Processing Time (처리 시간)

이벤트를 처리하는 머신의 시스템 시계 기준이다. 구현이 단순하고 지연이 가장 적다. 하지만 네트워크 지연이나 장애 복구 시 결과가 달라질 수 있어 결정적(deterministic) 결과를 보장하지 않는다.

Event Time (이벤트 시간)

이벤트가 실제로 발생한 시점을 기준으로 한다. 이벤트 자체에 포함된 타임스탬프를 사용한다. 이벤트가 늦게 도착하더라도, 발생 시점 기준으로 올바른 윈도우에 배정할 수 있어 정확하고 재현 가능한 결과를 보장한다.

다만 Event Time을 사용하려면 한 가지 문제를 해결해야 한다. "지금까지 어디까지 이벤트가 도착했는지" 를 시스템이 어떻게 알 수 있을까? 5분 윈도우의 끝 시점인 10:05가 지났는데, 10:04에 발생한 이벤트가 아직 안 왔을 수도 있다. 이 문제를 해결하는 것이 바로 Watermark이다.


Watermark: "여기까지는 다 왔다"는 신호

Watermark(워터마크) 는 스트림 속에 흐르는 특수한 타임스탬프 마커로, "이 시각 이전의 이벤트는 더 이상 오지 않을 것" 이라는 선언이다.

Watermark(t)가 발생하면, Flink는 타임스탬프가 t 이하인 이벤트는 모두 도착했다고 간주한다. 이를 통해 이벤트 타임 윈도우가 "이제 닫아도 되겠다"는 판단을 내릴 수 있다.

왜 Watermark가 필요한가?

현실 세계에서 이벤트는 순서대로 도착하지 않는다. 네트워크 지연, 시스템 장애, 파티션 간 속도 차이 등 다양한 이유로 늦게 도착하는 이벤트(late event) 가 발생한다.

10:00~10:05 윈도우를 처리한다고 가정하자. 10:06에 발생한 이벤트가 도착했다고 해서, 10:04에 발생한 이벤트가 아직 오지 않았을 수도 있다. 무한정 기다릴 수도 없고, 너무 일찍 닫으면 데이터를 놓친다. Watermark는 이 완전성(completeness)과 지연(latency) 사이의 트레이드오프를 제어하는 도구다.

Watermark 생성 전략

Flink는 두 가지 내장 Watermark 생성 전략을 제공한다.

forMonotonousTimestamps (단조 증가)

이벤트 타임스탬프가 항상 증가하는 순서로 들어올 때 사용한다. 현재까지 본 최대 타임스탬프를 Watermark로 사용한다. 이벤트가 순서대로 도착하는 이상적인 환경에 적합하다.

WatermarkStrategy
    .<Event>forMonotonousTimestamps()
    .withTimestampAssigner((event, timestamp) -> event.getTimestamp());

forBoundedOutOfOrderness (제한된 비순서)

현실적으로 이벤트 순서가 뒤바뀔 수 있음을 인정하고, 최대 지연 시간을 지정하는 전략이다. "이벤트는 최대 5초까지 늦을 수 있다"고 선언하면, Watermark는 현재 최대 타임스탬프 - 5초가 된다.

WatermarkStrategy
    .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
    .withTimestampAssigner((event, timestamp) -> event.getTimestamp());

이 전략이 대부분의 실무 환경에서 사용된다. 지연 허용 시간을 너무 짧게 잡으면 늦은 데이터가 누락되고, 너무 길게 잡으면 결과 출력이 그만큼 늦어진다.

Watermark 적용

Watermark 전략은 소스에 직접 적용하거나, 스트림 중간에 삽입할 수 있다.

// 소스에 직접 적용
DataStream<Event> stream = env
    .fromSource(source,
        WatermarkStrategy
            .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
            .withTimestampAssigner((event, ts) -> event.getTimestamp()),
        "source");

// 스트림 중간에 적용
DataStream<Event> withWatermarks = stream
    .assignTimestampsAndWatermarks(
        WatermarkStrategy
            .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
            .withTimestampAssigner((event, ts) -> event.getTimestamp())
    );

지연 데이터 처리

Watermark를 넘긴 뒤에 도착하는 이벤트는 기본적으로 삭제(drop) 된다. 하지만 allowedLateness()를 설정하면, 윈도우가 닫힌 뒤에도 지정된 시간만큼 추가 데이터를 수용할 수 있다.

stream
    .keyBy(event -> event.getKey())
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .allowedLateness(Time.minutes(1))
    .sideOutputLateData(lateOutputTag)  // 허용 시간도 넘긴 데이터는 사이드 출력
    .sum("value");

// 사이드 출력으로 빠진 늦은 데이터 별도 처리
DataStream<Event> lateEvents = result.getSideOutput(lateOutputTag);

이렇게 하면 Watermark 이후 1분까지는 윈도우에 포함시켜 결과를 재계산하고, 그마저 넘긴 이벤트는 사이드 출력으로 분리하여 별도 처리할 수 있다.


Trigger: 언제 결과를 내보낼 것인가

Trigger(트리거) 는 윈도우가 결과를 방출하는 시점을 결정한다. 윈도우에 이벤트가 쌓이더라도, 트리거가 발동(fire)해야 비로소 윈도우 함수가 실행되어 결과가 출력된다.

기본 트리거

각 윈도우 타입에는 기본 트리거가 내장되어 있다.

윈도우 타입기본 트리거발동 조건
Event Time 윈도우EventTimeTriggerWatermark가 윈도우 끝을 넘을 때
Processing Time 윈도우ProcessingTimeTrigger처리 시간이 윈도우 끝을 넘을 때
Global 윈도우NeverTrigger절대 발동하지 않음 (커스텀 필요)

대부분의 경우 기본 트리거만으로 충분하다.

TriggerResult

트리거가 반환하는 결과는 네 가지다.

  • CONTINUE: 아무것도 하지 않는다. 더 기다린다.
  • FIRE: 윈도우 함수를 실행하고 결과를 방출한다. 윈도우 상태는 유지된다.
  • PURGE: 윈도우 내용을 삭제한다. 결과는 방출하지 않는다.
  • FIRE_AND_PURGE: 결과를 방출하고 윈도우 내용을 삭제한다.

커스텀 트리거

내장 트리거를 조합하거나, Trigger 클래스를 상속하여 커스텀 트리거를 만들 수 있다. 트리거는 세 가지 콜백 메서드를 구현해야 한다.

public class MyCountTrigger extends Trigger<Object, TimeWindow> {
    private final int maxCount;

    // 이벤트가 윈도우에 추가될 때마다 호출
    @Override
    public TriggerResult onElement(Object element, long timestamp,
            TimeWindow window, TriggerContext ctx) {
        // 카운트 상태 업데이트
        ReducingState<Long> count = ctx.getPartitionedState(countStateDesc);
        count.add(1L);
        if (count.get() >= maxCount) {
            count.clear();
            return TriggerResult.FIRE_AND_PURGE;
        }
        return TriggerResult.CONTINUE;
    }

    // 이벤트 타임 타이머가 발동할 때 호출
    @Override
    public TriggerResult onEventTime(long time, TimeWindow window,
            TriggerContext ctx) {
        return TriggerResult.CONTINUE;
    }

    // 처리 시간 타이머가 발동할 때 호출
    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window,
            TriggerContext ctx) {
        return TriggerResult.CONTINUE;
    }
}

CountTrigger.of(100)처럼 이벤트 개수 기반 트리거를 사용하면, 시간과 무관하게 100개의 이벤트가 쌓일 때마다 결과를 방출한다. Global Window와 조합하여 사용하는 경우가 많다.


Evictor: 윈도우 속 요소를 골라내다

Evictor(이빅터) 는 트리거가 발동한 후, 윈도우 함수가 실행되기 전이나 후에 윈도우 내의 특정 요소를 제거하는 역할을 한다. 선택적 구성 요소이며, 대부분의 윈도우에서는 사용하지 않아도 된다.

Evictor는 두 가지 시점에 개입할 수 있다.

  • evictBefore: 윈도우 함수 실행 전에 요소를 제거
  • evictAfter: 윈도우 함수 실행 후에 요소를 제거

내장 Evictor

Flink는 세 가지 내장 Evictor를 제공한다.

CountEvictor

윈도우에서 지정된 개수만큼만 유지하고, 나머지는 윈도우 버퍼의 앞쪽부터 제거한다. 최근 N개의 이벤트만 대상으로 집계하고 싶을 때 유용하다.

stream
    .keyBy(event -> event.getKey())
    .window(GlobalWindows.create())
    .trigger(CountTrigger.of(10))
    .evictor(CountEvictor.of(5))  // 최근 5개만 남기고 제거
    .process(new MyWindowFunction());

TimeEvictor

윈도우 내에서 현재 최대 타임스탬프 기준으로, 지정된 시간 범위 밖의 오래된 요소를 제거한다. 예를 들어 TimeEvictor.of(Time.seconds(10))으로 설정하면, 윈도우 내 가장 최근 타임스탬프에서 10초 이전의 요소가 제거된다.

stream
    .keyBy(event -> event.getKey())
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .evictor(TimeEvictor.of(Time.seconds(30)))  // 최근 30초 내 요소만 유지
    .process(new MyWindowFunction());

DeltaEvictor

사용자 정의 DeltaFunction을 기준으로, 윈도우 내 가장 마지막 요소와의 차이(delta)가 임계값(threshold) 이상인 요소를 제거한다. 값의 변화량 기반 필터링에 활용할 수 있다.

stream
    .keyBy(event -> event.getKey())
    .window(GlobalWindows.create())
    .trigger(CountTrigger.of(10))
    .evictor(DeltaEvictor.of(
        0.5,  // threshold
        (oldEvent, newEvent) -> Math.abs(oldEvent.getValue() - newEvent.getValue())
    ))
    .process(new MyWindowFunction());

주의사항

Evictor를 사용하면 Flink가 사전 집계(pre-aggregation) 최적화를 적용할 수 없다. 일반적으로 윈도우에 이벤트가 도착할 때마다 점진적으로 집계하지만, Evictor가 있으면 모든 요소를 버퍼에 보관해야 하므로 메모리 사용량이 증가한다. 꼭 필요한 경우에만 사용하는 것이 좋다.


윈도우의 생명주기

마지막으로, 윈도우가 생성되고 소멸하는 전체 흐름을 정리하면 다음과 같다.

  1. 생성: 해당 윈도우에 속하는 첫 번째 이벤트가 도착하면, Window Assigner가 윈도우를 생성한다.
  2. 이벤트 수집: 이후 도착하는 이벤트가 윈도우에 추가된다. Trigger의 onElement()이 매번 호출된다.
  3. 트리거 발동: Trigger 조건이 충족되면 FIRE 또는 FIRE_AND_PURGE를 반환한다.
  4. Evictor 실행 (선택): evictBefore()로 요소를 제거한다.
  5. 윈도우 함수 실행: reduce(), aggregate(), process() 등이 실행되어 결과가 방출된다.
  6. Evictor 실행 (선택): evictAfter()로 요소를 제거한다.
  7. 소멸: Watermark가 윈도우 + allowedLateness를 넘으면, Flink가 윈도우 상태를 정리(garbage collect)한다.

정리

Flink의 윈도우 메커니즘은 무한한 스트림 데이터를 유한한 단위로 나누어 의미 있는 집계를 수행하는 핵심 기능이다.

  • Window Assigner가 이벤트를 윈도우에 배정한다. Tumbling(고정, 비겹침), Sliding(고정, 겹침), Session(가변, 비활성 갭 기준), Global(무한, 커스텀 트리거 필요)의 네 가지 타입이 있다.
  • Watermark는 이벤트 타임 기반 처리에서 "여기까지는 다 왔다"는 진행 신호를 보내, 윈도우가 안전하게 닫힐 수 있도록 한다. forBoundedOutOfOrderness로 지연을 허용하는 전략이 실무에서 가장 많이 쓰인다.
  • Trigger는 윈도우 결과를 언제 방출할지 결정한다. 대부분의 경우 기본 트리거로 충분하지만, 복잡한 요구사항에는 커스텀 트리거를 구현할 수 있다.
  • Evictor는 윈도우 함수 실행 전후에 특정 요소를 제거하는 선택적 도구다. 메모리 최적화와의 트레이드오프를 고려해야 한다.

이 네 가지 구성 요소의 조합으로, Flink는 단순한 시간 기반 집계부터 복잡한 세션 분석까지 다양한 스트림 처리 시나리오를 유연하게 지원한다.