나의개발일지
Kafka CDC설정 (debezium + mysql + apache kafka) 본문
728x90
Kafka docker에 올리기
카프카는 브로커를 3개로 구성했고, kafka-ui를 통해 카프카 브로커들의 상태, 토픽의 저장된 내용을 쉽게 확인 할 수 있도록 설정하였다.
KAFKA_ADVERTISED_LISTENERS 설정에서 EXTERNAL 포트를 따로 두어 외부에서 (Local spring-boot 개발용) 접근이 가능하게 열어 두었다.
networks:
webnet:
driver: bridge
services:
# Kafka
kafka-00:
image: apache/kafka:3.7.0
ports:
- "9092:9092"
volumes:
- ./docker-volume/kafka/secrets:/etc/kafka/secrets
- ./docker-volume/kafka/config:/mnt/shared/config
environment:
CLUSTER_ID: "event-broker"
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka-00:29092,2@kafka-01:29093,3@kafka-02:29094"
KAFKA_LISTENERS: "PLAINTEXT://:19092,CONTROLLER://:29092,EXTERNAL://:9092"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka-00:19092,EXTERNAL://localhost:9092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT'
KAFKA_PROCESS_ROLES: 'broker,controller'
networks:
- webnet
kafka-01:
image: apache/kafka:3.7.0
ports:
- "9093:9093"
volumes:
- ./docker-volume/kafka/secrets:/etc/kafka/secrets
- ./docker-volume/kafka/config:/mnt/shared/config
environment:
CLUSTER_ID: "event-broker"
KAFKA_NODE_ID: 2
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka-00:29092,2@kafka-01:29093,3@kafka-02:29094"
KAFKA_LISTENERS: "PLAINTEXT://:19093,CONTROLLER://:29093,EXTERNAL://:9093"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka-01:19093,EXTERNAL://localhost:9093"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT'
KAFKA_PROCESS_ROLES: 'broker,controller'
networks:
- webnet
kafka-02:
image: apache/kafka:3.7.0
ports:
- "9094:9094"
volumes:
- ./docker-volume/kafka/secrets:/etc/kafka/secrets
- ./docker-volume/kafka/config:/mnt/shared/config
environment:
CLUSTER_ID: "event-broker"
KAFKA_NODE_ID: 3
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka-00:29092,2@kafka-01:29093,3@kafka-02:29094"
KAFKA_LISTENERS: "PLAINTEXT://:19094,CONTROLLER://:29094,EXTERNAL://:9094"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka-02:19094,EXTERNAL://localhost:9094"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT'
KAFKA_PROCESS_ROLES: 'broker,controller'
networks:
- webnet
# Kafka UI
# Just for monitoring...
kafka-ui:
image: provectuslabs/kafka-ui:latest
depends_on:
- kafka-00
- kafka-01
- kafka-02
ports:
- "9000:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-00:19092,kafka-01:19093,kafka-02:19094
networks:
- webnet
# Debezium
debezium:
image: debezium/connect:2.6
ports:
- "8083:8083"
depends_on:
- kafka-00
- kafka-01
- kafka-02
environment:
- BOOTSTRAP_SERVERS=kafka-00:19092,kafka-01:19093,kafka-02:19094
- GROUP_ID=debezium-00
- CONFIG_STORAGE_TOPIC=DEBEZIUM_CONNECT_CONFIGS
- OFFSET_STORAGE_TOPIC=DEBEZIUM_CONNECT_OFFSETS
- STATUS_STORAGE_TOPIC=DEBEZIUM_CONNECT_STATUSES
networks:
- webnet
MySQL docker에 올리기
payment-mysql:
image: mysql:8.0
container_name: payment-mysql
ports:
- "3307:3306"
environment:
MYSQL_ROOT_PASSWORD: rootpassword
TZ: UTC
MYSQL_DATABASE: payment
MYSQL_USER: test
MYSQL_PASSWORD: password
command:
- --character-set-server=utf8mb4
- --collation-server=utf8mb4_0900_ai_ci
volumes:
- payment-db-data:/var/lib/mysql
networks:
- webnet
각 데이터베이스의 유저의 binlog를 가져올 수 있는 권한을 부여한다.
GRANT SUPER, REPLICATION CLIENT ON *.* TO 'test'@'%';
FLUSH PRIVILEGES;
Debezium connector 설정
포스트맨으로 debezium 커넥터에 요청을 보내 커넥터 생성
<요청주소>
http://localhost:8083/connectors
{
"name": "debezium-connector-payment",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "host.docker.internal",
"database.port": "3307",
"database.user": "test",
"database.password": "password",
"database.server.id": "1234563",
"topic.prefix": "test_sample_payment",
"table.include.list": "payment.test",
"schema.history.internal.kafka.bootstrap.servers": "kafka-00:19092,kafka-01:19093,kafka-02:19094",
"schema.history.internal.kafka.topic": "SCHEMA_CHANGES_PAYMENT",
"snapshot.mode": "schema_only",
"tombstones.on.delete": false,
"database.allowPublicKeyRetrieval": true,
"provide.transaction.metadata": true
}
}
커넥터를 연결할 DB 정보를 입력한다.
"database.port": "3307",
"database.user": "test",
"database.password": "password",
스키마 변경 감지 topic
"schema.history.internal.kafka.topic": "SCHEMA_CHANGES_PAYMENT",
데이터 변경 감지 topic
"topic.prefix": "test_sample_payment",
주의사항 : "name" 이랑 "database.server.id" 는 고유해야 한다.
결과 화면
총 4개의 db에 대한 카프카 CDC를 구성하였고, test 테이블에 대한 변경감지를 진행한다.
728x90
반응형
'Infra > Kafka' 카테고리의 다른 글
Apache Kafka (Consumer Groups, Consumer Offset) (1) | 2024.09.03 |
---|---|
Kafka Docker에 올리기 (0) | 2024.09.03 |
Apache Kafka (소개, 특징, 기본용어) (3) | 2024.09.01 |