聊一聊kafka connector中的消息转换机制

时光闹钟app开发者,请关注我,后续分享更精彩!

坚持原创,共同进步!

背景

之前文章MongoDB 数据同步kafka,介绍了MongoDB通过kafka source connector同步数据到kafka的使用,但推送到kafka的消息是默认数据格式,和实际业务使用可能存在较大差异。kafka connector中是否能实现消息格式的转换,以便投递到kafka中的消息业务能直接使用?经过一番调研测试,发现connector中的transform机制能满足需求。通过transform的自定义实现能完成各类灵活格式转换扩展。记录下来,希望能给有需要的小伙伴帮助和参考。

kafka connector transforms

connector框架通过transforms对原始数据进行转换/过滤等修改处理。

transforms的配置项

  • transforms - transforms转换别名alias,多个值以逗号','间隔
  • transforms.$alias.type - alias对应的transforms转换全路径实现类名
  • transforms.$alias.$transformationSpecificConfig - 实现类对应的参数配置

如以下示例:

# ...省略其他connector参数定义
#定义transforms两个别名MakeMap, InsertSource
transforms=MakeMap, InsertSource
#MakeMap别名对应的实现类型
transforms.MakeMap.type=org.apache.kafka.connect.transforms.HoistField$Value
#MakeMap别名自定义的属性 field
transforms.MakeMap.field=line
#InsertSource别名对应的实现类型
transforms.InsertSource.type=org.apache.kafka.connect.transforms.InsertField$Value
#InsertSource自定义的属性static.field
transforms.InsertSource.static.field=data_source
transforms.InsertSource.static.value=test-file-source

上述配置,未有transforms配置时,输出结果:

"foo"
"bar"
"hello world"

生效transforms配置时,输出结果:

#结果以json格式输出。其中line属性值自动填充原有数据值。每行新增一个data_source:test-file-source键值对
{"line":"foo","data_source":"test-file-source"}
{"line":"bar","data_source":"test-file-source"}
{"line":"hello world","data_source":"test-file-source"}

kafka connector predicates

transforms可以和predicates断言集合配合使用,以实现满足指定条件的消息被transforms处理。

predicates的配置项

  • predicates - 断言别名,多个值以逗号','分隔
  • predicates.$alias.type - 实现的全路径类名
  • predicates.$alias.$predicateSpecificConfig - 断言的其他配置参数

示例:如下配置,声明了一个别名为Filter的transforms转换,并与别名为IsFoo的predicate断言绑定,过滤处理所有topic名称为foo的消息。

transforms=Filter
transforms.Filter.type=org.apache.kafka.connect.transforms.Filter
transforms.Filter.predicate=IsFoo

predicates=IsFoo
predicates.IsFoo.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.IsFoo.pattern=foo

自定义transforms

1. 新建maven工程,添加三方依赖


	2.7.1


	
	
		org.apache.kafka
		connect-api
		${kafka.version}
		provided
	
	
		org.apache.kafka
		connect-transforms
		${kafka.version}
		provided
	
  
      org.projectlombok
      lombok
      1.16.20
      provided
	
	

2. 新建JsonTransform类,实现Transformation接口

public class JsonTransform > implements Transformation {
    private static final Logger logger = LoggerFactory.getLogger(JsonTransform.class);

    private static final String DEBUG_CONFIG = "debug";

    private boolean debug;

    public static final ConfigDef CONFIG_DEF = new ConfigDef()          
            .define(DEBUG_CONFIG,ConfigDef.Type.BOOLEAN,false,ConfigDef.Importance.MEDIUM, "debug logging. default is false");

    /**
     * 配置定义
     * @return
     */
    @Override
    public ConfigDef config() {
        return CONFIG_DEF;
    }

    /**
     * 配置获取
     * @param props
     */
    @Override
    public void configure(Map props) {
        final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
        this.debug = config.getBoolean(DEBUG_CONFIG);
    }

