首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >使用生成器把Kafka写入速度提高1000倍

使用生成器把Kafka写入速度提高1000倍

作者头像
青南
发布于 2018-08-31 06:11:05
发布于 2018-08-31 06:11:05
1.5K00
代码可运行
举报
文章被收录于专栏:未闻Code未闻Code
运行总次数:0
代码可运行

[如果代码显示有问题,请点击阅读原文]

通过本文你会知道Python里面什么时候用yield最合适。本文不会给你讲生成器是什么,所以你需要先了解Python的yield,再来看本文。

疑惑

多年以前,当我刚刚开始学习Python协程的时候,我看到绝大多数的文章都举了一个生产者-消费者的例子,用来表示在生产者内部可以随时调用消费者,达到和多线程相同的效果。这里凭记忆简单还原一下当年我看到的代码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import time
def consumer():
    product = None
    while True:
        if product is not None:
            print('consumer: {}'.format(product))
        product = yield None

def producer():
    c = consumer()
    next(c)
    for i in range(10):
        c.send(i)

start = time.time()
producer()
end = time.time()
print(f'直到把所有数据塞入Kafka,一共耗时:{end - start}秒')

运行效果如下图所示。

这些文章的说法,就像统一好了口径一样,说这样写可以减少线程切换开销,从而大大提高程序的运行效率。但是当年我始终想不明白,这种写法与直接调用函数有什么区别,如下图所示。

直到后来我需要操作Kafka的时候,我明白了使用yield的好处。

探索

为了便于理解,我会把实际场景做一些简化,以方便说明事件的产生发展和解决过程。事件的起因是我需要把一些信息写入到Kafka中,我的代码一开始是这样的:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import time
from pykafka import KafkaClient

client = KafkaClient(hosts="127.0.0.1:9092")
topic = client.topics[b'test']
def consumer(product):
    with topic.get_producer(delivery_reports=True) as producer:
        producer.produce(str(product).encode())
def feed():
    for i in range(10):
        consumer(i)

start = time.time()
feed()
end = time.time()
print(f'直到把所有数据塞入Kafka,一共耗时:{end - start}秒')

这段代码的运行效果如下图所示。

写入10条数据需要100秒,这样的龟速显然是有问题的。问题就出在这一句代码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
with topic.get_producer(delivery_reports=True) as producer

获得Kafka生产者对象是一个非常耗费时间的过程,每获取一次都需要10秒钟才能完成。所以写入10个数据就获取十次生产者对象。这消耗的100秒主要就是在获取生产者对象,而真正写入数据的时间短到可以忽略不计。

由于生产者对象是可以复用的,于是我对代码作了一些修改:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import timefrom pykafka import KafkaClient

client = KafkaClient(hosts="127.0.0.1:9092")
topic = client.topics[b'test']
products = []
def consumer(product_list):
    with topic.get_producer(delivery_reports=True) as producer:        for product in product_list:
            producer.produce(str(product).encode())
def feed():
    for i in range(10):
        products.append(i)
    consumer(products)

start = time.time()
feed()
end = time.time()
print(f'直到把所有数据塞入Kafka,一共耗时:{end - start}秒')

首先把所有数据存放在一个列表中,最后再一次性给consumer函数。在一个Kafka生产者对象中展开列表,再把数据一条一条塞入Kafka。这样由于只需要获取一次生产者对象,所以需要耗费的时间大大缩短,如下图所示。

这种写法在数据量小的时候是没有问题的,但数据量一旦大起来,如果全部先放在一个列表里面的话,服务器内存就爆了。

于是我又修改了代码。每100条数据保存一次,并清空暂存的列表:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import timefrom pykafka import KafkaClient

client = KafkaClient(hosts="127.0.0.1:9092")
topic = client.topics[b'test']
def consumer(product_list):
    with topic.get_producer(delivery_reports=True) as producer:        for product in product_list:
            producer.produce(str(product).encode())
def feed():
    products = []
    for i in range(1003):
        products.append(i)
        if len(products) >= 100:
            consumer(products)
            products = []
    if products:
        consumer(products)

start = time.time()
feed()
end = time.time()
print(f'直到把所有数据塞入Kafka,一共耗时:{end - start}秒')

由于最后一轮循环可能无法凑够100条数据,所以feed函数里面,循环结束以后还需要判断products列表是否为空,如果不为空,还要再消费一次。这样的写法,在上面这段代码中,一共1003条数据,每100条数据获取一次生产者对象,那么需要获取11次生产者对象,耗时至少为110秒。

显然,要解决这个问题,最直接的办法就是减少获取Kafka生产者对象的次数并最大限度复用生产者对象。如果读者举一反三的能力比较强,那么根据开关文件的两种写法:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# 写法一

with open('test.txt', 'w', encoding='utf-8') as f:
    f.write('xxx')
# 写法二

f = open('test.txt', 'w', encoding='utf-8')
f.write('xxx')
f.close()

可以推测出获取Kafka生产者对象的另一种写法:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# 写法二

