使用Debezium获取SQL Server的更改数据

首先

我之前写过一篇关于如何使用Debezium获取MySQL更改数据的文章。接下来我们来了解一下如何使用Debezium获取SQL Server的更改数据的方法。

我参考了以下两个链接:
– [https://github.com/debezium/debezium-examples/tree/master/tutorial#using-sql-server](https://github.com/debezium/debezium-examples/tree/master/tutorial#using-sql-server)
– [https://debezium.io/documentation/reference/0.9/connectors/sqlserver.html](https://debezium.io/documentation/reference/0.9/connectors/sqlserver.html)

执行环境

AWS EC2 Linux2
需要事先安装 Docker。

安装

由于本次提供了文件,所以首先需要进行文件下载。

curl -O https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/register-sqlserver.json
curl -O https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/docker-compose-sqlserver.yaml
curl -O https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-sqlserver-init/inventory.sql
    • register-sqlserver.json:SQL Serverのコネクタインスタンスを記載するファイル

 

    • docker-compose-sqlserver.yaml:zookeeper、kafka、SQL Server、connectのコンテナを起動するためのファイル

 

    debezium-sqlserver-init/inventory.sql:SQL Server内にDB、テーブル、データを作成・登録するためのファイル

启动所有容器

启动Zookeeper、Kafka、SQL Server和Connect容器。

export DEBEZIUM_VERSION=0.9
docker-compose -f docker-compose-sqlserver.yaml up

在Docker ps中确认已启动。

CONTAINER ID        IMAGE                                        COMMAND                  CREATED             STATUS              PORTS                                                                                        NAMES
899bba830789        debezium/connect:0.9                         "/docker-entrypoint.…"   2 minutes ago       Up 2 minutes        8778/tcp, 9092/tcp, 0.0.0.0:8083->8083/tcp, 9779/tcp                                         ec2-user_connect_1
4fabee9ff69e        debezium/kafka:0.9                           "/docker-entrypoint.…"   2 minutes ago       Up 2 minutes        8778/tcp, 9779/tcp, 0.0.0.0:9092->9092/tcp                                                   ec2-user_kafka_1
73765146588s        debezium/zookeeper:0.9                       "/docker-entrypoint.…"   2 minutes ago       Up 2 minutes        0.0.0.0:2181->2181/tcp, 0.0.0.0:2888->2888/tcp, 8778/tcp, 0.0.0.0:3888->3888/tcp, 9779/tcp   ec2-user_zookeeper_1
cf0b6e715693        microsoft/mssql-server-linux:2017-CU9-GDR2   "/opt/mssql/bin/sqls…"   2 minutes ago       Up 2 minutes        0.0.0.0:1433->1433/tcp                                                                       ec2-user_sqlserver_1

将DB、表和数据注册到SQL Server中。

cat inventory.sql | docker exec -i ec2-user_sqlserver_1 bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'

您可以通过以下命令确认数据已经插入成功。

docker exec -i ec2-user_sqlserver_1 bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD -q "use testDB; select * from customers;"'

配置 SQL Server 连接器

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-sqlserver.json

register-sqlserver.json文件具有以下结构。

{
  "name": "inventory-connector",  (1)
  "config": {
    "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", (2)
    "database.hostname": "192.168.99.100", (3)
    "database.port": "1433", (4)
    "database.user": "sa", (5)
    "database.password": "Password!", (6)
    "database.dbname": "testDB", (7)
    "database.server.name": "fullfillment", (8)
    "table.whitelist": "dbo.customers", (9)
    "database.history.kafka.bootstrap.servers": "kafka:9092", (10)
    "database.history.kafka.topic": "dbhistory.fullfillment" (11)
  }
}
    • (1) Kafka Connectサービスに登録するときのコネクタ名称。

 

    • (2) SQL Serverコネクタクラス名称。

 

    • (3) SQL Serverインスタンスのアドレス。

 

    • (4) SQL Serverインスタンスのポート番号(1433)。

 

    • (5) SQL Serverユーザー名称。

 

    • (6) SQL Serverユーザーパスワード。

 

    • (7) 変更をキャプチャするデータベース名称。

 

    • (8) SQL Serverインスタンス/クラスターの論理名。

 

    • (9) Debeziumがキャプチャする必要がある変更を含むすべてのテーブルのリスト

 

    • (10) コネクターがデータベース履歴トピックへのDDLステートメントの書き込みおよび回復に使用するKafkaブローカーリスト。

 

    (11) コネクターがDDLステートメントを作成およびリカバリーするデータベース履歴トピックの名前。

请参考下列资料。

安装已经完成。

获取变更消息

我们将尝试以实际的JSON格式捕捉更改。

docker-compose -f docker-compose-sqlserver.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server kafka:9092 \
    --from-beginning \
    --property print.key=true \
    --topic server1.dbo.customers

客户数据已经成功插入到customers表中,并以JSON格式显示。

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"server1.dbo.customers.Key"},"payload":{"id":1001}}                                                                            {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"server1.dbo.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"server1.dbo.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":true,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"string","optional":true,"field":"change_lsn"},{"type":"string","optional":true,"field":"commit_lsn"},{"type":"boolean","optional":true,"field":"snapshot"}],"optional":false,"name":"io.debezium.connector.sqlserver.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"server1.dbo.customers.Envelope"},"payload":{"before":null,"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"},"source":{"version":"0.9.5.Final","connector":"sqlserver","name":"server1","ts_ms":1569814118560,"change_lsn":null,"commit_lsn":"00000027:00000078:0001","snapshot":true},"op":"r","ts_ms":1569814118560}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"server1.dbo.customers.Key"},"payload":{"id":1002}}                                                                            {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"server1.dbo.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"server1.dbo.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":true,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"string","optional":true,"field":"change_lsn"},{"type":"string","optional":true,"field":"commit_lsn"},{"type":"boolean","optional":true,"field":"snapshot"}],"optional":false,"name":"io.debezium.connector.sqlserver.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"server1.dbo.customers.Envelope"},"payload":{"before":null,"after":{"id":1002,"first_name":"George","last_name":"Bailey","email":"gbailey@foobar.com"},"source":{"version":"0.9.5.Final","connector":"sqlserver","name":"server1","ts_ms":1569814118561,"change_lsn":null,"commit_lsn":"00000027:00000078:0001","snapshot":true},"op":"r","ts_ms":1569814118561}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"server1.dbo.customers.Key"},"payload":{"id":1003}}                                                                            {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"server1.dbo.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"server1.dbo.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":true,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"string","optional":true,"field":"change_lsn"},{"type":"string","optional":true,"field":"commit_lsn"},{"type":"boolean","optional":true,"field":"snapshot"}],"optional":false,"name":"io.debezium.connector.sqlserver.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"server1.dbo.customers.Envelope"},"payload":{"before":null,"after":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com"},"source":{"version":"0.9.5.Final","connector":"sqlserver","name":"server1","ts_ms":1569814118561,"change_lsn":null,"commit_lsn":"00000027:00000078:0001","snapshot":true},"op":"r","ts_ms":1569814118561}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"server1.dbo.customers.Key"},"payload":{"id":1004}}                                                                            {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"server1.dbo.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"server1.dbo.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":true,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"string","optional":true,"field":"change_lsn"},{"type":"string","optional":true,"field":"commit_lsn"},{"type":"boolean","optional":true,"field":"snapshot"}],"optional":false,"name":"io.debezium.connector.sqlserver.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"server1.dbo.customers.Envelope"},"payload":{"before":null,"after":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"},"source":{"version":"0.9.5.Final","connector":"sqlserver","name":"server1","ts_ms":1569814118561,"change_lsn":null,"commit_lsn":"00000027:00000078:0001","snapshot":true},"op":"r","ts_ms":1569814118561}}

