Spark를 처음 배울 때 가장 먼저 만나는 개념이 RDD다. 분산 컬렉션을 다루는 저수준 API로, 강력하지만 사용하기 번거롭다. 스키마 정보가 없어서 데이터 구조를 개발자가 직접 관리해야 하고, 최적화도 수동으로 해야 한다.
DataFrame은 이 문제를 해결하기 위해 등장한 고수준 API다. 관계형 데이터베이스의 테이블처럼 행과 열로 구성되며, 스키마 정보를 기반으로 Spark가 자동 최적화를 수행한다. 여기에 SQL 인터페이스까지 결합한 것이 Spark SQL 모듈이다.
RDD vs DataFrame
RDD와 DataFrame의 차이를 이해하는 것이 출발점이다.
RDD(Resilient Distributed Dataset) 는 타입이 없는 분산 컬렉션이다. Python 리스트나 튜플의 분산 버전이라고 생각하면 된다. 데이터의 구조(열 이름, 타입)를 Spark가 알지 못하므로, 최적화 여지가 제한적이다.
DataFrame은 이름이 붙은 열(named column)로 구성된 분산 데이터셋이다. 관계형 데이터베이스의 테이블, 혹은 Pandas의 DataFrame과 개념적으로 같다. 핵심 차이는 Spark가 스키마(Schema) 를 알고 있다는 점이다. 열 이름과 데이터 타입 정보를 바탕으로, Spark의 최적화 엔진인 Catalyst가 실행 계획을 자동으로 최적화한다.
| 항목 | RDD | DataFrame |
|---|---|---|
| 추상화 수준 | 저수준 (Low-level) | 고수준 (High-level) |
| 스키마 | 없음 | 있음 (열 이름 + 타입) |
| 최적화 | 수동 | Catalyst에 의한 자동 최적화 |
| API 스타일 | 함수형 (map, filter, reduce) | 선언형 (select, groupBy, join) |
| 언어 간 성능 | Python이 Scala보다 느림 | 동일 (최적화된 실행 계획 공유) |
마지막 항목이 중요하다. RDD를 Python으로 다루면 JVM과 Python 프로세스 사이의 직렬화 비용 때문에 Scala보다 느리다. 반면 DataFrame은 어떤 언어로 작성하든 동일한 Catalyst 최적화를 거치므로 언어에 관계없이 성능이 같다.
DataFrame 생성
DataFrame을 만드는 방법은 크게 세 가지다.
파일에서 읽기
가장 일반적인 방법이다. SparkSession의 read 인터페이스를 통해 다양한 포맷의 파일을 DataFrame으로 로드한다.
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("DataFrame Example") \
.getOrCreate()
# JSON 파일 읽기
df = spark.read.json("people.json")
df.show()
# +----+-------+
# | age| name|
# +----+-------+
# |null|Michael|
# | 30| Andy|
# | 19| Justin|
# +----+-------+
df.printSchema()
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)
printSchema()로 스키마를 확인할 수 있다. JSON의 경우 Spark가 데이터를 스캔하여 스키마를 자동 추론(infer) 한다.
Python 컬렉션에서 생성
테스트나 프로토타이핑 시 유용하다. 리스트와 스키마를 직접 지정하여 DataFrame을 만든다.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
])
data = [("Alice", 30), ("Bob", 25), ("Charlie", 35)]
df = spark.createDataFrame(data, schema)
df.show()
# +-------+---+
# | name|age|
# +-------+---+
# | Alice| 30|
# | Bob| 25|
# |Charlie| 35|
# +-------+---+
StructType과 StructField로 스키마를 명시적으로 정의한다. 프로덕션 환경에서는 스키마 추론보다 명시적 스키마 지정이 권장된다. 추론 과정에서 데이터를 한 번 더 읽어야 하고, 추론 결과가 의도와 다를 수 있기 때문이다.
RDD에서 변환
기존 RDD가 있다면 스키마를 부여하여 DataFrame으로 변환할 수 있다.
from pyspark.sql import Row
rdd = spark.sparkContext.parallelize([
Row(name="Alice", age=30),
Row(name="Bob", age=25)
])
df = spark.createDataFrame(rdd)
Row 객체를 사용하면 필드 이름이 자동으로 열 이름이 된다.
DataFrame Operations
DataFrame의 핵심은 선언형(declarative) 연산이다. "어떻게(how)" 계산할지가 아니라 "무엇을(what)" 원하는지를 표현하면, Catalyst가 최적의 실행 방법을 결정한다.
select: 열 선택
# 단일 열 선택
df.select("name").show()
# 여러 열 선택 + 표현식
df.select(df["name"], df["age"] + 1).show()
# +-------+---------+
# | name|(age + 1)|
# +-------+---------+
# | Alice| 31|
# | Bob| 26|
# |Charlie| 36|
# +-------+---------+
filter (where): 행 필터링
# 조건에 맞는 행만 선택
df.filter(df["age"] > 25).show()
# +-------+---+
# | name|age|
# +-------+---+
# | Alice| 30|
# |Charlie| 35|
# +-------+---+
# where는 filter의 별칭(alias)
df.where(df["age"] > 25).show()
withColumn: 열 추가/변환
기존 DataFrame에 새로운 열을 추가하거나 기존 열을 변환한다. DataFrame은 불변(immutable) 이므로, 원본을 수정하는 것이 아니라 새로운 DataFrame을 반환한다.
from pyspark.sql.functions import col
# 새 열 추가
df.withColumn("age_after_10_years", col("age") + 10).show()
# +-------+---+------------------+
# | name|age|age_after_10_years|
# +-------+---+------------------+
# | Alice| 30| 40|
# | Bob| 25| 35|
# |Charlie| 35| 45|
# +-------+---+------------------+
groupBy + agg: 집계
groupBy()로 그룹을 나누고, agg()로 집계 함수를 적용한다. SQL의 GROUP BY와 같은 역할이다.
from pyspark.sql.functions import avg, max, count
# 단순 집계
df.groupBy("age").count().show()
# 여러 집계 함수를 한 번에
df.groupBy("department").agg(
avg("salary").alias("avg_salary"),
max("salary").alias("max_salary"),
count("*").alias("employee_count")
).show()
alias()로 결과 열의 이름을 지정할 수 있다. 지정하지 않으면 avg(salary) 같은 자동 생성 이름이 붙는다.
join: 테이블 결합
두 DataFrame을 특정 조건으로 결합한다. SQL의 JOIN과 동일한 개념이다.
employees = spark.createDataFrame([
(1, "Alice", 100),
(2, "Bob", 200),
(3, "Charlie", 100)
], ["id", "name", "dept_id"])
departments = spark.createDataFrame([
(100, "Engineering"),
(200, "Marketing")
], ["dept_id", "dept_name"])
# Inner Join
employees.join(departments, "dept_id").show()
# +-------+---+-------+-----------+
# |dept_id| id| name| dept_name|
# +-------+---+-------+-----------+
# | 100| 1| Alice|Engineering|
# | 100| 3|Charlie|Engineering|
# | 200| 2| Bob| Marketing|
# +-------+---+-------+-----------+
# Left Outer Join
employees.join(departments, "dept_id", "left").show()
join()의 세 번째 인자로 조인 타입을 지정한다. inner(기본값), left, right, outer, cross 등을 지원한다.
Spark SQL: SQL로 데이터 다루기
DataFrame API가 프로그래밍적 접근이라면, Spark SQL은 익숙한 SQL 문법으로 같은 작업을 수행하는 방법이다. 내부적으로는 동일한 Catalyst 엔진을 사용하므로 성능 차이가 없다.
Temporary View와 SQL 쿼리
DataFrame을 SQL로 쿼리하려면 먼저 임시 뷰(Temporary View) 로 등록해야 한다.
df.createOrReplaceTempView("people")
# SQL 쿼리 실행
result = spark.sql("SELECT name, age FROM people WHERE age > 25")
result.show()
# +-------+---+
# | name|age|
# +-------+---+
# | Alice| 30|
# |Charlie| 35|
# +-------+---+
createOrReplaceTempView()는 해당 SparkSession 범위 내에서만 유효한 임시 뷰를 생성한다. 같은 이름의 뷰가 이미 있으면 덮어쓴다.
세션을 넘어 공유해야 하는 경우에는 createGlobalTempView()를 사용한다. 글로벌 임시 뷰는 global_temp 데이터베이스에 등록되므로, 쿼리 시 global_temp.people처럼 접두사를 붙여야 한다.
df.createGlobalTempView("people")
# 다른 세션에서도 접근 가능
spark.newSession().sql("SELECT * FROM global_temp.people").show()
SQL vs DataFrame API
같은 연산을 두 가지 방식으로 표현할 수 있다. 어떤 방식을 선택하든 Catalyst가 동일한 실행 계획으로 변환한다.
# DataFrame API
df.filter(df["age"] > 25) \
.select("name", "age") \
.groupBy("age") \
.count() \
.show()
# SQL
df.createOrReplaceTempView("people")
spark.sql("""
SELECT age, count(*) as count
FROM people
WHERE age > 25
GROUP BY age
""").show()
팀 내 SQL에 익숙한 분석가와 협업할 때 Spark SQL이 유리하고, 복잡한 변환 로직이나 UDF(User Defined Function)를 조합할 때는 DataFrame API가 편리하다. 상황에 맞게 혼용하면 된다.
Catalyst Optimizer: Spark SQL의 두뇌
DataFrame과 Spark SQL이 RDD보다 빠른 이유, 그리고 어떤 언어로 작성하든 성능이 같은 이유는 Catalyst Optimizer 덕분이다.
Catalyst는 Spark SQL의 쿼리 최적화 엔진으로, 사용자가 작성한 쿼리를 분석하고 최적의 실행 계획을 자동으로 생성한다. 트리(Tree) 기반의 규칙 적용 프레임워크이며, 네 단계를 거쳐 쿼리를 최적화한다.
1단계: Analysis (분석)
사용자가 작성한 쿼리에서 미해결 참조(unresolved references) 를 해결한다. 예를 들어 SELECT name FROM people에서 name이라는 열이 실제로 존재하는지, 어떤 타입인지를 카탈로그(Catalog)를 참조하여 확인한다. 이 과정을 거치면 Resolved Logical Plan(해결된 논리 계획) 이 만들어진다.
2단계: Logical Optimization (논리적 최적화)
규칙 기반(rule-based)으로 논리 계획을 최적화한다. 대표적인 최적화 기법은 다음과 같다.
- Predicate Pushdown(조건절 푸시다운): 필터 조건을 데이터 소스에 최대한 가깝게 이동시켜 불필요한 데이터 읽기를 줄인다
- Column Pruning(열 가지치기): 실제로 사용하는 열만 읽어 I/O를 절감한다
- Constant Folding(상수 접기):
1 + 2같은 상수 표현식을 컴파일 시점에 미리 계산한다
3단계: Physical Planning (물리적 계획)
최적화된 논리 계획을 실제 실행 가능한 물리적 계획으로 변환한다. 여러 대안을 생성한 후, 비용 모델(cost model)을 기반으로 가장 효율적인 계획을 선택한다. 예를 들어 조인 연산의 경우, 데이터 크기에 따라 BroadcastHashJoin, SortMergeJoin 등 적합한 전략을 결정한다.
4단계: Code Generation (코드 생성)
최종 물리 계획을 최적화된 Java 바이트코드로 컴파일한다. Whole-Stage Code Generation이라 불리는 이 과정은, 여러 연산을 하나의 함수로 합쳐(fuse) 가상 함수 호출 오버헤드를 제거한다.
이 네 단계를 그림으로 정리하면 다음과 같은 흐름이다.
SQL / DataFrame API
|
v
[Unresolved Logical Plan]
| -- Analysis (카탈로그 참조) -->
v
[Resolved Logical Plan]
| -- Logical Optimization (규칙 기반) -->
v
[Optimized Logical Plan]
| -- Physical Planning (비용 기반) -->
v
[Physical Plan]
| -- Code Generation -->
v
[Optimized Java Bytecode]
|
v
실행 (RDD 연산)
핵심은, DataFrame API든 SQL이든 동일한 파이프라인을 거친다는 점이다. 사용자는 "무엇을 원하는지"만 선언하고, "어떻게 실행할지"는 Catalyst에 맡기면 된다.
Data I/O: 데이터 읽기와 쓰기
Spark는 다양한 데이터 포맷을 지원한다. 주요 포맷의 특징과 사용법을 살펴본다.
CSV
가장 널리 사용되는 텍스트 기반 포맷이다. 사람이 읽을 수 있고, 거의 모든 도구에서 지원한다.
# CSV 읽기
df = spark.read.load(
"data.csv",
format="csv",
sep=",",
header="true",
inferSchema="true"
)
# CSV 쓰기
df.write.csv("output.csv", header=True)
header: 첫 행을 열 이름으로 사용할지 여부inferSchema: 데이터 타입 자동 추론 (기본값false-- 모든 열이 문자열로 읽힌다)sep: 구분자 지정
CSV는 편리하지만 스키마 정보가 파일에 포함되지 않고, 텍스트 기반이라 용량이 크고 읽기 속도가 느리다. 분석 초기 단계에서 데이터를 확인할 때 주로 사용하고, 본격적인 파이프라인에서는 Parquet 등 컬럼나 포맷으로 변환하는 것이 일반적이다.
JSON
반정형(semi-structured) 데이터에 적합하다. 중첩 구조를 자연스럽게 표현할 수 있다.
# JSON 읽기
df = spark.read.json("data.json")
# JSON 쓰기
df.write.json("output.json")
Spark는 JSON을 읽을 때 스키마를 자동 추론한다. 다만 Spark가 기대하는 JSON 형식은 JSON Lines(줄 단위 JSON)이다. 한 행이 하나의 JSON 객체에 대응한다.
{"name": "Alice", "age": 30}
{"name": "Bob", "age": 25}
JSON도 텍스트 기반이므로 CSV와 비슷한 성능 한계를 가진다. 중첩 구조가 필요한 경우가 아니라면, 분석 워크로드에는 Parquet이 더 적합하다.
Parquet
Apache Parquet은 빅데이터 생태계의 사실상 표준 파일 포맷이다. 열 기반(columnar) 저장 방식으로, 분석 쿼리에 최적화되어 있다.
# Parquet 읽기
df = spark.read.parquet("data.parquet")
# Parquet 쓰기
df.write.parquet("output.parquet")
Parquet이 선호되는 이유는 명확하다.
- 열 기반 압축: 같은 타입의 데이터가 연속으로 저장되므로 압축률이 높다. CSV 대비 수 배에서 수십 배 작다.
- Column Pruning: 필요한 열만 읽을 수 있어,
SELECT name FROM table같은 쿼리에서 불필요한 열의 I/O를 완전히 건너뛴다. - 스키마 내장(self-describing): 파일 자체에 스키마 정보가 포함되어 있어, 별도 스키마 지정 없이 읽을 수 있다.
- Predicate Pushdown: 파일 메타데이터(min/max 통계)를 활용하여, 조건에 맞지 않는 데이터 블록을 아예 읽지 않는다.
| 포맷 | 저장 방식 | 스키마 포함 | 압축률 | 읽기 속도 |
|---|---|---|---|---|
| CSV | 행 기반, 텍스트 | X | 낮음 | 느림 |
| JSON | 행 기반, 텍스트 | X | 낮음 | 느림 |
| Parquet | 열 기반, 바이너리 | O | 높음 | 빠름 |
파티셔닝: 대규모 데이터 정리의 핵심
수십 TB 규모의 데이터를 하나의 거대한 파일로 저장하면 어떻게 될까? 특정 날짜의 데이터만 필요한 쿼리도 전체 파일을 스캔해야 한다. 파티셔닝(Partitioning) 은 이 문제를 해결한다.
파티셔닝이란
데이터를 특정 열의 값에 따라 디렉토리로 분리하여 저장하는 방식이다. 예를 들어 date 열로 파티셔닝하면, 각 날짜별로 별도 디렉토리가 생성된다.
# date 열로 파티셔닝하여 저장
df.write.partitionBy("date").parquet("output/")
저장 결과의 디렉토리 구조는 다음과 같다.
output/
├── date=2026-01-01/
│ └── part-00000.parquet
├── date=2026-01-02/
│ └── part-00000.parquet
└── date=2026-01-03/
└── part-00000.parquet
Partition Discovery (파티션 자동 감지)
Spark는 이 디렉토리 구조를 읽을 때 파티션 열을 자동으로 인식한다. date=2026-01-01 형식의 디렉토리 이름에서 열 이름(date)과 값(2026-01-01)을 추출하여, DataFrame의 열로 자동 추가한다.
# 파티셔닝된 데이터 읽기 -- date 열이 자동으로 포함된다
df = spark.read.parquet("output/")
df.printSchema()
# root
# |-- name: string (nullable = true)
# |-- age: long (nullable = true)
# |-- date: date (nullable = true) <-- 파티션 열 자동 추가
여러 열로 중첩 파티셔닝도 가능하다.
df.write.partitionBy("year", "month").parquet("output/")
output/
├── year=2026/
│ ├── month=01/
│ │ └── part-00000.parquet
│ └── month=02/
│ └── part-00000.parquet
└── ...
파티셔닝의 효과
WHERE date = '2026-01-01' 조건이 있는 쿼리를 실행하면, Spark는 해당 파티션 디렉토리만 읽는다. 나머지 파티션은 아예 스캔하지 않으므로, 데이터 규모에 비례하여 I/O가 대폭 줄어든다. 이것이 Partition Pruning(파티션 가지치기) 이다.
파티셔닝 열을 선택할 때는 카디널리티(cardinality) 를 고려해야 한다. 고유 값이 너무 많은 열(예: 사용자 ID)로 파티셔닝하면 수백만 개의 작은 디렉토리가 생성되어 오히려 성능이 저하된다. 날짜, 지역, 카테고리처럼 적절한 수의 고유 값을 가진 열이 파티셔닝에 적합하다.
Schema Merging (스키마 병합)
시간이 지나면서 데이터 스키마가 변경될 수 있다. 초기에는 name과 age 열만 있다가, 나중에 email 열이 추가되는 경우다. Parquet은 파일마다 독립적인 스키마를 가지므로, 서로 다른 스키마의 파일이 공존할 수 있다.
# 스키마 병합을 활성화하여 읽기
df = spark.read.option("mergeSchema", "true").parquet("data/")
df.printSchema()
# root
# |-- name: string (nullable = true)
# |-- age: long (nullable = true)
# |-- email: string (nullable = true) <-- 일부 파일에만 존재, 나머지는 null
스키마 병합은 모든 파일의 스키마를 읽어야 하므로 비용이 높은 연산이다. 기본적으로 비활성화되어 있으며, 필요한 경우에만 mergeSchema 옵션을 켜서 사용한다.
Save Mode: 저장 전략
데이터를 쓸 때 이미 같은 경로에 데이터가 존재하면 어떻게 할지를 Save Mode로 지정한다.
| Mode | 동작 |
|---|---|
error (기본값) | 데이터가 이미 존재하면 예외 발생 |
append | 기존 데이터에 추가 |
overwrite | 기존 데이터를 삭제하고 새로 쓰기 |
ignore | 데이터가 이미 존재하면 아무것도 하지 않음 |
df.write.mode("overwrite").parquet("output/")
배치 파이프라인에서는 overwrite가 흔하고, 스트리밍이나 증분 로드에서는 append를 사용한다. 기본값이 error이므로, 반복 실행 가능한 파이프라인에서는 반드시 mode를 명시해야 한다.
정리
Spark의 DataFrame과 SQL은 RDD의 저수준 복잡성을 추상화하면서도, Catalyst Optimizer를 통해 동등하거나 더 나은 성능을 제공한다. 핵심을 다시 정리하면 다음과 같다.
- DataFrame: 스키마 기반의 고수준 분산 데이터 구조. 선언형 API로 데이터를 변환한다.
- DataFrame Operations:
select,filter,groupBy,join,withColumn등 SQL과 유사한 연산을 프로그래밍적으로 수행한다. - Spark SQL: DataFrame을 임시 뷰로 등록하고 SQL 문법으로 쿼리한다. DataFrame API와 성능 차이가 없다.
- Catalyst Optimizer: Analysis, Logical Optimization, Physical Planning, Code Generation 네 단계를 거쳐 쿼리를 자동 최적화한다.
- Data I/O: CSV, JSON, Parquet 등 다양한 포맷을 지원하며, 분석 워크로드에는 열 기반 포맷인 Parquet이 표준이다.
- Partitioning: 특정 열 값으로 데이터를 디렉토리 단위로 분리하여, Partition Pruning을 통해 I/O를 대폭 줄인다.
실무에서 Spark를 사용할 때, RDD를 직접 다루는 경우는 거의 없다. DataFrame과 Spark SQL이 대부분의 데이터 처리 요구를 충족하며, Catalyst가 최적화를 알아서 해주기 때문이다.