Kafka 를 이용한 실시간 데이터 스트리밍

2024. 3. 11. 00:00Kafka

반응형

 Kafka

실시간으로 기록 스트림을 게시, 구독, 저장 및 처리할 수 있는 분산형 데이터 스트리밍 플랫폼이다. 하루에 1조 4천억 건의 메시지를 처리하기 위해 LinkedIn이 개발한 내부 시스템으로 시작했으나, 현재 이는 다양한 기업의 요구 사항을 지원하는 애플리케이션을 갖춘 오픈소스 데이터 스트리밍 솔루션이 되었다.

 

일단 요즘 같은 micro service (??) 환경 또는 대규모 데이터 처리가 필요한 상황에서

실시간으로 data 를 주고 받는 처리를 일반적인 tcp/ip 기반의 app 을 개발하여 처리하기에는 한계가 있다. 

그래서 위와 같은 대용량에 특화된 솔루션을 이용하게 된다. 

 

Apache Kafka는 ZooKeeper라고 하는 컴포넌트들과 Cluster로 구성되어 있고 Kafka Cluster 내에는 여러개의 Broker들로 구성되어 있다

 

참고 : https://velog.io/@hyun6ik/Apache-Kafka-Broker-Zookeeper
참고 : https://www.linkedin.com/pulse/how-deploy-kafka-zookeeper-cluster-linux-based-operating-tiwari/

 

 

이 zookeeper 를 이용하던것이 초기 broker 관리 모델이고 현재는 자체 관리 모델을 사용한다. 

KRaft 프로토콜을 사용하는 Quorum Controller 이 그 자체 관리 모델이다. 

 

 

 

docker-compose.yml

networks:
  kafka_network:

volumes:
  Kafka00:
    driver: local
  Kafka01:
    driver: local
  Kafka02:
    driver: local

services:
  ##Kafka 00
  Kafka00Service:
    image: bitnami/kafka:latest
    restart: unless-stopped
    container_name: Kafka00Container
    ports:
      - '10000:9094'
    environment:
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
      # KRaft settings
      - KAFKA_CFG_BROKER_ID=0
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_KRAFT_CLUSTER_ID=HsDBs9l6UUmQq7Y5E6bNlw
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@Kafka00Service:9093,1@Kafka01Service:9093,2@Kafka02Service:9093
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      # Listeners
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://Kafka00Service:9092,EXTERNAL://127.0.0.1:10000
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
      # Clustering
      - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
      - KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
      - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
    networks:
      - kafka_network
    volumes:
      - "Kafka00:/bitnami/kafka"
  ##Kafka 01
  Kafka01Service:
    image: bitnami/kafka:latest
    restart: unless-stopped
    container_name: Kafka01Container
    ports:
      - '10001:9094'
    environment:
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
      # KRaft settings
      - KAFKA_CFG_BROKER_ID=1
      - KAFKA_CFG_NODE_ID=1
      - KAFKA_KRAFT_CLUSTER_ID=HsDBs9l6UUmQq7Y5E6bNlw
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@Kafka00Service:9093,1@Kafka01Service:9093,2@Kafka02Service:9093
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      # Listeners
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://Kafka01Service:9092,EXTERNAL://127.0.0.1:10001
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
      # Clustering
      - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
      - KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
      - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
    networks:
      - kafka_network
    volumes:
      - "Kafka01:/bitnami/kafka"
  ##Kafka 02
  Kafka02Service:
    image: bitnami/kafka:latest
    restart: unless-stopped
    container_name: Kafka02Container
    ports:
      - '10002:9094'
    environment:
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
      # KRaft settings
      - KAFKA_CFG_BROKER_ID=2
      - KAFKA_CFG_NODE_ID=2
      - KAFKA_KRAFT_CLUSTER_ID=HsDBs9l6UUmQq7Y5E6bNlw
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@Kafka00Service:9093,1@Kafka01Service:9093,2@Kafka02Service:9093
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      # Listeners
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://Kafka02Service:9092,EXTERNAL://127.0.0.1:10002
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
      # Clustering
      - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
      - KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
      - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
    networks:
      - kafka_network
    volumes:
      - "Kafka02:/bitnami/kafka"

  KafkaWebUiService:
    image: provectuslabs/kafka-ui:latest
    restart: unless-stopped
    container_name: KafkaWebUiContainer
    ports:
      - '8080:8080'
    environment:
      - KAFKA_CLUSTERS_0_NAME=Local-Kraft-Cluster
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=Kafka00Service:9092,Kafka01Service:9092,Kafka02Service:9092
      - DYNAMIC_CONFIG_ENABLED=true
      - KAFKA_CLUSTERS_0_AUDIT_TOPICAUDITENABLED=true
      - KAFKA_CLUSTERS_0_AUDIT_CONSOLEAUDITENABLED=true
      #- KAFKA_CLUSTERS_0_METRICS_PORT=9999
    networks:
      - kafka_network

 

