Back to Blog
SparkPerformanceShufflePartitioningCachingBroadcastAQECatalyst

0x04. Spark 성능 최적화

Spark 애플리케이션의 성능을 좌우하는 Shuffle, 파티셔닝, 캐싱 전략과 Catalyst Optimizer, AQE 등 최적화 메커니즘을 알아본다.

Spark는 빠르다. 인메모리 처리, DAG 기반 실행 계획, Catalyst Optimizer까지 갖추고 있어서, 같은 작업을 MapReduce보다 수십 배 빠르게 처리한다. 하지만 "Spark를 쓴다"와 "Spark를 쓴다" 사이에는 큰 간극이 있다. 동일한 데이터, 동일한 클러스터에서도 코드 한 줄 차이로 실행 시간이 수 배에서 수십 배까지 벌어질 수 있다.

성능 차이의 핵심에는 Shuffle이 있다. 그리고 Shuffle을 제어하는 열쇠가 파티셔닝이다. 이 글에서는 Spark 성능을 좌우하는 핵심 요인들을 하나씩 짚어보고, Spark가 내부적으로 수행하는 자동 최적화 메커니즘까지 살펴본다.


Shuffle: 성능의 최대 병목

Shuffle이란?

이전 글에서 Transformation을 NarrowWide로 구분했다. Narrow Transformation은 하나의 입력 파티션이 하나의 출력 파티션에만 기여하는 반면, Wide Transformation은 하나의 입력 파티션 데이터가 여러 출력 파티션으로 흩어진다. 이때 발생하는 데이터 재분배 과정이 바로 Shuffle이다.

Shuffle을 택배 물류에 비유하면 이렇다. 전국 10개 물류센터에 무작위로 쌓인 택배를 지역별로 다시 분류해야 한다고 하자. 서울행 택배는 서울 센터로, 부산행은 부산 센터로 옮겨야 한다. 이 과정에서 모든 센터가 모든 다른 센터에 택배를 보내야 하므로, 운송 비용이 폭발적으로 증가한다. Spark의 Shuffle도 마찬가지다.

Shuffle은 다음 단계를 거친다.

  1. Map 단계: 각 Executor가 자기 파티션의 데이터를 출력 파티션 번호(키의 해시값)에 따라 분류하고 디스크에 기록한다
  2. 네트워크 전송: 분류된 데이터를 해당 출력 파티션을 담당하는 Executor로 전송한다
  3. Reduce 단계: 수신한 데이터를 읽고 병합한다

문제는 이 과정에서 디스크 I/O(중간 결과 기록/읽기), 네트워크 I/O(Executor 간 데이터 전송), 직렬화/역직렬화(데이터 변환) 비용이 모두 발생한다는 점이다. Spark 작업에서 가장 비싼 연산이며, 성능 튜닝의 첫 번째 목표는 언제나 불필요한 Shuffle을 줄이는 것이다.

Shuffle을 유발하는 연산

어떤 연산이 Shuffle을 발생시키는지 알아야 피할 수 있다.

연산Shuffle 여부설명
groupByKey, reduceByKeyO같은 키의 데이터를 한 곳에 모아야 한다
join (일반)O양쪽 테이블의 같은 키를 매칭해야 한다
distinctO중복 제거를 위해 같은 값을 비교해야 한다
repartitionO명시적으로 데이터를 재분배한다
sortByKey, orderByO전역 정렬을 위해 데이터 이동이 필요하다
map, filter, flatMapX파티션 내부에서만 처리된다
coalesce (축소)X파티션을 합치기만 하므로 전체 셔플 없이 가능하다

Shuffle 최소화 전략

1. reduceByKey > groupByKey

같은 키의 값을 합산하는 경우를 생각해 보자.

# 비효율: 모든 값을 네트워크로 보낸  집계
rdd.groupByKey().mapValues(sum)

# 효율:  파티션에서 먼저 로컬 집계  네트워크 전송
rdd.reduceByKey(lambda a, b: a + b)

groupByKey는 모든 값을 Shuffle한 뒤 집계한다. 반면 reduceByKey는 각 파티션에서 로컬 집계(combiner)를 먼저 수행한 후 집계된 결과만 전송한다. 전송 데이터량이 크게 줄어든다. DataFrame API에서는 groupBy().agg()가 내부적으로 이 최적화를 자동 적용한다.

2. Broadcast Join