另外,我們將對顧客進行更新。
將 id=1004 的顧客的 first_name 更改為「Anne→123」。

docker exec -i ec2-user_sqlserver_1 bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD -q "use testDB; update customers set first_name = 123 where id=1004;"'

更改已捕捉。

{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"server1.dbo.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"server1.dbo.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":true,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"string","optional":true,"field":"change_lsn"},{"type":"string","optional":true,"field":"commit_lsn"},{"type":"boolean","optional":true,"field":"snapshot"}],"optional":false,"name":"io.debezium.connector.sqlserver.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"server1.dbo.customers.Envelope"},"payload":{"before":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"},"after":{"id":1004,"first_name":"123","last_name":"Kretchmar","email":"annek@noanswer.org"},"source":{"version":"0.9.5.Final","connector":"sqlserver","name":"server1","ts_ms":1569825287100,"change_lsn":"00000028:00003640:0002","commit_lsn":"00000028:00003640:0003","snapshot":false},"op":"u","ts_ms":1569825292094}}

抽取更改部分。我发现名字发生了变化。

    "payload": {
        "before": {
            "id": 1004,
            "first_name": "Anne",
            "last_name": "Kretchmar",
            "email": "annek@noanswer.org"
        },
        "after": {
            "id": 1004,
            "first_name": "123",
            "last_name": "Kretchmar",
            "email": "annek@noanswer.org"
        },

最后

由于确认完成,将删除容器。

docker-compose -f docker-compose-sqlserver.yaml down
bannerAds