728x90
반응형
[CDC] MySQL Debezium Change Data Capture 따라해보기
io.apicurio.registry.utils.converter.AvroConverter를 사용해서 설정을 하고 싶었는데
잘 안되서 혹시 아신다면 댓글 부탁 드립니다.
실제 데모 구현한 프로젝트 레포 입니다.
https://github.com/sanggi-wjg/spring-cdc-debezium-demo
Docker-compose.yaml
version: "3.8"
name: cdc-docker-work-space
services:
cdc-demo-mysql-master:
image: mysql:8.0
container_name: cdc-demo-mysql-master
networks:
- cdc_network
ports:
- "10001:3306"
environment:
- MYSQL_DATABASE=demo
- MYSQL_ROOT_PASSWORD=rootroot
volumes:
- ./config/master-my.cnf:/etc/mysql/my.cnf
cdc-demo-mysql-slave:
image: mysql:8.0
container_name: cdc-demo-mysql-slave
networks:
- cdc_network
depends_on:
- cdc-demo-mysql-master
ports:
- "10002:3306"
environment:
- MYSQL_DATABASE=demo
- MYSQL_ROOT_PASSWORD=rootroot
volumes:
- ./config/slave-my.cnf:/etc/mysql/my.cnf
cdc-demo-mysql-target:
image: mysql:8.0
container_name: cdc-demo-mysql-target
networks:
- cdc_network
depends_on:
- cdc-demo-mysql-master
- cdc-demo-mysql-slave
ports:
- "10003:3306"
environment:
- MYSQL_DATABASE=target
- MYSQL_ROOT_PASSWORD=rootroot
volumes:
- ./config/master-my.cnf:/etc/mysql/my.cnf
cdc-zookeeper:
image: debezium/zookeeper:2.5.1.Final # https://hub.docker.com/r/debezium/zookeeper
platform: linux/arm64/v8
hostname: cdc-zookeeper
container_name: cdc-zookeeper
networks:
- cdc_network
depends_on:
- cdc-demo-mysql-master
- cdc-demo-mysql-slave
- cdc-demo-mysql-target
ports:
- "2181:2181"
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_INIT_LIMIT: 5
ZOOKEEPER_SYNC_LIMIT: 2
cdc-kafka:
image: debezium/kafka:2.5.1.Final # https://hub.docker.com/r/debezium/kafka
platform: linux/arm64/v8
container_name: cdc-kafka
networks:
- cdc_network
depends_on:
- cdc-zookeeper
ports:
- "9092:9092"
environment:
ZOOKEEPER_CONNECT: cdc-zookeeper:2181
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_LISTENERS: INTERNAL://:29092,EXTERNAL://:9092
KAFKA_ADVERTISED_LISTENERS: INTERNAL://cdc-kafka:29092,EXTERNAL://localhost:9092
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
cdc-kafka-ui:
image: provectuslabs/kafka-ui
container_name: cdc-kafka-ui
networks:
- cdc_network
depends_on:
- cdc-zookeeper
- cdc-kafka
ports:
- "8084:8080"
environment:
- KAFKA_CLUSTERS_0_NAME=Local-Cluster
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=cdc-kafka:29092
- KAFKA_CLUSTERS_0_ZOOKEEPER=cdc-zookeeper:2181
schema-registry:
image: confluentinc/cp-schema-registry
container_name: schema-registry
networks:
- cdc_network
depends_on:
- cdc-zookeeper
- cdc-kafka
ports:
- "8181:8181"
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
# SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: cdc-zookeeper:2181 # deprecated
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://cdc-kafka:29092'
SCHEMA_REGISTRY_DEBUG: true
# https://debezium.io/documentation/reference/stable/configuration/avro.html#deploying-with-debezium-containers
connector:
image: debezium/connect:2.5.1.Final # https://hub.docker.com/r/debezium/connect
platform: linux/arm64/v8
container_name: connector
networks:
- cdc_network
ports:
- "8083:8083"
depends_on:
- cdc-zookeeper
- cdc-kafka
- schema-registry
environment:
- BOOTSTRAP_SERVERS=cdc-kafka:29092
- GROUP_ID=debezium-connector-1
- CONFIG_STORAGE_TOPIC=cdc_connect_config_storage
- CONFIG_STORAGE_REPLICATION_FACTOR=1
- OFFSET_STORAGE_TOPIC=cdc_connect_offset_storage
- OFFSET_STORAGE_REPLICATION_FACTOR=1
- STATUS_STORAGE_TOPIC=cdc_connect_status_storage
- STATUS_STORAGE_REPLICATION_FACTOR=1
- ENABLE_DEBEZIUM_KC_REST_EXTENSION=true
- ENABLE_DEBEZIUM_SCRIPTING=true
- LOG_LEVEL=INFO
- KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
- VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
# - KEY_CONVERTER=io.confluent.connect.avro.AvroConverter
# - VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
# - CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081
# - CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081
- INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
- INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
# - CONNECT_PLUGIN_PATH=/kafka/connect
volumes:
- ./plugins/common-config.jar:/kafka/connect/common-config.jar
- ./plugins/common-utils.jar:/kafka/connect/common-utils.jar
- ./plugins/kafka-avro-serializer.jar:/kafka/connect/kafka-avro-serializer.jar
- ./plugins/kafka-connect-avro-data.jar:/kafka/connect/kafka-connect-avro-data.jar
- ./plugins/kafka-connect-avro-converter.jar:/kafka/connect/kafka-connect-avro-converter.jar
- ./plugins/kafka-schema-converter.jar:/kafka/connect/kafka-schema-converter.jar
- ./plugins/kafka-schema-registry-client.jar:/kafka/connect/kafka-schema-registry-client.jar
- ./plugins/kafka-schema-serializer.jar:/kafka/connect/kafka-schema-serializer.jar
debezium-ui:
image: debezium/debezium-ui:2.1.2.Final # https://hub.docker.com/r/debezium/debezium-ui
platform: linux/arm64/v8
container_name: debezium-ui
networks:
- cdc_network
ports:
- "8082:8080"
depends_on:
- cdc-zookeeper
- cdc-kafka
- schema-registry
- connector
environment:
- DEPLOYMENT_MODE=default
- KAFKA_CONNECT_URIS=http://connector:8083
networks:
cdc_network:
name: cdc_docker_network
driver: bridge
MySQL Master my.cnf
[mysqld]
log-bin=mysql-bin
binlog_format=row
server-id=1
read_only=0
general_log=1
slow_query_log=1
long_query_time=2
log-error = /var/lib/mysql/mysql.err
character-set-server=utf8mb4
collation-server=utf8mb4_0900_ai_ci
init-connect='SET NAMES utf8mb4'
log_queries_not_using_indexes=1
transaction-isolation=REPEATABLE-READ
max_connections = 500
default_authentication_plugin=mysql_native_password
[client]
default-character-set=utf8mb4
[mysql]
default-character-set=utf8mb4
MySQL Slave my.cnf
[mysqld]
log-bin=mysql-bin
binlog_format=row
server-id=2
read_only=1
general_log=1
slow_query_log=1
long_query_time=2
log-error = /var/lib/mysql/mysql.err
character-set-server=utf8mb4
collation-server=utf8mb4_0900_ai_ci
init-connect='SET NAMES utf8mb4'
log_queries_not_using_indexes=1
transaction-isolation=REPEATABLE-READ
max_connections = 500
default_authentication_plugin=mysql_native_password
[client]
default-character-set=utf8mb4
[mysql]
default-character-set=utf8mb4
docker-compose 실행과 MySQL Replication 설정
docker 컨테이너들 실행 후 Master-Slave 간 Replica 설정을 한다.
docker compose up -d --build
# Master MySQL 접속 후
show master status;
# Slave MySQL 접속 후
STOP SLAVE;
# log position 변경
CHANGE MASTER TO MASTER_HOST='cdc-demo-mysql-master', MASTER_USER='root', MASTER_PASSWORD='rootroot', MASTER_LOG_POS=157, MASTER_LOG_FILE='mysql-bin.000003';
START SLAVE;
SHOW SLAVE STATUS \G;
Debezium MySQL Connector 등록
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
"name": "demo-mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "cdc-demo-mysql-slave",
"database.port": "3306",
"database.user": "root",
"database.password": "rootroot",
"database.server.id": "2",
"topic.prefix": "cdc-demo",
"database.include.list": "demo",
"schema.history.internal.kafka.bootstrap.servers": "cdc-kafka:29092",
"schema.history.internal.kafka.topic": "schema-changes.demo",
"include.schema.changes": "true",
"table.whitelist": "demo.user",
}
}'
# 생성 201 성공
HTTP/1.1 201 Created
Date: Fri, 09 Feb 2024 12:34:29 GMT
Location: http://localhost:8083/connectors/demo-mysql-connector
Content-Type: application/json
Content-Length: 503
Server: Jetty(9.4.52.v20230823)
{
"name": "demo-mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "cdc-demo-mysql-slave",
"database.port": "3306",
"database.user": "root",
"database.password": "rootroot",
"database.server.id": "2",
"topic.prefix": "cdc-demo",
"database.include.list": "demo",
"schema.history.internal.kafka.bootstrap.servers": "cdc-kafka:29092",
"schema.history.internal.kafka.topic": "schema-changes.demo",
"include.schema.changes": "true",
"table.whitelist": "demo.user",
"name": "demo-mysql-connector"
},
"tasks": [],
"type": "source"
}
Connector 등록 확인
curl -H "Accept:application/json" localhost:8083/connectors/
# Response
["demo-mysql-connector"]
curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/demo-mysql-connector
# Response
{
"name": "demo-mysql-connector",
...
"type": "source"
}
Connector 정상 등록 모니터링 로그 확인
도커 컨테이너 서비스간에 서로 연결이 잘되었다면 아래 로그 내용이 출력이 되어야 한다. 도커 컨테이너 로그를 확인 한다.
2024-02-10 02:49:21,273 INFO || [Worker clientId=connect-1, groupId=1] Successfully joined group with generation Generation{generationId=5, memberId='connect-1-ba424f7b-4e29-4909-83ee-51742760506f', protocol='sessioned'} [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
2024-02-10 02:49:21,313 INFO || [Worker clientId=connect-1, groupId=1] Successfully synced group in generation Generation{generationId=5, memberId='connect-1-ba424f7b-4e29-4909-83ee-51742760506f', protocol='sessioned'} [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
2024-02-10 02:49:21,313 INFO || [Worker clientId=connect-1, groupId=1] Joined group at generation 5 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-1-ba424f7b-4e29-4909-83ee-51742760506f', leaderUrl='http://192.168.112.7:8083/', offset=4, connectorIds=[demo-mysql-connector], taskIds=[demo-mysql-connector-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2024-02-10 02:49:21,314 WARN || [Worker clientId=connect-1, groupId=1] Catching up to assignment's config offset. [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2024-02-10 02:49:21,314 INFO || [Worker clientId=connect-1, groupId=1] Current config state offset -1 is behind group assignment 4, reading to end of config log [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2024-02-10 02:49:21,323 INFO || [Worker clientId=connect-1, groupId=1] Finished reading to end of log and updated config snapshot, new config log offset: 4 [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2024-02-10 02:49:21,323 INFO || [Worker clientId=connect-1, groupId=1] Starting connectors and tasks using config offset 4 [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2024-02-10 02:49:21,325 INFO || [Worker clientId=connect-1, groupId=1] Starting task demo-mysql-connector-0 [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2024-02-10 02:49:21,326 INFO || [Worker clientId=connect-1, groupId=1] Starting connector demo-mysql-connector [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2024-02-10 02:50:38,101 INFO || Kafka version: 3.6.1 [org.apache.kafka.common.utils.AppInfoParser]
2024-02-10 02:50:38,101 INFO || Kafka commitId: 5e3c2b738d253ff5 [org.apache.kafka.common.utils.AppInfoParser]
2024-02-10 02:50:38,101 INFO || Kafka startTimeMs: 1707533438101 [org.apache.kafka.common.utils.AppInfoParser]
2024-02-10 02:50:38,145 INFO || Database schema history topic '(name=schema-changes.demo, numPartitions=1, replicationFactor=default, replicasAssignments=null, configs={cleanup.policy=delete, retention.ms=9223372036854775807, retention.bytes=-1})' created [io.debezium.storage.kafka.history.KafkaSchemaHistory]
2024-02-10 02:50:38,145 INFO || App info kafka.admin.client for cdc-demo-schemahistory unregistered [org.apache.kafka.common.utils.AppInfoParser]
2024-02-10 02:50:38,146 INFO || Metrics scheduler closed [org.apache.kafka.common.metrics.Metrics]
2024-02-10 02:50:38,146 INFO || Closing reporter org.apache.kafka.common.metrics.JmxReporter [org.apache.kafka.common.metrics.Metrics]
2024-02-10 02:50:38,146 INFO || Metrics reporters closed [org.apache.kafka.common.metrics.Metrics]
2024-02-10 02:50:38,146 INFO || Reconnecting after finishing schema recovery [io.debezium.connector.mysql.MySqlConnectorTask]
2024-02-10 02:50:38,170 INFO || Requested thread factory for connector MySqlConnector, id = cdc-demo named = SignalProcessor [io.debezium.util.Threads]
2024-02-10 02:50:38,179 INFO || Requested thread factory for connector MySqlConnector, id = cdc-demo named = change-event-source-coordinator [io.debezium.util.Threads]
2024-02-10 02:50:38,179 INFO || Requested thread factory for connector MySqlConnector, id = cdc-demo named = blocking-snapshot [io.debezium.util.Threads]
2024-02-10 02:50:38,181 INFO || Creating thread debezium-mysqlconnector-cdc-demo-change-event-source-coordinator [io.debezium.util.Threads]
2024-02-10 02:50:38,184 INFO MySQL|cdc-demo|snapshot Metrics registered [io.debezium.pipeline.ChangeEventSourceCoordinator]
2024-02-10 02:50:38,184 INFO MySQL|cdc-demo|snapshot Context created [io.debezium.pipeline.ChangeEventSourceCoordinator]
2024-02-10 02:50:38,193 INFO MySQL|cdc-demo|snapshot Snapshot step 1 - Preparing [io.debezium.relational.RelationalSnapshotChangeEventSource]
2024-02-10 02:50:38,195 INFO MySQL|cdc-demo|snapshot Snapshot step 2 - Determining captured tables [io.debezium.relational.RelationalSnapshotChangeEventSource]
2024-02-10 02:50:38,195 INFO MySQL|cdc-demo|snapshot Read list of available databases [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2024-02-10 02:50:38,197 INFO MySQL|cdc-demo|snapshot list of available databases is: [demo, information_schema, mysql, performance_schema, sys] [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2024-02-10 02:50:38,198 INFO MySQL|cdc-demo|snapshot Read list of available tables in each database [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2024-02-10 02:50:38,208 INFO MySQL|cdc-demo|snapshot snapshot continuing with database(s): [demo] [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2024-02-10 02:50:38,208 INFO MySQL|cdc-demo|snapshot Adding table demo.user to the list of capture schema tables [io.debezium.relational.RelationalSnapshotChangeEventSource]
2024-02-10 02:50:38,209 INFO MySQL|cdc-demo|snapshot Created connection pool with 1 threads [io.debezium.relational.RelationalSnapshotChangeEventSource]
2024-02-10 02:50:38,209 INFO MySQL|cdc-demo|snapshot Snapshot step 3 - Locking captured tables [demo.user] [io.debezium.relational.RelationalSnapshotChangeEventSource]
2024-02-10 02:50:38,212 INFO MySQL|cdc-demo|snapshot Flush and obtain global read lock to prevent writes to database [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2024-02-10 02:50:38,213 INFO MySQL|cdc-demo|snapshot Snapshot step 4 - Determining snapshot offset [io.debezium.relational.RelationalSnapshotChangeEventSource]
2024-02-10 02:50:38,216 INFO MySQL|cdc-demo|snapshot Read binlog position of MySQL primary server [io.debezium.connector.mysql.strategy.mysql.MySqlConnectorAdapter]
2024-02-10 02:50:38,217 INFO MySQL|cdc-demo|snapshot using binlog 'mysql-bin.000003' at position '2267' and gtid '' [io.debezium.connector.mysql.strategy.mysql.MySqlConnectorAdapter]
2024-02-10 02:50:38,217 INFO MySQL|cdc-demo|snapshot Snapshot step 5 - Reading structure of captured tables [io.debezium.relational.RelationalSnapshotChangeEventSource]
2024-02-10 02:50:38,218 INFO MySQL|cdc-demo|snapshot All eligible tables schema should be captured, capturing: [demo.user] [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2024-02-10 02:50:38,459 INFO MySQL|cdc-demo|snapshot Reading structure of database 'demo' [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2024-02-10 02:50:38,513 INFO MySQL|cdc-demo|snapshot Snapshot step 6 - Persisting schema history [io.debezium.relational.RelationalSnapshotChangeEventSource]
2024-02-10 02:50:38,534 INFO MySQL|cdc-demo|snapshot Already applied 1 database changes [io.debezium.relational.history.SchemaHistoryMetrics]
2024-02-10 02:50:38,557 INFO MySQL|cdc-demo|snapshot Releasing global read lock to enable MySQL writes [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2024-02-10 02:50:38,559 INFO MySQL|cdc-demo|snapshot Writes to MySQL tables prevented for a total of 00:00:00.346 [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2024-02-10 02:50:38,559 INFO MySQL|cdc-demo|snapshot Snapshot step 7 - Snapshotting data [io.debezium.relational.RelationalSnapshotChangeEventSource]
2024-02-10 02:50:38,560 INFO MySQL|cdc-demo|snapshot Creating snapshot worker pool with 1 worker thread(s) [io.debezium.relational.RelationalSnapshotChangeEventSource]
2024-02-10 02:50:38,561 INFO MySQL|cdc-demo|snapshot For table 'demo.user' using select statement: 'SELECT `id`, `email`, `password`, `nickname`, `user_status` FROM `demo`.`user`' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2024-02-10 02:50:38,568 INFO MySQL|cdc-demo|snapshot Estimated row count for table demo.user is OptionalLong[4] [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2024-02-10 02:50:38,572 INFO MySQL|cdc-demo|snapshot Exporting data from table 'demo.user' (1 of 1 tables) [io.debezium.relational.RelationalSnapshotChangeEventSource]
2024-02-10 02:50:38,579 INFO MySQL|cdc-demo|snapshot Finished exporting 5 records for table 'demo.user' (1 of 1 tables); total duration '00:00:00.007' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2024-02-10 02:50:38,583 INFO MySQL|cdc-demo|snapshot Snapshot - Final stage [io.debezium.pipeline.source.AbstractSnapshotChangeEventSource]
2024-02-10 02:50:38,583 INFO MySQL|cdc-demo|snapshot Snapshot completed [io.debezium.pipeline.source.AbstractSnapshotChangeEventSource]
2024-02-10 02:50:38,617 INFO MySQL|cdc-demo|snapshot Snapshot ended with SnapshotResult [status=COMPLETED, offset=MySqlOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, sourceInfo=SourceInfo [currentGtid=null, currentBinlogFilename=mysql-bin.000003, currentBinlogPosition=2267, currentRowNumber=0, serverId=0, sourceTime=2024-02-10T02:50:38Z, threadId=-1, currentQuery=null, tableIds=[demo.user], databaseName=demo], snapshotCompleted=true, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], restartGtidSet=null, currentGtidSet=null, restartBinlogFilename=mysql-bin.000003, restartBinlogPosition=2267, restartRowsToSkip=0, restartEventsToSkip=0, currentEventLengthInBytes=0, inTransaction=false, transactionId=null, incrementalSnapshotContext =IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]]] [io.debezium.pipeline.ChangeEventSourceCoordinator]
Kafka-UI, Debezium-UI 확인
Kafka, Debezium connector 연동이 잘 되었다면 UI를 통해서 확인 할 수 있다.
Ref
https://debezium.io/documentation/reference/stable/tutorial.html
https://github.com/debezium/debezium-examples
728x90
반응형
'Infrastructure > CDC' 카테고리의 다른 글
[CDC] MySQL Debezium Change Data Capture 따라해보기 - 3 (0) | 2024.03.08 |
---|---|
[CDC] MySQL Debezium Change Data Capture 따라해보기 - 2 (0) | 2024.02.10 |
[CDC] Change Data Capture 개념 (0) | 2024.02.10 |