728x90
반응형
[Kafka] Python confluent Kafka 설치 및 테스트
테스트 및 설치 환경은 Mac
에서 진행 하였습니다.
Zookeper와 Kafka Cluster docker-compose.yml
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
hostname: zookeeper
container_name: zookeeper
environment:
TZ: Asia/Seoul
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka-1:
image: confluentinc/cp-kafka:7.3.0
container_name: kafka-1
depends_on:
- zookeeper
ports:
- "9091:9091"
environment:
TZ: Asia/Seoul
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka-1:29091,LISTENER_DOCKER_EXTERNAL://localhost:9091
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
KAFKA_HEAP_OPTS: '-Xmx512M -Xms512M'
kafka-2:
image: confluentinc/cp-kafka:7.3.0
container_name: kafka-2
ports:
- '9092:9092'
depends_on:
- zookeeper
environment:
TZ: Asia/Seoul
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka-2:29092,LISTENER_DOCKER_EXTERNAL://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
KAFKA_HEAP_OPTS: '-Xmx512M -Xms512M'
kafka-3:
image: confluentinc/cp-kafka:7.3.0
container_name: kafka-3
ports:
- '9093:9093'
depends_on:
- zookeeper
environment:
TZ: Asia/Seoul
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka-3:29093,LISTENER_DOCKER_EXTERNAL://localhost:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
KAFKA_HEAP_OPTS: '-Xmx512M -Xms512M'
kafka-ui:
image: provectuslabs/kafka-ui
container_name: kafka-ui
depends_on:
- kafka-1
- kafka-2
- kafka-3
ports:
- "8081:8080"
restart: always
environment:
- TZ=Asia/Seoul
- KAFKA_CLUSTERS_0_NAME=Local-Cluster
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka-1:29091,kafka-2:29092,kafka-3:29093
- KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181
docker-compose up -d
도커 컴포즈 실행docker ps -a
도커 컨테이너 리스트docker logs [zookeper_conatiner_id, kafka_container_ids]
도커 컨테이너 정상적으로 올라왔는지 로그 확인- localhost:8081 접속하여 kafka-ui 확인
Kafka Cluster 확인
# Kafka 테스트 토픽 생성
docker-compose exec kafka-1 kafka-topics --create --topic TestTopic --bootstrap-server kafka-1:9091 # --replication-factor 3 --partitions 3
# 테스트 토픽 확인
docker-compose exec kafka-1 kafka-topics --describe --topic TestTopic --bootstrap-server kafka-1:9091
# Kafka 프로듀서 실행
docker-compose exec kafka-1 bash
kafka-console-producer --topic test-topic --broker-list kafka-1:9092
>hello
>this is producer
Python Producer, Consumer 테스트
# 프로젝트 라이브러리 설치
pip install confluent-kafka
kafka config.ini
[default]
bootstrap.servers=localhost:9092
[consumer]
group.id=python_group_1
# 'auto.offset.reset=earliest' to start reading from the beginning of
# the topic if no committed offsets exist.
auto.offset.reset=earliest
kafka_producer.py
from argparse import ArgumentParser, FileType
from configparser import ConfigParser
from random import choice
from confluent_kafka import Producer
# Optional per-message delivery callback (triggered by poll() or flush())
# when a message has been successfully delivered or permanently
# failed delivery (after retries).
def delivery_callback(err, msg):
if err:
print('ERROR: Message failed delivery: {}'.format(err))
else:
print("Produced event to topic {topic}: key = {key:12} value = {value:12}".format(
topic=msg.topic(), key=msg.key().decode('utf-8'), value=msg.value().decode('utf-8')))
if __name__ == '__main__':
# Parse the command line.
parser = ArgumentParser()
parser.add_argument('config_file', type=FileType('r'))
args = parser.parse_args()
# Parse the configuration.
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
config_parser = ConfigParser()
config_parser.read_file(args.config_file)
config = dict(config_parser['default'])
# Produce data by selecting random values from these lists.
topic = "test-topic"
user_ids = ['eabara', 'jsmith', 'sgarcia', 'jbernard', 'htanaka', 'awalther']
products = ['book', 'alarm clock', 't-shirts', 'gift card', 'batteries']
# Create Producer instance
producer = Producer(config)
for _ in range(10):
producer.produce(topic, choice(products), choice(user_ids), callback=delivery_callback)
# Block until the messages are sent.
producer.poll(10000)
producer.flush()
kafka_consumer.py
from argparse import ArgumentParser, FileType
from configparser import ConfigParser
from confluent_kafka import Consumer, OFFSET_BEGINNING
# Set up a callback to handle the '--reset' flag.
def reset_offset(consumer, partitions):
if args.reset:
for p in partitions:
p.offset = OFFSET_BEGINNING
consumer.assign(partitions)
if __name__ == '__main__':
# Parse the command line.
parser = ArgumentParser()
parser.add_argument('config_file', type=FileType('r'))
parser.add_argument('--reset', action='store_true')
args = parser.parse_args()
# Parse the configuration.
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
config_parser = ConfigParser()
config_parser.read_file(args.config_file)
config = dict(config_parser['default'])
config.update(config_parser['consumer'])
# Subscribe to topic
topic = "test-topic"
# Create Consumer instance
consumer = Consumer(config)
consumer.subscribe([topic], on_assign=reset_offset)
# Poll for new messages from Kafka and print them.
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
# Initial message consumption may take up to `session.timeout.ms` for the consumer group to rebalance and start consuming
print("Waiting...")
elif msg.error():
print("ERROR: %s".format(msg.error()))
else:
# Extract the (optional) key and value, and print.
print("Consumed event from topic {topic}: key = {key:12} value = {value:12}".format(
topic=msg.topic(), key=msg.key().decode('utf-8'), value=msg.value().decode('utf-8')))
except KeyboardInterrupt:
pass
finally:
# Leave group and commit final offsets
consumer.close()
728x90
반응형
'Infrastructure > Kafka' 카테고리의 다른 글
[Kafka] 카프카 컨슈머 (2) | 2023.10.29 |
---|---|
[Kafka] 카프카 프로듀서 (0) | 2023.09.17 |
[Kafka] 카프카 메시지 브로커 (0) | 2023.09.17 |
[Kafka] 카프카 에러 핸들링 패턴 (0) | 2023.09.15 |