그다음 아래 명령 실행

docker compose -f docker-compose.yml up -d

 

그리고 localhost:8080 으로 접속하면 다음과 같은 화면이 나온다. 

 

Brokers 를 확인해 보면 아래와 같다. 

 

정상적으로 설치된걸 확인 할 수 있다. 

 

이제 solution 을 하나 생성하고

producer 와 consumer 를 consol app project 로 만들어 추가 하자

그리고 각각에 다음 package 를 설치 하자 

dotnet add package Confluent.Kafka

 

아래는 producer 와 consumer 예제이다. 

Producer.cs

using Confluent.Kafka;

Console.WriteLine("Producer");

var config = new ProducerConfig
{
    BootstrapServers = "localhost:10001", // Kafka broker 주소
    ClientId = "my-producer"
};

using (var producer = new ProducerBuilder<Null, string>(config).Build())
{
    var topic = "my-topic"; // 원하는 토픽 이름
    for (int i = 0; i < 10; i++)
    {
        var message = new Message<Null, string>
        {
            Value = $"Hello, Kafka! Message {Guid.NewGuid().ToString("N")}"
        };

        producer.Produce(topic, message, deliveryReport =>
        {
            if (deliveryReport.Error.Code != ErrorCode.NoError)
            {
                Console.WriteLine($"Delivery failed: {deliveryReport.Error.Reason}");
            }
            else
            {
                Console.WriteLine($"Delivered message to {deliveryReport.TopicPartitionOffset}");
            }
        });
    }
    producer.Flush();

 

 

Consumer

using Confluent.Kafka;

Console.WriteLine("Consumer");
var config = new ConsumerConfig
{
    BootstrapServers = "localhost:10002", // Kafka broker 주소
    GroupId = "my-consumer-group", // Consumer Group ID
    AutoOffsetReset = AutoOffsetReset.Earliest // 메시지 읽기 시작 위치 (가장 처음부터)
};

using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
    var topic = "my-topic"; // 원하는 토픽 이름

    consumer.Subscribe(topic);

    while (true)
    {
        var consumeResult = consumer.Consume(TimeSpan.FromSeconds(1));
        if (consumeResult != null)
        {
            Console.WriteLine($"Received message: {consumeResult.Message.Value}");
        }
    }
}

 

위에 BootstraServers 주소를 Producer 는 1001 에 Consumer 는 1002 에 둔것을 확인하자.

Clustering 이 정상동작하는지 확인하기 위해 두 port 를 나누었고 

정상적으로 data 가 전달되는 것을 확인할 수 있다. 

 

 

실행결과

 

 

주의할점 cluster 관련 meta 를 가져오는 과정에서 초기에 10~30 초 가량의 delay 가 있을 수 있다. 

이 delay 를 줄일 수 있지만 만약 그렇게 한다면 consumer 의 갯수에 따라 kafka 에 많은 부하를 줄 수 있다. 

 

 

관련영상

https://youtu.be/u-1rvwnpUfA

 

- YouTube

 

www.youtube.com

 

반응형