Back to Blog
SparkDataSourceParquetJDBCKafkaDelta LakeI/O

0x05. Spark 데이터 I/O와 외부 시스템 연동

Spark에서 다양한 데이터 소스(파일, DB, Kafka, Delta Lake)를 읽고 쓰는 방법과 I/O 최적화 전략을 알아본다.

Spark 애플리케이션의 시작과 끝은 데이터 읽기와 쓰기이다. 아무리 복잡한 변환 로직을 작성해도, 데이터를 효율적으로 읽고 쓰지 못하면 전체 파이프라인의 성능이 병목에 걸린다. 이 글에서는 Spark의 DataSource API를 통해 다양한 외부 시스템과 연동하는 방법을 다룬다.


DataSource API 기본

Spark는 통일된 인터페이스로 다양한 데이터 소스를 지원한다.

# 읽기
df = spark.read.format("포맷").option("", "").load("경로")

# 쓰기
df.write.format("포맷").option("", "").save("경로")

축약형도 제공한다.

df = spark.read.parquet("path/to/data")
df = spark.read.csv("path/to/data.csv", header=True, inferSchema=True)
df = spark.read.json("path/to/data.json")

파일 포맷

Parquet (기본값)

Parquet은 Spark의 기본 파일 포맷이자, 빅데이터 에코시스템의 사실상 표준이다.

# 읽기
df = spark.read.parquet("s3://bucket/data/")

# 쓰기
df.write.parquet("s3://bucket/output/")

Parquet의 핵심 장점:

  • 컬럼 기반 저장: 필요한 컬럼만 읽어 I/O를 최소화한다
  • 효율적 압축: 같은 타입의 값이 연속으로 저장되어 압축률이 높다
  • 스키마 내장: 파일 자체에 스키마 정보가 포함된다
  • Predicate Pushdown: 필터 조건을 파일 레벨에서 적용하여 불필요한 데이터를 건너뛴다
# Predicate Pushdown: price > 100인 행만 디스크에서 읽음
df = spark.read.parquet("products/").filter("price > 100")

Parquet 파일의 내부 구조는 Row Group → Column Chunk → Page로 구성된다. 각 Row Group에는 컬럼별 최솟값/최댓값 통계가 저장되어, 조건에 맞지 않는 Row Group은 아예 읽지 않는다.

CSV

사람이 읽을 수 있는 텍스트 포맷. 간단하지만 비효율적이다.

df = spark.read.csv("data.csv",
    header=True,
    inferSchema=True,
    sep=",",
    encoding="UTF-8",
    nullValue="NULL"
)

df.write.csv("output/",
    header=True,
    sep=",",
    mode="overwrite"
)

inferSchema=True는 편리하지만, 전체 데이터를 한 번 스캔하므로 대용량 파일에서는 스키마를 직접 지정하는 것이 좋다.

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("city", StringType(), True)
])

df = spark.read.csv("data.csv", header=True, schema=schema)

JSON

중첩 구조를 자연스럽게 표현할 수 있지만, 행 기반이라 분석 쿼리에는 비효율적이다.

df = spark.read.json("events.json")

# 중첩 필드 접근
df.select("user.name", "user.email", "event.type")

ORC

Parquet과 유사한 컬럼 기반 포맷. Hive 에코시스템에서 많이 사용된다.

df = spark.read.orc("hive-data/")

포맷 비교

포맷저장 방식압축률읽기 성능쓰기 성능적합한 용도
Parquet컬럼 기반높음매우 빠름보통분석 쿼리 (기본 선택)
ORC컬럼 기반높음매우 빠름보통Hive 연동
CSV행 기반낮음느림빠름데이터 교환, 간단한 작업
JSON행 기반낮음느림빠름API 데이터, 중첩 구조

특별한 이유가 없다면 Parquet을 기본으로 사용한다.


JDBC: 관계형 데이터베이스 연동

기본 읽기/쓰기

# DB에서 읽기
df = spark.read.format("jdbc") \
    .option("url", "jdbc:postgresql://host:5432/mydb") \
    .option("dbtable", "orders") \
    .option("user", "user") \
    .option("password", "password") \
    .option("driver", "org.postgresql.Driver") \
    .load()

# DB에 쓰기
df.write.format("jdbc") \
    .option("url", "jdbc:postgresql://host:5432/mydb") \
    .option("dbtable", "order_summary") \
    .option("user", "user") \
    .option("password", "password") \
    .mode("append") \
    .save()

병렬 읽기

기본적으로 JDBC 읽기는 단일 연결로 수행된다. 대용량 테이블에서는 병렬 읽기를 설정해야 한다.

df = spark.read.format("jdbc") \
    .option("url", "jdbc:postgresql://host:5432/mydb") \
    .option("dbtable", "orders") \
    .option("partitionColumn", "id") \
    .option("lowerBound", "1") \
    .option("upperBound", "1000000") \
    .option("numPartitions", "10") \
    .load()

이 설정으로 10개의 병렬 쿼리가 실행된다.

SELECT * FROM orders WHERE id >= 1 AND id < 100001
SELECT * FROM orders WHERE id >= 100001 AND id < 200001
...
SELECT * FROM orders WHERE id >= 900001 AND id <= 1000000

partitionColumn균등 분포된 숫자 컬럼이어야 한다. 분포가 편향되면 특정 파티션에 데이터가 몰려 병렬 효과가 떨어진다.

Pushdown Query

전체 테이블 대신 서브쿼리 결과만 가져올 수 있다.

query = "(SELECT id, amount, status FROM orders WHERE status = 'completed') AS filtered"

df = spark.read.format("jdbc") \
    .option("url", "jdbc:postgresql://host:5432/mydb") \
    .option("dbtable", query) \
    .load()

