今天我们来通过源码来分析一下,生产者发送一条消息的所有流程~~~
生产者客户端代码
public class SzzTestSend { public static final String bootStrap = "xxxxxx:9090"; public static final String topic = "t_3_1"; public static void main(String[] args) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootStrap); // 序列化协议 下面两种写法都可以 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); //过滤器 可配置多个用逗号隔开 properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"org.apache.kafka.clients.producer.SzzProducerInterceptorsTest"); //构造 KafkaProducer KafkaProducer producer = new KafkaProducer(properties); // 发送消息, 并设置 回调(回调函数也可以不要) ProducerRecord record = new ProducerRecord(topic,"Hello World!"); try { producer.send(record,new SzzTestCallBack(record.topic(), record.key(), record.value())); }catch (Exception e){ e.printStackTrace(); } } /** * 发送成功回调类 */ public static class SzzTestCallBack implements Callback{ private static final Logger log = LoggerFactory.getLogger(SzzTestCallBack.class); private String topic; private String key; private String value; public SzzTestCallBack(String topic, String key, String value) { this.topic = topic; this.key = key; this.value = value; } public void onCompletion(RecordMetadata metadata, Exception e) { if (e != null) { log.error("Error when sending message to topic {} with key: {}, value: {} with error:", topic, key,value, e); }else { log.info("send message to topic {} with key: {} value:{} success, partiton:{} offset:{}", topic, key,value,metadata.partition(),metadata.offset()); } } }} KafkaProducer通过解析producer.propeties文件里面的属性来构造自己。 例如 :分区器、Key和Value序列化器、拦截器、RecordAccumulator消息累加器 、元信息更新器、启动发送请求的后台线程
//构造 KafkaProducerKafkaProducer producer = new KafkaProducer(properties);我们之前有讲过. 客户端都会保存集群的元信息,例如生产者的元信息是 ProducerMetadata. 消费组的是ConsumerMetadata 。
元信息都会有自己的自动更新逻辑, 详细请看Kafka的客户端发起元信息更新请求
相关的Producer配置有:
属性 | 描述 | 默认 |
metadata.max.age.ms | 即使我们没有看到任何分区领导层更改以主动发现任何新代理或分区,我们也强制刷新元数据的时间段(以毫秒为单位)。。 | 300000(5分钟) |
retry.backoff.ms | 如果上次更新失败,发起重试的间隔时间 | 100 |
虽然Producer元信息会自动更新, 但是有可能在生产者发送消息的时候,发现某个TopicPartition不存在,这个时候可能就需要立刻发起一个元信息更新了。
生产者拦截器在消息发送之前可以做一些准备工作, 比如 按照某个规则过滤某条消息, 又或者对 消息体做一些改造, 还可以用来在发送回调逻辑之前做一些定制化的需求,例如统计类的工作! 拦截器的执行时机在最前面,在消息序列化和分区计算之前
相关的Producer配置有:
属性 | 描述 | 默认 |
interceptor.classes | 生产者拦截器配置,填写全路径类名,可用逗号隔开配置多个,执行顺序就是配置的顺序。 | 空 |
用来设置发送的消息具体要发送到哪个分区上
相关的Producer配置有:
属性 | 描述 | 默认值 |
partitioner.class | 消息的分区分配策略 | org.apache.kafka.clients.producer.internals.DefaultPartitioner |
Sender是专门负责将消息发送到Broker的I/O线程。
相关的Producer配置有:
属性 | 描述 | 默认值 |
max.in.flight.requests.per.connection | 客户端能够允许的最大未完成请求(在请求中)的请求数量, 如果该值大于1, 并且请求发送失败可可能导致消息重排序的风险(如果重试启用的话) | 5 |
request.timeout.ms | 控制客户端等待请求响应的最长时间。如果在超时之前没有收到响应,客户端将在必要时重新发送请求,或者如果重试次数用尽,则请求失败 | 30000(30 秒) |
connections.max.idle.ms | 在此配置指定的毫秒数后关闭空闲连接。 | 540000(9 分钟) |
reconnect.backoff.ms | 在尝试重新连接到给定主机之前等待的基本时间量。这避免了在紧密循环中重复连接到主机。此退避适用于客户端到代理的所有连接尝试 | 50 |
reconnect.backoff.max.ms | 重新连接到反复连接失败的代理时等待的最长时间(以毫秒为单位)。如果提供,每台主机的退避将在每次连续连接失败时呈指数增长,直至达到此最大值。在计算回退增加后,添加 20% 的随机抖动以避免连接风暴。 | 1000(1 秒) |
retry.backoff.ms | 在尝试重试对给定主题分区的失败请求之前等待的时间量。这避免了在某些故障情况下在紧密循环中重复发送请求。 | 100 |
send.buffer.bytes | 发送数据时使用的 TCP 发送缓冲区 (SO_SNDBUF) 的大小。如果值为 -1,将使用操作系统默认值。 | 131072(128 千字节) |
receive.buffer.bytes | 读取数据时使用的 TCP 接收缓冲区 (SO_RCVBUF) 的大小。如果值为 -1,将使用操作系统默认值。 | 32768 |
acks | 生产者要求Leader在决定是否完成请求之前收到的确认数量. 这控制了发送的记录的持久性 可配置的参数如下: | 1 |
transactional.id | ||
enable.idempotence | 是否启动幂等。当设置为true时候, 生产者将确保每条消息被最多写入一个副本,如果未false,生产者由于Broker失败等原因重试,可能会写入到多个副本中。注意:启动幂等性的要求max.in.flight.requests.per.connection<=5 retries>0并且 acks=all .如果设置了不兼容的值则会抛出异常 | false |
max.request.size | 请求的最大大小(以字节为单位)。此设置将限制生产者在单个请求中发送的记录批次的总数据量,以避免发送大量请求。这实际上也是最大未压缩记录批量大小的上限。请注意,服务器对记录批量大小有自己的上限(如果启用压缩,则在压缩之后),这可能与此不同。 | 1048576 |
retries | 生产者重试次数,当max.in.flight.requests.per.connection>1的情况发生重试可能会导致顺序问题. | 2147483647 |
delivery.timeout.ms | 最大交付时间, 调用send()方法后不管是成功还是失败的时间上限。例如重试太多次之后达到次配置时间的时候也会停止重试了。此配置值应该大于等于request.timeout.ms 和linger.ms总和 | 120000 (2 minutes). 如果这个值你没有主动设置并且request.timeout.ms +linger.ms > 120000(默认值) ,那么它最终的值是request.timeout.ms +linger.ms |
producer.send(record,new SzzTestCallBack(record.topic(), record.key(), record.value()));发送消息的第一步就是执行拦截器
一般情况下我们可能不需要拦截器, 但是我们需要用拦截器的时候按照下面操作执行:
这个interceptor.classes中的属性可以配置多个拦截器, 用逗号隔开,并且执行顺序就是按照配置的顺序执行的。
拦截器的执行时机在最前面,在消息序列化和分区计算之前
ProducerInterceptor
org.apache.kafka.clients.producer.ProducerInterceptor
public ProducerRecord onSend(ProducerRecord record); public void onAcknowledgement(RecordMetadata metadata, Exception exception); public void close(); onSend(ProducerRecord
当客户端将记录发送到 KafkaProducer 时,在键和值被序列化之前调用。 该方法调用ProducerInterceptor.onSend(ProducerRecord)方法。 从第一个拦截器的onSend()返回的 ProducerRecord传递给第二个拦截器 onSend(),在拦截器链中依此类推。 从最后一个拦截器返回的记录就是从这个方法返回的。 此方法不会抛出异常。 任何拦截器方法抛出的异常都会被捕获并忽略。 如果链中间的拦截器(通常会修改记录)抛出异常,则链中的下一个拦截器将使用前一个未抛出异常的拦截器返回的记录调用。
调用地方
①. 拦截器执行时机在键值序列化之前 ②. 拦截器抛出异常会被捕获,并打印日志,那么也意味着这个拦截器所做的修改不会生效 ③.拦截器中修改的消息体会被传递到下一个拦截器
onAcknowledgement(RecordMetadata metadata, Exception exception)方法:
当发送到服务器的记录已被确认时,或者当发送记录在发送到服务器之前失败时,将调用此方法。 此方法通常在用户设置的Callback之前调用,此方法不会抛出异常。 任何拦截器方法抛出的异常都会被捕获并忽略。这个方法运行在Producer的I/O线程中,所以这个方法中的代码逻辑需要越简单越好。 否则,来自其他线程的消息发送可能会延迟。
参数: metadata – 已发送记录的元数据(即分区和偏移量)。 如果发生错误,元数据将只包含有效的主题和分区。 如果 ProducerRecord 中没有给出 partition 并且在分配 partition 之前发生错误,则 partition 将设置为 RecordMetadata.NO_PARTITION。 如果客户端将空记录传递给KafkaProducer.send(ProducerRecord)则元数据可能为空。 exception– 在处理此记录期间抛出的异常。 如果没有发生错误,则为空。
close()
主要用于在关闭拦截器时自行一些资源清理工作。
configure(Map
ProducerInterceptor接口中集成了一Configurable接口,接口有个方法
void configure(Map configs); 也就是说在拦截器中,我们可以拿到所有的配置属性了; 这个方法在这几个方法中最早执行
将发送的消息加上后缀 注意这里消息value的类型是String ,如果是byte则需要处理一下
@Override public ProducerRecord onSend(ProducerRecord record) { System.out.println("生产者拦截器 onSend() run ."+record); return new ProducerRecord<>( record.topic(), record.partition(), record.key(), record.value().concat("_后缀")); } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { System.out.println("生产者拦截器 onAcknowledgement run ."+metadata.toString() +" exception:"+exception); } @Override public void close() { System.out.println("生产者拦截器 close() run ."); } @Override public void configure(Map configs) { this.configs = configs; System.out.println("生产者拦截器 configure run ."+configs); } 在发送消息之前,要先获取一下将要发送的TopicPartition的元信息。这个获取元信息的请求也是通过唤醒 Sender线程进行发送的。
1 . ProducerMetadata 元信息Map
org.apache.kafka.common.errors.TimeoutException: Topic t_3_1 not present in metadata after 60000 ms.相关的Producer配置有:
属性 | 描述 | 默认 |
max.block.ms | 生产者发送消息过程中,获取元信息的最大超时时间 | 60000(1分钟) |
metadata.max.idle.ms | Topic的最大空闲时间. 如果一个主题在这么多毫秒内没有被访问过,它就会从缓存中删除。并且下一次对其的访问将强制执行元数据获取请求。 | 300000(5分钟) |
KafkaProducer producer = new KafkaProducer(properties); 在构建KafkaProducer对象的时候, 有构建 producer I/O thread, 并且启动了, Runnable是 sender
最终调用NetworkClient.poll(long timeout, long now)里面maybeUpdate()方法 这个方法会获取 前Node中负载最少的节点发起网络请求, 如果所有Node都是满负载则请求不会被发起。
如何判断哪个节点负载最少?
通过每个节点的InFlightRequests(空中请求数量)里面的最小数量判断,这个表示当前正在发起的请求,但是还没有收到回复的请求数量; 保存形式是一个HashMap,key是Node的Id, value是所有当前还在请求中的节点; 当请求完成,请求就会在这个队列里面移除; 如果这个队列一直是满的,说明当前负载很高或者网络连接有问题。如果所有Node都是满负载则请求不会被发起,除非等到队列数量减少。
private final Map> requests = new HashMap<>(); 每个Node最大负载数 ?
每个客户端在发起请求还没有收到回复的时候都会被缓存到InFlightRequests(空中请求数量)里面,但是这个数量是有限制的,这个可以通过配置max.in.flight.requests.per.connection 进行设置, 默认是: 5; 也就是每个客户端对每个Node最多也就同时发起 5 个未完成的请求; 如果超时这个数量就会等待有请求完成并释放额度了才可以发起新的请求;
相关的Producer配置有:
属性 | 描述 | 默认 |
max.in.flight.requests.per.connection | 每个客户端对每个Node发起请求的最大并发数 | 5 |
将key和Value先序列化。
自定义序列化器,需要实现org.apache.kafka.common.serialization.Serializer接口。 我们简单看下StringSerializer序列化器
public class StringSerializer implements Serializer { private String encoding = "UTF8"; @Override public void configure(Map configs, boolean isKey) { String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding"; Object encodingValue = configs.get(propertyName); if (encodingValue == null) encodingValue = configs.get("serializer.encoding"); if (encodingValue instanceof String) encoding = (String) encodingValue; } @Override public byte[] serialize(String topic, String data) { try { if (data == null) return null; else return data.getBytes(encoding); } catch (UnsupportedEncodingException e) { throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding); } }} configure(Map
Kafka客户端提供了很多种序列化器供我们选择,如果这些序列化器你都不满意,你也可以选择其他一些开源的序列化工具,或者自己进行实现。
将序列化后的key、 value 调用合适的分区器选择将要发送的分区号。
分区三种策略
图解Kafka Producer中的消息缓存模型
Sender线程在构造KafkaProducer的时候就已经启动了,它的职责就是从
以下忽略部分代码省略
void runOnce() { long currentTimeMs = time.milliseconds(); long pollTimeout = sendProducerData(currentTimeMs); client.poll(pollTimeout, currentTimeMs); } private long sendProducerData(long now) { // 获取哪些数据准备好了发送 RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); }我们都知道生产者生产的消息是暂时缓存在消息累加器RecordAccumulator中的, Sender负责从RecordAccumulator里面获取准备好的数据进行发送
那么 ,哪些属于准备好的数据呢?
我们先回顾一下 RecordAccumulator的结构。
每个TopicPartition的消息都会被暂存在ProducerBatch Deque 阻塞队列中的其中一个ProducerBatch中,每个ProducerBatch都存放着一条或者多条消息。
具体请看 图解Kafka Producer 消息缓存模型
遍历每个TopicPartition里面的Deque, 获取队列中的第一个ProducerBatch 如果该TopicPartition不存在Leader,则忽略该Batch,如果有则进入判断流程
因为消息是要发Leader所在的Broker发送的, 所以必须要有Leader。
在满足条件
不属于重试或者属于重试并且等待的时候大于 retry.backoff.ms 的前提下,满足下面条件的均可发送
(该条件就是要排除那些是属于重试,但是还没有到达重试间隔时间的情况。)
该ProducerBatch还没有被发送过. 该Batch能否发送判断条件如下
上面是讲哪些Batch属于可发送的逻辑判断,但是实际上,真正发送的时候并不是以每个Batch维度来判断发送的,而是以Node维度来发送的,上面我们知道了哪些Batch能够发送,然后我们就可以推断出Batch对应的TopicPartition所属的Broker。有了这些可发送的Broker,然后再来遍历Broker上的每个TopicPartition中的First Batch
文字不好理解,我们看看下图
上图是生产者的RecordAccumulator消息累加器, 消息累加成上图所示。
每个TopicPartition队列都有很多Batch, 我们知道了TopicPartition 是不是就能够确定它所在的Broker?
例如上图中
那么最终得到的ReadyNodes就是Broker-0、Broker-2
上面我们在获取 哪些Batch准备好发送的时候,也会找到哪些TopicPartition没有Leader。
那么这个时候就需要强制的去更新一下这些TopicPartition的元信息了,否则就发送不了。
上面我们已经获取了ReadyNodes
那么在真正的向对应的ReadyNodes 发起请求之前, 我们还是需要判断一下 我们的生产者客户端是否准备好了跟ReadyNodes 发起请求.
那么客户端的准备条件有哪些呢?
生产者客户端在最开始的时候都没有跟任何Node建立连接的, 当我们尝试发送之前会去检验一下连接是否建立成功(就是当前这一步), 如果没有的话,则会去尝试建立连接。并且当前这次是会把这个Node过滤掉的,因为还没有建立成功链接,等到下一次循环的时候,可能已经建立成功了。
当然客户端是否准备好,不仅仅是判断 连接是否建立成功。
还需要判断 当前未完成的请求队列数量是否 < max.in.flight.requests.per.connection
到现在为止,我们已经得到了可以发送请求的ReadyNodes了。那么接下来就是分别解析这些ReadyNode 他们能够发送的Batch打包发送了。
这一步最重要的作用是将 ProducerBatch 跟Node映射,也就是知道当前批次想哪个Broker发送哪些Batch
public Map> drain(Cluster cluster, Set nodes, int maxSize, long now) { if (nodes.isEmpty()) return Collections.emptyMap();// 遍历ReadyNodes 每个Node下的队列都获取一遍 Map> batches = new HashMap<>(); for (Node node : nodes) { List ready = drainBatchesForOneNode(cluster, node, maxSize, now); batches.put(node.id(), ready); } return batches; } 那么应该选择哪些Batch来发送呢?
遍历每个ReadyNode节点下面的每个TopicPartition 队列的首个Batch
上面我们已经得到了
Map> batches 也就是Node.id 和对应要发往该Node的Request请求携带的ProducerBatch列表。
发送成功之后,会返回Response,根据Response情况处理不同的逻辑
Response处理逻辑 每个Batch都会对应着一个PartitionResponse, 不同的PartitionResponse对应的不同处理逻辑。
假如你想确定某个消息是否发送成功, 那么你可以自己定义一个拦截器。 并重写接口onAcknowledgement(RecordMetadata metadata, Exception exception) 在这里面来判断你的消息是否发送成功。
整个生产者客户端是由主线程和Sender线程协调运行的, 主线程创建消息, 然后通过 拦截器、元信息更新、序列化、分区器、缓存消息等等流程。
然后Sender线程在初始化的时候就已经运行了,并且是一个while循环。
Sender线程里面主要工作是:
1.寻找ReadyNodes: 去消息累加器里面获取有哪些Node是能够发送Request的。只要该Node有一个TopicPartition队列中有符合发送条件的Batch。那么这个Node就应该是ReadyNode。具体的筛选逻辑请看上文有具体分析。
2.构建Request: 过滤之后, 拿到了所有的ReadyNodes。接下来就是遍历该Node下所有的TopicPartition队列里面的FirstBatch, 组装到Request请求里面。发往一个Node的请求Request,可以包含多个ProducerBatch,能够一次发送多少个Batch是由配置max.request.size决定的,一个Node对应一个Request。 注意: 这个时候映射关系已经是 Map
3.将Request放入inFightRequest中: 上面是组装好了Request, 组装好了之后要先把这个Request放到inFightRequest对象中, 它保存着每个Node当前已经发送的Request但是还没有收到Response的请求。每个Node最多能够存放多少个未完成的Request,是由max.in.flight.requests.per.connection控制的。需要注意的是, 如果队列已经满了, Request是放入不了这个对象里面的,并且会抛出异常:
"Attempt to send a request to node " + nodeId + " which is not ready." 它决定着生产者针对某个Node的并发度。
4.Request通过Selector发起通信.
5.返回Response: 服务端处理完成, 返回Response信息。
6.从inFightRequest中移除完成Request
7.释放内存回消息累加器: 请求结束,清理消息累加器,将发送成功的ProducerBatch占用的内存大小加回到消息累加器中。 注意:这里纯粹的是数字的加减,不涉及内存的处理, 因为发送成功之前的Batch占用了消息累加器的剩余可用内存。发送成之后要加回来。否则消息累加器满了会导致阻塞。
作者:石臻臻的杂货铺
链接:https://juejin.cn/post/7124563723645091876
来源:稀土掘金
| 留言与评论(共有 0 条评论) “” |