前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >从源码来分析kafka生产者原理

从源码来分析kafka生产者原理

作者头像
崩天的勾玉
发布2024-03-12 17:33:48
1020
发布2024-03-12 17:33:48
举报
文章被收录于专栏:崩天的勾玉

源码学习是一种挺好的方式,不过根据我的经验最好是先学习大致的原理,再回头看源码,更能抓住重点。

今天带大家过一遍 kafka-python 最新v2.0.2生产者源码,为啥是python,当然是因为我比较熟悉,而且各语言实现都差不多。

本文分2个部分说明:

  • kafka生产者初始化做了什么
  • 发送消息时做了什么

喜欢可以收藏。

例行先上快速开始的代码😁:

代码语言:javascript
复制
from kafka import KafkaProducer


producer = KafkaProducer(
    bootstrap_servers=["ip:9092"],
    retries=3, 
    batch_size=524288, 
    linger_ms=400, 
    buffer_memory=134217728,     # 缓冲区内存调大到128mb
    max_request_size=5048576,  # 每次请求的最大体积
    compression_type='gzip',  # 可选,使用lz4压缩,能极大提高性能。需要安装依赖pip install lz4
)

for i in range(10000):
    producer.send('test', "测试".encode('utf-8'))
producer.flush()

异步发送,做了点参数调优,无甚稀奇。

1、生产者初始化

点开KafkaProducer 类,看看初始化了啥:

代码语言:javascript
复制
    def __init__(self, **configs):
        log.debug("Starting the Kafka producer")  # trace
        self.config = copy.copy(self.DEFAULT_CONFIG)
        for key in self.config:
            if key in configs:
                self.config[key] = configs.pop(key)

        # Only check for extra config keys in top-level class
        assert not configs, 'Unrecognized configs: %s' % (configs,)

        if self.config['client_id'] is None:
            self.config['client_id'] = 'kafka-python-producer-%s' % \
                                       (PRODUCER_CLIENT_ID_SEQUENCE.increment(),)

        if self.config['acks'] == 'all':
            self.config['acks'] = -1

        # api_version was previously a str. accept old format for now
        if isinstance(self.config['api_version'], str):
            deprecated = self.config['api_version']
            if deprecated == 'auto':
                self.config['api_version'] = None
            else:
                self.config['api_version'] = tuple(map(int, deprecated.split('.')))
            log.warning('use api_version=%s [tuple] -- "%s" as str is deprecated',
                        str(self.config['api_version']), deprecated)

        # Configure metrics
        metrics_tags = {'client-id': self.config['client_id']}

第一步是处理配置参数,你没指定的参数,那就给默认值,你传了,那我就校验。

比如client_id 没给我就自动生成一个,带一个固定前缀;

比如api_version 、compression_type 、max_in_flight_requests_per_connection参数获取或生成。

再往下看,到了第一个重点:RecordAccumulator