필요한 컬럼과 행만 DB에서 전달받아 네트워크 전송량을 줄인다.


Kafka 연동

Structured Streaming으로 읽기

df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "broker:9092") \
    .option("subscribe", "orders") \
    .option("startingOffsets", "latest") \
    .load()

# Kafka 메시지는 key, value, topic, partition, offset, timestamp 컬럼을 가짐
parsed = df.select(
    col("key").cast("string"),
    from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")

배치로 읽기

특정 범위의 Kafka 메시지를 배치로 읽을 수도 있다.

df = spark.read.format("kafka") \
    .option("kafka.bootstrap.servers", "broker:9092") \
    .option("subscribe", "orders") \
    .option("startingOffsets", "earliest") \
    .option("endingOffsets", "latest") \
    .load()

Kafka에 쓰기

df.select(
    col("user_id").cast("string").alias("key"),
    to_json(struct("*")).alias("value")
).write.format("kafka") \
    .option("kafka.bootstrap.servers", "broker:9092") \
    .option("topic", "processed-orders") \
    .save()

Delta Lake

Delta Lake는 Parquet 위에 ACID 트랜잭션, 스키마 관리, 타임 트래블을 추가한 오픈소스 스토리지 레이어이다. Databricks가 개발했다.

기본 사용

# Delta 테이블 쓰기
df.write.format("delta").save("/data/delta/orders")

# Delta 테이블 읽기
df = spark.read.format("delta").load("/data/delta/orders")

ACID 트랜잭션

여러 Spark 잡이 동시에 같은 테이블에 쓸 때도 데이터 일관성이 보장된다.

# MERGE (UPSERT): 있으면 업데이트, 없으면 삽입
from delta.tables import DeltaTable

deltaTable = DeltaTable.forPath(spark, "/data/delta/orders")

deltaTable.alias("target").merge(
    newData.alias("source"),
    "target.id = source.id"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()

타임 트래블

Delta Lake는 변경 이력을 보존한다. 과거 시점의 데이터를 조회할 수 있다.

# 특정 버전의 데이터
df = spark.read.format("delta") \
    .option("versionAsOf", 5) \
    .load("/data/delta/orders")

# 특정 시점의 데이터
df = spark.read.format("delta") \
    .option("timestampAsOf", "2026-02-11") \
    .load("/data/delta/orders")

잘못된 데이터를 쓴 경우 이전 버전으로 되돌릴 수 있다.

deltaTable.restoreToVersion(5)

스키마 관리

# 스키마 검증 (기본: 스키마가 다르면 에러)
df.write.format("delta").mode("append").save("/data/delta/orders")

# 스키마 자동 병합 ( 컬럼 자동 추가)
df.write.format("delta") \
    .option("mergeSchema", "true") \
    .mode("append") \
    .save("/data/delta/orders")

쓰기 모드와 파티셔닝

Save Mode

모드동작
append기존 데이터에 추가
overwrite기존 데이터를 교체
ignore이미 존재하면 무시
error (기본값)이미 존재하면 에러

파티셔닝

디스크에 데이터를 분할 저장하여 쿼리 성능을 향상시킨다.

df.write.partitionBy("year", "month") \
    .parquet("s3://bucket/orders/")

결과 디렉토리 구조:

orders/
├── year=2025/
   ├── month=01/
      └── part-00000.parquet
   └── month=02/
       └── part-00000.parquet
└── year=2026/
    └── month=01/
        └── part-00000.parquet

이후 WHERE year = 2026 AND month = 1로 쿼리하면, 해당 디렉토리만 읽어 I/O를 크게 줄인다(Partition Pruning).

파티션 키 선택 기준:

  • 쿼리에서 자주 필터링하는 컬럼 (날짜, 지역 등)
  • 카디널리티가 적당한 컬럼 (너무 많으면 소파일 문제)
  • 날짜 기반 파티셔닝이 가장 일반적이다

Bucketing

파티셔닝과 다른 차원의 최적화이다. 조인 키 기준으로 데이터를 미리 분배하여, 조인 시 Shuffle을 제거한다.

df.write.bucketBy(16, "user_id") \
    .sortBy("user_id") \
    .saveAsTable("orders_bucketed")

두 테이블이 같은 키로 버킷팅되어 있으면, 조인 시 Shuffle 없이 로컬에서 바로 매칭할 수 있다.


I/O 최적화 팁

소파일 문제(Small File Problem)

파일이 너무 많고 작으면, 파일 목록을 읽는 것만으로도 시간이 소요된다. 쓰기 전에 파티션 수를 조절한다.

# 출력 파일 수를 줄임
df.coalesce(10).write.parquet("output/")

# 또는 repartition으로 균등 분배
df.repartition(10).write.parquet("output/")

압축 설정

df.write.option("compression", "snappy").parquet("output/")  # 기본값
df.write.option("compression", "gzip").parquet("output/")    #  높은 압축률
df.write.option("compression", "zstd").parquet("output/")    # 압축률+속도 균형
압축 코덱압축률속도용도
snappy보통빠름기본 선택
gzip높음느림저장 공간 절약
zstd높음빠름snappy 대체 (권장)

정리

  • Parquet은 Spark의 기본이자 최적의 파일 포맷이다. Predicate Pushdown과 컬럼 프루닝으로 I/O를 최소화한다
  • JDBC 연동 시 partitionColumnnumPartitions으로 병렬 읽기를 설정해야 한다
  • Kafka 연동은 Structured Streaming과 배치 모두 지원한다
  • Delta Lake는 ACID 트랜잭션, MERGE, 타임 트래블로 데이터 레이크의 신뢰성을 높인다
  • 파티셔닝은 쿼리 성능의 핵심이며, 날짜 기반 파티셔닝이 가장 일반적이다