두 테이블을 조인할 때, 한쪽이 충분히 작다면 Broadcast Join을 사용할 수 있다. 작은 테이블을 모든 Executor에 통째로 복제(broadcast)하면, 큰 테이블의 데이터를 이동시키지 않아도 각 Executor가 로컬에서 조인을 수행할 수 있다. Shuffle이 완전히 제거된다.

from pyspark.sql.functions import broadcast

#  테이블 (수억 )
orders = spark.read.parquet("orders.parquet")

# 작은 테이블 (수천 )
products = spark.read.parquet("products.parquet")

# Broadcast Join: products를 모든 Executor에 복제
result = orders.join(broadcast(products), "product_id")

Spark는 기본적으로 한쪽 테이블이 10MB 이하일 때 자동으로 Broadcast Join을 선택한다. 이 임계값은 설정으로 조절할 수 있다.

# Broadcast 임계값을 50MB로 변경
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 50 * 1024 * 1024)

# Broadcast Join 비활성화
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

3. repartition vs coalesce

파티션 수를 변경할 때, repartitioncoalesce의 차이를 이해해야 한다.

# repartition: Full Shuffle 발생 (파티션 증가/감소 모두 가능)
df = df.repartition(200)
df = df.repartition("date")  # 특정  기준으로 재분배

# coalesce: Shuffle 없이 파티션 축소만 가능
df = df.coalesce(10)

repartition은 데이터를 완전히 재분배하므로 항상 Shuffle이 발생한다. coalesce는 인접한 파티션을 합치기만 하므로 Shuffle 없이 파티션 수를 줄일 수 있다. 단, coalesce는 파티션을 늘릴 수 없고, 합치는 과정에서 파티션 간 크기가 불균형해질 수 있다.

일반적인 가이드라인은 다음과 같다.

  • 파티션을 줄일 때: coalesce 사용 (예: 파일 저장 전 소수의 파일로 합치기)
  • 파티션을 늘릴 때: repartition 사용
  • 파티션을 특정 열 기준으로 재분배할 때: repartition("col") 사용

파티셔닝 전략

적절한 파티션 수 결정

파티션 수는 Spark 작업의 병렬성을 직접 결정한다. 파티션이 너무 적으면 일부 Executor만 일하고 나머지는 놀게 되며, 파티션이 너무 많으면 Task 스케줄링 오버헤드가 커지고 소량의 데이터를 처리하는 작은 Task가 넘쳐난다.

경험적 가이드라인은 다음과 같다.

  • 파티션 하나당 128MB 내외의 데이터가 적절하다
  • 클러스터 전체 코어 수의 2~4배를 파티션 수로 설정하면 균형 잡힌 병렬 처리가 가능하다
  • Shuffle 후 파티션 수는 spark.sql.shuffle.partitions 설정으로 제어한다 (기본값: 200)
# Shuffle  파티션  설정
spark.conf.set("spark.sql.shuffle.partitions", 100)

기본값 200은 소규모 데이터에는 과도하고, 대규모 데이터에는 부족할 수 있다. 데이터 규모에 맞게 조정하거나, 뒤에서 다룰 AQE(Adaptive Query Execution) 를 활성화하여 자동으로 결정하게 할 수 있다.

Data Skew 문제

Data Skew(데이터 편향) 는 특정 파티션에 데이터가 집중되는 현상이다. 예를 들어, 전자상거래 플랫폼에서 product_id를 기준으로 주문 데이터를 조인한다고 하자. 대부분의 상품은 주문이 수백 건이지만, 인기 상품은 수백만 건의 주문을 가진다. 이 인기 상품의 데이터가 하나의 파티션에 몰리면, 해당 파티션을 처리하는 Task 하나가 전체 Job의 병목이 된다.

Spark UI에서 한 Task만 유독 오래 걸리거나, 특정 Task의 데이터 크기가 다른 Task의 수십 배라면 Data Skew를 의심해야 한다.

해결 방법 1: Salting

Salting은 편향된 키에 랜덤 접두사(salt)를 추가하여 데이터를 인위적으로 분산시키는 기법이다.

from pyspark.sql.functions import col, lit, rand, floor, concat

num_salts = 10

#  테이블: 키에 랜덤 salt 추가
orders_salted = orders.withColumn(
    "salted_key",
    concat(col("product_id"), lit("_"), floor(rand() * num_salts).cast("string"))
)

