728x90
반응형
[CDC] MySQL Debezium Change Data Capture 따라해보기 - 3
아래 레포에서 데모 구현한 샘플 확인할 수 있다.
https://github.com/sanggi-wjg/spring-cdc-debezium-demo
Spring boot 설정
build.gradle.kts
import com.github.imflog.schema.registry.Subject
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
plugins {
id("org.springframework.boot") version "3.2.2"
id("io.spring.dependency-management") version "1.1.4"
id("com.github.davidmc24.gradle.plugin.avro") version "1.9.1"
id("com.github.imflog.kafka-schema-registry-gradle-plugin") version "1.13.0"
kotlin("jvm") version "1.9.22"
kotlin("plugin.spring") version "1.9.22"
kotlin("plugin.jpa") version "1.9.22"
kotlin("kapt") version "1.9.22"
}
group = "com.example"
version = "0.0.1-SNAPSHOT"
java {
sourceCompatibility = JavaVersion.VERSION_17
}
repositories {
mavenCentral()
gradlePluginPortal()
maven {
url = uri("https://packages.confluent.io/maven/")
name = "confluent"
}
}
dependencies {
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.boot:spring-boot-starter-data-jpa")
implementation("org.springframework.boot:spring-boot-starter-jdbc")
implementation("org.springframework.boot:spring-boot-starter-validation")
implementation("org.springframework.kafka:spring-kafka")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
// avro + kafka schema registry
implementation("org.apache.avro:avro:1.11.3")
implementation("io.confluent:kafka-avro-serializer:7.5.3")
implementation("io.confluent:kafka-schema-registry:7.5.3")
runtimeOnly("com.mysql:mysql-connector-j")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("org.springframework.kafka:spring-kafka-test")
// Query DSL
implementation("com.querydsl:querydsl-jpa:5.0.0:jakarta")
implementation("com.querydsl:querydsl-core:5.0.0")
compileOnly("com.querydsl:querydsl-core:5.0.0")
kapt("com.querydsl:querydsl-apt:5.0.0:jakarta")
}
avro {
// https://github.com/davidmc24/gradle-avro-plugin
setOutputCharacterEncoding("UTF-8")
setCreateSetters(false)
setFieldVisibility("PRIVATE")
}
schemaRegistry {
// https://github.com/ImFlog/schema-registry-plugin
url.set("http://localhost:8081")
pretty = true
// download {
//// subjectPattern("avro.*", "src/main/avro")
// subject("cdc-demo.demo.user", "src/main/avro/avro-schema.avsc")
// }
val localSubject = Subject("localSubject", "src/main/avro/schema.avsc", "AVRO")
// .addLocalReference("localAvroSubject", "src/main/avro/schema.avsc")
register {
subject(localSubject)
}
compatibility {
subject(localSubject)
}
}
configurations {
all {
// exclude("org.springframework.boot", "spring-boot-starter-logging")
exclude("org.apache.logging.log4j", "log4j-to-slf4j")
exclude("ch.qos.logback", "logback-classic")
}
}
tasks.withType<KotlinCompile> {
kotlinOptions {
freeCompilerArgs += "-Xjsr305=strict"
jvmTarget = "17"
}
dependsOn("registerSchemasTask")
dependsOn("downloadSchemasTask")
}
tasks.withType<Test> {
useJUnitPlatform()
}
application.yaml
spring:
datasource:
type: com.zaxxer.hikari.HikariDataSource
url: jdbc:mysql://localhost:10003/target
username: root
password: rootroot
driver-class-name: com.mysql.cj.jdbc.Driver
dbcp2:
validation-query: SELECT 1
hikari:
maximum-pool-size: 10
jpa:
hibernate:
ddl-auto: none
config:
kafka:
bootstrap-servers: localhost:9092
schema-registry-url: http://localhost:8081
specific-avro-reader: true
consumer:
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
concurrency: 1
properties:
security.protocol: PLAINTEXT
spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
# value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: all
retries: 3
compression-type: gzip
properties:
security.protocol: PLAINTEXT
logging:
level:
org.springframework.kafka: DEBUG
com.example.springcdcdebeziumdemo.kafka: DEBUG
server:
port: 8089
KafkaConfiguration
package com.example.springcdcdebeziumdemo.kafka
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafka
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.*
@EnableKafka
@Configuration
@EnableConfigurationProperties(KafkaProperty::class)
@ConditionalOnClass(KafkaTemplate::class)
class KafkaConfiguration(
private val property: KafkaProperty,
) {
companion object {
const val KAFKA_TEMPLATE = "kafkaTemplate"
const val CDC_KAFKA_TEMPLATE = "cdcKafkaTemplate"
const val LISTENER_CONTAINER_FACTORY = "listenerContainerFactory"
const val CDC_LISTENER_CONTAINER_FACTORY = "cdcListenerContainerFactory"
}
@Bean
fun consumerFactory(): ConsumerFactory<String, String> {
return DefaultKafkaConsumerFactory(
mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to property.bootstrapServers,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to property.consumer.keyDeserializer,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to property.consumer.autoOffsetReset,
ConsumerConfig.GROUP_ID_CONFIG to KafkaGroup.SPRING_TEST,
),
)
}
@Bean
fun cdcConsumerFactory(): ConsumerFactory<String, GenericRecord> {
return DefaultKafkaConsumerFactory(
mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to property.bootstrapServers,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to property.consumer.keyDeserializer,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to property.consumer.valueDeserializer,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to property.consumer.autoOffsetReset,
ConsumerConfig.GROUP_ID_CONFIG to KafkaGroup.SPRING_CDC,
KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG to property.schemaRegistryUrl,
KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG to property.specificAvroReader,
KafkaAvroDeserializerConfig.AVRO_REFLECTION_ALLOW_NULL_CONFIG to true,
KafkaAvroDeserializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG to true,
),
)
}
@Bean
fun producerFactory(): ProducerFactory<String, String> {
return DefaultKafkaProducerFactory(
mapOf(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to property.bootstrapServers,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to property.producer.keySerializer,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to property.producer.valueSerializer,
ProducerConfig.ACKS_CONFIG to property.producer.acks,
ProducerConfig.RETRIES_CONFIG to property.producer.retries,
ProducerConfig.COMPRESSION_TYPE_CONFIG to property.producer.compressionType,
),
)
}
@Bean
fun cdcProducerFactory(): ProducerFactory<String, GenericRecord> {
return DefaultKafkaProducerFactory(
mapOf(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to property.bootstrapServers,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to property.producer.keySerializer,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to property.producer.valueSerializer,
ProducerConfig.ACKS_CONFIG to property.producer.acks,
ProducerConfig.RETRIES_CONFIG to property.producer.retries,
ProducerConfig.COMPRESSION_TYPE_CONFIG to property.producer.compressionType,
KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG to property.schemaRegistryUrl,
),
)
}
@Bean(LISTENER_CONTAINER_FACTORY)
fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
val factory: ConcurrentKafkaListenerContainerFactory<String, String> = ConcurrentKafkaListenerContainerFactory()
factory.consumerFactory = consumerFactory()
factory.setConcurrency(property.consumer.concurrency)
return factory
}
@Bean(CDC_LISTENER_CONTAINER_FACTORY)
fun kafkaCdcListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, GenericRecord> {
val factory: ConcurrentKafkaListenerContainerFactory<String, GenericRecord> = ConcurrentKafkaListenerContainerFactory()
factory.consumerFactory = cdcConsumerFactory()
factory.setConcurrency(property.consumer.concurrency)
return factory
}
@Bean(KAFKA_TEMPLATE)
fun kafkaTemplate(): KafkaTemplate<String, String> {
return KafkaTemplate(producerFactory())
}
@Bean(CDC_KAFKA_TEMPLATE)
fun cdcKafkaTemplate(): KafkaTemplate<String, GenericRecord> {
return KafkaTemplate(cdcProducerFactory())
}
}
ProducerService
package com.example.springcdcdebeziumdemo.producer
import avro.schema.Event
import com.example.springcdcdebeziumdemo.kafka.KafkaConfiguration
import com.example.springcdcdebeziumdemo.kafka.KafkaTopic
import org.apache.avro.generic.GenericRecord
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Service
import java.sql.Timestamp
import java.time.Instant
import kotlin.random.Random
interface ProducerService {
fun publishTest()
fun publishTestEvent()
}
@Service
class BasicProducerService(
private val kafkaProducer: KafkaTemplate<String, String>,
@Qualifier(KafkaConfiguration.CDC_KAFKA_TEMPLATE)
private val cdcKafkaProducer: KafkaTemplate<String, GenericRecord>,
) : ProducerService {
private val log = LoggerFactory.getLogger(this::class.java)
// @TransactionalEventListener
override fun publishTest() {
log.info("Publishing test message")
kafkaProducer.send(KafkaTopic.TEST, "test")
}
override fun publishTestEvent() {
log.info("Publishing user message")
cdcKafkaProducer.send(
KafkaTopic.TEST_EVENT,
Event(Random.nextInt(100), "Random event created", Timestamp.from(Instant.now()).time.toInt()),
)
}
}
ConsumerService
package com.example.springcdcdebeziumdemo.consumer
import avro.schema.Event
import avro.schema.User
import com.example.springcdcdebeziumdemo.kafka.KafkaConfiguration
import com.example.springcdcdebeziumdemo.kafka.KafkaGroup
import com.example.springcdcdebeziumdemo.kafka.KafkaTopic
import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.slf4j.LoggerFactory
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.messaging.handler.annotation.Payload
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
interface ConsumerService {
fun test(message: String)
fun testUser(@Payload message: ConsumerRecord<String, Event>)
fun changedDataCapture(@Payload message: ConsumerRecord<String, String>)
}
@Transactional
@Service
class BasicConsumerService(
private val objectMapper: ObjectMapper,
) : ConsumerService {
private val log = LoggerFactory.getLogger(BasicConsumerService::class.java)
@KafkaListener(
topics = [KafkaTopic.TEST],
groupId = KafkaGroup.SPRING_TEST,
containerFactory = KafkaConfiguration.LISTENER_CONTAINER_FACTORY,
)
override fun test(@Payload message: String) {
log.info(message)
}
@KafkaListener(
topics = [KafkaTopic.TEST_EVENT],
groupId = KafkaGroup.SPRING_CDC,
containerFactory = KafkaConfiguration.CDC_LISTENER_CONTAINER_FACTORY,
)
override fun testUser(@Payload message: ConsumerRecord<String, Event>) {
log.info("testUser")
log.info(message.key())
log.info(message.value().toString())
}
@KafkaListener(
topics = [KafkaTopic.DEMO_USER],
groupId = KafkaGroup.SPRING_CDC,
containerFactory = KafkaConfiguration.CDC_LISTENER_CONTAINER_FACTORY,
)
override fun changedDataCapture(@Payload message: ConsumerRecord<String, String>) {
log.info("changedDataCapture")
log.info(message.key())
log.info(message.value().toString())
}
}
728x90
반응형
'Infrastructure > CDC' 카테고리의 다른 글
[CDC] MySQL Debezium Change Data Capture 따라해보기 - 2 (0) | 2024.02.10 |
---|---|
[CDC] MySQL Debezium Change Data Capture 따라해보기 - 1 (1) | 2024.02.10 |
[CDC] Change Data Capture 개념 (0) | 2024.02.10 |