카프카 프로듀서
프로듀서는 카프카 프로듀서 API를 포함하여 구성된 애플리케이션을 마라혀 브로커에 특정 토픽의 파티션에 메시지를 전달하는 역할을 합니다.
프로듀서가 전달하는 메시지의 구조는 다음과 같습니다.
- Topic (토픽)
- Partition (특정 파티션 위치)
- Timestamp (생성 시간)
- Header (헤더)
- Key (키)
- Value (값)
카프카 서버에 메시지를 전송하는 책임을 가지고 있으며 데이터를 전송하기 위한 전략을 가지고 있다.
- 재시도는 어떻게 할 것 인지
- 데이터 전송 시 Compression은 어떻게 할 것 인지
- 데이터 전송 시 Serializer는 어떤것을 사용할 것인지
- 어떤 Partition으로 데이터 전송을 할 것인지
Compression (압축)
메시지 압축 옵션을 사용하였다면 설정을 따라 메시지 전송시 압축을 합니다. 메시지 압축 설정은 Client와 Server간에 네트워크 비용 절감 뿐만 아니라 브로커 내부 빠른 복제를 도와줍니다.
- Client에서 Server로 데이터를 전송할 때 네트워크 비용을 위해 압축 후 전송을 위한 방법
- 압축 시 ProducerRecord 단위로 압축 진행
Configuration | Default | Description |
compression.type | producer | 토픽에 대해 최종 압축 타입 정의 ( gzip, snappy, lz4, ztsd, prodcuer, uncompressed ) |
Serializer (시리얼라이저)
전달받은 Key, Value를 직렬화 하며 다음 파티셔닝 과정으로 넘어갑니다.
- Key는 Partition을 선택하기 위한 기준값
- Value는 Topic에 전달할 데이터
Configuration | Default | Description |
key.serializer | - | key 를 serialize 하는 방법에 대한 전략 |
value.serializer | - | value 를 serialize 하는 방법에 대한 전략 |
Partitioner
- Record가 어떤 Partition으로 전송할 것인지 선택의 전략
Configuration | Default | Description |
partitioner.class | org.apache.kafka.clients.producer.internals.DefaultPartitioner | Partitioner 에 대한 전략을 선택 - DefaultPartitioner: key 가 존재하면 hash 값을 기반으로 선택, key 가 존재하지 않으면 UniformStickyPartitioner 전략으로 구현되어 있음 - RoundRobinPartitioner: Partition의 순서대로 하나씩 할당한다 - UniformStickyPartitioner: batch.size 가 꽉차거나 linger.ms 값이 초과되었을 때 쌓아둔 데이터를 하나의 Partition 에게 모두 전송한다 |
batch.size | 16384 | Producer 가 데이터를 전송할 수 있는 최대 데이터 크기. 참고로 batch size 만큼 무조건적으로 전송하는게 아닌 최대 임계값 Record의 크기가 사이즈를 초과하면 전송 시도를 하지 않는다. |
linger.ms | 0 | Producer 가 데이터를 전송하는데 대기하는 시간. 대기하는 시간만큼 batch size 만큼 데이터의 용량을 쌓아둘 수 있음 linger.ms가 0으로 설정이 되어 있다면 누적된 batch size 상관없이 즉시 전송한다는 뜻. |
batch.size 설정 값을 작게 한다면 처리량이 적어질 수 있고 크게 한다면 메모리를 낭비할 수 있다.
UniformStickyPartitioner
Batch Size만큼 어떤 Partition에게 Record를 전송할지 판단
https://www.confluent.io/blog/apache-kafka-producer-improvements-sticky-partitioner/
재시도
재시도를 제어 시에는 delivery.timeout 의 설정값을 기반으로 세팅 필요
Configuration | Default | Description |
retries | 2147483647 | 오류로 인해 실패한 record 전송 이력에 대해 재시도하는 횟수. 단 무한정으로 재시도하는게 아니라 delivery.timeout.ms 내에서만 재시도 따라서 client 는 retries 설정값으로 튜닝하는게 아닌 timeout 값으로 튜닝할 것을 권고 |
delivery.timeout.ms | 120000ms = 2min | Producer 의 send 호출이 성공 또는 실패로 보고하는 최대 시간의 임계값 delivery.timeout.ms >= linger.ms + request.timeout.ms 를 준수할 것 |
linger.ms | 0 | Producer 가 데이터를 전송하는데 대기하는 시간. 대기하는 시간만큼 batch size 만큼 데이터의 용량을 쌓아둘 수 있음 |
request.timeout.ms | 30000 = 30second | Producer client 가 요청응답을 기다리는 최대시간 |
Network timeout 에러로 Client에서 응답을 받지 못한다면 재시도를 하여 Record가 중복으로 생성되는 이슈가 있을 수 있다.
Acks
Configuration | Default | Description |
acks | all | Producer 가 요청을 보내고 Partition의 Leader 가 Replication 의 수신을 확인해야되는 개수. - 0: Producer 는 통보만 하고 수신여부를 확인하지 않음 - 1: Leader 의 Partition 에는 기록했지만, Replication 에게는 이제 기록해야됨을 수신 확인으로 통보 - all: 모든 Partition 에게 기록되었는지 확인을 통보받음 |
enable.idempotence | true | Producer 가 Record 쓰기 작업을 단 한번만 허용할 것인지 멱등성을 보장 |
enable.idempotence는 Producer ID를 통해서 어떤 Producer가 어떤 Record를 생성했는지 체크를 하여 처리하는 Broker에서 Producer ID 기반으로 Record에 대해서 쓰기 작업을 한 번만 할 수 있도록 도와준다.
참고
https://kafka.apache.org/documentation/#configuration
옵션 개많앙
https://always-kimkim.tistory.com/entry/kafka101-producer
https://huisam.tistory.com/entry/kafka-producer?category=849126
'Infrastructure > Kafka' 카테고리의 다른 글
[Kafka] 카프카 컨슈머 (2) | 2023.10.29 |
---|---|
[Kafka] 카프카 메시지 브로커 (0) | 2023.09.17 |
[Kafka] 카프카 에러 핸들링 패턴 (0) | 2023.09.15 |
[Kafka] Python confluent Kafka 설치 및 테스트 (1) | 2023.02.07 |