개요

CDC(Change Data Capture)에 대해서 MYSQL과 Kafka를 통해서 알아보고 실제 구현을 통해서 어떻게 동작하는지 알아봅니다.

CDC란

CDC(Change Data Capture)는 데이터베이스의 변경 사항을 실시간으로 감지하고 이를 다른 시스템으로 전달하는 기술입니다.

img.png

Debezium

Debezium은 CDC를 위한 오픈 소스 플랫폼 중 하나로, Apache Kafka와 연동하여 사용됩니다. Debezium은 MySQL, PostgreSQL, MongoDB, Oracle 등 다양한 데이터베이스 시스템의 로그 파일을 모니터링하면서 데이터베이스에 발생하는 모든 변경 사항을 캡처하고 Kafka 토픽으로 이를 전달합니다. 이렇게 Kafka를 통해 전달된 데이터는 실시간 분석, 데이터 통합, 백업 등 다양한 목적으로 활용될 수 있습니다.

MYSQL과 Kafka CDC 구성

Debezium을 이용하여 MYSQL과 Kafka를 연동하는 방법에 대해서 알아보려고 합니다. 설명할 예제는 Debezium connector for MySQL 를 참고해서 설명하겠습니다.

사전준비

우선 설명할 예제는 Docker를 이용해서 구성할 예정이고 아래 컴포넌트들이 컨테이너로 구동됩니다.

  • Zookeeper
  • Kafka
  • Debezium Connect
  • MYSQL

MYSQL은 테스트를 위해서 데이터가 일부 들어가있고 데이터의 생성/변경/삭제가 이루어 졌을 때 이를 Kafka로 전달하는 것을 확인할 수 있도록 구성하겠습니다.

컨테이너 실행

아래 명령어를 통해서 컨테이너를 실행합니다.

 1# Zookeeper
 2docker run -d -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 quay.io/debezium/zookeeper:2.5
 3
 4# Kafka
 5docker run -d -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper quay.io/debezium/kafka:2.5
 6
 7# MYSQL
 8docker run -d -it --rm --name mysql -p 3306:3306 \
 9  -e MYSQL_ROOT_PASSWORD=debezium \
10  -e MYSQL_USER=mysqluser \
11  -e MYSQL_PASSWORD=mysqlpw \
12  quay.io/debezium/example-mysql:2.5
13
14# Connect
15docker run -d -it --rm --name connect -p 8083:8083 \
16  -e GROUP_ID=1 \
17  -e CONFIG_STORAGE_TOPIC=my_connect_configs \
18  -e OFFSET_STORAGE_TOPIC=my_connect_offsets \
19  -e STATUS_STORAGE_TOPIC=my_connect_statuses \
20  --link kafka:kafka \
21  --link mysql:mysql \
22  quay.io/debezium/connect:2.5

Connect 확인 및 등록

Debezium Connect가 정상적으로 실행되었는지 확인하고 MYSQL Connector를 등록합니다.

 1# 커넥터 서버 확인
 2curl -H "Accept:application/json" localhost:8083/
 3
 4# 커넥터 등록
 5curl -i -X POST \
 6  -H "Accept:application/json" \
 7  -H "Content-Type:application/json" \
 8  -d '{
 9        "name": "inventory-connector",
10        "config": {
11          "connector.class": "io.debezium.connector.mysql.MySqlConnector",
12          "tasks.max": "1",
13          "database.hostname": "mysql",
14          "database.port": "3306",
15          "database.user": "debezium",
16          "database.password": "dbz",
17          "database.server.id": "184054",
18          "topic.prefix": "dbserver1",
19          "database.include.list": "inventory",
20          "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
21          "schema.history.internal.kafka.topic": "schemahistory.inventory"
22        }
23      }' \
24  localhost:8083/connectors/
25  
26# 커넥터 등록 확인
27curl -H "Accept:application/json" localhost:8083/connectors/
28
29# 커넥터 정보 조회
30curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/inventory-connector

inventory-connector를 조회했을 때 나오는 값은 아래와 같습니다.

 1{
 2  "connector.class": "io.debezium.connector.mysql.MySqlConnector",
 3  "database.user": "debezium",
 4  "topic.prefix": "dbserver1",
 5  "schema.history.internal.kafka.topic": "schemahistory.inventory",
 6  "database.server.id": "184054",
 7  "tasks.max": "1",
 8  "database.hostname": "mysql",
 9  "database.password": "dbz",
10  "name": "inventory-connector",
11  "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
12  "database.port": "3306",
13  "database.include.list": "inventory"
14}

