使用Debezium从MySQL中获取变更数据

首先

Debezium 是一个可以将数据库的更改转换为事件流的分布式平台。例如,您可以以行级别检查对 MySQL 的更新(以 JSON 格式)的情况。

~省略~
  "payload": {
    "before": {
      "id": 100,
      "first_name": "Oliver",
      "last_name": "Jack",
      "email": "jack@test.com"
    },
    "after": {
      "id": 100,
      "first_name": "Oliver",
      "last_name": "Harry",
      "email": "harry@test.com"
    }
~省略~

Debezium 是在 Apache Kafka 基础上实现的,它将数据变更的历史记录在 Kafka 日志中。即使应用程序停止,也可以通过重新启动来确认从中断的事件中的变更。
此外,它还支持多个数据库,包括 MySQL、PostgreSQL、MongoDB、SQL Server、Oracle 等。

因为有教程,所以我尝试了一下。
https://debezium.io/documentation/reference/0.9/tutorial.html

由于教程内容较长,我打算尽可能地简化写作。
该教程涉及获取 MySQL 的变更数据内容。

执行环境

在使用 AWS EC2 Linux2 之前,需要先安装 Docker。

安装

Debezium有三个主要组件,分别是ZooKeeper、Kafka和Debezium连接器服务。为了确认其功能,每次使用docker run命令都需要启动一个新的终端。通过在docker run命令中指定-it选项,可以在终端上以交互模式显示内容。此外,通过指定–rm选项,当教程结束并停止时,容器将被删除。

启动动物园管理员

Zookeeper是用于构建分布式处理系统的中间件。
Kafka可以在多个Kafka服务器上组成集群,而Zookeeper用于协调各个Kafka服务器。

docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:0.9

卡夫卡启动

Kafka 是一种分布式消息队列,它是用于临时数据存储的中间件。

docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:0.9

启动MySQL

MySQL 是一种关系型数据库。
本次我们会使用已经创建好了的表等,用于教程演示。

docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.9

启动MySQL命令行客户端

为了操作MySQL,我们启动命令行客户端。

docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'

在 MySQL 命令行客户端中,执行命令 “use inventory;”,然后执行命令 “show tables;”,您将发现表已经存在。

开启 Kafka Connect

Kafka Connect是用于在Kafka和周边系统之间进行流数据交换的通信协议、库和工具。

docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:0.9

运行curl-h”Accept:application/json”localhost:8083/如果返回值,则说明Kafka Connect REST API已经启动。

MySQL 监控 (MySQL

设置从连接器到DB和Kafka连接所需的信息。

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'

确保显示HTTP/1.1 201 Created。

确认已通过命令进行设置。

curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/inventory-connector

以上即为设置完成。

資料修改时的操作确认

我会尝试修改MySQL的数据并查看是否可以获取到更改后的内容。

为了检查 Kafka 是否收到了消息,我们将启动一个监听器。

docker run -it --name watcher --rm --link zookeeper:zookeeper --link kafka:kafka debezium/kafka:0.9 watch-topic -a -k dbserver1.inventory.customers

由于在初次连接时需要读取自表格创建以来的所有更改,因此所有数据都将显示为更改数据。从下一次开始,将从已读取的日志继续读取。

WARNING: Using default BROKER_ID=1, which is valid only for non-clustered installations.
Using ZOOKEEPER_CONNECT=172.17.0.2:2181
Using KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.0.7:9092
Using KAFKA_BROKER=172.17.0.3:9092
Contents of topic dbserver1.inventory.customers:
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1001}}        {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":true,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"default":false,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope"},"payload":{"before":null,"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"},"source":{"version":"0.9.5.Final","connector":"mysql","name":"dbserver1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers","query":null},"op":"c","ts_ms":1569744224564}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1002}}        {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":true,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"default":false,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope"},"payload":{"before":null,"after":{"id":1002,"first_name":"George","last_name":"Bailey","email":"gbailey@foobar.com"},"source":{"version":"0.9.5.Final","connector":"mysql","name":"dbserver1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers","query":null},"op":"c","ts_ms":1569744224564}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1003}}        {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":true,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"default":false,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope"},"payload":{"before":null,"after":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com"},"source":{"version":"0.9.5.Final","connector":"mysql","name":"dbserver1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers","query":null},"op":"c","ts_ms":1569744224564}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1004}}        {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":true,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"default":false,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope"},"payload":{"before":null,"after":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"},"source":{"version":"0.9.5.Final","connector":"mysql","name":"dbserver1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers","query":null},"op":"c","ts_ms":1569744224564}}

