도커로 Kafka 실습 로컬 서버 환경 구축하기

도커를 통한 Kafka 실습 환경 구축

들어가기 전에

  • 해당 포스팅은 로컬 환경에서 Kafka 동작을 테스트하기 위해 Kafka 서버 환경을 구축해 본 내용을 담았다.
  • 본 구축을 하기 전에 우선 도커가 로컬 환경에 설치되어 있어야 한다.

1. docker-compose.yml 작성 및 도커 이미지 실행

해당 실습에서 Kafka image는 현재 가장 많이 사용하는 wurstmeister/kafka를 사용해 구축해 볼 것이다. wurstmeister Github에서 릴리즈 버전을 확인할 수 있고, 튜토리얼 문서 또한 확인해볼 수 있다. (wurstmeister/kafka github / wurstmeister/kafka 튜토리얼)

version: "3.6"

networks:
  kafka_network: # zookeeper, kafka, cmak을 한 네트워크로 묶기 위함

services:
  zookeeper:
    container_name: local-zookeeper
    image: zookeeper:3.5.8 # CMAK을 사용하려면 zookeeper 3.5.x 이상의 버전을 사용해야 함
    ports:
      - "2181:2181"
    networks:
      - kafka_network
  kafka:
    container_name: local-kafka
    image: wurstmeister/kafka:2.13-2.8.1
    depends_on:
      - zookeeper # zookeeper가 실행된 이후 kafka 실행
    ports:
      - "9092:9092"
    environment:
      JMX_PORT: 9093  # CMAK 이용을 위해 추가해야 할 설정
      KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote=true
                      -Dcom.sun.management.jmxremote.authenticate=false
                      -Dcom.sun.management.jmxremote.ssl=false
                      -Djava.rmi.server.hostname=127.0.0.1
                      -Dcom.sun.management.jmxremote.rmi.port=9393
                      -Djava.net.preferIPv4Stack=true
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:9092
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
    networks:
      - kafka_network
  kafka_manager:
    container_name: cmak-kafka-manager
    image: hlebalbau/kafka-manager:stable # CMAK image
    restart: on-failure
    ports:
      - "9000:9000"
    depends_on:
      - kafka
      - zookeeper
    environment:
      ZK_HOSTS: zookeeper:2181
      APPLICATION_SECRET: "random-secret"
    networks:
      - kafka_network


Apache zookeeper란 공개 분산형 구성 서비스, 동기 서비스 및 대용량 분산 시스템을 위한 네이밍 레지스트리를 제공하는 소프트웨어이다. Kafka가 메시지 큐 역할을 할 때 zookeeper가 메시지 큐로 들어오는 메시지들을 관리하는 역할을 담당한다. 해당 포스팅에서는 로컬 환경에서의 Kafka 서버 구축을 위한 것이므로 1대만 구성한다.

CMAK은 Yahoo에서 제작한 GUI 기반 Kafka 관리 도구로, 웹 환경에서 클러스터, 토픽 등의 생성이나 변경이 가능하도록 해주며 Consumer group을 확인할 수도 있게 해주어 Kafka 관리에 편의성을 높여주는 툴이다.

위와 같이 docker-compose.yml 파일을 작성한 후, 터미널로 해당 파일이 존재하는 디렉토리 경로에서 docker compose 실행 명령어로 이미지 다운로드 및 컨테이너 실행을 수행한다.

 > docker-compose up -d
  • -d: 도커 컨테이너 실행을 백그라운드에서 실행하도록 하는 명령어

명령어를 실행하면 문제가 없다면 정상적으로 이미지가 다운로드되고 컨테이너가 실행되는 것을 확인할 수 있다.

docker-compose-up-terminal

2. Kafka 클러스터 생성

CMAK을 접속하기 위해 compose 파일에 작성한 설정 포트로 접속한다.

cmak

Cluster 메뉴의 Add Cluster로 Kafka Cluster를 추가해줄 수 있다.

cmak-add-cluster

본 포스팅에서는 간단한 Kafka 메시징 테스트만 진행할 것이므로 나머지 Cluster 설정들은 기본값으로 두고, Cluster Zookeeper Hosts과 Kafka Version 설정만 입력해준다.
Cluster Zookeeper Hosts에는 compose 파일에 작성한 CMAK의 ZK_HOSTS 설정에 입력했던 내용을 입력하고, Kafka Version은 wurstmeister/kafka 문서를 살펴보면 버전을 <scalar version>-<kafka version> 이런 형식으로 작성하고 있다고 한다. 즉 compose 파일에 작성한 wurstmeister/kafka의 scalar version은 2.13으로 Kafka 2.4.0 버전까지 지원되는 버전이다.

cmak-cluster-setting

3. Kafka 토픽 생성 및 Pub/Sub 테스트

Kafka Cluster를 생성하고 해당 클러스터 내 토픽을 생성한다. 카프카의 메시지는 토픽으로 분류된다. 하나의 토픽에는 여러 개의 파티션으록 구성될 수 있고 메시지는 파티션에 추가되는 형태로 기록된다.

CMAK에서 생성한 Cluster로 들어온 후 Topic Create 메뉴로 접속해 토픽을 추가해줄 수 있다.

cmak-topic

토픽 생성 옵션으로 파티션 수와 Replication Factor 수를 설정해줄 수 있다. 주의할 점은 토픽의 파티션은 개수를 늘릴 수는 있지만, 줄일 수는 없다. 파티션을 제거하려면 제거할 파티션에 배치된 메시지 세그먼트를 재배치해줘야 하는데 이 기능은 복잡하고 리소스가 너무 많이 소요되는 작업으로 현재로써는 지원하고 있지 않기 때문이다.

따라서 토픽을 생성할 때는 파티션 수를 최소의 개수로 먼저 만든 후 운영 상황에 맞춰 점차 늘려갈 것을 추천한다.
로컬 환경에서는 그리 많은 파티션은 필요하지 않기 때문에 1개만 생성하도록 했다.

cmak-create-topic

Kafka 메시지 Pub/Sub 테스트를 하는 방법은 다양한 방법이 있겠지만, 나는 해당 kafka 토픽을 Publishing 및 Consuming 하는 애플리케이션을 작성하여 테스트했다.

우선 해당 토픽으로 메시지를 Producing 하는 메시지 Sender를 구현한다.

import org.springframework.kafka.core.KafkaTemplate;

@Component
@RequiredArgsConstructor
public final class KafkaMessageSender {
    private final KafkaTemplate kafkaTemplate;
    
    public void sendEventMessage(final EventRequest eventRequest) {
        String topic = "queuing.event.request";
        kafkaTemplate.send(topic, eventRequest);
    }
}


그 다음 해당 메시지를 Comsuming 하는 메시지 Listener를 구현한다.

@Component
@RequiredArgsConstructor
@Slf4j
public class KafkaMessageListener {
    @KafkaListener(topic = "queuing.event.request", autoStartup = "false")
    public void listenEventRequest(final EventRequest eventRequest, final Acknowledgment acknowledgment) {
        try {
            System.out.println("queue.event.request 컨슈밍");
        } catch (Exception e) {
            log.error("에러 로그 작성");
        } finally {
            acknowledgment.acknowledge();
        }
    }
}