# 작은 테이블: 모든 salt 값으로 복제
from pyspark.sql.functions import explode, array
products_salted = products.withColumn(
    "salt", explode(array([lit(str(i)) for i in range(num_salts)]))
).withColumn(
    "salted_key",
    concat(col("product_id"), lit("_"), col("salt"))
)

# Salted Key로 조인
result = orders_salted.join(products_salted, "salted_key")

원래 하나의 파티션에 몰렸을 데이터가 num_salts개의 파티션으로 분산된다. 조인 후 salt를 제거하면 원래 결과와 동일하다.

해결 방법 2: AQE Skew Join

Spark 3.0 이상에서는 AQE(Adaptive Query Execution) 가 Skew Join을 자동으로 최적화한다. 이에 대해서는 AQE 섹션에서 자세히 다룬다.

Partition Pruning

Partition Pruning(파티션 가지치기) 은 쿼리 조건에 맞는 파티션만 읽고 나머지를 건너뛰는 최적화다. 이전 글에서 partitionBy()로 데이터를 저장하면, 쿼리 시 해당 파티션 열 조건으로 불필요한 I/O를 제거할 수 있다고 했다.

# date 열로 파티셔닝된 Parquet 데이터
df = spark.read.parquet("data/partitioned_by_date/")

# 2026-01-15 데이터만 필요 -> 해당 파티션 디렉토리만 읽는다
result = df.filter(col("date") == "2026-01-15").select("user_id", "amount")

Partition Pruning이 효과적으로 작동하려면 다음을 고려해야 한다.

  • 파티션 열은 쿼리에서 자주 필터링하는 열이어야 한다 (날짜, 지역 등)
  • 파티션의 카디널리티가 적절해야 한다 (수십~수천 수준)
  • WHERE 절에 파티션 열을 직접 사용해야 한다. 파티션 열에 함수를 씌우면 (WHERE year(date) = 2026) 최적화가 적용되지 않을 수 있다

캐싱과 영속화

cache()와 persist()

동일한 DataFrame을 여러 번 사용하는 경우, 매번 처음부터 다시 계산하는 것은 낭비다. cache()persist()는 DataFrame의 연산 결과를 메모리나 디스크에 저장하여 재사용할 수 있게 한다.

# cache(): MEMORY_AND_DISK 레벨로 캐싱 (Spark 3.0+)
df_cached = df.cache()

# persist(): Storage Level을 직접 지정
from pyspark import StorageLevel
df_persisted = df.persist(StorageLevel.MEMORY_ONLY)

cache()persist()의 축약형이다. Spark 3.0 이상에서 cache()는 기본적으로 MEMORY_AND_DISK 레벨을 사용한다. 메모리에 먼저 저장하고, 메모리가 부족하면 나머지를 디스크에 기록한다.

Storage Level 선택

persist()에 전달할 수 있는 Storage Level은 다음과 같다.

Storage Level메모리디스크직렬화복제
MEMORY_ONLYOXX1
MEMORY_AND_DISKOOX1
MEMORY_ONLY_SEROXO1
MEMORY_AND_DISK_SEROOO1
DISK_ONLYXOX1
MEMORY_ONLY_2OXX2
  • MEMORY_ONLY: 가장 빠르지만, 메모리가 부족하면 캐싱되지 않은 파티션을 매번 다시 계산한다
  • MEMORY_AND_DISK: 메모리 부족 시 디스크를 활용하므로 안정적이다. 대부분의 경우 권장된다
  • _SER 접미사: 데이터를 직렬화하여 저장한다. 메모리 사용량은 줄어들지만 CPU 비용이 추가된다
  • _2 접미사: 데이터를 2개 노드에 복제한다. 장애 복구가 빨라지지만 저장 공간이 두 배 필요하다

언제 캐싱하고, 언제 하지 말아야 하는가

캐싱이 항상 유리한 것은 아니다.

캐싱이 효과적인 경우:

  • 동일한 DataFrame을 여러 Action에서 반복 사용할 때
  • 복잡한 Transformation 체인의 결과를 재활용할 때
  • 머신러닝의 반복 학습(iterative training) 처럼 같은 데이터를 여러 번 읽을 때

