나의개발일지

Kafka CDC설정 (debezium + mysql + apache kafka) 본문

Infra/Kafka

Kafka CDC설정 (debezium + mysql + apache kafka)

아. 이렇게 하면 될거 같은데.. 2024. 9. 23. 00:10
728x90



Kafka docker에 올리기

카프카는 브로커를 3개로 구성했고, kafka-ui를 통해 카프카 브로커들의 상태, 토픽의 저장된 내용을 쉽게 확인 할 수 있도록 설정하였다.

KAFKA_ADVERTISED_LISTENERS 설정에서 EXTERNAL 포트를 따로 두어 외부에서 (Local spring-boot 개발용) 접근이 가능하게 열어 두었다.

networks:
  webnet:
    driver: bridge

services:
  # Kafka
  kafka-00:
    image: apache/kafka:3.7.0
    ports:
      - "9092:9092"
    volumes:
      - ./docker-volume/kafka/secrets:/etc/kafka/secrets
      - ./docker-volume/kafka/config:/mnt/shared/config
    environment:
      CLUSTER_ID: "event-broker"
      KAFKA_NODE_ID: 1
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka-00:29092,2@kafka-01:29093,3@kafka-02:29094"
      KAFKA_LISTENERS: "PLAINTEXT://:19092,CONTROLLER://:29092,EXTERNAL://:9092"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka-00:19092,EXTERNAL://localhost:9092"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT'
      KAFKA_PROCESS_ROLES: 'broker,controller'
    networks:
      - webnet

  kafka-01:
    image: apache/kafka:3.7.0
    ports:
      - "9093:9093"
    volumes:
      - ./docker-volume/kafka/secrets:/etc/kafka/secrets
      - ./docker-volume/kafka/config:/mnt/shared/config
    environment:
      CLUSTER_ID: "event-broker"
      KAFKA_NODE_ID: 2
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka-00:29092,2@kafka-01:29093,3@kafka-02:29094"
      KAFKA_LISTENERS: "PLAINTEXT://:19093,CONTROLLER://:29093,EXTERNAL://:9093"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka-01:19093,EXTERNAL://localhost:9093"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT'
      KAFKA_PROCESS_ROLES: 'broker,controller'
    networks:
      - webnet

  kafka-02:
    image: apache/kafka:3.7.0
    ports:
      - "9094:9094"
    volumes:
      - ./docker-volume/kafka/secrets:/etc/kafka/secrets
      - ./docker-volume/kafka/config:/mnt/shared/config
    environment:
      CLUSTER_ID: "event-broker"
      KAFKA_NODE_ID: 3
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka-00:29092,2@kafka-01:29093,3@kafka-02:29094"
      KAFKA_LISTENERS: "PLAINTEXT://:19094,CONTROLLER://:29094,EXTERNAL://:9094"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka-02:19094,EXTERNAL://localhost:9094"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT'
      KAFKA_PROCESS_ROLES: 'broker,controller'
    networks:
      - webnet

  # Kafka UI
  # Just for monitoring...
  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    depends_on:
      - kafka-00
      - kafka-01
      - kafka-02
    ports:
      - "9000:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-00:19092,kafka-01:19093,kafka-02:19094
    networks:
      - webnet


  # Debezium
  debezium:
    image: debezium/connect:2.6
    ports:
      - "8083:8083"
    depends_on:
      - kafka-00
      - kafka-01
      - kafka-02
    environment:
      - BOOTSTRAP_SERVERS=kafka-00:19092,kafka-01:19093,kafka-02:19094
      - GROUP_ID=debezium-00
      - CONFIG_STORAGE_TOPIC=DEBEZIUM_CONNECT_CONFIGS
      - OFFSET_STORAGE_TOPIC=DEBEZIUM_CONNECT_OFFSETS
      - STATUS_STORAGE_TOPIC=DEBEZIUM_CONNECT_STATUSES
    networks:
      - webnet

MySQL docker에 올리기

  payment-mysql:
    image: mysql:8.0
    container_name: payment-mysql
    ports:
      - "3307:3306"
    environment:
      MYSQL_ROOT_PASSWORD: rootpassword
      TZ: UTC
      MYSQL_DATABASE: payment
      MYSQL_USER: test
      MYSQL_PASSWORD: password
    command:
      - --character-set-server=utf8mb4
      - --collation-server=utf8mb4_0900_ai_ci
    volumes:
      - payment-db-data:/var/lib/mysql
    networks:
      - webnet

 

 

각 데이터베이스의 유저의 binlog를 가져올 수 있는 권한을 부여한다.

GRANT SUPER, REPLICATION CLIENT ON *.* TO 'test'@'%';
FLUSH PRIVILEGES;

Debezium connector 설정

 

포스트맨으로 debezium 커넥터에 요청을 보내 커넥터 생성

<요청주소>

http://localhost:8083/connectors

{
    "name": "debezium-connector-payment",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "host.docker.internal",
        "database.port": "3307",
        "database.user": "test",
        "database.password": "password",
        "database.server.id": "1234563",
        "topic.prefix": "test_sample_payment",
        "table.include.list": "payment.test",
        "schema.history.internal.kafka.bootstrap.servers": "kafka-00:19092,kafka-01:19093,kafka-02:19094",
        "schema.history.internal.kafka.topic": "SCHEMA_CHANGES_PAYMENT",
        "snapshot.mode": "schema_only",
        "tombstones.on.delete": false,
        "database.allowPublicKeyRetrieval": true,
        "provide.transaction.metadata": true
    }
}

 

커넥터를 연결할 DB 정보를 입력한다.

        "database.port": "3307",
        "database.user": "test",
        "database.password": "password",

스키마 변경 감지 topic

        "schema.history.internal.kafka.topic": "SCHEMA_CHANGES_PAYMENT",

데이터 변경 감지 topic

        "topic.prefix": "test_sample_payment",

 

주의사항 : "name" 이랑 "database.server.id" 는 고유해야 한다.

 


결과 화면

총 4개의 db에 대한 카프카 CDC를 구성하였고, test 테이블에 대한 변경감지를 진행한다.

728x90
반응형

'Infra > Kafka' 카테고리의 다른 글

Apache Kafka (Consumer Groups, Consumer Offset)  (1) 2024.09.03
Kafka Docker에 올리기  (0) 2024.09.03
Apache Kafka (소개, 특징, 기본용어)  (3) 2024.09.01