    @Override
    public R apply(R record) {
        String topic = record.topic().trim();
        if (this.debug) {
            logger.info("--- JsonTransform begin to handle ConnectRecord: {}",record.toString());
        }

      //转换后的json object
      MongoPayload updatedValue = ValueJsonWrapper.transformObject((String)record.value());
      //转换java object 为 map,connector框架输出json对象格式默认只支持map类型。要直接使用java object类型,需提供对应valueSchema格式
      Map mapValue = JsonHelper.getInstance().convertObj2Map(updatedValue);
      if (this.debug) {
        logger.info("--- [topic value] before transform Object: {}", (String)record.value());
        String updateJsonValue = JsonHelper.getInstance().toJsonString(mapValue);
        logger.info("--- [topic value] after  transform Object: {}", updateJsonValue);
      }
      return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), null, mapValue, record.timestamp());
    }

    @Override
    public void close() {

    }

    /**
     * 自定义json格式转换
     */
    public static class ValueJsonWrapper{
        public static MongoPayload transformObject(String json){
            MongoPayload payloadObj = JsonHelper.getInstance().readValue(json,MongoPayload.class);
            return payloadObj;
        }
    }
}

说明:实现Transformation接口,涉及以下主要方法

# 自定义transform配置信息,上述示例定义了一个debug调试参数
ConfigDef config()

# 初始化自定义transform运行时配置值,通过运行时动态获取debug参数值
void configure(Map props)

# 自定义transform结束关闭时的回收处理工作,没有可以不处理
void close()

# 自定义transform的消息转换处理逻辑。示例中将原生record的value字段转换为需要的业务结构,
# 最后以map对象输出。详细请参考示例代码注释
R apply(R record) 

转换的MongoPayload类

/**
 * mongo Payload json:
 * {
 * "operationType": "replace",
 * 	"fullDocument": {
 * 		"_id": 5.0,
 * 		"purchases": 14.0
 *    },
 * 	"ns": {
 * 		"db": "shopping",
 * 		"coll": "customers"
 *    },
 * 	"documentKey": {
 * 		"_id": 5.0
 *    },
 *   "updateDescription": {
 * 		"updatedFields": {
 * 			"purchases": 400.0
 *        },
 * 		"removedFields": []
 * 	 }
 * }
 */
@Data
public class MongoPayload implements Serializable {
    private String operationType;
    private JsonNode fullDocument;
    private Ns ns;
    private DocumentKey documentKey;
    private UpdateDescription updateDescription;

    @Data
    public static class UpdateDescription{
        private JsonNode updatedFields;
        private List removedFields;
    }

    @Data
    public static class Ns{
        private String db;
        private String coll;
    }

    @Data
    public static class DocumentKey{
        private String _id;
    }
}

3. 打包部署

将maven工程打包kafka-connector-transform-ext-1.0.0.jar文件(文件名任意),放到{kafka安装目录}/libs下。启动/重启connector框架集群。使用api提交task任务。

curl 'http://dev2:8083/connectors' -X POST -i -H "Content-Type:application/json" -d   \
    '{ "name":"mongo-source",  
       "config":{"connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",  
                "tasks.max":1,  
                "connection.uri":"mongodb://dev2:27017,dev3:27017,dev4:27017/?replicaSet=rs0",  
                "database":"shopping",
                "collection":"customers",
                "copy.existing":true,
                "poll.max.batch.size":100,
                "poll.await.time.ms":5000,
                "transforms":"jsonTransform",
                "transforms.jsonTransform.type":"com.dctl.ea.kafka.connector.transform.ext.JsonTransform",
                "transforms.jsonTransform.debug":true,
                "key.converter":"org.apache.kafka.connect.json.JsonConverter",
                "value.converter":"org.apache.kafka.connect.json.JsonConverter",
                "key.converter.schemas.enable":false,
                "value.converter.schemas.enable":false
                }  
    }' 

上述配置参数说明如下:

#source connector和sink connector消息key,value序列化方式
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

#不做json schema格式转换,减少性能开销
key.converter.schemas.enable=false
value.converter.schemas.enable=false

#数据格式转换
transforms=jsonTransform
transforms.jsonTransform.type=com.dctl.ea.kafka.connector.transform.ext.JsonTransform
#debug日志打印显示,true显示消息转换前后日志消息
transforms.jsonTransform.debug=true

结果验证:变更mongo collection中的document值,查看kafka topic内容和kafka connector log日志输出

聊一聊kafka connector中的消息转换机制

聊一聊kafka connector中的消息转换机制

附录

官方文档:https://kafka.apache.org/documentation/#connect_transforms

mongo-source-connector原生json结构

operationType字段对应值:

  • insert,插入文档
  • delete,删除文档
  • replace,替换文档,当执行replace操作指定upsert时,可能是insert事件
  • update,更新文档,当执行update操作指定upsert时,可能是insert事件

以下是各操作类型operationType对应的json结构,便于业务转换参考

新增(存量)

{
  "_id": {
    "_id": 3.0,
    "copyingData": true
  },
  "operationType": "insert",
  "documentKey": {
    "_id": 3.0
  },
  "fullDocument": {
    "_id": 3.0,
    "country": "Mexico",
    "purchases": 200.0,
    "last_viewed": {
      "date": "2021-10-31T20:30:00.245Z"
    }
  },
  "ns": {
    "db": "shopping",
    "coll": "customers"
  }
}

新增(增量)

{
	"_id": {
		"_data": {
			"$binary": "gmKn8SoAAAABRh5faWQAKwgAWhAEtUiGZFZUSN+PgmsFn7NISQQ=",
			"$type": "00"
		},
		"_typeBits": {
			"$binary": "AQ==",
			"$type": "00"
		}
	},
	"operationType": "insert",
	"fullDocument": {
		"_id": 4.0,
		"country": "Chongqing",
		"purchases": 8.0,
		"last_viewed": {
			"date": "2015-07-20T10:00:00.135Z"
		}
	},
	"ns": {
		"db": "shopping",
		"coll": "customers"
	},
	"documentKey": {
		"_id": 4.0
	}
}

更新(字段更新)

{
	"_id": {
		"_data": {
			"$binary": "gmKn74IAAAABRh5faWQAKwYAWhAEtUiGZFZUSN+PgmsFn7NISQQ=",
			"$type": "00"
		},
		"_typeBits": {
			"$binary": "AQ==",
			"$type": "00"
		}
	},
	"operationType": "update",
	"ns": {
		"db": "shopping",
		"coll": "customers"
	},
	"documentKey": {
		"_id": 3.0
	},
	"updateDescription": {
		"updatedFields": {
			"purchases": 400.0
		},
		"removedFields": []
	}
}

更新(覆盖更新)

{
	"_id": {
		"_data": {
			"$binary": "gmKn9ZUAAAABRh5faWQAKwoAWhAEtUiGZFZUSN+PgmsFn7NISQQ=",
			"$type": "00"
		},
		"_typeBits": {
			"$binary": "AQ==",
			"$type": "00"
		}
	},
	"operationType": "replace",
	"fullDocument": {
		"_id": 5.0,
		"purchases": 14.0
	},
	"ns": {
		"db": "shopping",
		"coll": "customers"
	},
	"documentKey": {
		"_id": 5.0
	}
}

删除

{
	"_id": {
		"_data": {
			"$binary": "gmKn8cMAAAABRh5faWQAKwQAWhAEtUiGZFZUSN+PgmsFn7NISQQ=",
			"$type": "00"
		},
		"_typeBits": {
			"$binary": "AQ==",
			"$type": "00"
		}
	},
	"operationType": "delete",
	"ns": {
		"db": "shopping",
		"coll": "customers"
	},
	"documentKey": {
		"_id": 2.0
	}
}
发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章