producer = topic.get_producer(delivery_reports=True)
producer.produce(b'xxxx')
producer.close()

这样一来,只要获取一次生产者对象并把它作为全局变量就可以一直使用了。

然而,pykafka的官方文档中使用的是第一种写法,通过上下文管理器with来获得生产者对象。暂且不论第二种方式是否会报错,只从写法上来说,第二种方式必需要手动关闭对象。开发者经常会出现开了忘记关的情况,从而导致很多问题。而且如果中间出现了异常,使用上下文管理器的第一种方式会自动关闭生产者对象,但第二种方式仍然需要开发者手动关闭。

函数VS生成器

但是如果使用第一种方式,怎么能在一个上下文里面接收生产者传进来的数据呢?这个时候才是yield派上用场的时候。

首先需要明白,使用yield以后,函数就变成了一个生成器。生成器与普通函数的不同之处可以通过下面两段代码来进行说明:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
def funciton(i):
    print('进入')
    print(i)
    print('结束')
for i in range(5):
    funciton(i)

运行效果如下图所示。

函数在被调用的时候,函数会从里面的第一行代码一直运行到某个return或者函数的最后一行才会退出。

而生成器可以从中间开始运行,从中间跳出。例如下面的代码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
def generator():
    print('进入')
    i = None
    while True:
        if i is not None:
            print(i)
        print('跳出')
        i = yield None

g = generator()
next(g)
for i in range(5):
    g.send(i)

运行效果如下图所示。

从图中可以看到,进入只打印了一次。代码运行到i = yield None后就跳到外面,外面的数据可以通过g.send(i)的形式传进生成器,生成器内部拿到外面传进来的数据以后继续执行下一轮while循环,打印出被传进来的内容,然后到i = yield None的时候又跳出。如此反复。

所以回到最开始的Kafka问题。如果把with topic.get_producer(delivery_reports=True) as producer写在上面这一段代码的print('进入')这个位置上,那岂不是只需要获取一次Kafka生产者对象,然后就可以一直使用了?

根据这个逻辑,设计如下代码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import timefrom pykafka import KafkaClient

client = KafkaClient(hosts="127.0.0.1:9092")
topic = client.topics[b'test']
def consumer():
    with topic.get_producer(delivery_reports=True) as producer:
        print('init finished..')
        next_data = ''
        while True:
            if next_data:
                producer.produce(str(next_data).encode())
            next_data = yield True

def feed():
    c = consumer()
    next(c)
    for i in range(1000):
        c.send(i)

start = time.time()
feed()
end = time.time()
print(f'直到把所有数据塞入Kafka,一共耗时:{end - start}秒')

这一次直接插入1000条数据,总共只需要10秒钟,相比于每插入一次都获取一次Kafka生产者对象的方法,效率提高了1000倍。运行效果如下图所示。

后记

