데이터 파이프라인을 구축할 때 가장 반복적인 작업은 "A 시스템에서 데이터를 읽어 Kafka에 넣고, Kafka에서 읽어 B 시스템에 쓴다" 는 것이다. 이 작업을 매번 Producer/Consumer 코드로 직접 구현하면, 에러 핸들링, Offset 관리, 스키마 변환 등 동일한 문제를 반복적으로 풀어야 한다.
Kafka Connect는 이 문제를 해결하는 데이터 통합 프레임워크이다. 코드를 작성하지 않고 설정만으로 외부 시스템과 Kafka 사이의 데이터 파이프라인을 구축할 수 있다.
Kafka Connect 아키텍처
핵심 개념
외부 시스템 ──[Source Connector]──→ Kafka Topic ──[Sink Connector]──→ 외부 시스템
- Source Connector: 외부 시스템에서 데이터를 읽어 Kafka Topic에 쓴다 (DB → Kafka)
- Sink Connector: Kafka Topic에서 데이터를 읽어 외부 시스템에 쓴다 (Kafka → DB, ES, S3 등)
- Worker: Connector를 실행하는 프로세스. Standalone 또는 Distributed 모드
- Task: 실제 데이터 복사를 수행하는 단위. 하나의 Connector는 여러 Task로 병렬화된다
- Converter: Kafka 메시지의 직렬화/역직렬화를 담당한다 (JSON, Avro, Protobuf 등)
Standalone vs Distributed 모드
Standalone 모드: 단일 프로세스에서 실행. 개발/테스트용이다.
connect-standalone.sh config/connect-standalone.properties \
config/my-connector.properties
Distributed 모드: 여러 Worker가 클러스터를 구성한다. 프로덕션 환경의 표준이다.
connect-distributed.sh config/connect-distributed.properties
Distributed 모드의 장점:
- 자동 부하 분산: Task가 Worker 간에 자동으로 분배된다
- 고가용성: Worker 장애 시 Task가 다른 Worker로 재할당된다
- 수평 확장: Worker를 추가하면 처리량이 증가한다
- REST API 관리: Connector를 REST API로 생성/수정/삭제한다
Source Connector: 외부 → Kafka
JDBC Source Connector
관계형 데이터베이스의 데이터를 Kafka로 가져오는 가장 기본적인 Source Connector이다.
POST /connectors
{
"name": "postgres-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://db:5432/mydb",
"connection.user": "user",
"connection.password": "password",
"table.whitelist": "orders,users",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "db-",
"poll.interval.ms": "5000"
}
}
이 설정으로 orders와 users 테이블의 데이터가 db-orders, db-users Topic으로 자동 전송된다.
캡처 모드:
| 모드 | 동작 | 한계 |
|---|---|---|
bulk | 매 폴링마다 전체 테이블 복사 | 대량 데이터에 비효율적 |
incrementing | 증가하는 ID 기준으로 새 행만 캡처 | UPDATE/DELETE 감지 불가 |
timestamp | 타임스탬프 기준으로 변경된 행 캡처 | 타임스탬프 열 필요 |
timestamp+incrementing | 두 방식 결합 | 가장 안정적이지만 DELETE 감지 불가 |
JDBC Source의 근본적 한계는 DELETE를 감지할 수 없다는 점이다. 이 문제를 해결하려면 CDC(Change Data Capture) 방식이 필요하다.
Debezium: CDC 기반 Source Connector
Debezium은 데이터베이스의 변경 로그(binlog, WAL 등) 를 직접 읽어 변경 사항을 캡처하는 CDC 플랫폼이다. INSERT, UPDATE, DELETE를 모두 실시간으로 캡처할 수 있다.
POST /connectors
{
"name": "postgres-cdc",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "db",
"database.port": "5432",
"database.user": "debezium",
"database.password": "password",
"database.dbname": "mydb",
"database.server.name": "pgserver",
"table.include.list": "public.orders,public.users",
"plugin.name": "pgoutput",
"slot.name": "debezium_slot",
"publication.name": "dbz_publication",
"topic.prefix": "cdc"
}
}
Debezium이 캡처하는 메시지에는 변경 전/후 값이 모두 포함된다.
{
"before": { "id": 1, "status": "pending" },
"after": { "id": 1, "status": "shipped" },
"op": "u",
"ts_ms": 1707696000000,
"source": { "table": "orders", "lsn": "0/15D68C8" }
}
op: 연산 유형 (c=create,u=update,d=delete,r=read/snapshot)before/after: 변경 전/후 레코드 값
Debezium이 지원하는 데이터베이스:
- PostgreSQL (WAL 기반)
- MySQL (Binlog 기반)
- MongoDB (Oplog 기반)
- SQL Server (CT 기반)
- Oracle (LogMiner 기반)
Sink Connector: Kafka → 외부
Elasticsearch Sink Connector
Kafka의 데이터를 Elasticsearch에 자동으로 인덱싱한다.
POST /connectors
{
"name": "es-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://elasticsearch:9200",
"topics": "db-orders",
"type.name": "_doc",
"key.ignore": "false",
"schema.ignore": "true",
"behavior.on.null.values": "delete",
"write.method": "upsert"
}
}
write.method: upsert와 behavior.on.null.values: delete를 조합하면, CDC로 캡처된 INSERT/UPDATE/DELETE가 Elasticsearch에 그대로 반영된다.
S3 Sink Connector
Kafka 데이터를 S3에 파일로 저장한다. 데이터 레이크 구축에 활용된다.
POST /connectors
{
"name": "s3-sink",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"s3.bucket.name": "my-data-lake",
"s3.region": "ap-northeast-2",
"topics": "raw-events",
"flush.size": "10000",
"rotate.interval.ms": "3600000",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd",
"locale": "ko_KR",
"timezone": "Asia/Seoul"
}
}
Parquet 포맷으로 시간 기반 파티셔닝하여 S3에 저장한다. Hive, Spark, Athena 등에서 바로 쿼리할 수 있는 형태이다.
Converter와 Schema Registry
Kafka Connect에서 데이터의 직렬화 형식을 결정하는 것이 Converter이다.
# Worker 레벨 기본 설정
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
주요 Converter:
| Converter | 특징 |
|---|---|
JsonConverter | 사람이 읽을 수 있음. 크기가 큼 |
AvroConverter | 스키마 관리 가능. 압축 효율적 |
ProtobufConverter | 언어 중립적. 높은 성능 |
StringConverter | 단순 문자열. 스키마 없음 |
Avro를 사용할 때는 Schema Registry와 함께 쓴다. Schema Registry는 스키마 버전을 중앙에서 관리하여, Producer와 Consumer 간의 스키마 호환성을 보장한다.
관리와 모니터링
REST API
Distributed 모드에서는 REST API로 Connector를 관리한다.
# 전체 Connector 목록
curl localhost:8083/connectors
# 특정 Connector 상태 확인
curl localhost:8083/connectors/postgres-source/status
# Connector 일시 중지
curl -X PUT localhost:8083/connectors/postgres-source/pause
# Connector 재개
curl -X PUT localhost:8083/connectors/postgres-source/resume
# Connector 삭제
curl -X DELETE localhost:8083/connectors/postgres-source
# Connector 설정 변경
curl -X PUT localhost:8083/connectors/postgres-source/config \
-H "Content-Type: application/json" \
-d '{ "config": "..." }'
에러 처리
Connector가 처리할 수 없는 메시지를 만나면 기본적으로 실패한다. Dead Letter Queue(DLQ) 패턴으로 문제 메시지를 별도 토픽에 격리할 수 있다.
{
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dlq-orders",
"errors.deadletterqueue.topic.replication.factor": 3,
"errors.deadletterqueue.context.headers.enable": true
}
errors.tolerance: all이면 에러가 발생해도 Connector가 중단되지 않고, 문제 메시지만 DLQ 토픽으로 보낸다. DLQ에 쌓인 메시지를 나중에 분석하고 재처리할 수 있다.
정리
- Kafka Connect는 코드 없이 설정만으로 외부 시스템과 Kafka를 연결하는 데이터 통합 프레임워크이다
- Source Connector는 외부 → Kafka, Sink Connector는 Kafka → 외부 방향이다
- Debezium은 CDC 기반 Source Connector로, DB의 INSERT/UPDATE/DELETE를 모두 실시간 캡처한다
- Distributed 모드에서는 Worker 간 자동 부하 분산과 고가용성이 보장된다
- Dead Letter Queue로 에러 메시지를 격리하여 파이프라인의 안정성을 확보한다