캐싱을 피해야 하는 경우:

  • DataFrame을 한 번만 사용할 때 (캐싱 오버헤드만 추가된다)
  • 데이터가 매우 커서 메모리 압박을 유발할 때
  • 이미 빠른 데이터 소스(Parquet + Partition Pruning 등)에서 읽는 경우
# 캐싱이 효과적인 
expensive_df = raw_df.join(lookup_df, "key").filter(col("value") > 100)
expensive_df.cache()

# 여러 Action에서 재사용
count = expensive_df.count()
summary = expensive_df.groupBy("category").agg(avg("value"))
expensive_df.write.parquet("output/")

# 사용 완료  캐시 해제
expensive_df.unpersist()

캐시는 메모리 자원을 점유하므로, 사용이 끝나면 unpersist()로 해제하는 것이 좋다.


Catalyst Optimizer와 Tungsten

0x02에서 Catalyst의 네 단계(Analysis, Logical Optimization, Physical Planning, Code Generation)를 다루었다. 여기서는 성능에 직접적인 영향을 미치는 주요 최적화 기법을 좀 더 깊이 살펴본다.

Predicate Pushdown

Predicate Pushdown(조건절 푸시다운) 은 필터 조건을 데이터 소스에 최대한 가깝게 밀어 넣는 최적화다.

df = spark.read.parquet("orders.parquet")
result = df.join(products, "product_id").filter(col("date") == "2026-01-15")

사용자가 조인 후에 필터를 작성하더라도, Catalyst는 이 필터를 조인 이전으로 이동시킨다. Parquet 파일의 메타데이터(min/max 통계, 딕셔너리 인코딩)를 활용하면, 조건에 맞지 않는 Row Group 전체를 읽지 않고 건너뛸 수 있다. 데이터 소스 단에서 걸러내므로 Spark로 들어오는 데이터 자체가 줄어든다.

Column Pruning

Column Pruning(열 가지치기) 은 쿼리에서 실제로 사용하는 열만 읽는 최적화다.

# 10개 열이 있는 Parquet 파일에서 2개 열만 사용
df = spark.read.parquet("wide_table.parquet")
result = df.select("name", "age")

Parquet 같은 열 기반 포맷에서는 필요한 열의 데이터만 읽으면 되므로, I/O가 대폭 줄어든다. SELECT * 대신 필요한 열만 명시하는 것만으로도 성능이 크게 향상될 수 있다.

Whole-Stage Code Generation

Catalyst의 마지막 단계에서는 Whole-Stage Code Generation이 수행된다. 여러 물리적 연산자(filter, project, aggregation 등)를 하나의 Java 메서드로 합쳐(fuse) 컴파일하는 기술이다.

전통적인 Volcano 모델(반복자 기반 실행)에서는 각 행이 연산자를 하나씩 거치며, 매 단계에서 가상 함수 호출이 발생한다. Whole-Stage Code Generation은 이 체인 전체를 하나의 루프로 합쳐서, 가상 함수 호출 오버헤드를 제거하고 CPU 캐시 효율을 높인다.

explain()으로 실행 계획을 확인할 때 WholeStageCodegen이라는 표시가 보이면, 해당 구간의 연산자들이 하나의 코드로 합쳐진 것이다.

df.filter(col("age") > 25).select("name", "age").explain()
# == Physical Plan ==
# *(1) Project [name#0, age#1]
# +- *(1) Filter (age#1 > 25)
#    +- *(1) ColumnarToRow
#       +- FileScan parquet [name#0,age#1] ...
# * 표시가 WholeStageCodegen을 의미한다

Adaptive Query Execution (AQE)

런타임 최적화의 필요성

Catalyst Optimizer는 강력하지만 한 가지 한계가 있다. 쿼리를 실행하기 전에 수립한 실행 계획이 항상 최적이라는 보장이 없다. 테이블 통계가 부정확하거나, 데이터의 실제 분포가 예상과 다를 수 있기 때문이다.

Adaptive Query Execution(AQE) 은 Spark 3.0에서 도입된 런타임 최적화 프레임워크다. 쿼리를 실행하면서 중간 결과의 실제 통계를 수집하고, 이를 바탕으로 나머지 실행 계획을 동적으로 수정한다.

# AQE 활성화 (Spark 3.2 이상에서는 기본 활성화)
spark.conf.set("spark.sql.adaptive.enabled", True)

AQE는 Shuffle 경계를 기준으로 쿼리를 스테이지(Stage) 로 나누고, 각 스테이지가 완료될 때마다 다음 스테이지의 실행 계획을 재최적화한다.

