微软生态圈相对比较封闭,SQLServer数据库作为微软系产品在互联网企业应用较少。但在传统的生产制造企业中大量的传统ERP软件以SQLServer作为数据存储载体。网上对于SQLServer的数据实时CDC案例较少。本篇文章将介绍如何通过Debezium+kafka-connnect将SQLServer实时捕获到kafka消息队列上。
SQLServer为实时更新数据同步提供了CDC机制,类似于Mysql的binlog,将数据更新操作维护到一张CDC表中。开启cdc的源表在插入INSERT、更新UPDATE和删除DELETE活动时会插入数据到日志表中。cdc通过捕获进程将变更数据捕获到变更表中,通过cdc提供的查询函数,可以捕获这部分数据。下图为SQLServer实现CDC机制:
图片源于微软官网
SQLServer 2012版本
统信UOS或其它linux发行版
DBeaver或其它数据可视化工具
本文默认SQLServer 2012版已正常安装。
2.1 创建测试数据库
在可视化工具里执行如下命令创建测试数据库
create database testdb;2.2 开启数据库CDC功能
use testdb;
EXEC sys.sp_cdc_enable_db;开启数据库CDC功能
如果想关闭CDC功能可使用如下命令:
use testdb;
exec sys.sp_cdc_disable_db;查询CDC功能是否开启
select * from sys.databases where is_cdc_enabled = 1;数据库CDC功能已开启
执行命令后,刚刚新建的testdb数据库出现在结果集里,就表示CDC功能开启成功。这时在testdb数据下会自动新建一个名为cdc用户,里面会有一些CDC的功能表。
自动创建cdc用户
2.3 新建监听测试表
SQLServer CDC监听的目标表一定要存在ID主键才能生效,没有主键则无法正常监听该表。
# 创建测试表
use testdb;
CREATE TABLE [dbo].[sales] ( [amt] int NULL,
[id] int NOT NULL,
[dept] varchar(100) COLLATE Chinese_PRC_CI_AS NULL,
CONSTRAINT [PK__sales] PRIMARY KEY CLUSTERED ([id]) WITH (PAD_INDEX = OFF,
STATISTICS_NORECOMPUTE = OFF,
IGNORE_DUP_KEY = OFF,
ALLOW_ROW_LOCKS = ON,
ALLOW_PAGE_LOCKS = ON
) ON
[PRIMARY] ) ON
[PRIMARY];
ALTER TABLE [dbo].[sales] SET(LOCK_ESCALATION = AUTO);
# 插入测试数据
insert into sales(id,dept,amt) values(1,'一部',12.3);
insert into sales(id,dept,amt) values(2,'二部',191);2.4 开启测试表CDC功能
exec sp_cdc_enable_table @source_schema='dbo', @source_name='sales', @role_name=null, @supports_net_changes = 1;2.5 验证测试表CDC功能开启
EXEC sys.sp_cdc_help_change_data_capture表CDC功能已开启
当刚刚创建的测试表sales进入查询结果集里,则表示此表的CDC监听功能已正常开启。
2.6 确认sqlserver agent已开启
虽然上面数据库和表的监听功能都已开启,但是SQLServer依赖于agent代理。所以要确认代理已正常开启。
EXEC master.dbo.xp_servicecontrol N'QUERYSTATE',N'SQLSERVERAGENT'代理正常
由于作者在前次教程中已详细演示kafka和debezium的安装方法,这里就不演示。接下来就略过截图而把命令行和步骤演示出来。有需要详细演示安装的同学可以前往作者前一篇文章阅读安装和使用方法:大数据(一)-实时捕获mysql变化数据到kafka
# 拉取zookeeper镜像
sudo docker pull wurstmeister/zookeeper:latest
# 拉取kakfa镜像
sudo docker pull wurstmeister/kafka:2.13-2.7.0
# 拉取debezium/connect
sudo docker pull debezium/connect
# 启动zookeeper
sudo docker run \
-itd \
-p 2181:2181 \ # zookeeper端口映射
--name zookeeper \
-e TZ="Asia/Shanghai" \
wurstmeister/zookeeper
# 启动kafka
sudo docker run \
-p 9092:9092 \ # kafka端口映射
--name kafka1 \
-itd \
-e KAFKA_BROKER_ID=1 \
-e KAFKA_ZOOKEEPER_CONNECT=本机IP:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://本机IP:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
-e KAFKA_NUM_PARTITIONS=1 \ #分区数
-e KAFKA_DEFAULT_REPLICATION_FACTOR=1 \ # 副本数
-e KAFKA_LOG_RETENTION_HOURS=168 \ #kafka数据保留时长
-e TZ="Asia/Shanghai" \
wurstmeister/kafka:2.13-2.7.0
# 启动监听工具
sudo docker run \
-itd \
--name kafka-connect \
-p 8083:8083 \
-e GROUP_ID=1 \
-e CONFIG_STORAGE_TOPIC=my_connect_configs \
-e OFFSET_STORAGE_TOPIC=my_connect_offsets \
-e STATUS_STORAGE_TOPIC=my_connect_statuses \
-e TZ="Asia/Shanghai" \
--link zookeeper:zookeeper \ #与zookeeper通信
--link kafka1:kafka1 \ #与容器kafka1通信
debezium/connect工具均已正常运行
4.1 新建监听配置文件
在宿主机新建一个名为sqlserver-cdc-source.json的文件,输入下列配置保存。配置项详细说明可参考Debezium官网。
新建监听配置文件
{"name": "sqlserver-cdc-source",
"config": {
# 驱动
"connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
# 进程数
"tasks.max" : "1",
# 数据库名
"database.server.name" : "testdb",
# 数据库地址
"database.hostname" : "192.168.31.5",
"database.port" : "1433",
# 数据库账号
"database.user" : "sa",
# 数据库密码
"database.password" : "123456",
"database.dbname" : "testdb",
# 监听表白名单
"table.whitelist":"dbo.sales",
"schemas.enable" : "false",
"mode":"incrementing",
"incrementing.column.name":"id",
# kafka地址
"database.history.kafka.bootstrap.servers" : "192.168.31.5:9092",
# 生成topic名称
"database.history.kafka.topic": "testdb.dbo.sales",
"value.converter.schemas.enable":"false",
"value.converter":"org.apache.kafka.connect.json.JsonConverter"
}
}4.2 启动kafka-connector监听
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @sqlserver-cdc-source.json通过kafka的查询命令可以看到在kafka里出现了监听表产生的topic,监听成功。接来就可以做数据上的实时消费。
sudo docker exec -it kafka1 /opt/kafka_2.13-2.7.0/bin/kafka-topics.sh --list --zookeeper 192.168.31.5:2181监听表对应的topic
此次监听SQLServer数据库遇到一个深坑,kafka和debezium镜像里openjdk version 为"11.0.13"。在11版本下启动kafka-connector监听会报如下错误:
错误信息
解决方案是在容器里将jdk部分配置注释掉,路径为:/etc/java/java-11-openjdk/java-11-openjdk-11.0.13.0.8-2.fc34.x86_64/conf/security/java.security
注释这段代码
码字不易,请转发并收藏。
| 留言与评论(共有 0 条评论) “” |