Back to Blog
KafkaKafka ConnectConnectorSourceSinkCDCDebezium

0x04. Kafka Connect - 데이터 통합 프레임워크

Kafka Connect의 아키텍처와 Source/Sink 커넥터 구성, 그리고 Debezium을 활용한 CDC 파이프라인을 알아본다.

데이터 파이프라인을 구축할 때 가장 반복적인 작업은 "A 시스템에서 데이터를 읽어 Kafka에 넣고, Kafka에서 읽어 B 시스템에 쓴다" 는 것이다. 이 작업을 매번 Producer/Consumer 코드로 직접 구현하면, 에러 핸들링, Offset 관리, 스키마 변환 등 동일한 문제를 반복적으로 풀어야 한다.

Kafka Connect는 이 문제를 해결하는 데이터 통합 프레임워크이다. 코드를 작성하지 않고 설정만으로 외부 시스템과 Kafka 사이의 데이터 파이프라인을 구축할 수 있다.


Kafka Connect 아키텍처

핵심 개념

외부 시스템 ──[Source Connector]──→ Kafka Topic ──[Sink Connector]──→ 외부 시스템
  • Source Connector: 외부 시스템에서 데이터를 읽어 Kafka Topic에 쓴다 (DB → Kafka)
  • Sink Connector: Kafka Topic에서 데이터를 읽어 외부 시스템에 쓴다 (Kafka → DB, ES, S3 등)
  • Worker: Connector를 실행하는 프로세스. Standalone 또는 Distributed 모드
  • Task: 실제 데이터 복사를 수행하는 단위. 하나의 Connector는 여러 Task로 병렬화된다
  • Converter: Kafka 메시지의 직렬화/역직렬화를 담당한다 (JSON, Avro, Protobuf 등)

Standalone vs Distributed 모드

Standalone 모드: 단일 프로세스에서 실행. 개발/테스트용이다.

connect-standalone.sh config/connect-standalone.properties \
  config/my-connector.properties

Distributed 모드: 여러 Worker가 클러스터를 구성한다. 프로덕션 환경의 표준이다.

connect-distributed.sh config/connect-distributed.properties

Distributed 모드의 장점:

  • 자동 부하 분산: Task가 Worker 간에 자동으로 분배된다
  • 고가용성: Worker 장애 시 Task가 다른 Worker로 재할당된다
  • 수평 확장: Worker를 추가하면 처리량이 증가한다
  • REST API 관리: Connector를 REST API로 생성/수정/삭제한다

Source Connector: 외부 → Kafka

JDBC Source Connector

관계형 데이터베이스의 데이터를 Kafka로 가져오는 가장 기본적인 Source Connector이다.

POST /connectors
{
  "name": "postgres-source",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:postgresql://db:5432/mydb",
    "connection.user": "user",
    "connection.password": "password",
    "table.whitelist": "orders,users",
    "mode": "incrementing",
    "incrementing.column.name": "id",
    "topic.prefix": "db-",
    "poll.interval.ms": "5000"
  }
}

이 설정으로 ordersusers 테이블의 데이터가 db-orders, db-users Topic으로 자동 전송된다.

캡처 모드:

모드동작한계
bulk매 폴링마다 전체 테이블 복사대량 데이터에 비효율적
incrementing증가하는 ID 기준으로 새 행만 캡처UPDATE/DELETE 감지 불가
timestamp타임스탬프 기준으로 변경된 행 캡처타임스탬프 열 필요
timestamp+incrementing두 방식 결합가장 안정적이지만 DELETE 감지 불가

JDBC Source의 근본적 한계는 DELETE를 감지할 수 없다는 점이다. 이 문제를 해결하려면 CDC(Change Data Capture) 방식이 필요하다.

Debezium: CDC 기반 Source Connector

Debezium은 데이터베이스의 변경 로그(binlog, WAL 등) 를 직접 읽어 변경 사항을 캡처하는 CDC 플랫폼이다. INSERT, UPDATE, DELETE를 모두 실시간으로 캡처할 수 있다.

POST /connectors
{
  "name": "postgres-cdc",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "db",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "password",
    "database.dbname": "mydb",
    "database.server.name": "pgserver",
    "table.include.list": "public.orders,public.users",
    "plugin.name": "pgoutput",
    "slot.name": "debezium_slot",
    "publication.name": "dbz_publication",
    "topic.prefix": "cdc"
  }
}

Debezium이 캡처하는 메시지에는 변경 전/후 값이 모두 포함된다.