数据更新

我将尝试更新数据。

use inventory;
UPDATE customers SET first_name='Anne Marie' WHERE id=1004;
{
“schema”: {
“type”: “struct”,
“fields”: [
{
“type”: “struct”,
“fields”: [
{
“type”: “int32”,
“optional”: false,
“field”: “id”
},
{
“type”: “string”,
“optional”: false,
“field”: “first_name”
},
{
“type”: “string”,
“optional”: false,
“field”: “last_name”
},
{
“type”: “string”,
“optional”: false,
“field”: “email”
}
],
“optional”: true,
“name”: “dbserver1.inventory.customers.Value”,
“field”: “before”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “int32”,
“optional”: false,
“field”: “id”
},
{
“type”: “string”,
“optional”: false,
“field”: “first_name”
},
{
“type”: “string”,
“optional”: false,
“field”: “last_name”
},
{
“type”: “string”,
“optional”: false,
“field”: “email”
}
],
“optional”: true,
“name”: “dbserver1.inventory.customers.Value”,
“field”: “after”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “string”,
“optional”: true,
“field”: “version”
},
{
“type”: “string”,
“optional”: true,
“field”: “connector”
},
{
“type”: “string”,
“optional”: false,
“field”: “name”
},
{
“type”: “int64”,
“optional”: false,
“field”: “server_id”
},
{
“type”: “int64”,
“optional”: false,
“field”: “ts_sec”
},
{
“type”: “string”,
“optional”: true,
“field”: “gtid”
},
{
“type”: “string”,
“optional”: false,
“field”: “file”
},
{
“type”: “int64”,
“optional”: false,
“field”: “pos”
},
{
“type”: “int32”,
“optional”: false,
“field”: “row”
},
{
“type”: “boolean”,
“optional”: true,
“default”: false,
“field”: “snapshot”
},
{
“type”: “int64”,
“optional”: true,
“field”: “thread”
},
{
“type”: “string”,
“optional”: true,
“field”: “db”
},
{
“type”: “string”,
“optional”: true,
“field”: “table”
},
{
“type”: “string”,
“optional”: true,
“field”: “query”
}
],
“optional”: false,
“name”: “io.debezium.connector.mysql.Source”,
“field”: “source”
},
{
“type”: “string”,
“optional”: false,
“field”: “op”
},
{
“type”: “int64”,
“optional”: true,
“field”: “ts_ms”
}
],
“optional”: false,
“name”: “dbserver1.inventory.customers.Envelope”
},
“payload”: {
“before”: {
“id”: 1004,
“first_name”: “Anne”,
“last_name”: “Kretchmar”,
“email”: “annek@noanswer.org”
},
“after”: {
“id”: 1004,
“first_name”: “Anne Marie”,
“last_name”: “Kretchmar”,
“email”: “annek@noanswer.org”
},
“source”: {
“version”: “0.9.5.Final”,
“connector”: “mysql”,
“name”: “dbserver1”,
“server_id”: 223344,
“ts_sec”: 1569745138,
“gtid”: null,
“file”: “mysql-bin.000003”,
“pos”: 364,
“row”: 0,
“snapshot”: false,
“thread”: 2,
“db”: “inventory”,
“table”: “customers”,
“query”: null
},
“op”: “u”,
“ts_ms”: 1569745138230
}
}出力結果:
{
“schema”: {
“type”: “struct”,
“fields”: [
{
“type”: “struct”,
“fields”: [
{
“type”: “int32”,
“optional”: false,
“field”: “id”
},
{
“type”: “string”,
“optional”: false,
“field”: “first_name”
},
{
“type”: “string”,
“optional”: false,
“field”: “last_name”
},
{
“type”: “string”,
“optional”: false,
“field”: “email”
}
],
“optional”: true,
“name”: “dbserver1.inventory.customers.Value”,
“field”: “before”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “int32”,
“optional”: false,
“field”: “id”
},
{
“type”: “string”,
“optional”: false,
“field”: “first_name”
},
{
“type”: “string”,
“optional”: false,
“field”: “last_name”
},
{
“type”: “string”,
“optional”: false,
“field”: “email”
}
],
“optional”: true,
“name”: “dbserver1.inventory.customers.Value”,
“field”: “after”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “string”,
“optional”: true,
“field”: “version”
},
{
“type”: “string”,
“optional”: true,
“field”: “connector”
},
{
“type”: “string”,
“optional”: false,
“field”: “name”
},
{
“type”: “int64”,
“optional”: false,
“field”: “server_id”
},
{
“type”: “int64”,
“optional”: false,
“field”: “ts_sec”
},
{
“type”: “string”,
“optional”: true,
“field”: “gtid”
},
{
“type”: “string”,
“optional”: false,
“field”: “file”
},
{
“type”: “int64”,
“optional”: false,
“field”: “pos”
},
{
“type”: “int32”,
“optional”: false,
“field”: “row”
},
{
“type”: “boolean”,
“optional”: true,
“default”: false,
“field”: “snapshot”
},
{
“type”: “int64”,
“optional”: true,
“field”: “thread”
},
{
“type”: “string”,
“optional”: true,
“field”: “db”
},
{
“type”: “string”,
“optional”: true,
“field”: “table”
},
{
“type”: “string”,
“optional”: true,
“field”: “query”
}
],
“optional”: false,
“name”: “io.debezium.connector.mysql.Source”,
“field”: “source”
},
{
“type”: “string”,
“optional”: false,
“field”: “op”
},
{
“type”: “int64”,
“optional”: true,
“field”: “ts_ms”
}
],
“optional”: false,
“name”: “dbserver1.inventory.customers.Envelope”
},
“payload”: {
“before”: {
“id”: 1004,
“first_name”: “Anne”,
“last_name”: “Kretchmar”,
“email”: “annek@noanswer.org”
},
“after”: {
“id”: 1004,
“first_name”: “Anne Marie”,
“last_name”: “Kretchmar”,
“email”: “annek@noanswer.org”
},
“source”: {
“version”: “0.9.5.Final”,
“connector”: “mysql”,
“name”: “dbserver1”,
“server_id”: 223344,
“ts_sec”: 1569745138,
“gtid”: null,
“file”: “mysql-bin.000003”,
“pos”: 364,
“row”: 0,
“snapshot”: false,
“thread”: 2,
“db”: “inventory”,
“table”: “customers”,
“query”: null
},
“op”: “u”,
“ts_ms”: 1569745138230
}
}

由于实行结果过长,只摘录了部分。可以看出已获取到变更前后的数据。

    "payload": {
        "before": {
            "id": 1004,
            "first_name": "Anne",
            "last_name": "Kretchmar",
            "email": "annek@noanswer.org"
        },
        "after": {
            "id": 1004,
            "first_name": "Anne Marie",
            "last_name": "Kretchmar",
            "email": "annek@noanswer.org"
        },

最后

完成教程后,根据需要停止在Docker中启动的每个服务。
由于在运行docker时指定了选项–rm,所以停止后将会被删除。

docker stop mysqlterm watcher connect mysql kafka zookeeper

确认没有正在运行的进程然后完成。

docker ps -a
广告
将在 10 秒后关闭
bannerAds