kafka 自定义了一套网络协议,我们可以使用任意语言来实现这套协议,实现向 kafka 集群 push message 以及从 kafka 集群 pull message 的效果。在 kafka 2.8.0 版本的源码中的 clients 模块就是官方默认提供的 Java 版本 producer、consumer 实现,我们本课时重点关注其中的 producer 部分实现。
按照国际惯例,先来一个 demo 示例,带同学们了解一下 kafka Producer 的基本使用,示例的具体代码如下:
public class ProducerDemo {
public static void main(String[] args) throws Exception {
Properties config = new Properties();
config.put("client.id", "ProducerDemo");
// 指定kafka broker集群的地址
config.put("bootstrap.servers", "localhost:9092");
// 配置kafka集群响应之前,需要有多少replica成功复制了该message,all表示整个ISR集合都复制完成
config.put("acks", "all");
// 指定message key和value的序列化器,它负责将KV序列化成字节数组
config.put("key.serializer", StringSerializer.class);
config.put("value.serializer", StringSerializer.class);
KafkaProducer<String, String> producer = new KafkaProducer<>(config);
for (int i = 0; i < 10; i++) {
// 消息的value
long startTime = System.currentTimeMillis();
// 构造ProducerRecord对象,其中记录了该message的目标topic以及key和value
ProducerRecord<String, String> record =
new ProducerRecord<>("test_topic", String.valueOf(i), "YangSizheng_" + startTime);
// 第二个参数是一个匿名的CallBack对象,当producer接收到kafka集群发来的ACK确认消息的时候,
// 会调用其onCompletion()方法完成回调
Future<RecordMetadata> future = producer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null)
System.out.println("Send failed for record:" + record + ", error message:" + e.getMessage());
}
});
// send()方法是异步发送message,返回的是一个Future对象。如果需要同步发送,可以调用其get()方法,
// 返回的RecordMetadata中包含了该message落到了哪个partition上,以及分配的offset多少
RecordMetadata recordMetadata = future.get();
System.out.println("partition:" + recordMetadata.partition()
+ ", offset:" + recordMetadata.offset());
}
}
}
在执行 ProducerDemo 之前,我们执行kafka-console-consumer.sh
命令启动命令行 consumer:
1 | ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_topic |
---|
然后执行 ProducerDemo 可以在控制台看到如下输出:
在 kafka-console-consumer.sh
命令行中看到如下输出:
了解了 kafka producer 的基本使用之后,我们开始深入 producer 的架构进行介绍,千言万语不及不急一张图,下图就是 kafka producer 的核心架构:
这里描述一下上图中涉及到的核心组件在,这里涉及到两个线程,一个是我们的业务线程(也就是图中的主线程),另一个是 Sender 线程,我们一个个来说。 首先是主线程的逻辑:
下面来看 Sender 线程的逻辑:
介绍完 kafka producer 的核心架构和流程之后,我们开始深入分析 KafkaProducer.send() 方法,即主线程的核心逻辑,还是开局一张图,后面都好说:
下面来描述一下 KafkaProducer.send() 方法的核心逻辑,也就是上图的核心步骤:
首先来看 ProducerInterceptors,其中维护了一个 ProducerInterceptor 集合,其 onSend()方法、onAcknowledgement()方法、onSendError()方法,实际上,是循环调用 ProducerInterceptor 集合的方法。 我们可以通过实现 ProducerInterceptor 接口的 onSend() 方法来拦截或修改待发送的 message,也可以通过实现 onAcknowledgement()方法、onSendError()方法先于用户的 Callback,对kafka集群响应进行预处理。
在我们通过 KafkaProducer 发送 message 的时候,我们只明确指定了 message 要写入哪个 topic ,并没有明确指定要写入的 partition。
但是同一个 topic 的 partition 可能位于 kafka 的不同 broker 上,所以 producer 需要明确的知道该 topic 下所有 partition 的元信息(即所在 broker 的 IP、端口等信息),这样才能与 partition 所在 broker 建立网络连接并发送 message。
在 KafkaProducer 中,使用 Node、TopicPartition、PartitionInfo 三个类来记录 Kafka 集群元数据:
kafka producer 会将上述三个维度的基础信息封装成 Cluster 对象使用,下面是 Cluster 包含的信息:
再向上一层,Cluster对象会被维护到Metadata中,Metadata同时还维护了Cluster的版本号、过期时间、监听器等等信息,如下图所示:
经过上面的分析,我们可以得到下面这张简图:
静态数据结构分析完了之后,我们来看 KafkaProducer.waitOnMetadata()方法是如何工作的:
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long nowMs, long maxWaitMs) throws InterruptedException {
// 获取MetadataCache当前缓存的Cluster对象
Cluster cluster = metadata.fetch();
if (cluster.invalidTopics().contains(topic))
throw new InvalidTopicException(topic);
// 更新ProducerMetadata的缓存
metadata.add(topic, nowMs);
// 从partitionsByTopic集合中获取目标topic的partition数量
Integer partitionsCount = cluster.partitionCountForTopic(topic);
// 要是没有目标topic的元数据存在,则直接返回ClusterAndWaitTime对象,无需下面的更新操作
if (partitionsCount != null && (partition == null || partition < partitionsCount))
return new ClusterAndWaitTime(cluster, 0);
long remainingWaitMs = maxWaitMs;
long elapsed = 0;
do {
// 更新ProducerMetadata缓存
metadata.add(topic, nowMs + elapsed);
// 更新获取当前updateVersion,并设置相应标识,尽快触发元数据更新
int version = metadata.requestUpdateForTopic(topic);
// 唤醒Sender线程,由Sender线程去完成元数据的更新
sender.wakeup();
try {
// 阻塞等待元数据更新,停止阻塞的条件是:更新后的updateVersion大于当前version,超时的话会直接抛出异常
metadata.awaitUpdate(version, remainingWaitMs);
} catch (TimeoutException ex) {
throw new TimeoutException(。。。);
}
cluster = metadata.fetch(); // 获取最新的Cluster
elapsed = time.milliseconds() - nowMs;
if (elapsed >= maxWaitMs) {
throw new TimeoutException(partitionsCount == null ?
String.format("Topic %s not present in metadata after %d ms.",
topic, maxWaitMs) :
String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.",
partition, topic, partitionsCount, maxWaitMs));
}
metadata.maybeThrowExceptionForTopic(topic);
remainingWaitMs = maxWaitMs - elapsed; // 计算元数据更新耗时
partitionsCount = cluster.partitionCountForTopic(topic); // 获取partition数
} while (partitionsCount == null || (partition != null && partition >= partitionsCount));
return new ClusterAndWaitTime(cluster, elapsed);
}
这里具体如何更新元数据,我们将在介绍 Sender 线程工作流程的时候,详细分析。
分布式系统中各个节点相互通信,必然涉及到内存对象与字节流之间的转换,也就是序列化与反序列化。
kafka 中的序列化器接口是 Serializer,负责将对象转换成字节数组;反序列化器是 Deserializer 接口,负责将字节数组转换成内存中的对象。
下面展示了 Serializer 和 Deserializer 接口的实现类:
从上图中我们可以看出,kafka 自带了常用 Java 类型的 Serializer 实现和 Deserializer 实现,当然,我们也可以自定义Serializer和Deserializer实现来处理复杂类型。
下面我们以 StringSerializer 实现为例说明一下 Serializer 的核心实现:
在 waitOnMetadata() 方法拿到最新的集群元数据之后,下面就要开始确定待发送的 message 要发送到哪个 partition 了。
如果我们明确指定了目标 partition,则以用户指定的为准,但是一般情况下,业务并不会指定 message 需要写入到哪个 partition,此时就会通过 Partitioner 结合 集群元数据计算出一个目标 partition。
下图展示了 Partitioner 接口的全部实现:
从名字也能看出,DefaultPartitioner 是默认实现,其中的 partition() 方法中:
这里解释一下 StickyPartitionCache 的功能。我们前面介绍整个 KafkaProducer 流程的时候说过,RecordAccumulator 是一个缓冲区,主线程发送的 message 会先进入 RecordAccumulator,然后 Sender 线程攒够了 message 的时候进行批量发送。
触发 Sender 线程批量发送堆积 message 的条件主要有两方面:
StickyPartitionCache 主要实现的是”黏性选择”,就是尽可能的先往一个 partition 发送 message,让发往这个 partition 的缓冲区快速填满,这样的话,就可以降低 message 的发送延迟。我们不用担心出现 partition 数据量不均衡的情况,因为只要业务运行时间足够长,message 还是会均匀的发送到每个 partition 上的。
下面来看 StickyPartitionCache 的实现,其中维护了一个 ConcurrentMap(indexCache字段),key 是 topic,value 是当前黏住了哪个 partition。
在 partition() 方法中,StickyPartitionCache 会先从 indexCache 字段中获取黏住的 partition,如果没有,则调用 nextPartition() 方法向 indexCache 中写入一个。在 nextPartition() 方法中,会先获取目标 topic 中可用的 partition,并从中随机选择一个写入 indexCache。
最后,同学们可能问,什么时候更新黏住的 partition 呢?我们看一下 KafkaProducer.doSend()方法中,有这么一个片段:
// 尝试向RecordAccumulator中追加message
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
// 由于目标partition的当前batch没有空间了,需要更换一个partition,再次尝试
if (result.abortForNewBatch) {
int prevPartition = partition;
// 更换目标partition,同时也会更换StickyPartitionCache黏住的partition
partitioner.onNewBatch(record.topic(), cluster, prevPartition);
// 计算新的目标partition
partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
// 再次调用append()方法向RecordAccumulator写入message,如果该partition缓冲区中的batch也没有空间,
// 则创建新batch了,不会再次尝试了
result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
}
RecordAccumulator.append()方法我们后面分析。
UniformStickyPartitioner 这个 Partitioner 底层也是依赖 StickyPartitionCache 实现黏性发送的,不再展开介绍。
再来看 RoundRobinPartitioner 实现,从名字也可以看出,它是按照轮训的策略来计算目标 partition,其中也维护了一个 ConcurrentMap 集合(topicCounterMap字段),其中的 key 是 topic 的名称,value 是一个递增的 AtomicInteger。
在 RoundRobinPartitioner.partition() 方法中,会先查找目标 topic 的 partition 总数,然后自增上述 AtomicInteger 值并与 partition 总数取模,得到目标 partition 的编号。
本课时我们首先介绍了 KafkaProducer 的基础使用,然后介绍了 KafkaProducer 的核心架构,最后介绍了 KafkaProducer.send() 方法中主线程的核心操作。
下一课时,我们将开始介绍 KafkaProducer 中 RecordAccumulator 相关的内容。
本课时的文章和视频讲解,还会放到:
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。