Debezium Connector가 새벽 3시에 죽었다. 알람을 보고 달려갔더니 binlog 포지션이 날아가 있었다. 재시작하면 처음부터 스냅샷을 다시 떠야 하는데, 테이블에는 행이 5천만 개다.
내부 동작을 모르면 이 상황에서 할 수 있는 게 없다. 어디서부터 왜 잘못됐는지, 어떻게 복구해야 하는지. 프레임워크를 믿으려면 먼저 그게 어떻게 돌아가는지 알아야 한다.
Kafka Connect가 왜 나왔을까?
“MySQL 데이터를 Kafka로 보내야 한다.”
간단해 보인다. Producer 코드 짜면 되는 거 아닌가? 직접 짜보자.
// "간단한" MySQL → Kafka 프로듀서
while (true) {
// 어디서부터 읽어야 하지? 마지막 읽은 위치를 어딘가 저장해야 한다
long lastId = readLastOffset(); // 이걸 어디 저장하지? 파일? Redis?
List<Row> rows = jdbc.query(
"SELECT * FROM orders WHERE id > ? ORDER BY id LIMIT 1000", lastId
);
for (Row row : rows) {
producer.send(new ProducerRecord<>("orders", row.toJson()));
saveOffset(row.getId()); // 메시지 전송과 offset 저장이 원자적이지 않다
}
Thread.sleep(1000); // 폴링 주기. 1초? 5초? 어떻게 정하지?
}
이미 문제가 보이기 시작한다.
- 서버가 죽으면 어디서부터 다시 읽나?
saveOffset()을 호출하기 직전에 죽으면? - 이 프로세스를 N개 띄워서 병렬로 읽게 하려면 어떻게 해야 하나?
- 폴링이다.
id > lastId로 읽으면 UPDATE는 어떻게 잡나? DELETE는?
그리고 이 코드는 MySQL 하나에 대한 코드다.
사내에 MySQL이 5개, PostgreSQL이 2개, MongoDB가 1개, S3가 10개의 소스가 있다고 해보자. 각 소스마다 이 코드를 짜야 한다. offset 관리 코드, 재시작 복구 코드, 병렬화 코드, 에러 처리 코드 — 모두 비슷하게 반복된다.
Kafka Connect는 이 반복을 없애기 위해 나왔다.
“어떻게 데이터를 읽을 것인가”와 “어떻게 데이터를 쓸 것인가”만 구현하면 된다. offset 관리, 재시작 복구, Worker 간 조정, 에러 격리 — 이건 프레임워크가 처리한다.
Connector, Task, Worker를 왜 세 개로 나눠놨나?
하나의 클래스에 다 넣으면 안 되는 이유가 있다.
만약 하나로 합쳤다면:
MySqlConnectorTask {
- 설정 파싱
- "Task를 몇 개 띄울지" 결정
- 실제 binlog 읽기
- Kafka에 쓰기
}
MySQL 테이블이 10개고 이걸 병렬로 읽고 싶다. “Task를 10개 띄워서 테이블마다 하나씩 맡기자.” 근데 설정 파싱과 실제 I/O가 같은 클래스에 있으면 10개를 어떻게 뜨나? 하나의 설정이 10개로 분리되는 로직이 실제 I/O 로직과 뒤엉킨다.
그래서 나눴다.
┌─────────────────────────────────────────────────┐
│ Worker Process │
│ ┌──────────────┐ ┌────────────────────────┐ │
│ │ Connector │ │ REST API (:8083) │ │
│ │ (설정/감독) │ └────────────────────────┘ │
│ └──────┬───────┘ │
│ │ "테이블 10개, Task 10개 띄워" │
│ ┌──────▼──────┐ ┌──────────┐ ┌──────────┐ │
│ │ Task 0 │ │ Task 1 │ │ Task 2 │ │
│ │ (orders) │ │ (users) │ │(products)│ │
│ └─────────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────┘
Worker는 JVM 프로세스다. 포트 8083으로 REST API를 열고, Connector와 Task의 생명주기를 관리한다. Worker가 죽으면 Distributed mode에서 다른 Worker가 Task를 이어받는다.
Connector는 감독자다. 설정을 파싱하고 “이 설정으로 Task를 몇 개, 어떻게 나눌 것인지”를 결정한다. 데이터를 직접 건드리지 않는다.
Task가 실제로 poll()을 수행하고 데이터를 Kafka에 쓴다.
이 분리 덕분에 Connector 코드를 바꾸지 않고도 Task 수를 늘려서 병렬성을 높일 수 있다. tasks.max=10 설정 하나로.
Standalone vs Distributed:
| Standalone | Distributed | |
|---|---|---|
| offset 저장 | 로컬 파일 | Kafka 토픽 (connect-offsets) |
| 장애 복구 | Worker 죽으면 끝 | 다른 Worker가 Task 인계 |
| 설정 방법 | properties 파일 | REST API |
| 용도 | 개발, 테스트 | 프로덕션 |
프로덕션에서는 무조건 Distributed다. Worker 하나가 죽어도 파이프라인이 살아있다.
MySQL 데이터를 어떻게 읽어오나? 폴링이면 어떤 문제가 있나?
Source Connector는 폴링이다. Kafka Connect 프레임워크가 poll()을 무한 루프로 호출한다.
// Kafka Connect 프레임워크가 반복 호출
public interface SourceTask {
List<SourceRecord> poll() throws InterruptedException;
void commitRecord(SourceRecord record, RecordMetadata metadata);
}
“폴링이면 문제 없는 거 아닌가?” — 아니다.
폴링 방식의 근본적인 문제가 있다. SELECT * FROM orders WHERE id > ? LIMIT 1000 으로 읽으면:
- UPDATE를 잡을 수 없다. 행이 변경됐는데 id는 그대로다. 이미 지나쳤다.
- DELETE를 잡을 수 없다. 행이 사라지면 폴링 쿼리로 감지 방법이 없다.
- updated_at 컬럼으로 잡으면? 인덱스가 있으면 괜찮지만, 폴링 주기 사이에 UPDATE가 여러 번 발생하면 중간 상태를 놓친다.
updated_at이 없는 테이블은?
그래서 CDC(Change Data Capture)가 나왔다.
CDC의 핵심 아이디어는 간단하다. “SELECT로 읽지 말고, DB가 내부적으로 기록하는 변경 로그를 읽자.”
MySQL은 복제(replication)를 위해 binlog라는 변경 로그를 이미 쓰고 있다. CREATE든 UPDATE든 DELETE든 모든 변경이 binlog에 순서대로 기록된다. Debezium은 이걸 읽는다.
binlog가 뭔데 Debezium이 읽나? 없으면 어떻게 변경을 감지하나?
MySQL binlog는 MySQL이 복제(replication)를 위해 만든 내부 변경 이력 파일이다.
마스터 MySQL이 있고 슬레이브 MySQL이 있을 때, 슬레이브는 마스터의 binlog를 읽어서 “마스터가 한 것과 똑같이” 실행한다. 이 복제 프로토콜이 이미 잘 만들어져 있다.
Debezium은 이 프로토콜을 그대로 가져다 쓴다. MySQL에 “나 슬레이브야”라고 연결하고 binlog 스트리밍을 받는다. MySQL 입장에서는 슬레이브 하나가 더 붙은 것처럼 보인다.
binlog가 없다면 — 실제로 binlog 비활성화 상태에서 변경을 감지하려면:
-- 폴링으로 UPDATE 잡기 시도
SELECT * FROM orders WHERE updated_at > '2026-06-29 10:00:00'
이게 바로 “한계가 있는 폴링”이다. DELETE는 잡지 못하고, updated_at이 없는 테이블은 UPDATE도 못 잡는다.
binlog를 쓰면 완전히 다르다. 행이 INSERT됐는지, 어떤 컬럼이 어떤 값에서 어떤 값으로 UPDATE됐는지, DELETE됐는지 — 정확히 알 수 있다.
Debezium이 MySQL에 연결하기 위한 설정:
-- binlog 활성화 확인
SHOW VARIABLES LIKE 'log_bin';
-- Value: ON 이어야 한다
-- Debezium 전용 계정 (복제 슬레이브 권한 필요)
CREATE USER 'debezium'@'%' IDENTIFIED BY 'secret';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'@'%';
FLUSH PRIVILEGES;
REPLICATION SLAVE 권한이 없으면 Debezium은 binlog 스트리밍을 시작할 수 없다.
중요한 것이 하나 있다. Debezium이 사용하는 server.id는 MySQL 복제 토폴로지 안에서 유일해야 한다. 이미 슬레이브가 있다면 그 슬레이브의 ID와 달라야 한다. 겹치면 기존 복제가 끊어진다.
binlog_format=ROW가 왜 필수인가? STATEMENT 방식이 왜 위험한가?
MySQL binlog는 변경 이벤트를 세 가지 방식으로 기록할 수 있다.
STATEMENT 방식 — SQL 문 자체를 기록한다.
-- binlog에 이렇게 기록됨
UPDATE orders SET status = 'shipped' WHERE customer_id = 42
Debezium이 이 로그를 읽는다. 그러면 “status가 shipped로 바뀌었다”는 건 알겠는데, 어떤 행들이 바뀌었는지 어떻게 알까?
모른다.
customer_id = 42인 주문이 1개인지 100개인지 binlog만 봐서는 알 수 없다. 실제로 실행해야 알 수 있는데, 재실행하는 시점에 DB 상태가 달라져 있을 수 있다.
더 심각한 문제가 있다.
-- 비결정적 함수를 쓴 쿼리
INSERT INTO events VALUES (UUID(), NOW(), 'payment_completed')
Debezium이 이 SQL을 재실행하면 다른 UUID, 다른 NOW()가 나온다. 완전히 다른 데이터가 만들어진다.
ROW 방식 — 변경된 행의 before/after 이미지를 기록한다.
// ROW 방식 binlog 내용 (개념적 표현)
operation: UPDATE
table: orders
before: { id: 1001, status: 'pending', customer_id: 42 }
after: { id: 1001, status: 'shipped', customer_id: 42 }
operation: UPDATE
table: orders
before: { id: 1002, status: 'pending', customer_id: 42 }
after: { id: 1002, status: 'shipped', customer_id: 42 }
customer_id = 42인 주문이 2개였다면 ROW 이벤트가 2개 나온다. 각각 어떤 행이 어떤 값에서 어떤 값으로 바뀌었는지 정확히 담겨 있다.
이것이 CDC에서 ROW가 필수인 이유다. “무엇이 어떻게 바뀌었는지”를 정확히 알아야 한다.
-- 현재 설정 확인
SHOW VARIABLES LIKE 'binlog_format'; -- ROW 이어야 함
SHOW VARIABLES LIKE 'binlog_row_image'; -- FULL 이어야 함
# my.cnf
[mysqld]
binlog_format = ROW
binlog_row_image = FULL # before/after 모두 기록. MINIMAL이면 변경된 컬럼만 기록
log_bin = /var/log/mysql/mysql-bin.log
binlog_expire_logs_seconds = 604800 # 7일 보존 (MySQL 8.0+)
binlog_row_image=FULL이 아니면 변경되지 않은 컬럼의 before 이미지가 없다. Debezium은 FULL을 기대한다. MINIMAL로 설정하면 특정 상황에서 before 이미지가 불완전하게 온다.
Connector가 재시작되면 어디서부터 읽나? offset이 없으면 어떻게 되나?
Worker가 재시작됐다. binlog 어디서부터 다시 읽어야 할까?
offset 정보가 없으면 두 가지 선택지밖에 없다.
- 처음부터 다시 읽는다 → 이미 Kafka에 쓴 데이터를 또 쓴다. 중복.
- 현재 시점부터 읽는다 → 재시작 전에 발생한 변경을 놓친다. 유실.
둘 다 나쁘다.
Kafka Connect는 이 문제를 connect-offsets 토픽으로 해결한다. Task가 처리한 위치를 이 토픽에 기록하고, 재시작하면 여기서 읽어서 이어서 시작한다.
poll() 호출
→ binlog에서 이벤트 읽기
→ SourceRecord 생성 (sourcePartition + sourceOffset 포함)
→ Kafka 토픽에 쓰기 성공
→ commitRecord() 호출
→ connect-offsets 토픽에 offset flush
여기서 sourceOffset의 내용이 중요하다.
// connect-offsets 토픽에 저장되는 Debezium offset
{
"sourcePartition": { "server": "mysql-prod" },
"sourceOffset": {
"file": "mysql-bin.000042",
"pos": 195621,
"snapshot": false
}
}
“mysql-bin.000042 파일의 195621번 위치까지 읽었다.” Worker가 재시작되면 이 값을 읽어서 여기서부터 binlog를 이어서 읽는다.
Consumer offset(__consumer_offsets)과 Source connector offset(connect-offsets)이 별개 토픽인 이유가 있다. Consumer offset은 Kafka 파티션 내 숫자 위치다. Source connector offset은 MySQL binlog 파일명과 위치 — 구조 자체가 다르다. 같은 저장소에 넣으면 의미가 충돌한다.
Distributed mode에서 offset 관련 필수 설정:
# connect-distributed.properties
bootstrap.servers=localhost:9092
group.id=connect-cluster
offset.storage.topic=connect-offsets
config.storage.topic=connect-configs
status.storage.topic=connect-status
# 토픽 replication factor — 클러스터라면 3, 단일 브로커라면 1
offset.storage.replication.factor=1
config.storage.replication.factor=1
status.storage.replication.factor=1
# offset을 얼마나 자주 flush하나 (기본 60000ms = 60초)
# 60초면 재시작 시 최악의 경우 60초치 중복이 생긴다
offset.flush.interval.ms=10000 # 10초로 줄이면 중복 윈도우 축소
offset.flush.timeout.ms=5000
Connect는 at-least-once 보장이다. flush 직전에 Worker가 죽으면 마지막 flush 이후 처리된 이벤트가 재시작 후 중복으로 온다. 완전히 없애려면 Sink에서 idempotent write가 필요하다 — 이건 나중에 다룬다.
snapshot.mode 옵션들이 왜 이렇게 많은가?
처음 Connector를 배포했다. connect-offsets에 저장된 offset이 없다. “어디서부터 읽지?”
기존 데이터가 100만 행 있다. 이걸 어떻게 처리할 것인가 — 이게 snapshot.mode다.
시나리오 1: 이 데이터를 다른 시스템으로 모두 이관해야 한다
snapshot.mode: initial 을 쓴다. 현재 테이블 전체를 읽어서 Kafka로 보내고(스냅샷), 이후부터는 binlog를 실시간으로 추적한다.
스냅샷 중에도 binlog는 계속 쌓인다. 스냅샷이 끝나면 스냅샷 시작 시점의 binlog 위치부터 이어서 읽는다. 데이터 누락이 없다.
시나리오 2: 기존 데이터는 이미 다른 방법으로 이관했다. 앞으로의 변경만 캡처하면 된다
snapshot.mode: schema_only 를 쓴다. 스키마 정보만 가져오고, 기존 행은 읽지 않는다. 이 순간 이후의 INSERT/UPDATE/DELETE만 Kafka로 보낸다.
시나리오 3: 데이터 이관은 필요 없고, 딱 지금 이 순간부터의 변경만 필요하다
snapshot.mode: never 를 쓴다. 스냅샷 없이 현재 binlog 위치부터 시작한다.
| 모드 | 기존 데이터 처리 | 언제 쓰나 |
|---|---|---|
initial | 전체 스냅샷 후 binlog 추적 | 최초 세팅, 기존 데이터 필요 시 |
initial_only | 스냅샷만 하고 종료 | 일회성 데이터 이관 |
schema_only | 스키마만, 기존 행 건너뜀 | 이미 다른 방법으로 초기 동기화 완료 |
never | 스냅샷 없이 현재 시점부터 | 실시간 변경분만 필요 |
when_needed | offset 없을 때만 스냅샷 | 재시작 시 안전 |
initial 모드의 함정 — 테이블 잠금:
{
"snapshot.locking.mode": "minimal"
}
initial 스냅샷을 시작할 때 Debezium은 잠깐 테이블 잠금을 건다. “지금 이 순간의 일관된 상태를 읽겠다”는 보장을 위해서다.
잠금을 none으로 설정하면 잠금 없이 읽는다. 스냅샷 도중 다른 트랜잭션이 데이터를 변경하면 스냅샷 데이터가 일관되지 않을 수 있다. minimal이 기본값이고 권장값이다. 잠금은 아주 잠깐(초 단위)이고, 스냅샷 진행 중에는 일반 SELECT 모드로 읽는다.
왜 schema.history.internal.kafka.topic이 별도로 있나:
{
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schemahistory.mysql-prod"
}
binlog 위치만으로는 충분하지 않다. “파일 X, 위치 Y에 있는 이벤트가 어떤 스키마를 기준으로 쓰여진 것인지”도 알아야 한다.
6개월 전 binlog 이벤트를 재처리해야 하는 상황을 생각해보자. 그 당시 테이블 스키마가 지금과 다르다. 그 시점의 스키마를 알아야 이벤트를 올바르게 파싱할 수 있다.
schema.history.internal.kafka.topic이 바로 그 스키마 변경 이력을 저장하는 곳이다. DDL이 바뀔 때마다 여기에 기록된다.
Sink에서 중복 메시지가 오면 어떻게 되나?
at-least-once 보장이라고 했다. Sink Connector가 재시작되면 이미 처리한 메시지가 또 올 수 있다.
중복을 처리하지 않으면?
orders 테이블에 id=1001, amount=50000인 주문이 있다.
Kafka에서 이 INSERT 이벤트를 Sink가 두 번 처리하면?
→ Sink DB에 id=1001짜리 행이 두 개 생긴다 (PK 제약 위반 또는 중복 데이터)
JDBC Sink Connector에서 이 문제를 해결하는 방법이 insert.mode: upsert다.
{
"name": "jdbc-sink-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:mysql://target-db:3306/mydb",
"connection.user": "writer",
"connection.password": "secret",
"topics": "mysql-prod.mydb.orders",
"auto.create": "false",
"insert.mode": "upsert",
"pk.mode": "record_key",
"pk.fields": "id",
"delete.enabled": "true",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dlq-jdbc-sink",
"errors.deadletterqueue.topic.replication.factor": "1",
"errors.deadletterqueue.context.headers.enable": "true"
}
}
insert.mode: upsert는 INSERT ... ON DUPLICATE KEY UPDATE다. 같은 PK가 두 번 와도 두 번째는 그냥 덮어쓴다. 데이터 오염이 없다.
at-least-once + upsert = effectively exactly-once.
delete.enabled: true는 DELETE 이벤트를 처리한다. Debezium이 DELETE 이벤트를 보낼 때 value가 null인 tombstone 메시지를 보내는데, 이걸 Sink에서 실제 DELETE로 처리한다.
DLQ(Dead Letter Queue) 없이 errors.tolerance: all은 위험하다:
errors.tolerance: none (기본값)
→ 첫 번째 에러에서 Connector가 FAILED 상태
→ 운영자가 즉시 인지, 조치 필요
errors.tolerance: all (DLQ 없이)
→ 에러 레코드를 조용히 건너뜀
→ 데이터가 처리 안 됐는데 아무도 모름
→ Kafka consumer group lag은 줄어드는데 Sink에는 데이터가 없다
DLQ를 설정하면 에러 레코드가 거기로 간다. DLQ 헤더에 에러 원인이 담긴다:
__connect.errors.exception.class.name: org.apache.kafka.connect.errors.DataException
__connect.errors.exception.message: Failed to deserialize data for topic...
__connect.errors.topic: mysql-prod.mydb.orders
__connect.errors.offset: 14523
DLQ 모니터링 없이 errors.tolerance: all을 쓰면 데이터 손실이 발생한다. DLQ에 메시지가 쌓이면 알람이 가야 한다.
Spring Boot + JPA 코드가 Debezium으로 어떻게 보이나?
JPA로 저장하면 binlog에는 언제 기록되나?
트랜잭션이 커밋되는 순간이다.
@Transactional
public Order createOrder(OrderRequest req) {
Order order = Order.builder()
.customerId(req.getCustomerId())
.amount(req.getAmount())
.status(OrderStatus.PENDING)
.build();
orderRepository.save(order);
// 이 시점에는 binlog에 아무것도 없다
// DB에 쓰기는 했지만 트랜잭션이 아직 열려 있다
notificationService.send(order); // 이것도 같은 트랜잭션 안에서
return order;
// @Transactional 메서드 종료 = 트랜잭션 커밋
// 이 시점에 binlog에 모든 변경이 원자적으로 기록된다
}
같은 트랜잭션에서 여러 엔티티를 저장하면 binlog에는 커밋 시점에 모두 한꺼번에 들어간다. 트랜잭션이 롤백되면 binlog에는 아무것도 없다.
Debezium 이벤트 구조:
{
"op": "c",
"before": null,
"after": {
"id": 1001,
"customer_id": 42,
"amount": 50000,
"status": "PENDING",
"created_at": 1719619200000
},
"source": {
"version": "2.5.0.Final",
"connector": "mysql",
"name": "mysql-prod",
"ts_ms": 1719619200123,
"snapshot": "false",
"db": "mydb",
"table": "orders",
"server_id": 1,
"file": "mysql-bin.000042",
"pos": 195621
},
"ts_ms": 1719619200456
}
op 값: c=create, u=update, d=delete, r=read(스냅샷).
@Version이 있는 낙관적 락 엔티티의 함정:
@Entity
public class Product {
@Id private Long id;
private int stock;
@Version private Long version; // JPA 낙관적 락
}
stock을 10 → 9로 변경하면 Debezium 이벤트:
{
"op": "u",
"before": { "id": 42, "stock": 10, "version": 5 },
"after": { "id": 42, "stock": 9, "version": 6 }
}
version 컬럼이 이벤트에 포함된다.
문제는 Sink 쪽이 JPA 엔티티로 이 데이터를 저장할 때다. JPA가 version=6인 행을 저장하려고 할 때, 현재 DB에 있는 version이 다르면 OptimisticLockException이 난다. Sink는 보통 메시지를 순서대로 처리하지만 재시작 시 재처리되면 version 불일치가 생긴다.
해결책 두 가지:
// SMT로 version 컬럼 제거
"transforms": "removeVersion",
"transforms.removeVersion.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.removeVersion.blacklist": "version"
또는 Sink를 순수 JDBC(JPA 아님)로 처리하고 insert.mode: upsert로 덮어쓴다.
“DB에 저장됐다 = Debezium이 곧 이벤트를 발행한다”는 맞는데, “즉시”는 아니다. binlog → Debezium poll() → Kafka 쓰기까지 수십~수백 ms가 있다. 이 사이에 downstream이 DB를 직접 읽으면 불일치가 생긴다. CDC와 직접 DB 읽기를 혼용할 때 주의해야 한다.
DDL이 바뀌면 왜 Connector가 죽는가? 어떻게 방어하나?
ALTER TABLE orders ADD COLUMN discount_amount DECIMAL(10,2) 를 실행했다.
Debezium이 이 DDL 이벤트를 binlog에서 읽는다. schema history 토픽에 기록하고 새 스키마로 업데이트한다. 새 컬럼이 포함된 이벤트부터 discount_amount 필드가 이벤트에 나타난다.
대부분의 경우 이건 자동으로 처리된다.
문제가 되는 케이스:
1. 컬럼 삭제: Sink가 해당 컬럼을 필수로 기대하고 있다면 에러.
2. 컬럼 타입 변경 (INT → VARCHAR): Sink가 숫자로 받아야 할 컬럼에 문자열이 온다. DataException.
3. NOT NULL 컬럼 추가 (DEFAULT 없이): 기존 행을 재처리하거나 스냅샷할 때 NULL 값이 온다. Sink의 NOT NULL 제약을 위반한다.
4. DDL 변경 중 Connector가 죽으면: schema history가 부분 적용 상태가 된다. 재시작 시 Debezium이 스키마 불일치를 감지하고 에러를 낸다.
에러 예시:
Schema not found in database history for column 'discount_amount'
방어 절차:
# 1. DDL 변경 전 Connector pause
curl -X PUT http://connect:8083/connectors/mysql-source/pause
# 2. PAUSED 상태 확인 (바로 안 될 수 있다. Task가 진행 중인 poll을 마쳐야 PAUSED가 된다)
watch 'curl -s http://connect:8083/connectors/mysql-source/status | jq ".tasks[].state"'
# "PAUSED" 가 나올 때까지 기다린다
# 3. DDL 실행
mysql -e "ALTER TABLE orders ADD COLUMN discount_amount DECIMAL(10,2) DEFAULT 0;"
# 4. Connector resume
curl -X PUT http://connect:8083/connectors/mysql-source/resume
10초 pause가 파이프라인 재설치 작업보다 훨씬 낫다.
처음부터 영향을 줄이는 방법 — column.exclude.list:
"column.exclude.list": "mydb.orders.internal_temp,mydb.users.password_hash"
민감 데이터나 자주 변경되는 임시 컬럼을 CDC 이벤트에서 처음부터 제외하면, 해당 컬럼의 DDL 변경이 Connector에 영향을 주지 않는다.
Debezium 이벤트를 Sink 친화적으로 변환하는 SMT:
Debezium이 만드는 이벤트는 op, before, after, source 필드를 가진 envelope 구조다. Sink로 보낼 때는 after 내용만 필요한 경우가 많다.
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,table,source.ts_ms"
ExtractNewRecordState가 after 내용을 최상위로 올린다. add.fields로 op(변경 종류)와 ts_ms(변경 시각)를 메타 필드로 추가할 수 있다.
토픽 이름을 바꾸고 싶으면:
"transforms": "routeByTable",
"transforms.routeByTable.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.routeByTable.regex": "mysql-prod\\.mydb\\.(.*)",
"transforms.routeByTable.replacement": "cdc-$1"
mysql-prod.mydb.orders → cdc-orders로 바뀐다.
최적화 — 무엇을 어떻게 튜닝하나
Source Connector 핵심 설정:
{
"max.batch.size": "2048",
"max.queue.size": "16384",
"poll.interval.ms": "500",
"snapshot.fetch.size": "10240",
"heartbeat.interval.ms": "10000",
"heartbeat.action.query": "INSERT INTO mydb.debezium_heartbeat (ts) VALUES (NOW()) ON DUPLICATE KEY UPDATE ts=NOW()"
}
max.batch.size:poll()이 한 번에 반환하는 최대 레코드 수. 높이면 throughput 증가, 처리 지연 증가. 기본값 2048이 보통 적절하다.max.queue.size: 내부 큐 크기.poll()이 가져온 레코드를 Kafka에 쓰기 전에 여기에 쌓인다. 큐가 가득 차면poll()호출이 블록된다. 기본값 8192.heartbeat.interval.ms: 변경이 없을 때도 heartbeat 이벤트를 보내 binlog 포지션을 advance시킨다. 변경이 오래 없으면 MySQL이 오래된 binlog를 purge할 수 있다. 그 이후에 재시작하면 저장된 포지션이 이미 없어진 binlog를 가리킨다.heartbeat.action.query: heartbeat용 테이블에 직접 써서 binlog에 실제 이벤트를 만든다. 이 쿼리 없이 heartbeat만 설정하면 Debezium 내부 이벤트만 보내고 binlog 포지션이 advance되지 않는 경우가 있다.
Worker 핵심 설정:
# converter — JSON은 간단하지만 스키마 정보가 메시지를 크게 만든다
# 프로덕션에서는 Avro + Schema Registry가 표준
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false # true면 메시지 크기 2~3배
value.converter.schemas.enable=false
# offset flush 주기
offset.flush.interval.ms=10000
offset.flush.timeout.ms=5000
튜닝 — 병목 지표로 무엇을 봐야 하나
측정 없이 설정을 바꾸면 어디서 개선됐는지 알 수 없다.
Source Connector 상태 확인:
# Connector 전체 상태
curl http://connect:8083/connectors/mysql-source/status | jq
# 핵심 Debezium JMX/Prometheus 지표
debezium_metrics_MilliSecondsSinceLastEvent # 마지막 이벤트 이후 경과 시간
debezium_metrics_QueueRemainingCapacity # 내부 큐 여유 용량
debezium_metrics_TotalNumberOfEventsSeen # 총 처리 이벤트 수
# MilliSecondsSinceLastEvent > 30000 (30초) → Source가 막혀 있거나 변경이 없는 것
# QueueRemainingCapacity가 0에 가까움 → Kafka 쓰기가 병목
Sink Connector lag 확인:
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--describe \
--group connect-jdbc-sink-connector
# LAG 컬럼이 지속적으로 증가 → Sink 처리량이 Source를 못 따라가는 것
# tasks.max 증가 + 파티션 수 맞추기로 해결
병목 시나리오별 대응:
| 증상 | 원인 | 조치 |
|---|---|---|
MilliSecondsSinceLastEvent 급증 | binlog 접근 지연 또는 MySQL 부하 | poll.interval.ms 줄이기, MySQL 서버 상태 확인 |
QueueRemainingCapacity = 0 | Kafka 쓰기 병목 | max.queue.size 늘리기, 브로커 상태 확인 |
| Sink LAG 증가 | Sink 처리 느림 | tasks.max 증가, 파티션 수 조정 |
| Connector 반복 restart | 처리 못하는 레코드 존재 | DLQ 설정, 에러 원인 분석 |
운영상 주의사항 — 프로덕션에서 실제로 깨지는 것들
1. schema.history 토픽이 삭제되면 Connector를 재설치해야 한다
이 토픽이 없어지면 Debezium이 스키마 히스토리를 재구성할 수 없다. initial 모드로 처음부터 다시 스냅샷해야 한다. 5천만 행 테이블이면 몇 시간짜리 작업이다.
# 이 토픽은 절대 retention을 짧게 설정하면 안 된다
kafka-configs.sh --bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name schemahistory.mysql-prod \
--alter --add-config retention.ms=-1 # 무한 보존
2. binlog가 purge되면 Connector가 죽는다
binlog_expire_logs_seconds=604800(7일)이면 7일 넘게 Connector가 멈춰 있다가 재시작하면 저장된 포지션이 이미 없어진 binlog를 가리킨다.
에러: The connector is trying to read binlog starting at
file 'mysql-bin.000010', position 12345,
but this is no longer available on the server.
이 경우 offset을 수동으로 리셋하거나 snapshot.mode=initial로 재설치해야 한다.
heartbeat.action.query를 설정해두면 binlog 포지션이 계속 advance되므로 오래된 포지션을 가리키는 상황을 방지할 수 있다.
3. connect-offsets 토픽의 cleanup.policy가 잘못되면 offset이 사라진다
# connect-offsets 설정 확인
kafka-configs.sh --bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name connect-offsets \
--describe
# cleanup.policy=compact 이어야 한다
# delete면 retention 기간 이후 offset이 사라진다
kafka-configs.sh --bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name connect-offsets \
--alter --add-config cleanup.policy=compact
compact 정책이면 같은 key의 가장 최신 값만 보존된다. offset은 절대 만료되지 않는다.
4. DLQ를 모니터링하지 않으면 조용히 데이터가 사라진다
# DLQ lag 확인 (이 값이 0이 아니면 알람 필요)
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --group dlq-monitor-group
# Prometheus alert rule 예시
# kafka_consumer_records_lag{topic="dlq-jdbc-sink"} > 0
errors.tolerance: all 설정의 의미를 다시 생각해보자. “에러가 나도 파이프라인을 멈추지 않겠다”는 것이지 “에러를 무시하겠다”가 아니다. DLQ는 반드시 모니터링해야 한다.
5. Worker JVM 메모리와 GC
# Connect worker 기본 힙 설정 확인
export KAFKA_HEAP_OPTS="-Xmx2g -Xms2g"
Worker는 브로커가 아니라 Connector 로직을 실행하는 JVM이다. 힙을 너무 작게 주면 OOM이 나고, 너무 크게 주면 GC pause가 길어진다. 2~4GB가 보통 적절하다. GC 로그를 확인해서 Full GC가 자주 발생하는지 확인해야 한다.
핵심 세 가지:
-
Offset은 Kafka 토픽에, 스키마 히스토리도 Kafka 토픽에 — 둘 다 retention.ms=-1(무한 보존)로 설정한다. 이 토픽이 날아가면 파이프라인 재설치다.
-
At-least-once가 기본 — Sink는 항상 upsert로, DLQ는 항상 모니터링과 함께.
-
DDL 변경 전 pause — 10초 pause이 몇 시간짜리 스냅샷 재설치보다 낫다.