kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。
kafka里面的一些概念:
producer:生产者。
consumer:消费者。
topic: 消息以topic为类别记录,Kafka将消息种子(Feed)分门别类,每一类的消息称之为一个主题(Topic)。
broker:以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker;消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。
kafka有四个核心API:producer API,consumer API,streams API,connector API
kafka有什么用?
可它以有效的获取系统和应用程序之间的数据,对数据流进行转换或者反应。
关于kafka的下载安装就不过多介绍了,下面主要介绍的是使用python操作kafka。
首先安装kafka的模块:
pip install kafka
安装完我们就可以尝试着去跑个例子:
首先看看producer是怎么跑起来的:
fromkafkaimportKafkaProducer
producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
foriinrange(3):
msg ="msg%d"% i
producer.send('test',msg)
producer.close()
调用KafkaProducer指定server地址即可
类似的来看看consumer例子:
fromkafkaimportKafkaConsumer
consumer = KafkaConsumer('test',
bootstrap_servers=['127.0.0.1:9092'])
formessageinconsumer:
print("%s:%d:%d: key=%s value=%s"% (message.topic,message.partition,
message.offset,message.key,
message.value))
对于consumer group(消费者群组),我们需要给一个群组id(用来区分单个消费者或是群组):
fromkafkaimportKafkaConsumer
consumer = KafkaConsumer('test',
group_id='my-group',
bootstrap_servers=['127.0.0.1:9092'])
formessageinconsumer:
print("%s:%d:%d: key=%s value=%s"% (message.topic,message.partition,
message.offset,message.key,
message.value))
使用consumer订阅多个主题,需要使用subscribe方法,传入需要订阅的标题:
fromkafkaimportKafkaConsumer
fromkafka.structsimportTopicPartition
consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])
consumer.subscribe(topics=('topic1','topic2','top3'))#订阅要消费的主题
printconsumer.topics()
printconsumer.position(TopicPartition(topic=u'test',partition=))#获取当前主题的最新偏移量
formessageinconsumer:
print("%s:%d:%d: key=%s value=%s"% (message.topic,message.partition,
message.offset,message.key,
message.value))
如果需要手动拉取信息,那我们需要加一个循环,在这个循环里监听,一直获取服务器信息:
fromkafkaimportKafkaConsumer
consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])
consumer.subscribe(topics=('topic1','topic2','top3'))
while True:
msg = consumer.poll(timeout_ms=5)#从kafka获取消息
printmsg
如果想挂起consumer可以调用pause()方法,恢复调用resume()方法:
fromkafkaimportKafkaConsumer
fromkafka.structsimportTopicPartition
importtime
consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])
consumer.subscribe(topics=('topic1'))
consumer.topics()
consumer.pause(TopicPartition(topic=u'test',partition=))
num =
while True:
printnum
printconsumer.paused()#获取当前挂起的消费者
msg = consumer.poll(timeout_ms=5)
printmsg
time.sleep(2)
num = num +1
ifnum ==10:
consumer.resume(TopicPartition(topic=u'test',partition=))
print"resume......"
关于简单的操作就介绍到这里了,想了解更多:
https://pypi.org/project/kafka-python/
Pls follow It!!
领取专属 10元无门槛券
私享最新 技术干货