Infrastructure/CDC

[CDC] MySQL Debezium Change Data Capture 따라해보기 - 2

상쾌한기분 2024. 2. 10. 13:05
728x90
반응형

[CDC] MySQL Debezium Change Data Capture 따라해보기 - 2 

MySQL Master INSERT 실행

마스터 디비에 데모 테이블과 로우를 생성한다.

CREATE TABLE user
(
    id          INT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
    email       VARCHAR(124) NOT NULL,
    password    VARCHAR(256) NOT NULL,
    nickname    VARCHAR(124) NULL,
    user_status VARCHAR(32) DEFAULT 'ACTIVE'
);

INSERT INTO user (email, password, nickname, user_status)
VALUES ('user@dev.com', 'hashed_password', 'user-1', 'ACTIVE');

로그 확인

2024-02-10 03:57:20,525 INFO   MySQL|cdc-demo|snapshot  1 records sent during previous 01:06:20.512, last recorded offset of {server=cdc-demo} partition is {transaction_id=null, ts_sec=1707537440, file=mysql-bin.000003, pos=2693, row=1, server_id=1, event=2}   [io.debezium.connector.common.BaseSourceTask]

Kafka Message

카프카 메시지를 확인하면 다음과 같이 메시지를 생성했다.

{
	"schema": {
		"type": "struct",
		"fields": [
			{
				"type": "struct",
				"fields": [
					{
						"type": "int64",
						"optional": false,
						"field": "id"
					},
					{
						"type": "string",
						"optional": false,
						"field": "email"
					},
					{
						"type": "string",
						"optional": false,
						"field": "password"
					},
					{
						"type": "string",
						"optional": true,
						"field": "nickname"
					},
					{
						"type": "string",
						"optional": true,
						"default": "ACTIVE",
						"field": "user_status"
					}
				],
				"optional": true,
				"name": "cdc-demo.demo.user.Value",
				"field": "before"
			},
			{
				"type": "struct",
				"fields": [
					{
						"type": "int64",
						"optional": false,
						"field": "id"
					},
					{
						"type": "string",
						"optional": false,
						"field": "email"
					},
					{
						"type": "string",
						"optional": false,
						"field": "password"
					},
					{
						"type": "string",
						"optional": true,
						"field": "nickname"
					},
					{
						"type": "string",
						"optional": true,
						"default": "ACTIVE",
						"field": "user_status"
					}
				],
				"optional": true,
				"name": "cdc-demo.demo.user.Value",
				"field": "after"
			},
			{
				"type": "struct",
				"fields": [
					{
						"type": "string",
						"optional": false,
						"field": "version"
					},
					{
						"type": "string",
						"optional": false,
						"field": "connector"
					},
					{
						"type": "string",
						"optional": false,
						"field": "name"
					},
					{
						"type": "int64",
						"optional": false,
						"field": "ts_ms"
					},
					{
						"type": "string",
						"optional": true,
						"name": "io.debezium.data.Enum",
						"version": 1,
						"parameters": {
							"allowed": "true,last,false,incremental"
						},
						"default": "false",
						"field": "snapshot"
					},
					{
						"type": "string",
						"optional": false,
						"field": "db"
					},
					{
						"type": "string",
						"optional": true,
						"field": "sequence"
					},
					{
						"type": "string",
						"optional": true,
						"field": "table"
					},
					{
						"type": "int64",
						"optional": false,
						"field": "server_id"
					},
					{
						"type": "string",
						"optional": true,
						"field": "gtid"
					},
					{
						"type": "string",
						"optional": false,
						"field": "file"
					},
					{
						"type": "int64",
						"optional": false,
						"field": "pos"
					},
					{
						"type": "int32",
						"optional": false,
						"field": "row"
					},
					{
						"type": "int64",
						"optional": true,
						"field": "thread"
					},
					{
						"type": "string",
						"optional": true,
						"field": "query"
					}
				],
				"optional": false,
				"name": "io.debezium.connector.mysql.Source",
				"field": "source"
			},
			{
				"type": "string",
				"optional": false,
				"field": "op"
			},
			{
				"type": "int64",
				"optional": true,
				"field": "ts_ms"
			},
			{
				"type": "struct",
				"fields": [
					{
						"type": "string",
						"optional": false,
						"field": "id"
					},
					{
						"type": "int64",
						"optional": false,
						"field": "total_order"
					},
					{
						"type": "int64",
						"optional": false,
						"field": "data_collection_order"
					}
				],
				"optional": true,
				"name": "event.block",
				"version": 1,
				"field": "transaction"
			}
		],
		"optional": false,
		"name": "cdc-demo.demo.user.Envelope",
		"version": 1
	},
	"payload": {
		"before": null,
		"after": {
			"id": 1,
			"email": "user@dev.com",
			"password": "hashed_password",
			"nickname": "user-1",
			"user_status": "ACTIVE"
		},
		"source": {
			"version": "2.5.1.Final",
			"connector": "mysql",
			"name": "cdc-demo",
			"ts_ms": 1707533438000,
			"snapshot": "first",
			"db": "demo",
			"sequence": null,
			"table": "user",
			"server_id": 0,
			"gtid": null,
			"file": "mysql-bin.000003",
			"pos": 2267,
			"row": 0,
			"thread": null,
			"query": null
		},
		"op": "r",
		"ts_ms": 1707533438578,
		"transaction": null
	}
}

 

Ref

https://debezium.io/documentation/reference/stable/connectors/mysql.html

 

Debezium connector for MySQL :: Debezium Documentation

Time, date, and timestamps can be represented with different kinds of precision, including: adaptive_time_microseconds (the default) captures the date, datetime and timestamp values exactly as in the database using either millisecond, microsecond, or nanos

debezium.io

 

728x90
반응형