나의개발일지
Kafka Docker에 올리기 본문
결과 화면
[Docker 결과 화면]
[PostMan 요청]
[InteliJ Consol]
폴더 구조
push-notification-project/
├── src/
│ ├── main/
│ │ ├── java/
│ │ │ └── com/
│ │ │ └── example/
│ │ │ └── notification/
│ │ │ ├── PushNotificationApplication.java
│ │ │ ├── config/
│ │ │ │ └── KafkaConfig.java
│ │ │ ├── consumer/
│ │ │ │ └── KafkaConsumerService.java
│ │ │ ├── producer/
│ │ │ │ └── KafkaProducerService.java
│ │ │ └── model/
│ │ │ └── NotificationMessage.java
│ │ └── resources/
│ │ └── application.yml
├── docker/
│ └── kafka/
│ └── docker-compose.yml
└── README.md
설정파일
[appication.yml]
spring:
autoconfigure:
exclude:
- org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: notification-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
autoconfigure : 현재는 kafka 환경 테스트 환경이므로 DB 설정은 하지 않았다.
consumer :
- bootstrap-server : Kafka 클러스터의 주소를 설정한다. 브로커 서버 목록을 지정하며, 지금은 localhost:9092로 설정
- group-id : Kafka 소비자 그룹의 ID를 설정, 같은 group-id를 가진 소비자들은 메시지를 나누어 처리
- key-deserializer : Kafka가 수신한 메시지의 키를 역직렬화 하는데 사용되는 클래스 지정
- value-deserializer : Kafka가 수신한 메시지의 값을 역직렬화 하는데 사용되는 클래스 지정
producer :
- bootstrap-server : kafka 프로듀서가 메시지를 전송할 Kafka 클러스터의 브로커 주소를 지정
- key-serializer : kafka에 메시지를 전송할 때 메시지의 키를 직렬화 하는데 사용
- value-serializer : kafka에 메시지를 전송할 때 메시지의 값을 직렬화 하는데 사용
[docker-compose.yml]
version: '3.8'
services:
zookeeper:
image: bitnami/zookeeper:latest
ports:
- "2181:2181"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
networks:
- kafka-network
kafka:
image: bitnami/kafka:latest
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_LISTENERS: PLAINTEXT://:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
ALLOW_PLAINTEXT_LISTENER: "yes"
networks:
- kafka-network
depends_on:
- zookeeper
networks:
kafka-network:
driver: bridge
service : 여러개의 서비스를 정의한다.
zookeeper :
- image : 최신버전의 zookeeper를 사용
- ports : 컨테이너의 포트를 호스트의 포트와 매핑한다. zookeeper는 기본적으로 2181 포트를 사용한다
- environment : 컨테이너의 환경 변수 설정 현재는 익명의 로그인을 허용한 상태
- networks : 컨테이너가 연결될 네트워크를 지정한다
kafka :
image : 사용할 Docker 이미지 최신버전의 Kafka이미지를 사용
ports : 컨테이너의 포트를 호스트의 포트와 매핑 kafka는 기본적으로 9092포트를 사용
environment : kafka의 환경 변수를 설정한다.
- KAFKA_BROKER_ID: Kafka 브로커의 ID를 설정합니다. 클러스터 내에서 각 브로커를 식별하는 데 사용됩니다.
- KAFKA_LISTENERS: Kafka가 수신 대기할 주소와 포트를 설정합니다. PLAINTEXT://:9092는 모든 네트워크 인터페이스의 9092 포트에서 수신합니다.
- KAFKA_ADVERTISED_LISTENERS: Kafka 클라이언트가 브로커에 연결할 때 사용할 주소를 설정합니다. PLAINTEXT://localhost:9092로 설정하여 클라이언트가 localhost:9092를 통해 Kafka에 연결하도록 합니다.
- KAFKA_ZOOKEEPER_CONNECT: Zookeeper의 주소를 설정합니다. Kafka는 Zookeeper를 사용하여 메타데이터를 저장하고 클러스터를 관리합니다. zookeeper:2181로 설정하여 zookeeper라는 서비스 이름과 2181 포트를 통해 연결합니다.
- ALLOW_PLAINTEXT_LISTENER: 평문 연결을 허용하는 설정입니다. "yes"로 설정하여 보안되지 않은 연결을 허용합니다.
networks: Kafka 컨테이너가 연결될 네트워크를 지정합니다. kafka-network라는 사용자 정의 네트워크에 연결됩니다.
depends_on: Kafka 컨테이너가 Zookeeper 컨테이너에 의존하고 있음을 나타냅니다. Zookeeper가 시작된 후에 Kafka 컨테이너가 시작됩니다.
컨트롤러
[NotificationController.java]
package com.example.notification.controller;
import com.example.notification.producer.KafkaProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class NotificationController {
private final KafkaProducerService kafkaProducerService;
@Autowired
public NotificationController(KafkaProducerService kafkaProducerService) {
this.kafkaProducerService = kafkaProducerService;
}
@PostMapping("/notification")
public String notification(@RequestBody String message) {
kafkaProducerService.sendMessage(message);
return message;
}
}
/notification 엔드포인트로 메시지를 보내는 컨트롤러
서비스 ( Producer )
[KafkaProducerService.java]
package com.example.notification.producer;
import lombok.NoArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducerService {
private static final String TOPIC = "notification_topic";
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
public void sendMessage(String message) {
kafkaTemplate.send(TOPIC, message);
}
}
Kafkatemplate를 Spring 컨테이너에서 주입받아서 메세지를 전송하는 비즈니스 로직 부분
서비스 ( Consumer )
[KafkaConsumerService.java]
package com.example.notification.consumer;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerService {
@KafkaListener(topics = "notification_topic", groupId = "notification-group")
public void consume(String message) {
System.out.println("Received Message: " + message);
}
}
@KafkaListener 어노테이션을 사용하여 Kafka에서 메시지를 수신한다.
Topic 과 groupId를 지정해줘서 어디에서 메시지를 가져올지 설정한다.
위의 코드에서는 "notification_topic" 이라는 토픽에서 메시지를 가져온다.
'Infra > Kafka' 카테고리의 다른 글
Kafka CDC설정 (debezium + mysql + apache kafka) (0) | 2024.09.23 |
---|---|
Apache Kafka (Consumer Groups, Consumer Offset) (1) | 2024.09.03 |
Apache Kafka (소개, 특징, 기본용어) (3) | 2024.09.01 |