时光闹钟app开发者,请关注我,后续分享更精彩!
坚持原创,共同进步!
之前文章MongoDB 数据同步kafka,介绍了MongoDB通过kafka source connector同步数据到kafka的使用,但推送到kafka的消息是默认数据格式,和实际业务使用可能存在较大差异。kafka connector中是否能实现消息格式的转换,以便投递到kafka中的消息业务能直接使用?经过一番调研测试,发现connector中的transform机制能满足需求。通过transform的自定义实现能完成各类灵活格式转换扩展。记录下来,希望能给有需要的小伙伴帮助和参考。
connector框架通过transforms对原始数据进行转换/过滤等修改处理。
transforms的配置项
如以下示例:
# ...省略其他connector参数定义
#定义transforms两个别名MakeMap, InsertSource
transforms=MakeMap, InsertSource
#MakeMap别名对应的实现类型
transforms.MakeMap.type=org.apache.kafka.connect.transforms.HoistField$Value
#MakeMap别名自定义的属性 field
transforms.MakeMap.field=line
#InsertSource别名对应的实现类型
transforms.InsertSource.type=org.apache.kafka.connect.transforms.InsertField$Value
#InsertSource自定义的属性static.field
transforms.InsertSource.static.field=data_source
transforms.InsertSource.static.value=test-file-source
上述配置,未有transforms配置时,输出结果:
"foo"
"bar"
"hello world"
生效transforms配置时,输出结果:
#结果以json格式输出。其中line属性值自动填充原有数据值。每行新增一个data_source:test-file-source键值对
{"line":"foo","data_source":"test-file-source"}
{"line":"bar","data_source":"test-file-source"}
{"line":"hello world","data_source":"test-file-source"}
transforms可以和predicates断言集合配合使用,以实现满足指定条件的消息被transforms处理。
predicates的配置项
示例:如下配置,声明了一个别名为Filter的transforms转换,并与别名为IsFoo的predicate断言绑定,过滤处理所有topic名称为foo的消息。
transforms=Filter
transforms.Filter.type=org.apache.kafka.connect.transforms.Filter
transforms.Filter.predicate=IsFoo
predicates=IsFoo
predicates.IsFoo.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.IsFoo.pattern=foo
1. 新建maven工程,添加三方依赖
2.7.1
org.apache.kafka
connect-api
${kafka.version}
provided
org.apache.kafka
connect-transforms
${kafka.version}
provided
org.projectlombok
lombok
1.16.20
provided
2. 新建JsonTransform类,实现Transformation接口
public class JsonTransform > implements Transformation {
private static final Logger logger = LoggerFactory.getLogger(JsonTransform.class);
private static final String DEBUG_CONFIG = "debug";
private boolean debug;
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(DEBUG_CONFIG,ConfigDef.Type.BOOLEAN,false,ConfigDef.Importance.MEDIUM, "debug logging. default is false");
/**
* 配置定义
* @return
*/
@Override
public ConfigDef config() {
return CONFIG_DEF;
}
/**
* 配置获取
* @param props
*/
@Override
public void configure(Map props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
this.debug = config.getBoolean(DEBUG_CONFIG);
}
@Override
public R apply(R record) {
String topic = record.topic().trim();
if (this.debug) {
logger.info("--- JsonTransform begin to handle ConnectRecord: {}",record.toString());
}
//转换后的json object
MongoPayload updatedValue = ValueJsonWrapper.transformObject((String)record.value());
//转换java object 为 map,connector框架输出json对象格式默认只支持map类型。要直接使用java object类型,需提供对应valueSchema格式
Map mapValue = JsonHelper.getInstance().convertObj2Map(updatedValue);
if (this.debug) {
logger.info("--- [topic value] before transform Object: {}", (String)record.value());
String updateJsonValue = JsonHelper.getInstance().toJsonString(mapValue);
logger.info("--- [topic value] after transform Object: {}", updateJsonValue);
}
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), null, mapValue, record.timestamp());
}
@Override
public void close() {
}
/**
* 自定义json格式转换
*/
public static class ValueJsonWrapper{
public static MongoPayload transformObject(String json){
MongoPayload payloadObj = JsonHelper.getInstance().readValue(json,MongoPayload.class);
return payloadObj;
}
}
}
说明:实现Transformation接口,涉及以下主要方法
# 自定义transform配置信息,上述示例定义了一个debug调试参数
ConfigDef config()
# 初始化自定义transform运行时配置值,通过运行时动态获取debug参数值
void configure(Map props)
# 自定义transform结束关闭时的回收处理工作,没有可以不处理
void close()
# 自定义transform的消息转换处理逻辑。示例中将原生record的value字段转换为需要的业务结构,
# 最后以map对象输出。详细请参考示例代码注释
R apply(R record)
转换的MongoPayload类
/**
* mongo Payload json:
* {
* "operationType": "replace",
* "fullDocument": {
* "_id": 5.0,
* "purchases": 14.0
* },
* "ns": {
* "db": "shopping",
* "coll": "customers"
* },
* "documentKey": {
* "_id": 5.0
* },
* "updateDescription": {
* "updatedFields": {
* "purchases": 400.0
* },
* "removedFields": []
* }
* }
*/
@Data
public class MongoPayload implements Serializable {
private String operationType;
private JsonNode fullDocument;
private Ns ns;
private DocumentKey documentKey;
private UpdateDescription updateDescription;
@Data
public static class UpdateDescription{
private JsonNode updatedFields;
private List removedFields;
}
@Data
public static class Ns{
private String db;
private String coll;
}
@Data
public static class DocumentKey{
private String _id;
}
}
3. 打包部署
将maven工程打包kafka-connector-transform-ext-1.0.0.jar文件(文件名任意),放到{kafka安装目录}/libs下。启动/重启connector框架集群。使用api提交task任务。
curl 'http://dev2:8083/connectors' -X POST -i -H "Content-Type:application/json" -d \
'{ "name":"mongo-source",
"config":{"connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
"tasks.max":1,
"connection.uri":"mongodb://dev2:27017,dev3:27017,dev4:27017/?replicaSet=rs0",
"database":"shopping",
"collection":"customers",
"copy.existing":true,
"poll.max.batch.size":100,
"poll.await.time.ms":5000,
"transforms":"jsonTransform",
"transforms.jsonTransform.type":"com.dctl.ea.kafka.connector.transform.ext.JsonTransform",
"transforms.jsonTransform.debug":true,
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":false,
"value.converter.schemas.enable":false
}
}'
上述配置参数说明如下:
#source connector和sink connector消息key,value序列化方式
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
#不做json schema格式转换,减少性能开销
key.converter.schemas.enable=false
value.converter.schemas.enable=false
#数据格式转换
transforms=jsonTransform
transforms.jsonTransform.type=com.dctl.ea.kafka.connector.transform.ext.JsonTransform
#debug日志打印显示,true显示消息转换前后日志消息
transforms.jsonTransform.debug=true
结果验证:变更mongo collection中的document值,查看kafka topic内容和kafka connector log日志输出
官方文档:https://kafka.apache.org/documentation/#connect_transforms
operationType字段对应值:
以下是各操作类型operationType对应的json结构,便于业务转换参考
新增(存量)
{
"_id": {
"_id": 3.0,
"copyingData": true
},
"operationType": "insert",
"documentKey": {
"_id": 3.0
},
"fullDocument": {
"_id": 3.0,
"country": "Mexico",
"purchases": 200.0,
"last_viewed": {
"date": "2021-10-31T20:30:00.245Z"
}
},
"ns": {
"db": "shopping",
"coll": "customers"
}
}
新增(增量)
{
"_id": {
"_data": {
"$binary": "gmKn8SoAAAABRh5faWQAKwgAWhAEtUiGZFZUSN+PgmsFn7NISQQ=",
"$type": "00"
},
"_typeBits": {
"$binary": "AQ==",
"$type": "00"
}
},
"operationType": "insert",
"fullDocument": {
"_id": 4.0,
"country": "Chongqing",
"purchases": 8.0,
"last_viewed": {
"date": "2015-07-20T10:00:00.135Z"
}
},
"ns": {
"db": "shopping",
"coll": "customers"
},
"documentKey": {
"_id": 4.0
}
}
更新(字段更新)
{
"_id": {
"_data": {
"$binary": "gmKn74IAAAABRh5faWQAKwYAWhAEtUiGZFZUSN+PgmsFn7NISQQ=",
"$type": "00"
},
"_typeBits": {
"$binary": "AQ==",
"$type": "00"
}
},
"operationType": "update",
"ns": {
"db": "shopping",
"coll": "customers"
},
"documentKey": {
"_id": 3.0
},
"updateDescription": {
"updatedFields": {
"purchases": 400.0
},
"removedFields": []
}
}
更新(覆盖更新)
{
"_id": {
"_data": {
"$binary": "gmKn9ZUAAAABRh5faWQAKwoAWhAEtUiGZFZUSN+PgmsFn7NISQQ=",
"$type": "00"
},
"_typeBits": {
"$binary": "AQ==",
"$type": "00"
}
},
"operationType": "replace",
"fullDocument": {
"_id": 5.0,
"purchases": 14.0
},
"ns": {
"db": "shopping",
"coll": "customers"
},
"documentKey": {
"_id": 5.0
}
}
删除
{
"_id": {
"_data": {
"$binary": "gmKn8cMAAAABRh5faWQAKwQAWhAEtUiGZFZUSN+PgmsFn7NISQQ=",
"$type": "00"
},
"_typeBits": {
"$binary": "AQ==",
"$type": "00"
}
},
"operationType": "delete",
"ns": {
"db": "shopping",
"coll": "customers"
},
"documentKey": {
"_id": 2.0
}
}
留言与评论(共有 0 条评论) “” |