使用CDC(Debezium)将RDB(MySQL)的变更事件插入到Kafka中

总结

我会完成以下任务!

image.png

元ネタ

 

如果在以下列出的步骤中遇到了”不知道为什么不起作用”之类的问题,请参考上述Red Hat的教程。

环境构建概要概述

因为使用Docker,环境搭建变得轻松简单。

①在Windows上安装Docker桌面版
②启动Zookeeper
③启动Kafka
④启动MySQL数据库
⑤启动MySQL命令行客户端
⑥启动Debezium(KafkaConnect)
⑦启动事件监控(watch-topic)

在Windows上安装Docker桌面版本

请从以下链接获取并安装:
https://docs.docker.com/desktop/windows/install/

以下是免费提供的。其他则需要付费,请注意。
– 个人使用
– 小型企业
– 教育机构
– 非商业用途的开源项目

启动Zookeeper

以管理员身份启动Windows PowerShell

请执行以下命令。

docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 quay.io/debezium/zookeeper:1.9

如果想要确认进程,请启动另一个Windows PowerShell并执行以下命令。

docker ps

启动 Kafka

以管理员身份启动Windows PowerShell。

执行以下命令

docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper quay.io/debezium/kafka:1.9

启动MySQL数据库。

以管理员模式启动Windows PowerShell。

请执行以下命令

docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw quay.io/debezium/example-mysql:1.9

运行MySQL命令行客户端

以管理员模式启动Windows PowerShell。

执行以下命令

docker run -it --rm --name mysqlterm --link mysql --rm mysql:8.0 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'

确认数据库的内容

mysql> use inventory;
mysql> show tables;

请启动Debezium(Kafka Connect)。

以管理员模式启动Windows PowerShell

请执行以下命令。

docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql quay.io/debezium/connect:1.9

为了监视数据库,注册连接器。

启动KafkaConnect服务器的客户端。

image.png

请执行以下命令。

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.include.list": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'

启动事件监视(watch-topic)。

以管理员模式启动Windows PowerShell

请执行以下命令。

docker run -it --rm --name watcher --link zookeeper:zookeeper --link kafka:kafka quay.io/debezium/kafka:1.9 watch-topic -a -k dbserver1.inventory.customers

验证 1(更新)

在运行MySQL命令行客户端的终端中,执行以下操作

SELECT * FROM customers;
UPDATE customers SET first_name='Anne Marie' WHERE id=1004;

确认事件监视(watch-topic)
※可以确认first_name列的数据已更改为Anne Marie

 "before":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"}
, "after":{"id":1004,"first_name":"Anne Marie","last_name":"Kretchmar","email":"annek@noanswer.org"}

被验证的第二步(DDL)

在运行MySQL命令行客户端的终端中执行以下操作。

ALTER TABLE customers ADD (add_col_test text);

确认事件监视(watch-topic)→ 无响应

请执行以下内容的补充

UPDATE customers SET first_name='Anne Marie Hoge' WHERE id=1004;

确认事件监视(watch-topic)
可以确认是否添加了”add_col_test”列,以及是否将first_name列的数据更改为”Anne Marie Hoge”

"after":{"id":1004,"first_name":"Anne Marie Hoge","last_name":"Kretchmar","email":"annek@noanswer.org","test":null,"add_col_test":null}
bannerAds