读者如果仔细对比第一段代码和最后一段代码,就会发现他们本质上是一回事。但是第一段代码,也就是网上很多人讲yield的时候举的生产者-消费者的例子之所以会让人觉得毫无用处,就在于他们的消费者几乎就是秒运行,这样看不出和函数调用的差别。而我最后这一段代码,它的消费者分成两个部分,第一部分是获取Kafka生产者对象,这个过程非常耗时;第二部分是把数据通过Kafka生产者对象插入Kafka,这一部分运行速度极快。在这种情况下,使用生成器把这个消费者代码分开,让耗时长的部分只运行一次,让耗时短的反复运行,这样就能体现出生成器的优势。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2018-04-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 未闻Code 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
python kafka kerberos 验证 消费 生产
(adsbygoogle = window.adsbygoogle || []).push({});
stys35
2019/03/12
2.2K0
python 操作kafka
最近项目中总是跟java配合,我一个写python的程序员,面对有复杂数据结构的java代码转换成python代码,确实是一大难题,有时候或多或少会留有一点坑,看来有空还得看看java基础。这不今天又开始让我们连接kafka啦。公司的kafka跟zookeeper做了群集,连接比较麻烦,具体如何使用,java那面做的封装我也看不到,所以只能通过简单的沟通。
py3study
2020/01/09
6530
何测试kafka
最近项目的消息中间件从nsq切换至kafka,说是为了避免消息丢失的问题。 没有项目管理,让我去推进,大家吭呲吭呲切换了,结果测试的时候发现性能跟不上,功能上没有问题。 kafka的基础组件是由工程院提供,是将官方的sdk包了一层,供各个业务方调用。是一个应届生写的,真搞不懂这么重要的东西,交给应届生去弄。 然后问题来了,业务方怎么用都是性能有问题,很卡顿,性能很差。工程院死活不承认,也不测试,拒不接受问题,反复让业务方提供证据。当业务方一份又一份报告给出的时候,他们就是不认可,极限拉扯。
赵云龙龙
2024/07/24
1360
何测试kafka
python 生成器&迭代器
1、列表生成器:列表生成式就像是一个厨师,他只会做这n(n为任意整数)道菜,想吃甚麽做甚麽,不吃不做,不浪费空间;而列表表达式就相当于已经做好的n盘菜,占用空间。 2、生成器的创建方法:
py3study
2020/01/21
5130
kafka 认证和鉴权方式_kafka实际应用
在每个Kafka broker的config目录中添加一个类似下面的JAAS文件,我们称之为kafka_server_jaas.conf,这个文件我们用于启动Kafka服务端:
全栈程序员站长
2022/11/17
3.6K0
kafka 认证和鉴权方式_kafka实际应用
Python 生成器 generator
举例: def gen(): for i in range(10): x = yield i print(x) g=ge() print(g.send(None))) print(g.send(2))
py3study
2020/01/14
3710
1.5万字长文:从 C# 入门 Kafka
本教程是关于 Kafka 知识的教程,从 C# 中实践编写 Kafka 程序,一边写代码一边了解 Kafka。
痴者工良
2023/03/11
2.4K0
1.5万字长文:从 C# 入门 Kafka
整合Kafka到spark-streaming实例
在这篇文章里,我们模拟了一个场景,实时分析订单数据,统计实时收益。
挖掘大数据
2018/01/09
5.1K0
整合Kafka到spark-streaming实例
Python快速学习第十二天--生成器和协程
yield指令,可以暂停一个函数并返回中间结果。使用该指令的函数将保存执行环境,并且在必要时恢复。 生成器比迭代器更加强大也更加复杂,需要花点功夫好好理解贯通。 看下面一段代码: [python] view plain copy def gen():   for x in xrange(4):           tmp = yield x   if tmp == 'hello':   print 'world' else:   print str(tmp)      
汤高
2018/01/11
1.4K0
go的kafka生产和消费
之前有一篇文件聊了聊如何生产不丢失数据,消费不丢失数据。这一篇我们来看下go如何通过参数配置来处理生产和消费的。
公众号-利志分享
2022/04/25
1.7K0
谈谈 Python 的生成器
第一次看到Python代码中出现yield关键字时,一脸懵逼,完全理解不了这个。网上查下解释,函数中出现了yield关键字,则调用该函数时会返回一个生成器。那到底什么是生成器呢?我们经常看到类似下面的代码 def count(n): x = 0 while x < n: yield x x += 1 for i in count(5): print i 这段代码执行后打印序列0到4,所以我一开始以为这个生成器就是生成一个序列呀。那这跟迭代器有什么区别呢?我们来看下迭代器的例子: class C
顶级程序员
2018/04/26
8460
useful-scripts
python使用相对简单,快速便捷,很适合作为脚本开发;作为"资深"的sub3/vscode控,使用编辑器鞋脚本再也适合不过,vscode中的调试功能太好用啦:
LiosWong
2020/09/01
6210
useful-scripts
Kafka 集群配置SASL+ACL
在Kafka0.9版本之前,Kafka集群时没有安全机制的。Kafka Client应用可以通过连接Zookeeper地址,例如zk1:2181:zk2:2181,zk3:2181等。来获取存储在Zookeeper中的Kafka元数据信息。拿到Kafka Broker地址后,连接到Kafka集群,就可以操作集群上的所有主题了。由于没有权限控制,集群核心的业务主题时存在风险的。
py3study
2020/02/21
4.7K0
[727]python操作kafka
pypi:https://pypi.org/project/kafka-python/ kafka-python:https://github.com/dpkp/kafka-python
周小董
2020/01/13
2.8K0
Python Kafka客户端confluent-kafka学习总结
Confluent在GitHub上开发和维护的confluent-kafka-python,Apache Kafka®的一个python客户端,提供了一个与所有brokers>=v0.8的kafka 、Confluent Cloud和Confluent Platform兼容的高阶级生产者、消费者和AdminClient。
授客
2023/11/07
1.8K0
Kafka 消息队列深度解析:它到底是怎么运作的?
在现代分布式系统中,消息队列几乎是绕不开的技术。无论是微服务架构、日志收集,还是数据流处理,消息队列都充当着信息的中转站,确保数据高效、可靠地流转。而在众多消息队列解决方案中,Kafka 是一个明星选手,以高吞吐、分布式、高可用而著称。
Echo_Wish
2025/05/07
1960
Kafka 消息队列深度解析:它到底是怎么运作的?
python 生成器
第一种方法很简单,只要把一个列表生成式的[]改成(),就创建了一个generator:
py3study
2018/08/02
5000
python 操作 kafka
Kafka 是由 Apache 软件基金会开发的一个开源流处理平台,由 Scala 和Java 编写。Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。对于像 Hadoop 一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。
用户4945346
2020/11/25
1.6K0
python 操作 kafka
Python 使用python-kafka类库开发kafka生产者&消费者&客户端
http://zookeeper.apache.org/releases.html#download
授客
2019/09/10
4.5K0
Python 检测系统时间,k8s版本,redis集群,etcd,mysql,ceph,kafka
线上有一套k8s集群,部署了很多应用。现在需要对一些基础服务做一些常规检测,比如:
py3study
2020/02/24
1.8K0
相关推荐
python kafka kerberos 验证 消费 生产
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档