나의개발일지

Kafka Docker에 올리기 본문

Infra/Kafka

Kafka Docker에 올리기

아. 이렇게 하면 될거 같은데.. 2024. 9. 3. 05:36
728x90



결과 화면

 

[Docker 결과 화면]

 

[PostMan 요청]

 

[InteliJ Consol]


폴더 구조

push-notification-project/
├── src/
│   ├── main/
│   │   ├── java/
│   │   │   └── com/
│   │   │       └── example/
│   │   │           └── notification/
│   │   │               ├── PushNotificationApplication.java
│   │   │               ├── config/
│   │   │               │   └── KafkaConfig.java
│   │   │               ├── consumer/
│   │   │               │   └── KafkaConsumerService.java
│   │   │               ├── producer/
│   │   │               │   └── KafkaProducerService.java
│   │   │               └── model/
│   │   │                   └── NotificationMessage.java
│   │   └── resources/
│   │       └── application.yml
├── docker/
│   └── kafka/
│       └── docker-compose.yml
└── README.md

설정파일

[appication.yml]

spring:
  autoconfigure:
    exclude:
      - org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      group-id: notification-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

 

autoconfigure : 현재는 kafka 환경 테스트 환경이므로 DB 설정은 하지 않았다.

 

consumer : 

  • bootstrap-server : Kafka 클러스터의 주소를 설정한다. 브로커 서버 목록을 지정하며, 지금은 localhost:9092로 설정
  • group-id : Kafka 소비자 그룹의 ID를 설정, 같은 group-id를 가진 소비자들은 메시지를 나누어 처리
  • key-deserializer : Kafka가 수신한 메시지의 키를 역직렬화 하는데 사용되는 클래스 지정
  • value-deserializer : Kafka가 수신한 메시지의 값을 역직렬화 하는데 사용되는 클래스 지정

producer :

  • bootstrap-server : kafka 프로듀서가 메시지를 전송할 Kafka 클러스터의 브로커 주소를 지정
  • key-serializer : kafka에 메시지를 전송할 때 메시지의 키를 직렬화 하는데 사용
  • value-serializer : kafka에 메시지를 전송할 때 메시지의 값을 직렬화 하는데 사용

 

[docker-compose.yml]

 

 

version: '3.8'

services:
  zookeeper:
    image: bitnami/zookeeper:latest
    ports:
      - "2181:2181"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
    networks:
      - kafka-network

  kafka:
    image: bitnami/kafka:latest
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_LISTENERS: PLAINTEXT://:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      ALLOW_PLAINTEXT_LISTENER: "yes"
    networks:
      - kafka-network
    depends_on:
      - zookeeper

networks:
  kafka-network:
    driver: bridge

 

service : 여러개의 서비스를 정의한다.

zookeeper : 

  • image : 최신버전의 zookeeper를 사용
  • ports : 컨테이너의 포트를 호스트의 포트와 매핑한다. zookeeper는 기본적으로 2181 포트를 사용한다
  • environment : 컨테이너의 환경 변수 설정 현재는 익명의 로그인을 허용한 상태
  • networks : 컨테이너가 연결될 네트워크를 지정한다

kafka :

image : 사용할 Docker 이미지 최신버전의 Kafka이미지를 사용

ports : 컨테이너의 포트를 호스트의 포트와 매핑 kafka는 기본적으로 9092포트를 사용

environment : kafka의 환경 변수를 설정한다.

  • KAFKA_BROKER_ID: Kafka 브로커의 ID를 설정합니다. 클러스터 내에서 각 브로커를 식별하는 데 사용됩니다.
  • KAFKA_LISTENERS: Kafka가 수신 대기할 주소와 포트를 설정합니다. PLAINTEXT://:9092는 모든 네트워크 인터페이스의 9092 포트에서 수신합니다.
  • KAFKA_ADVERTISED_LISTENERS: Kafka 클라이언트가 브로커에 연결할 때 사용할 주소를 설정합니다. PLAINTEXT://localhost:9092로 설정하여 클라이언트가 localhost:9092를 통해 Kafka에 연결하도록 합니다.
  • KAFKA_ZOOKEEPER_CONNECT: Zookeeper의 주소를 설정합니다. Kafka는 Zookeeper를 사용하여 메타데이터를 저장하고 클러스터를 관리합니다. zookeeper:2181로 설정하여 zookeeper라는 서비스 이름과 2181 포트를 통해 연결합니다.
  • ALLOW_PLAINTEXT_LISTENER: 평문 연결을 허용하는 설정입니다. "yes"로 설정하여 보안되지 않은 연결을 허용합니다.

networks: Kafka 컨테이너가 연결될 네트워크를 지정합니다. kafka-network라는 사용자 정의 네트워크에 연결됩니다.

depends_on: Kafka 컨테이너가 Zookeeper 컨테이너에 의존하고 있음을 나타냅니다. Zookeeper가 시작된 후에 Kafka 컨테이너가 시작됩니다.


컨트롤러

[NotificationController.java]

package com.example.notification.controller;

import com.example.notification.producer.KafkaProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class NotificationController {

    private final KafkaProducerService kafkaProducerService;

    @Autowired
    public NotificationController(KafkaProducerService kafkaProducerService) {
        this.kafkaProducerService = kafkaProducerService;
    }

    @PostMapping("/notification")
    public String notification(@RequestBody String message) {
        kafkaProducerService.sendMessage(message);
        return message;
    }
}

 

/notification 엔드포인트로 메시지를 보내는 컨트롤러

서비스 ( Producer )

[KafkaProducerService.java]

package com.example.notification.producer;

import lombok.NoArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerService {

    private static final String TOPIC = "notification_topic";

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    public void sendMessage(String message) {
        kafkaTemplate.send(TOPIC, message);
    }
}

 

Kafkatemplate를 Spring 컨테이너에서 주입받아서 메세지를 전송하는 비즈니스 로직 부분

 

서비스 ( Consumer )

[KafkaConsumerService.java]

package com.example.notification.consumer;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

    @KafkaListener(topics = "notification_topic", groupId = "notification-group")
    public void consume(String message) {
        System.out.println("Received Message: " + message);
    }
}

 

@KafkaListener 어노테이션을 사용하여 Kafka에서 메시지를 수신한다.

Topic 과 groupId를 지정해줘서 어디에서 메시지를 가져올지 설정한다.

위의 코드에서는 "notification_topic" 이라는 토픽에서 메시지를 가져온다.

728x90
반응형