Spark 애플리케이션의 시작과 끝은 데이터 읽기와 쓰기이다. 아무리 복잡한 변환 로직을 작성해도, 데이터를 효율적으로 읽고 쓰지 못하면 전체 파이프라인의 성능이 병목에 걸린다. 이 글에서는 Spark의 DataSource API를 통해 다양한 외부 시스템과 연동하는 방법을 다룬다.
DataSource API 기본
Spark는 통일된 인터페이스로 다양한 데이터 소스를 지원한다.
# 읽기
df = spark.read.format("포맷").option("키", "값").load("경로")
# 쓰기
df.write.format("포맷").option("키", "값").save("경로")
축약형도 제공한다.
df = spark.read.parquet("path/to/data")
df = spark.read.csv("path/to/data.csv", header=True, inferSchema=True)
df = spark.read.json("path/to/data.json")
파일 포맷
Parquet (기본값)
Parquet은 Spark의 기본 파일 포맷이자, 빅데이터 에코시스템의 사실상 표준이다.
# 읽기
df = spark.read.parquet("s3://bucket/data/")
# 쓰기
df.write.parquet("s3://bucket/output/")
Parquet의 핵심 장점:
- 컬럼 기반 저장: 필요한 컬럼만 읽어 I/O를 최소화한다
- 효율적 압축: 같은 타입의 값이 연속으로 저장되어 압축률이 높다
- 스키마 내장: 파일 자체에 스키마 정보가 포함된다
- Predicate Pushdown: 필터 조건을 파일 레벨에서 적용하여 불필요한 데이터를 건너뛴다
# Predicate Pushdown: price > 100인 행만 디스크에서 읽음
df = spark.read.parquet("products/").filter("price > 100")
Parquet 파일의 내부 구조는 Row Group → Column Chunk → Page로 구성된다. 각 Row Group에는 컬럼별 최솟값/최댓값 통계가 저장되어, 조건에 맞지 않는 Row Group은 아예 읽지 않는다.
CSV
사람이 읽을 수 있는 텍스트 포맷. 간단하지만 비효율적이다.
df = spark.read.csv("data.csv",
header=True,
inferSchema=True,
sep=",",
encoding="UTF-8",
nullValue="NULL"
)
df.write.csv("output/",
header=True,
sep=",",
mode="overwrite"
)
inferSchema=True는 편리하지만, 전체 데이터를 한 번 스캔하므로 대용량 파일에서는 스키마를 직접 지정하는 것이 좋다.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("city", StringType(), True)
])
df = spark.read.csv("data.csv", header=True, schema=schema)
JSON
중첩 구조를 자연스럽게 표현할 수 있지만, 행 기반이라 분석 쿼리에는 비효율적이다.
df = spark.read.json("events.json")
# 중첩 필드 접근
df.select("user.name", "user.email", "event.type")
ORC
Parquet과 유사한 컬럼 기반 포맷. Hive 에코시스템에서 많이 사용된다.
df = spark.read.orc("hive-data/")
포맷 비교
| 포맷 | 저장 방식 | 압축률 | 읽기 성능 | 쓰기 성능 | 적합한 용도 |
|---|---|---|---|---|---|
| Parquet | 컬럼 기반 | 높음 | 매우 빠름 | 보통 | 분석 쿼리 (기본 선택) |
| ORC | 컬럼 기반 | 높음 | 매우 빠름 | 보통 | Hive 연동 |
| CSV | 행 기반 | 낮음 | 느림 | 빠름 | 데이터 교환, 간단한 작업 |
| JSON | 행 기반 | 낮음 | 느림 | 빠름 | API 데이터, 중첩 구조 |
특별한 이유가 없다면 Parquet을 기본으로 사용한다.
JDBC: 관계형 데이터베이스 연동
기본 읽기/쓰기
# DB에서 읽기
df = spark.read.format("jdbc") \
.option("url", "jdbc:postgresql://host:5432/mydb") \
.option("dbtable", "orders") \
.option("user", "user") \
.option("password", "password") \
.option("driver", "org.postgresql.Driver") \
.load()
# DB에 쓰기
df.write.format("jdbc") \
.option("url", "jdbc:postgresql://host:5432/mydb") \
.option("dbtable", "order_summary") \
.option("user", "user") \
.option("password", "password") \
.mode("append") \
.save()
병렬 읽기
기본적으로 JDBC 읽기는 단일 연결로 수행된다. 대용량 테이블에서는 병렬 읽기를 설정해야 한다.
df = spark.read.format("jdbc") \
.option("url", "jdbc:postgresql://host:5432/mydb") \
.option("dbtable", "orders") \
.option("partitionColumn", "id") \
.option("lowerBound", "1") \
.option("upperBound", "1000000") \
.option("numPartitions", "10") \
.load()
이 설정으로 10개의 병렬 쿼리가 실행된다.
SELECT * FROM orders WHERE id >= 1 AND id < 100001
SELECT * FROM orders WHERE id >= 100001 AND id < 200001
...
SELECT * FROM orders WHERE id >= 900001 AND id <= 1000000
partitionColumn은 균등 분포된 숫자 컬럼이어야 한다. 분포가 편향되면 특정 파티션에 데이터가 몰려 병렬 효과가 떨어진다.
Pushdown Query
전체 테이블 대신 서브쿼리 결과만 가져올 수 있다.
query = "(SELECT id, amount, status FROM orders WHERE status = 'completed') AS filtered"
df = spark.read.format("jdbc") \
.option("url", "jdbc:postgresql://host:5432/mydb") \
.option("dbtable", query) \
.load()
필요한 컬럼과 행만 DB에서 전달받아 네트워크 전송량을 줄인다.
Kafka 연동
Structured Streaming으로 읽기
df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "orders") \
.option("startingOffsets", "latest") \
.load()
# Kafka 메시지는 key, value, topic, partition, offset, timestamp 컬럼을 가짐
parsed = df.select(
col("key").cast("string"),
from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")
배치로 읽기
특정 범위의 Kafka 메시지를 배치로 읽을 수도 있다.
df = spark.read.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "orders") \
.option("startingOffsets", "earliest") \
.option("endingOffsets", "latest") \
.load()
Kafka에 쓰기
df.select(
col("user_id").cast("string").alias("key"),
to_json(struct("*")).alias("value")
).write.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("topic", "processed-orders") \
.save()
Delta Lake
Delta Lake는 Parquet 위에 ACID 트랜잭션, 스키마 관리, 타임 트래블을 추가한 오픈소스 스토리지 레이어이다. Databricks가 개발했다.
기본 사용
# Delta 테이블 쓰기
df.write.format("delta").save("/data/delta/orders")
# Delta 테이블 읽기
df = spark.read.format("delta").load("/data/delta/orders")
ACID 트랜잭션
여러 Spark 잡이 동시에 같은 테이블에 쓸 때도 데이터 일관성이 보장된다.
# MERGE (UPSERT): 있으면 업데이트, 없으면 삽입
from delta.tables import DeltaTable
deltaTable = DeltaTable.forPath(spark, "/data/delta/orders")
deltaTable.alias("target").merge(
newData.alias("source"),
"target.id = source.id"
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
타임 트래블
Delta Lake는 변경 이력을 보존한다. 과거 시점의 데이터를 조회할 수 있다.
# 특정 버전의 데이터
df = spark.read.format("delta") \
.option("versionAsOf", 5) \
.load("/data/delta/orders")
# 특정 시점의 데이터
df = spark.read.format("delta") \
.option("timestampAsOf", "2026-02-11") \
.load("/data/delta/orders")
잘못된 데이터를 쓴 경우 이전 버전으로 되돌릴 수 있다.
deltaTable.restoreToVersion(5)
스키마 관리
# 스키마 검증 (기본: 스키마가 다르면 에러)
df.write.format("delta").mode("append").save("/data/delta/orders")
# 스키마 자동 병합 (새 컬럼 자동 추가)
df.write.format("delta") \
.option("mergeSchema", "true") \
.mode("append") \
.save("/data/delta/orders")
쓰기 모드와 파티셔닝
Save Mode
| 모드 | 동작 |
|---|---|
append | 기존 데이터에 추가 |
overwrite | 기존 데이터를 교체 |
ignore | 이미 존재하면 무시 |
error (기본값) | 이미 존재하면 에러 |
파티셔닝
디스크에 데이터를 분할 저장하여 쿼리 성능을 향상시킨다.
df.write.partitionBy("year", "month") \
.parquet("s3://bucket/orders/")
결과 디렉토리 구조:
orders/
├── year=2025/
│ ├── month=01/
│ │ └── part-00000.parquet
│ └── month=02/
│ └── part-00000.parquet
└── year=2026/
└── month=01/
└── part-00000.parquet
이후 WHERE year = 2026 AND month = 1로 쿼리하면, 해당 디렉토리만 읽어 I/O를 크게 줄인다(Partition Pruning).
파티션 키 선택 기준:
- 쿼리에서 자주 필터링하는 컬럼 (날짜, 지역 등)
- 카디널리티가 적당한 컬럼 (너무 많으면 소파일 문제)
- 날짜 기반 파티셔닝이 가장 일반적이다
Bucketing
파티셔닝과 다른 차원의 최적화이다. 조인 키 기준으로 데이터를 미리 분배하여, 조인 시 Shuffle을 제거한다.
df.write.bucketBy(16, "user_id") \
.sortBy("user_id") \
.saveAsTable("orders_bucketed")
두 테이블이 같은 키로 버킷팅되어 있으면, 조인 시 Shuffle 없이 로컬에서 바로 매칭할 수 있다.
I/O 최적화 팁
소파일 문제(Small File Problem)
파일이 너무 많고 작으면, 파일 목록을 읽는 것만으로도 시간이 소요된다. 쓰기 전에 파티션 수를 조절한다.
# 출력 파일 수를 줄임
df.coalesce(10).write.parquet("output/")
# 또는 repartition으로 균등 분배
df.repartition(10).write.parquet("output/")
압축 설정
df.write.option("compression", "snappy").parquet("output/") # 기본값
df.write.option("compression", "gzip").parquet("output/") # 더 높은 압축률
df.write.option("compression", "zstd").parquet("output/") # 압축률+속도 균형
| 압축 코덱 | 압축률 | 속도 | 용도 |
|---|---|---|---|
| snappy | 보통 | 빠름 | 기본 선택 |
| gzip | 높음 | 느림 | 저장 공간 절약 |
| zstd | 높음 | 빠름 | snappy 대체 (권장) |
정리
- Parquet은 Spark의 기본이자 최적의 파일 포맷이다. Predicate Pushdown과 컬럼 프루닝으로 I/O를 최소화한다
- JDBC 연동 시
partitionColumn과numPartitions으로 병렬 읽기를 설정해야 한다 - Kafka 연동은 Structured Streaming과 배치 모두 지원한다
- Delta Lake는 ACID 트랜잭션, MERGE, 타임 트래블로 데이터 레이크의 신뢰성을 높인다
- 파티셔닝은 쿼리 성능의 핵심이며, 날짜 기반 파티셔닝이 가장 일반적이다