文章目录
- [构造KafkaProducer](https://cloud.tencent.com/developer)
- [生产者元信息更新器](https://cloud.tencent.com/developer)
- [生产者拦截器](https://cloud.tencent.com/developer)
- [生产者分区器](https://cloud.tencent.com/developer)
- [Sender线程启动](https://cloud.tencent.com/developer)
- [发送请求](https://cloud.tencent.com/developer)
- [生产者拦截器](https://cloud.tencent.com/developer)
- [生产者拦截器示例](https://cloud.tencent.com/developer)
- [更新元信息waitOnMetadata](https://cloud.tencent.com/developer)
- [KeyValue序列化](https://cloud.tencent.com/developer)
- [计算分区号](https://cloud.tencent.com/developer)
- [将消息缓存进RecordAccumulator累加器中](https://cloud.tencent.com/developer)
- [Sender发送消息](https://cloud.tencent.com/developer)
- [寻找准备好发送的消息Batch,获取对应Leader所在的ReadyNode](https://cloud.tencent.com/developer)
- [满足发送的条件的Batch](https://cloud.tencent.com/developer)
- [获取可发送请求的服务端ReadyNodes](https://cloud.tencent.com/developer)
- [强制更新没有Leader的Topic元信息](https://cloud.tencent.com/developer)
- [过滤一些还未准备好连接的ReadyNodes](https://cloud.tencent.com/developer)
- [遍历ReadNodes上的所有TopicPartition阻塞队列中的FirstBatch进行打包](https://cloud.tencent.com/developer)
- [构造Produce请求并发起接着处理Response](https://cloud.tencent.com/developer)
- [发送流程总结](https://cloud.tencent.com/developer)
- [Kafka Producer 整体架构图](https://cloud.tencent.com/developer)
今天我们来通过源码来分析一下,生产者发送一条消息的所有流程~~~
生产者客户端代码
public class SzzTestSend {
public static final String bootStrap = "xxxxxx:9090";
public static final String topic = "t_3_1";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootStrap);
// 序列化协议 下面两种写法都可以
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//过滤器 可配置多个用逗号隔开
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"org.apache.kafka.clients.producer.SzzProducerInterceptorsTest");
//构造 KafkaProducer
KafkaProducer producer = new KafkaProducer(properties);
// 发送消息, 并设置 回调(回调函数也可以不要)
ProducerRecord<String,String> record = new ProducerRecord(topic,"Hello World!");
try {
producer.send(record,new SzzTestCallBack(record.topic(), record.key(), record.value()));
}catch (Exception e){
e.printStackTrace();
}
}
/**
* 发送成功回调类
*/
public static class SzzTestCallBack implements Callback{
private static final Logger log = LoggerFactory.getLogger(SzzTestCallBack.class);
private String topic;
private String key;
private String value;
public SzzTestCallBack(String topic, String key, String value) {
this.topic = topic;
this.key = key;
this.value = value;
}
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
log.error("Error when sending message to topic {} with key: {}, value: {} with error:",
topic, key,value, e);
}else {
log.info("send message to topic {} with key: {} value:{} success, partiton:{} offset:{}",
topic, key,value,metadata.partition(),metadata.offset());
}
}
}
}
KafkaProducer通过解析producer.propeties
文件里面的属性来构造自己。
例如 :分区器、Key和Value序列化器、拦截器、RecordAccumulator消息累加器 、元信息更新器、启动发送请求的后台线程
//构造 KafkaProducer
KafkaProducer producer = new KafkaProducer(properties);
我们之前有讲过. 客户端都会保存集群的元信息,例如生产者的元信息是 ProducerMetadata. 消费组的是ConsumerMetadata 。
元信息都会有自己的自动更新逻辑, 详细请看Kafka的客户端发起元信息更新请求
相关的Producer配置有:
属性 | 描述 | 默认 |
---|---|---|
metadata.max.age.ms | 即使我们没有看到任何分区领导层更改以主动发现任何新代理或分区,我们也强制刷新元数据的时间段(以毫秒为单位)。。 | 300000(5分钟) |
retry.backoff.ms | 如果上次更新失败,发起重试的间隔时间 | 100 |
虽然Producer元信息会自动更新, 但是有可能在生产者发送消息的时候,发现某个TopicPartition不存在,这个时候可能就需要立刻发起一个元信息更新了。
生产者拦截器在消息发送之前可以做一些准备工作, 比如 按照某个规则过滤某条消息, 又或者对 消息体做一些改造, 还可以用来在发送回调逻辑之前做一些定制化的需求,例如统计类的工作! 拦截器的执行时机在最前面,在消息序列化和分区计算之前
相关的Producer配置有:
属性 | 描述 | 默认 |
---|---|---|
interceptor.classes | 生产者拦截器配置,填写全路径类名,可用逗号隔开配置多个,执行顺序就是配置的顺序。 | 空 |
用来设置发送的消息具体要发送到哪个分区上
相关的Producer配置有:
属性 | 描述 | 默认值 |
---|---|---|
partitioner.class | 消息的分区分配策略 | org.apache.kafka.clients.producer.internals.DefaultPartitioner |
Sender是专门负责将消息发送到Broker的I/O线程。
相关的Producer配置有:
属性 | 描述 | 默认值 |
---|---|---|
max.in.flight.requests.per.connection | 客户端能够允许的最大未完成请求(在请求中)的请求数量, 如果该值大于1, 并且请求发送失败可可能导致消息重排序的风险(如果重试启用的话) | 5 |
request.timeout.ms | 控制客户端等待请求响应的最长时间。如果在超时之前没有收到响应,客户端将在必要时重新发送请求,或者如果重试次数用尽,则请求失败 | 30000(30 秒) |
connections.max.idle.ms | 在此配置指定的毫秒数后关闭空闲连接。 | 6540000(9 分钟) |
reconnect.backoff.ms | 在尝试重新连接到给定主机之前等待的基本时间量。这避免了在紧密循环中重复连接到主机。此退避适用于客户端到代理的所有连接尝试 | 50 |
reconnect.backoff.max.ms | 重新连接到反复连接失败的代理时等待的最长时间(以毫秒为单位)。如果提供,每台主机的退避将在每次连续连接失败时呈指数增长,直至达到此最大值。在计算回退增加后,添加 20% 的随机抖动以避免连接风暴。 | 1000(1 秒) |
retry.backoff.ms | 在尝试重试对给定主题分区的失败请求之前等待的时间量。这避免了在某些故障情况下在紧密循环中重复发送请求。 | 100 |
send.buffer.bytes | 发送数据时使用的 TCP 发送缓冲区 (SO_SNDBUF) 的大小。如果值为 -1,将使用操作系统默认值。 | 131072(128 千字节) |
receive.buffer.bytes | 读取数据时使用的 TCP 接收缓冲区 (SO_RCVBUF) 的大小。如果值为 -1,将使用操作系统默认值。 | 32768 |
acks | 生产者要求Leader在决定是否完成请求之前收到的确认数量. 这控制了发送的记录的持久性 可配置的参数如下:1. acks=0 如果为0, 生产者不会等待服务器的任何确认, 会被立即视为已发送,这种情况下不能保证服务器是否真的已经收到了消息。这个时候retries配置不会生效(客户端都不管服务端的返回了,所以客户端一般是不知道有故障的) 2. acks=1 Leader会将消息写入到它的本地日志中,但是不会等待所有的Follower完全确认就会返回发送成功状态。 这种情况下, 当Follower成功同步数据之前Leader挂掉了会造成数据丢失。 3.acks=all Leader将等待所有的ISR中的副本完成同步之后返回成功状态, 这样子数据就不会丢失,是最高级别的保证。 | 1 |
transactional.id | | |
enable.idempotence | 是否启动幂等。当设置为true时候, 生产者将确保每条消息被最多写入一个副本,如果未false,生产者由于Broker失败等原因重试,可能会写入到多个副本中。注意:启动幂等性的要求max.in.flight.requests.per.connection<=5 retries>0并且 acks=all .如果设置了不兼容的值则会抛出异常 | false |
max.request.size | 请求的最大大小(以字节为单位)。此设置将限制生产者在单个请求中发送的记录批次的总数据量,以避免发送大量请求。这实际上也是最大未压缩记录批量大小的上限。请注意,服务器对记录批量大小有自己的上限(如果启用压缩,则在压缩之后),这可能与此不同。 | 1048576 |
retries | 生产者重试次数,当max.in.flight.requests.per.connection=1的情况发生重试可能会导致顺序问题. | 2147483647 |
delivery.timeout.ms | 最大交付时间, 调用send()方法后不管是成功还是失败的时间上限。例如重试太多次之后达到次配置时间的时候也会停止重试了。此配置值应该大于等于request.timeout.ms 和linger.ms总和 | 120000 (2 minutes). 如果这个值你没有主动设置并且request.timeout.ms +linger.ms > 120000(默认值) ,那么它最终的值是request.timeout.ms +linger.ms |
producer.send(record,new SzzTestCallBack(record.topic(), record.key(), record.value()));
发送消息的第一步就是执行拦截器
一般情况下我们可能不需要拦截器, 但是我们需要用拦截器的时候按照下面操作执行:
interceptor.classes=拦截器1,拦截器2,拦截器3
org.apache.kafka.clients.producer.ProducerInterceptor
这个interceptor.classes
中的属性可以配置多个拦截器, 用逗号隔开,并且执行顺序就是按照配置的顺序执行的。
拦截器的执行时机在最前面,在消息序列化和分区计算之前
ProducerInterceptor
org.apache.kafka.clients.producer.ProducerInterceptor
接口方法讲解:
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
public void onAcknowledgement(RecordMetadata metadata, Exception exception);
public void close();
onSend(ProducerRecord record)方法 :
当客户端将记录发送到
KafkaProducer
时,在键和值被序列化之前调用。 该方法调用ProducerInterceptor.onSend(ProducerRecord)
方法。 从第一个拦截器的onSend()
返回的ProducerRecord
传递给第二个拦截器 onSend(),在拦截器链中依此类推。 从最后一个拦截器返回的记录就是从这个方法返回的。 此方法不会抛出异常。 任何拦截器方法抛出的异常都会被捕获并忽略。 如果链中间的拦截器(通常会修改记录)抛出异常,则链中的下一个拦截器将使用前一个未抛出异常的拦截器返回的记录调用。
调用地方
①. 拦截器执行时机在键值序列化之前
②. 拦截器抛出异常会被捕获,并打印日志,那么也意味着这个拦截器所做的修改不会生效
③.拦截器中修改的消息体会被传递到下一个拦截器
onAcknowledgement(RecordMetadata metadata, Exception exception)方法:
当发送到服务器的记录已被确认时,或者当发送记录在发送到服务器之前失败时,将调用此方法。 此方法通常在用户设置的Callback之前调用,此方法不会抛出异常。 任何拦截器方法抛出的异常都会被捕获并忽略。这个方法运行在Producer的I/O线程中,所以这个方法中的代码逻辑需要越简单越好。 否则,来自其他线程的消息发送可能会延迟。 参数:
metadata
– 已发送记录的元数据(即分区和偏移量)。 如果发生错误,元数据将只包含有效的主题和分区。 如果 ProducerRecord 中没有给出 partition 并且在分配 partition 之前发生错误,则 partition 将设置为 RecordMetadata.NO_PARTITION。 如果客户端将空记录传递给KafkaProducer.send(ProducerRecord)则元数据可能为空。exception
– 在处理此记录期间抛出的异常。 如果没有发生错误,则为空。
close()
主要用于在关闭拦截器时自行一些资源清理工作。
configure(Map configs)
ProducerInterceptor
接口中集成了一Configurable
接口,接口有个方法
void configure(Map<String, ?> configs);
也就是说在拦截器中,我们可以拿到所有的配置属性了; 这个方法在这几个方法中最早执行
将发送的消息加上后缀
注意这里消息value的类型是String
,如果是byte则需要处理一下
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
System.out.println("生产者拦截器 onSend() run ."+record);
return new ProducerRecord<>(
record.topic(), record.partition(), record.key(), record.value().concat("_后缀")); }
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
System.out.println("生产者拦截器 onAcknowledgement run ."+metadata.toString() +" exception:"+exception);
}
@Override
public void close() {
System.out.println("生产者拦截器 close() run .");
}
@Override
public void configure(Map<String, ?> configs) {
this.configs = configs;
System.out.println("生产者拦截器 configure run ."+configs);
}
在发送消息之前,要先获取一下将要发送的TopicPartition的元信息。这个获取元信息的请求也是通过唤醒 Sender线程进行发送的。
1 . ProducerMetadata
元信息Map topics
中保存Topic
的有效期时间, metadata.max.idle.ms
控制,默认300000
ProducerMetadata
元信息Set newTopics
中保存所有Topic
元数据集群
以及我们等待的时间(以毫秒为单位), 这个获取元数据不是这里获取的,这里只是判断当前是否已经获取到了元数据,如果没有获取到,则一直等待,最大等待时间由max.block.ms
控制,默认60000(1分钟),关于获取元数据在最上面已经分析过了, 是Sender线程获取并更新的。如果等待时间超过了max.block.ms
,很有可能网络异常,那么会抛出超时异常。Metadata
请求(流程看最上面), 直到超时(max.block.ms
)之后 抛出异常org.apache.kafka.common.errors.TimeoutException: Topic t_3_1 not present in metadata after 60000 ms.
相关的Producer配置有:
属性 | 描述 | 默认 |
---|---|---|
max.block.ms | 生产者发送消息过程中,获取元信息的最大超时时间 | 60000(1分钟) |
metadata.max.idle.ms | Topic的最大空闲时间. 如果一个主题在这么多毫秒内没有被访问过,它就会从缓存中删除。并且下一次对其的访问将强制执行元数据获取请求。 | 300000(5分钟) |
KafkaProducer producer = new KafkaProducer(properties);
在构建KafkaProducer
对象的时候, 有构建 producer I/O thread
, 并且启动了, Runnable
是 sender
最终调用NetworkClient.poll(long timeout, long now)
里面maybeUpdate()
方法
这个方法会获取 前Node中负载最少的节点发起网络请求, 如果所有Node都是满负载则请求不会被发起。
如何判断哪个节点负载最少?
通过每个节点的InFlightRequests(空中请求数量)
里面的最小数量判断,这个表示当前正在发起的请求,但是还没有收到回复的请求数量; 保存形式是一个HashMap,key
是Node的Id, value
是所有当前还在请求中的节点; 当请求完成,请求就会在这个队列里面移除; 如果这个队列一直是满的,说明当前负载很高或者网络连接有问题。如果所有Node都是满负载则请求不会被发起,除非等到队列数量减少。
private final Map<String, Deque<NetworkClient.InFlightRequest>> requests = new HashMap<>();
每个Node最大负载数 ?
每个客户端在发起请求还没有收到回复的时候都会被缓存到InFlightRequests(空中请求数量)
里面,但是这个数量是有限制的,这个可以通过配置max.in.flight.requests.per.connection
进行设置, 默认是: 5; 也就是每个客户端对每个Node最多也就同时发起 5 个未完成的请求; 如果超时这个数量就会等待有请求完成并释放额度了才可以发起新的请求;
相关的Producer配置有:
属性 | 描述 | 默认 |
---|---|---|
max.in.flight.requests.per.connection | 每个客户端对每个Node发起请求的最大并发数 | 5 |
将key和Value先序列化。
自定义序列化器,需要实现org.apache.kafka.common.serialization.Serializer
接口。
我们简单看下StringSerializer
序列化器
public class StringSerializer implements Serializer<String> {
private String encoding = "UTF8";
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
Object encodingValue = configs.get(propertyName);
if (encodingValue == null)
encodingValue = configs.get("serializer.encoding");
if (encodingValue instanceof String)
encoding = (String) encodingValue;
}
@Override
public byte[] serialize(String topic, String data) {
try {
if (data == null)
return null;
else
return data.getBytes(encoding);
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
}
}
}
configure(Map configs, boolean isKey)
这个方法是在构造 KafkaProduce
实例的时候调用的。isKey
表示是 key还是value来进行序列化
这里 serialize(String topic, String data) 方法直接将字符串转换成byte[]类型。
Kafka客户端提供了很多种序列化器供我们选择,如果这些序列化器你都不满意,你也可以选择其他一些开源的序列化工具,或者自己进行实现。
将序列化后的key、 value 调用合适的分区器选择将要发送的分区号。
Sender线程在构造KafkaProducer的时候就已经启动了,它的职责就是从
以下忽略部分代码省略
void runOnce() {
long currentTimeMs = time.milliseconds();
long pollTimeout = sendProducerData(currentTimeMs);
client.poll(pollTimeout, currentTimeMs);
}
private long sendProducerData(long now) {
// 获取哪些数据准备好了发送
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
}
我们都知道生产者生产的消息是暂时缓存在消息累加器RecordAccumulator中的, Sender负责从RecordAccumulator里面获取准备好的数据进行发送
那么 ,哪些属于准备好的数据呢?
我们先回顾一下 RecordAccumulator的结构。
每个TopicPartition的消息都会被暂存在ProducerBatch Deque 阻塞队列中的其中一个ProducerBatch中,每个ProducerBatch都存放着一条或者多条消息。
遍历每个TopicPartition里面的Deque, 获取队列中的第一个ProducerBatch
如果该TopicPartition不存在Leader,则忽略该Batch,如果有则进入判断流程
因为消息是要发Leader所在的Broker发送的, 所以必须要有Leader。
在满足条件
不属于重试或者属于重试并且等待的时候大于 retry.backoff.ms
的前提下,满足下面条件的均可发送
(该条件就是要排除那些是属于重试,但是还没有到达重试间隔时间的情况。)
该ProducerBatch还没有被发送过. 该Batch能否发送判断条件如下
flush()
方法供调用,用于该时刻的消息都满足发送的条件,一般在消息事务的地方有调用。 这里要注意的是,是调用flush()
这一时刻的所有未发送的Batch都需满足发送条件,后面新增的Batch不属于这一范畴linger.ms
的时间上面是讲哪些Batch属于可发送的逻辑判断,但是实际上,真正发送的时候并不是以每个Batch维度来判断发送的,而是以Node维度来发送的,上面我们知道了哪些Batch能够发送,然后我们就可以推断出Batch对应的TopicPartition所属的Broker。有了这些可发送的Broker,然后再来遍历Broker上的每个TopicPartition中的First Batch
文字不好理解,我们看看下图
上图是生产者的RecordAccumulator消息累加器, 消息累加成上图所示。
每个TopicPartition队列都有很多Batch, 我们知道了TopicPartition 是不是就能够确定它所在的Broker?
例如上图中
那么最终得到的ReadyNodes就是Broker-0、Broker-2
上面我们在获取 哪些Batch准备好发送的时候,也会找到哪些TopicPartition没有Leader。
那么这个时候就需要强制的去更新一下这些TopicPartition的元信息了,否则就发送不了。
上面我们已经获取了ReadyNodes
那么在真正的向对应的ReadyNodes 发起请求之前, 我们还是需要判断一下 我们的生产者客户端是否准备好了跟ReadyNodes 发起请求.
那么客户端的准备条件有哪些呢?
生产者客户端在最开始的时候都没有跟任何Node建立连接的, 当我们尝试发送之前会去检验一下连接是否建立成功(就是当前这一步), 如果没有的话,则会去尝试建立连接。并且当前这次是会把这个Node过滤掉的,因为还没有建立成功链接,等到下一次循环的时候,可能已经建立成功了。
当然客户端是否准备好,不仅仅是判断 连接是否建立成功。
还需要判断 当前未完成的请求队列数量是否 < max.in.flight.requests.per.connection
到现在为止,我们已经得到了可以发送请求的ReadyNodes了。那么接下来就是分别解析这些ReadyNode 他们能够发送的Batch打包发送了。
这一步最重要的作用是将 ProducerBatch 跟Node映射,也就是知道当前批次想哪个Broker发送哪些Batch
public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {
if (nodes.isEmpty())
return Collections.emptyMap();
// 遍历ReadyNodes 每个Node下的队列都获取一遍
Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
for (Node node : nodes) {
List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now);
batches.put(node.id(), ready);
}
return batches;
}
那么应该选择哪些Batch来发送呢?
遍历每个ReadyNode节点下面的每个TopicPartition 队列的首个Batch
retry.backoff.ms
, 则该TopicPartition队列会忽略 例如上图 Topic3Partition-1)max.request.size
. 那么也不会再重新遍历。上面我们已经得到了
Map<Integer, List<ProducerBatch>> batches
也就是Node.id 和对应要发往该Node的Request请求携带的ProducerBatch列表。
发送成功之后,会返回Response,根据Response情况处理不同的逻辑
Response处理逻辑
每个Batch都会对应着一个PartitionResponse, 不同的PartitionResponse对应的不同处理逻辑。
如果Response返回RecordTooLargeException异常,并且Batch里面的消息数量>1.这种情况, 就会尝试的去拆分Batch, 如何拆分呢? 是以大小来拆分成多个Batch。并且重新放入到消息累加器中。
如果返回是其他异常则先判断一下是否能够重试,如果能够重试,则重新入队到消息累加器中。重新入队的Batch会记录重试次数和时间等等信息。是否能够重试判断逻辑:batch没有超过 && 重启次数<
如果是DuplicateSequenceException异常的话,那么并不会做其他的处理,而是当做正常完成。
其他异常或者没有异常则会走正常流程, 并且调用,如果有Exception也会返回。这个InterceptorCallback里面包含在拦截器和(用户自己的回调)。调用顺序如下图:
producer.send(record,new SzzTestCallBack(record.topic(), record.key(), record.value()));
注意: 这里的回调并不是指的一个Batch一个回调,这里是一个Batch里面有多少条消息,就有多少个回调。每个ProducerBatch里面都有一个对象专门保存所有消息的回调信息 thunks
. 在处理ProducerBatch返回信息的时候会遍历这个trunks, 来执行每个消息的回调。
假如你想确定某个消息是否发送成功, 那么你可以自己定义一个拦截器。
并重写接口onAcknowledgement(RecordMetadata metadata, Exception exception)
在这里面来判断你的消息是否发送成功。
整个生产者客户端是由主线程和Sender线程协调运行的, 主线程创建消息, 然后通过 拦截器、元信息更新、序列化、分区器、缓存消息等等流程。
然后Sender线程在初始化的时候就已经运行了,并且是一个while循环。
Sender线程里面主要工作是:
寻找ReadyNodes:
构建Request:FirstBatchProducerBatchmax.request.size
将Request放入inFightRequest中:inFightRequestmax.in.flight.requests.per.connection
"Attempt to send a request to node " + nodeId + " which is not ready."
Request通过Selector发起通信.
返回Response:
从inFightRequest中移除完成Request
释放内存回消息累加器:消息累加器消息累加器
消息累加器