01
前言
今天给大家带来的是 Kafka Producer 的全方位解析(基于 Apache Kafka 3.72)。考虑到篇幅限制,本文分为上下两篇,上篇将介绍 Kafka Producer 的使用方法与实现原理,下篇将介绍 Kafka Producer 的实现细节与常见问题。
02
使用方法
在介绍 Kafka Producer 的具体实现前,首先看一下如何使用。用 Kafka Producer 向指定 topic 发送一条消息的示例代码如下:
// 配置并创建一个 Producer
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "localhost:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(kafkaProps);
// 向指定 topic 发送一条消息
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "my-key", "my-value");
producer.send(record, (metadata, exception) -> {
if (exception != null) {
// 发送失败
exception.printStackTrace();
} else {
// 发送成功
System.out.println("Record sent to partition " + metadata.partition() + " with offset " + metadata.offset());
}
});
// 关闭 Producer,释放资源
producer.close();
接下来详细介绍一下 Kafka Producer 的主要接口。
public class ProducerRecord<K, V> {
private final String topic;
private final Integer partition;
private final Headers headers;
private final K key;
private final V value;
private final Long timestamp;
}
public interface Callback {
void onCompletion(RecordMetadata metadata, Exception exception);
}
public interface Producer<K, V> {
// ...
Future<RecordMetadata> send(ProducerRecord<K, V> record);
Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
void flush();
void close();
// ...
}
注:在接口 Producer中,还有一些事务相关的接口,例如beginTransaction、commitTransaction等,其在我们另外一篇内容原理剖析| Kafka Exactly Once 语义实现原理:幂等性与事务消息中已经详细介绍过,此处不再赘述。
2.1 ProducerRecord
Producer 发送出的一条消息,包含以下属性
Partitioner
或内置的 BuiltInPartitioner
选择分区(详见下文)send
方法的时间)2.2 Callback
用于发送消息 ack 后的回调。可能发生的 Exception 有:
InvalidTopicException
:topic 的名称不合法,例如过长、为空、使用非法字符等OffsetMetadataTooLarge
:调用 Producer#sendOffsetsToTransaction
时,使用的 Metadata 字符串过长(由 offset.metadata.max.bytes 控制,默认 4 KiB)RecordBatchTooLargeException
:发送的 batch 的大小RecordTooLargeException
:单条消息的大小超过了 producer 单个请求的最大大小(producer 配置 max.request.size,默认 1MiB)TopicAuthorizationException
、ClusterAuthorizationException
:鉴权失败UnknownProducerIdException
:事务请求中,PID 已过期或 PID 关联的 record 均已过期InvalidProducerEpochException
:事务请求中,epoch 非法UnknownServerException
:未知错误CorruptRecordException
:CRC 校验失败,通常由网络错误导致InvalidMetadataException
:Client 侧的 metadata 过期UnknownTopicOrPartitionException
:topic 或 partition 不存在,可能由 metadata 过期导致NotLeaderOrFollowerException
:请求的 broker 不是 leader,可能正在选举 leaderFencedLeaderEpochException
:请求中的 leader epoch 过期,可能由 metadata 刷新慢导致NotEnoughReplicasException
、NotEnoughReplicasAfterAppendException
:insync replica 数量不足(broker 配置 min.insync.replicas 或同名 topic 配置,默认 1)。注意,NotEnoughReplicasAfterAppendException
会在 record 写入完成后发现,producer 的重试会导致数据重复TimeoutException
:处理超时,有两种可能2.3 Producer#send
异步地发送一条消息,如果需要,在本条消息 ack 后触发 Callback。
保证向同一个 partition 发送的 send 请求的 Callback 会按调用顺序依次触发。
2.4 Producer#flush
标记 producer 缓存中的所有消息立即可用于发送,并阻塞当前线程,直至在此之前的所有消息都被 ack。
注:仅会阻塞当前线程,其他线程仍可正常发送,但对调用 flush 方法后发送的其他消息的完成时机没有保证。
2.5 Producer#close
关闭 producer,并阻塞等待至所有消息发送完成。
注:
03
核心组件
接下来介绍 Kafka Producer 的具体实现,它由以下几个核心组件组成
04
发送流程
一条消息的发送流程如下图:
分为以下几步:
接下来介绍其中的各项细节
4.1 刷新元数据
ProducerMetadata负责 Producer 侧所需元数据的缓存与刷新,它会维护一个 topic 视图,其中包含 producer 所需的所有 topic。它会
相关配置有
4.2 分区选择
在 KIP-7943 中,为了解决之前版本中的 Sticky Partitioner 导致的“向更慢的 broker 发送了更多的消息”的问题,提出了一个新的 Uniform Sticky Partitioner(并作为默认的内置 Partitioner)。在没有 key 的限制时,它会向更快的 broker 发送更多的消息。在进行分区选择时,分为以下两种情况:
BuiltInPartitioner
相关配置有
分区选择器的类名,可以由用户根据需求自行实现。提供了一些默认实现
DefaultPartitioner
与 UniformStickyPartitioner
:会 "sticky" 地向各 partition 分配消息,即,在某个 partition 攒满一个 batch 后,切换至下一个 partition。但其实现上存在问题,会导致向更慢的 broker 发送更多消息,现已标记为废弃。RoundRobinPartitioner
:将会忽略 record key,循环(round robin)地向每个 partition 分配消息。注意,它存在一个已知问题:在创建新的 batch 时,会导致不平均的分配。
目前建议使用内置 partitioner 或者自行实现 partitioner。是否根据 broker 的速度决定发送消息的数量,若不开启,则会随机地选择 partition。仅在未配置 partitioner.class 时生效。默认为 "true"。
仅在 partitioner.adaptive.partitioning.enable 设置为 "true" 时生效。当“为指定 broker 攒出一批消息的时间点”和“向指定 broker 发送消息的时间点”相差超过此配置时,则不再向指定 broker 分配消息;设置为 0 意味着不开启此逻辑。仅在未配置 partitioner.class 时生效。默认为 0。
选择 partition 时是否忽略消息的 key,若为 "false",则根据 key 的哈希值选择 partition,否则忽略 key 值。仅在未配置 partitioner.class 时生效。默认为 "false"。
4.3 消息攒批
在 RecordAccumulator 中,按照 partition 维度维护了所有待发送的 batch。有以下几个重要方法:
public RecordAppendResult append(String topic,
int partition,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
AppendCallbacks callbacks,
long maxTimeToBlock,
boolean abortOnNewBatch,
long nowMs,
Cluster cluster) throws InterruptedException;
public ReadyCheckResult ready(Metadata metadata, long nowMs);
public Map<Integer, List<ProducerBatch>> drain(Metadata metadata, Set<Node> nodes, int maxSize, long now);
相关配置有
4.4 超时处理
Kafka Producer 定义了一系列超时相关的配置,用于控制发送消息的各个阶段允许耗时的最大值。梳理如下图:
具体地说,相关配置有
05
小结
我们的项目 AutoMQ1 致力于构建下一代云原生 Kafka 系统,解决过去 Kafka 的成本、弹性问题。作为 Kafka 生态的忠实拥护者和参与者,我们将持续为 Kafka 技术爱好者带来优质的 Kafka 技术内容分享。在上篇中,我们介绍了 Kafka Producer 的使用方法以及基础的实现原理;在下篇中,我们将介绍 Kafka Producer 的更多实现细节与使用中的常见问题。欢迎关注我们以了解更多。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。