DB 변경 작업

MYSQL에 접속해서 데이터를 변경하면 Kafka로 데이터가 전달되는 것을 확인할 수 있습니다.

 1# 접속
 2docker run -it --rm --name mysqlterm --link mysql mysql:8.2 sh -c \
 3'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" \
 4-uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
 5
 6# DB 사용
 7use inventory;
 8
 9# 테이블 조회
10SELECT * FROM customers;
11
12# 특정 필드 업데이트
13UPDATE customers SET first_name='Anne Marie' WHERE id=1004;
14
15# 필드 삽입
16INSERT INTO customers VALUES (default, "Sarah", "Thompson", "kitt@acme.com");
17
18# 특정 필드 삭제
19DELETE FROM customers WHERE id = 1005;

Kafka 확인

Kafka 토픽에 의도한대로 데이터가 들어가 있는지 확인하는 방법은 여러가지가 있습니다. CLI로 간단하게 확인하는 방법은 kafkacat을 설치해서 사용하는 방법이 있지만 이번에는 AKHQ를 이용해서 확인해보겠습니다. AKHQ는 Kafka GUI로 토픽, 컨슈머 그룹, 스키마 레지스트리, 커넥트 등을 관리할 수 있는 도구입니다.

AKHQ 설치

AKHQ는 Docker를 이용해서 간단하게 설치할 수 있습니다.

 1docker run -d \
 2--link kafka:kafka --link connect:connect \
 3-p 8080:8080 \
 4  -e AKHQ_CONFIGURATION="akhq:
 5    connections:
 6      docker-kafka-server:
 7        properties:
 8          bootstrap.servers: 'kafka:9092'
 9        connect:
10          - name: 'connect'
11            url: 'http://connect:8083'" \
12  --name akhq \
13tchiotludo/akhq

AKHQ 접속

브라우저를 통해서 http://localhost:8080으로 접속하면 AKHQ 화면이 나타납니다.

img.png

토픽 확인

Topics 메뉴를 통해서 dbserver1.inventory.customers 토픽을 확인하면 데이터가 들어가 있는 것을 확인할 수 있습니다. 필드에서 payload 데이터를 확인해보면 아래처럼 명시가 되어있습니다.

img.png

 1{
 2    "before": {
 3      "id": 1004,
 4      "first_name": "Anne",
 5      "last_name": "Kretchmar",
 6      "email": "annek@noanswer.org"
 7    },
 8    "after": {
 9      "id": 1004,
10      "first_name": "Anne Marie",
11      "last_name": "Kretchmar",
12      "email": "annek@noanswer.org"
13    },
14    "source": {
15      "version": "2.5.4.Final",
16      "connector": "mysql",
17      "name": "dbserver1",
18      "ts_ms": 1713319992000,
19      "snapshot": "false",
20      "db": "inventory",
21      "sequence": null,
22      "table": "customers",
23      "server_id": 223344,
24      "gtid": null,
25      "file": "mysql-bin.000003",
26      "pos": 401,
27      "row": 0,
28      "thread": 12,
29      "query": null
30    },
31    "op": "u",
32    "ts_ms": 1713319992707,
33    "transaction": null
34}
  • op 필드를 통해서 어떤 작업이 이루어졌는지 확인할 수 있습니다.
    • u : update
    • c : create
    • d : delete
  • before/after 필드를 통해서 변경 전/후 데이터를 확인할 수 있습니다.
  • source 필드를 통해서 어떤 데이터베이스에서 어떤 테이블에서 변경이 일어났는지 확인할 수 있습니다.

정리

이번 포스팅에서는 CDC(Change Data Capture)에 대해서 MYSQL과 Kafka를 통해서 알아보았습니다. Debezium을 이용해서 MYSQL과 Kafka를 연동하는 방법에 대해서 알아보았고 실제로 데이터베이스에 변경이 일어났을 때 Kafka로 데이터가 전달되는 것을 확인했습니다. CDC는 데이터베이스의 변경 사항을 실시간으로 감지하고 이를 다른 시스템으로 전달하는 기술로 실시간 분석, 데이터 통합, 백업 등 다양한 목적으로 활용될 수 있습니다.