Coalescing Post-Shuffle Partitions

Shuffle 후 파티션 수를 자동으로 조절하는 기능이다.

spark.sql.shuffle.partitions를 200으로 설정했는데, 실제 Shuffle 후 데이터가 작아서 대부분의 파티션이 거의 비어 있는 경우가 흔하다. AQE는 Shuffle 결과의 실제 크기를 확인하고, 작은 파티션들을 자동으로 합쳐 적절한 수의 파티션으로 만든다.

# AQE Coalescing 관련 설정
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", True)
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionSize", "1MB")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "64MB")

이 기능 덕분에 spark.sql.shuffle.partitions를 완벽하게 튜닝하지 않아도, AQE가 런타임에 적절한 파티션 수를 결정해 준다. 보수적으로 높은 값을 설정해 두고 AQE에 맡기는 전략이 가능하다.

Converting Sort-Merge Join to Broadcast Join

Catalyst는 컴파일 타임의 테이블 통계를 기반으로 조인 전략을 결정한다. 통계가 부정확하여 Sort-Merge Join으로 계획했지만, 실제로 Shuffle 결과 한쪽 테이블이 충분히 작다면?

AQE는 Shuffle 후 실제 데이터 크기를 확인하고, 한쪽이 Broadcast 임계값 이하라면 Sort-Merge Join을 Broadcast Join으로 전환한다. 불필요한 Shuffle을 제거하여 성능을 크게 개선할 수 있다.

# AQE의 Broadcast Join 전환 설정
spark.conf.set("spark.sql.adaptive.autoBroadcastJoinThreshold", 30 * 1024 * 1024)

예를 들어 큰 테이블을 필터링한 후 조인하는 경우, 필터링 결과가 작아져서 Broadcast 가능해지더라도 기존 Catalyst는 이를 예측하기 어렵다. AQE는 필터링 후 실제 크기를 보고 판단하므로 더 정확한 결정을 내린다.

Optimizing Skew Joins

앞서 Data Skew를 Salting으로 해결하는 방법을 보았다. AQE는 이를 자동으로 처리한다.

AQE는 Shuffle 단계에서 각 파티션의 크기를 확인한다. 특정 파티션이 다른 파티션보다 현저히 크면(Data Skew), 해당 파티션을 여러 개의 작은 파티션으로 분할하고, 조인 상대 테이블의 해당 파티션을 복제하여 병렬 처리한다. 개발자가 Salting 코드를 작성할 필요 없이, Spark가 알아서 편향을 감지하고 해결한다.

# AQE Skew Join 설정
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", True)
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", 5)
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")
  • skewedPartitionFactor: 파티션 크기가 중앙값의 N배를 초과하면 Skew로 판단 (기본값: 5)
  • skewedPartitionThresholdInBytes: Skew 판단의 최소 크기 임계값 (기본값: 256MB)

두 조건을 모두 만족해야 Skew 파티션으로 분류된다.


정리

Spark 성능 최적화의 핵심은 Shuffle을 이해하고 제어하는 것에서 시작된다. 이 글에서 다룬 내용을 다시 정리하면 다음과 같다.

  • Shuffle: Wide Transformation에서 발생하는 데이터 재분배. 디스크 I/O, 네트워크 I/O, 직렬화 비용이 동반되며, Spark 작업에서 가장 비싼 연산이다
  • Shuffle 최소화: reduceByKey > groupByKey, Broadcast Join 활용, coalesce로 파티션 축소
  • 파티셔닝: 적절한 파티션 수 설정, Data Skew 해결(Salting 또는 AQE), Partition Pruning을 통한 I/O 절감
  • 캐싱: 반복 사용하는 DataFrame을 메모리에 유지. 한 번만 쓰는 데이터에는 불필요하다
  • Catalyst Optimizer: Predicate Pushdown, Column Pruning, Whole-Stage Code Generation으로 자동 최적화
  • AQE: 런타임 통계 기반으로 파티션 합치기, Broadcast Join 전환, Skew Join 최적화를 자동 수행

explain()으로 실행 계획을 확인하고, Spark UI에서 Stage별 소요 시간과 Shuffle 데이터 크기를 모니터링하는 습관을 들이면, 병목 지점을 빠르게 파악하고 적절한 최적화를 적용할 수 있다.