代码语言:javascript
复制
    self._accumulator = RecordAccumulator(message_version=message_version, metrics=self._metrics, **self.

点进去

代码语言:javascript
复制

    def __init__(self, **configs):
        self.config = copy.copy(self.DEFAULT_CONFIG)
        for key in self.config:
            if key in configs:
                self.config[key] = configs.pop(key)

        self._closed = False
        self._flushes_in_progress = AtomicInteger()
        self._appends_in_progress = AtomicInteger()
        self._batches = collections.defaultdict(collections.deque) # TopicPartition: [ProducerBatch]
        self._tp_locks = {None: threading.Lock()} # TopicPartition: Lock, plus a lock to add entries
        self._free = SimpleBufferPool(self.config['buffer_memory'],
                                      self.config['batch_size'],
                                      metrics=self.config['metrics'],
                                      metric_group_prefix=self.config['metric_group_prefix'])
        self._incomplete = IncompleteProducerBatches()
        # The following variables should only be accessed by the sender thread,
        # so we don't need to protect them w/ locking.
        self.muted = set()
        self._drain_index = 0

注意这行:

self._batches = collections.defaultdict(collections.deque)

RecordAccumulator类是啥呢?熟悉kafka的都知道,这是一个容器,存储消息批次_batches 的。

RecordAccumulator内的_batches在这里的实现是字典,键是TopicPartition,也就是主题+分区号,值是个队列collections.deque,队列内的元素是[ProducerBatch],也就是批次。

所以消息在生产者里,是这样存储的:

  • 一定数量的消息,组成一个批次batch
  • 一个主题的一个分区的所有batch,被放到一个队列里
  • 所有分区及各自的batch队列,共同在一个容器RecordAccumulator里

RecordAccumulator的大小由参数buffer_memory控制,batch的大小由参数batch_size控制。

再往下走,看到第二个重点:Sender线程

代码语言:javascript
复制
        self._sender = Sender(client, self._metadata,
                              self._accumulator, self._metrics,
                              guarantee_message_order=guarantee_message_order,
                              **self.config)
        self._sender.daemon = True
        self._sender.start()

生产者还初始化了个Sender实例,内部继承了线程类,并实现了run方法。并且是一个守护线程,在后台不停轮询:

代码语言:javascript
复制
    def run(self):
        """The main run loop for the sender thread."""
        log.debug("Starting Kafka producer I/O thread.")

        # main loop, runs until close is called
        while self._running:
            try:
                self.run_once()
            except Exception:
                log.exception("Uncaught error in kafka producer I/O thread")

这个Sender线程不停执行run_once方法,点进去看看:(只摘要了重点)

代码语言:javascript
复制
requests = self._create_produce_requests(batches_by_node)

for node_id, request in six.iteritems(requests):
    batches = batches_by_node[node_id]
    log.debug('Sending Produce Request: %r', request)
    (self._client.send(node_id, request, wakeup=False)
         .add_callback(
             self._handle_produce_response, node_id, time.time(), batches)
         .add_errback(
             self._failed_produce, batches, node_id))

Sender线程将消息批次按node归类,发往同一个node的批次放一个请求里,然后进行发送,并传递回调函数。

所以,Sender线程才是真正发送消息的发送者。

2、send()

那问题来了,下面发送消息的send()方法又做了啥?

producer.send('test', "测试".encode('utf-8'))

代码语言:javascript
复制
def send(self, topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None):

        self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0)
        
        key_bytes = self._serialize(
            self.config['key_serializer'],
            topic, key)
        value_bytes = self._serialize(
            self.config['value_serializer'],
            topic, value)

        partition = self._partition(topic, partition, key, key_bytes, value_bytes)

        message_size = self._estimate_size_in_bytes(key_bytes, value_bytes, headers)
        self._ensure_valid_record_size(message_size)

        result = self._accumulator.append(tp, timestamp_ms,
                                          key_bytes, value_bytes, headers,
                                          self.config['max_block_ms'],
                                          estimated_size=message_size)

只摘了重要部分,如下:

  • 刷新元数据
  • 对key、value序列化
  • 获取要发送的分区
  • 校验消息size
  • 将消息添加到_accumulator

我们看看添加消息的步骤

添加图片注释,不超过 140 字(可选)

首先取分区对应的队列,往队列的最后一个批次batch里塞消息

要是批次已经满了,就开辟一个新的批次,将消息塞入,并把批次放入队列里。

看到这里是不是很清晰了?send方法实际只是往RecordAccumulator容器里塞消息,Sender线程则在后台不停轮训,符合条件就发送。

总结

细节还有很多,比如api_version怎么生成的,参数怎么处理的,发送体积怎么限制的,具体发送过程是怎么样的,内部实现的什么消息协议,为什么生产者是线程安全的,在源码里你可以看到用了大量的锁。

感兴趣可以自己研究细节,但只要我们把握住了主干脉络,懂了大致原理,就能驾轻就熟,做到胸有成竹。

原创内容,欢迎关注我的专栏,谢谢😀

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-03-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 崩天的勾玉 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1、生产者初始化
  • 2、send()
  • 总结
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档