2024. 3. 11. 00:00ㆍKafka
Kafka
실시간으로 기록 스트림을 게시, 구독, 저장 및 처리할 수 있는 분산형 데이터 스트리밍 플랫폼이다. 하루에 1조 4천억 건의 메시지를 처리하기 위해 LinkedIn이 개발한 내부 시스템으로 시작했으나, 현재 이는 다양한 기업의 요구 사항을 지원하는 애플리케이션을 갖춘 오픈소스 데이터 스트리밍 솔루션이 되었다.
일단 요즘 같은 micro service (??) 환경 또는 대규모 데이터 처리가 필요한 상황에서
실시간으로 data 를 주고 받는 처리를 일반적인 tcp/ip 기반의 app 을 개발하여 처리하기에는 한계가 있다.
그래서 위와 같은 대용량에 특화된 솔루션을 이용하게 된다.
Apache Kafka는 ZooKeeper라고 하는 컴포넌트들과 Cluster로 구성되어 있고 Kafka Cluster 내에는 여러개의 Broker들로 구성되어 있다
이 zookeeper 를 이용하던것이 초기 broker 관리 모델이고 현재는 자체 관리 모델을 사용한다.
KRaft 프로토콜을 사용하는 Quorum Controller 이 그 자체 관리 모델이다.
docker-compose.yml
networks:
kafka_network:
volumes:
Kafka00:
driver: local
Kafka01:
driver: local
Kafka02:
driver: local
services:
##Kafka 00
Kafka00Service:
image: bitnami/kafka:latest
restart: unless-stopped
container_name: Kafka00Container
ports:
- '10000:9094'
environment:
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
# KRaft settings
- KAFKA_CFG_BROKER_ID=0
- KAFKA_CFG_NODE_ID=0
- KAFKA_KRAFT_CLUSTER_ID=HsDBs9l6UUmQq7Y5E6bNlw
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@Kafka00Service:9093,1@Kafka01Service:9093,2@Kafka02Service:9093
- KAFKA_CFG_PROCESS_ROLES=controller,broker
# Listeners
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://Kafka00Service:9092,EXTERNAL://127.0.0.1:10000
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
# Clustering
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
networks:
- kafka_network
volumes:
- "Kafka00:/bitnami/kafka"
##Kafka 01
Kafka01Service:
image: bitnami/kafka:latest
restart: unless-stopped
container_name: Kafka01Container
ports:
- '10001:9094'
environment:
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
# KRaft settings
- KAFKA_CFG_BROKER_ID=1
- KAFKA_CFG_NODE_ID=1
- KAFKA_KRAFT_CLUSTER_ID=HsDBs9l6UUmQq7Y5E6bNlw
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@Kafka00Service:9093,1@Kafka01Service:9093,2@Kafka02Service:9093
- KAFKA_CFG_PROCESS_ROLES=controller,broker
# Listeners
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://Kafka01Service:9092,EXTERNAL://127.0.0.1:10001
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
# Clustering
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
networks:
- kafka_network
volumes:
- "Kafka01:/bitnami/kafka"
##Kafka 02
Kafka02Service:
image: bitnami/kafka:latest
restart: unless-stopped
container_name: Kafka02Container
ports:
- '10002:9094'
environment:
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
# KRaft settings
- KAFKA_CFG_BROKER_ID=2
- KAFKA_CFG_NODE_ID=2
- KAFKA_KRAFT_CLUSTER_ID=HsDBs9l6UUmQq7Y5E6bNlw
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@Kafka00Service:9093,1@Kafka01Service:9093,2@Kafka02Service:9093
- KAFKA_CFG_PROCESS_ROLES=controller,broker
# Listeners
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://Kafka02Service:9092,EXTERNAL://127.0.0.1:10002
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
# Clustering
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
networks:
- kafka_network
volumes:
- "Kafka02:/bitnami/kafka"
KafkaWebUiService:
image: provectuslabs/kafka-ui:latest
restart: unless-stopped
container_name: KafkaWebUiContainer
ports:
- '8080:8080'
environment:
- KAFKA_CLUSTERS_0_NAME=Local-Kraft-Cluster
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=Kafka00Service:9092,Kafka01Service:9092,Kafka02Service:9092
- DYNAMIC_CONFIG_ENABLED=true
- KAFKA_CLUSTERS_0_AUDIT_TOPICAUDITENABLED=true
- KAFKA_CLUSTERS_0_AUDIT_CONSOLEAUDITENABLED=true
#- KAFKA_CLUSTERS_0_METRICS_PORT=9999
networks:
- kafka_network
그다음 아래 명령 실행
docker compose -f docker-compose.yml up -d
그리고 localhost:8080 으로 접속하면 다음과 같은 화면이 나온다.
Brokers 를 확인해 보면 아래와 같다.
정상적으로 설치된걸 확인 할 수 있다.
이제 solution 을 하나 생성하고
producer 와 consumer 를 consol app project 로 만들어 추가 하자
그리고 각각에 다음 package 를 설치 하자
dotnet add package Confluent.Kafka
아래는 producer 와 consumer 예제이다.
Producer.cs
using Confluent.Kafka;
Console.WriteLine("Producer");
var config = new ProducerConfig
{
BootstrapServers = "localhost:10001", // Kafka broker 주소
ClientId = "my-producer"
};
using (var producer = new ProducerBuilder<Null, string>(config).Build())
{
var topic = "my-topic"; // 원하는 토픽 이름
for (int i = 0; i < 10; i++)
{
var message = new Message<Null, string>
{
Value = $"Hello, Kafka! Message {Guid.NewGuid().ToString("N")}"
};
producer.Produce(topic, message, deliveryReport =>
{
if (deliveryReport.Error.Code != ErrorCode.NoError)
{
Console.WriteLine($"Delivery failed: {deliveryReport.Error.Reason}");
}
else
{
Console.WriteLine($"Delivered message to {deliveryReport.TopicPartitionOffset}");
}
});
}
producer.Flush();
Consumer
using Confluent.Kafka;
Console.WriteLine("Consumer");
var config = new ConsumerConfig
{
BootstrapServers = "localhost:10002", // Kafka broker 주소
GroupId = "my-consumer-group", // Consumer Group ID
AutoOffsetReset = AutoOffsetReset.Earliest // 메시지 읽기 시작 위치 (가장 처음부터)
};
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
var topic = "my-topic"; // 원하는 토픽 이름
consumer.Subscribe(topic);
while (true)
{
var consumeResult = consumer.Consume(TimeSpan.FromSeconds(1));
if (consumeResult != null)
{
Console.WriteLine($"Received message: {consumeResult.Message.Value}");
}
}
}
위에 BootstraServers 주소를 Producer 는 1001 에 Consumer 는 1002 에 둔것을 확인하자.
Clustering 이 정상동작하는지 확인하기 위해 두 port 를 나누었고
정상적으로 data 가 전달되는 것을 확인할 수 있다.
실행결과
주의할점 cluster 관련 meta 를 가져오는 과정에서 초기에 10~30 초 가량의 delay 가 있을 수 있다.
이 delay 를 줄일 수 있지만 만약 그렇게 한다면 consumer 의 갯수에 따라 kafka 에 많은 부하를 줄 수 있다.
관련영상
- YouTube
www.youtube.com