{
  "before": { "id": 1, "status": "pending" },
  "after": { "id": 1, "status": "shipped" },
  "op": "u",
  "ts_ms": 1707696000000,
  "source": { "table": "orders", "lsn": "0/15D68C8" }
}
  • op: 연산 유형 (c=create, u=update, d=delete, r=read/snapshot)
  • before/after: 변경 전/후 레코드 값

Debezium이 지원하는 데이터베이스:

  • PostgreSQL (WAL 기반)
  • MySQL (Binlog 기반)
  • MongoDB (Oplog 기반)
  • SQL Server (CT 기반)
  • Oracle (LogMiner 기반)

Sink Connector: Kafka → 외부

Elasticsearch Sink Connector

Kafka의 데이터를 Elasticsearch에 자동으로 인덱싱한다.

POST /connectors
{
  "name": "es-sink",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "connection.url": "http://elasticsearch:9200",
    "topics": "db-orders",
    "type.name": "_doc",
    "key.ignore": "false",
    "schema.ignore": "true",
    "behavior.on.null.values": "delete",
    "write.method": "upsert"
  }
}

write.method: upsertbehavior.on.null.values: delete를 조합하면, CDC로 캡처된 INSERT/UPDATE/DELETE가 Elasticsearch에 그대로 반영된다.

S3 Sink Connector

Kafka 데이터를 S3에 파일로 저장한다. 데이터 레이크 구축에 활용된다.

POST /connectors
{
  "name": "s3-sink",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "s3.bucket.name": "my-data-lake",
    "s3.region": "ap-northeast-2",
    "topics": "raw-events",
    "flush.size": "10000",
    "rotate.interval.ms": "3600000",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "path.format": "'year'=YYYY/'month'=MM/'day'=dd",
    "locale": "ko_KR",
    "timezone": "Asia/Seoul"
  }
}

Parquet 포맷으로 시간 기반 파티셔닝하여 S3에 저장한다. Hive, Spark, Athena 등에서 바로 쿼리할 수 있는 형태이다.


Converter와 Schema Registry

Kafka Connect에서 데이터의 직렬화 형식을 결정하는 것이 Converter이다.

# Worker 레벨 기본 설정
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

주요 Converter:

Converter특징
JsonConverter사람이 읽을 수 있음. 크기가 큼
AvroConverter스키마 관리 가능. 압축 효율적
ProtobufConverter언어 중립적. 높은 성능
StringConverter단순 문자열. 스키마 없음

Avro를 사용할 때는 Schema Registry와 함께 쓴다. Schema Registry는 스키마 버전을 중앙에서 관리하여, Producer와 Consumer 간의 스키마 호환성을 보장한다.


관리와 모니터링

REST API

Distributed 모드에서는 REST API로 Connector를 관리한다.

# 전체 Connector 목록
curl localhost:8083/connectors

# 특정 Connector 상태 확인
curl localhost:8083/connectors/postgres-source/status

# Connector 일시 중지
curl -X PUT localhost:8083/connectors/postgres-source/pause

# Connector 재개
curl -X PUT localhost:8083/connectors/postgres-source/resume

# Connector 삭제
curl -X DELETE localhost:8083/connectors/postgres-source

# Connector 설정 변경
curl -X PUT localhost:8083/connectors/postgres-source/config \
  -H "Content-Type: application/json" \
  -d '{ "config": "..." }'

에러 처리

Connector가 처리할 수 없는 메시지를 만나면 기본적으로 실패한다. Dead Letter Queue(DLQ) 패턴으로 문제 메시지를 별도 토픽에 격리할 수 있다.

{
  "errors.tolerance": "all",
  "errors.deadletterqueue.topic.name": "dlq-orders",
  "errors.deadletterqueue.topic.replication.factor": 3,
  "errors.deadletterqueue.context.headers.enable": true
}

errors.tolerance: all이면 에러가 발생해도 Connector가 중단되지 않고, 문제 메시지만 DLQ 토픽으로 보낸다. DLQ에 쌓인 메시지를 나중에 분석하고 재처리할 수 있다.


정리

  • Kafka Connect는 코드 없이 설정만으로 외부 시스템과 Kafka를 연결하는 데이터 통합 프레임워크이다
  • Source Connector는 외부 → Kafka, Sink Connector는 Kafka → 외부 방향이다
  • Debezium은 CDC 기반 Source Connector로, DB의 INSERT/UPDATE/DELETE를 모두 실시간 캡처한다
  • Distributed 모드에서는 Worker 간 자동 부하 분산과 고가용성이 보장된다
  • Dead Letter Queue로 에러 메시지를 격리하여 파이프라인의 안정성을 확보한다