pypi:https://pypi.org/project/kafka-python/ kafka-python:https://github.com/dpkp/kafka-python
pip install kafka
pip install kafka-python
如果想要完成负载均衡,就需要知道kafka的分区机制,同一个主题,可以为其分区,在生产者不指定分区的情况,kafka会将多个消息分发到不同的分区,消费者订阅时候如果不指定服务组,会收到所有分区的消息,如果指定了服务组,则同一服务组的消费者会消费不同的分区,如果2个分区两个消费者的消费者组消费,则,每个消费者消费一个分区,如果有三个消费者的服务组,则会出现一个消费者消费不到数据;如果想要消费同一分区,则需要用不同的服务组
kafka提供了偏移量的概念,允许消费者根据偏移量消费之前遗漏的内容,这基于kafka名义上的全量存储,可以保留大量的历史数据,历史保存时间是可配置的,一般是7天,如果偏移量定位到了已删除的位置那也会有问题,但是这种情况可能很小;每个保存的数据文件都是以偏移量命名的,当前要查的偏移量减去文件名就是数据在该文件的相对位置。要指定偏移量消费数据,需要指定该消费者要消费的分区,否则代码会找不到分区而无法消费
>>> from kafka import KafkaProducer
>>> producer = KafkaProducer(bootstrap_servers='localhost:1234')
>>> for _ in range(100):
... producer.send('foobar', b'some_message_bytes')
>>> # Block until a single message is sent (or timeout)
>>> future = producer.send('foobar', b'another_message')
>>> result = future.get(timeout=60)
>>> # Block until all pending messages are at least put on the network
>>> # NOTE: This does not guarantee delivery or success! It is really
>>> # only useful if you configure internal batching using linger_ms
>>> producer.flush()
>>> # Use a key for hashed-partitioning
>>> producer.send('foobar', key=b'foo', value=b'bar')
>>> # Serialize json messages
>>> import json
>>> producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
>>> producer.send('fizzbuzz', {'foo': 'bar'})
>>> # Serialize string keys
>>> producer = KafkaProducer(key_serializer=str.encode)
>>> producer.send('flipflap', key='ping', value=b'1234')
>>> # Compress messages
>>> producer = KafkaProducer(compression_type='gzip')
>>> for i in range(1000):
... producer.send('foobar', b'msg %d' % i)
>>> # Include record headers. The format is list of tuples with string key
>>> # and bytes value.
>>> producer.send('foobar', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64')])
>>> # Get producer performance metrics
>>> metrics = producer.metrics()
补充
from kafka import KafkaProducer
from kafka.errors import KafkaError
producer = KafkaProducer(bootstrap_servers=['broker1:1234'])
def on_send_success(record_metadata):
print(record_metadata.topic)
print(record_metadata.partition)
print(record_metadata.offset)
def on_send_error(excp):
log.error('I am an errback', exc_info=excp)
# handle exception
# produce asynchronously with callbacks
producer.send('my-topic', b'raw_bytes').add_callback(on_send_success).add_errback(on_send_error)
# configure multiple retries
producer = KafkaProducer(retries=5)
>>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer('my_favorite_topic')
>>> for msg in consumer:
... print (msg)
>>> # join a consumer group for dynamic partition assignment and offset commits
>>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group')
>>> for msg in consumer:
... print (msg)
>>> # manually assign the partition list for the consumer
>>> from kafka import TopicPartition
>>> consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
>>> consumer.assign([TopicPartition('foobar', 2)])
>>> msg = next(consumer)
>>> # Deserialize msgpack-encoded values
>>> consumer = KafkaConsumer(value_deserializer=msgpack.loads)
>>> consumer.subscribe(['msgpackfoo'])
>>> for msg in consumer:
... assert isinstance(msg.value, dict)
>>> # Access record headers. The returned value is a list of tuples
>>> # with str, bytes for key and value
>>> for msg in consumer:
... print (msg.headers)
>>> # Get consumer metrics
>>> metrics = consumer.metrics()
补充
from kafka import KafkaConsumer
consumer = KafkaConsumer('test',bootstrap_servers=['127.0.0.1:9092']) #参数为接收主题和kafka服务器地址
# 这是一个永久堵塞的过程,生产者消息会缓存在消息队列中,并且不删除,所以每个消息在消息队列中都有偏移
for message in consumer: # consumer是一个消息队列,当后台有消息时,这个消息队列就会自动增加.所以遍历也总是会有数据,当消息队列中没有数据时,就会堵塞等待消息带来
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
# 使用group,对于同一个group的成员只有一个消费者实例可以读取数据
consumer = KafkaConsumer('test',group_id='my-group',bootstrap_servers=['127.0.0.1:9092'])
for message in consumer:
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
#消费者读取目前最早可读的消息
consumer = KafkaConsumer('test',auto_offset_reset='earliest',bootstrap_servers=['127.0.0.1:9092'])
for message in consumer:
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
# consume earliest available messages, don't commit offsets
KafkaConsumer(auto_offset_reset='earliest', enable_auto_commit=False)
# consume json messages
KafkaConsumer(value_deserializer=lambda m: json.loads(m.decode('ascii')))
# consume msgpack
KafkaConsumer(value_deserializer=msgpack.unpackb)
# StopIteration if no message after 1sec
KafkaConsumer(consumer_timeout_ms=1000)
# Subscribe to a regex topic pattern
consumer = KafkaConsumer()
consumer.subscribe(pattern='^awesome.*')
auto_offset_reset:重置偏移量,earliest移到最早的可用消息,latest最新的消息,默认为latest 源码定义:{‘smallest’: ‘earliest’, ‘largest’: ‘latest’}
# ==========读取指定位置消息===============
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
consumer = KafkaConsumer('test',bootstrap_servers=['127.0.0.1:9092'])
print(consumer.partitions_for_topic("test")) #获取test主题的分区信息
print(consumer.topics()) #获取主题列表
print(consumer.subscription()) #获取当前消费者订阅的主题
print(consumer.assignment()) #获取当前消费者topic、分区信息
print(consumer.beginning_offsets(consumer.assignment())) #获取当前消费者可消费的偏移量
consumer.seek(TopicPartition(topic='test', partition=0), 5) #重置偏移量,从第5个偏移量消费
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
# =======订阅多个消费者==========
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])
consumer.subscribe(topics=('test','test0')) #订阅要消费的主题
print(consumer.topics())
print(consumer.position(TopicPartition(topic='test', partition=0))) #获取当前主题的最新偏移量
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
from kafka import KafkaConsumer
import time
consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])
consumer.subscribe(topics=('test','test0'))
while True:
msg = consumer.poll(timeout_ms=5) #从kafka获取消息
print(msg)
time.sleep(2)
# ==============消息恢复和挂起===========
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time
consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])
consumer.subscribe(topics=('test'))
consumer.topics()
consumer.pause(TopicPartition(topic=u'test', partition=0)) # pause执行后,consumer不能读取,直到调用resume后恢复。
num = 0
while True:
print(num)
print(consumer.paused()) #获取当前挂起的消费者
msg = consumer.poll(timeout_ms=5)
print(msg)
time.sleep(2)
num = num + 1
if num == 10:
print("resume...")
consumer.resume(TopicPartition(topic='test', partition=0))
print("resume......")
pause执行后,consumer不能读取,直到调用resume后恢复。
# -*- coding:utf-8 -*-
import sys,time,json
from kafka import KafkaProducer,KafkaConsumer,TopicPartition, OffsetAndMetadata
from kafka.errors import KafkaError
# from kafka.structs import TopicPartition
'''
pip install kafka==1.3.5
pip install kafka-python==1.3.5
'''
kafka_host = "47.14.12.26"
kafka_port = 9092
kafka_topic = "test"
class Kafka():
def __init__(self,key='key',group_id='group_id'):
self.key = key
bootstrap_servers = [
'{kafka_host}:{kafka_port}'.format(kafka_host=kafka_host,kafka_port=kafka_port),
]
self.producer = KafkaProducer(
bootstrap_servers = bootstrap_servers,
)
'''
fetch_min_bytes(int) - 服务器为获取请求而返回的最小数据量,否则请等待
fetch_max_wait_ms(int) - 如果没有足够的数据立即满足fetch_min_bytes给出的要求,服务器在回应提取请求之前将阻塞的最大时间量(以毫秒为单位)
fetch_max_bytes(int) - 服务器应为获取请求返回的最大数据量。这不是绝对最大值,如果获取的第一个非空分区中的第一条消息大于此值,
则仍将返回消息以确保消费者可以取得进展。注意:使用者并行执行对多个代理的提取,因此内存使用将取决于包含该主题分区的代理的数量。
支持的Kafka版本> = 0.10.1.0。默认值:52428800(50 MB)。
enable_auto_commit(bool) - 如果为True,则消费者的偏移量将在后台定期提交。默认值:True。
max_poll_records(int) - 单次调用中返回的最大记录数poll()。默认值:500
max_poll_interval_ms(int) - poll()使用使用者组管理时的调用之间的最大延迟 。这为消费者在获取更多记录之前可以闲置的时间量设置了上限。
如果 poll()在此超时到期之前未调用,则认为使用者失败,并且该组将重新平衡以便将分区重新分配给另一个成员。默认300000
'''
self.consumer = KafkaConsumer(
# kafka_topic,
bootstrap_servers=bootstrap_servers,
group_id = group_id,
# auto_offset_reset='earliest',
enable_auto_commit=False
)
self.topic_partition=TopicPartition(topic=kafka_topic, partition=0)
self.topic_partition2 =TopicPartition(topic=kafka_topic, partition=1)
##分配该消费者的TopicPartition,这里和KafkaConsumer()里不能同时配置kafka_topic
self.consumer.assign([
self.topic_partition,
# self.topic_partition2
]
)
# 获取test主题的分区信息
print(self.consumer.partitions_for_topic(kafka_topic))
print(self.consumer.assignment())
print(self.consumer.beginning_offsets(self.consumer.assignment()))
committed_offset=self.consumer.committed(self.topic_partition)
if committed_offset==None:
##重置此消费者消费的起始位
self.consumer.seek(partition=self.topic_partition, offset=0)
end_offset = self.consumer.end_offsets([self.topic_partition])[self.topic_partition]
print('已保存的偏移量:',committed_offset,'最新偏移量:',end_offset)
# 生产模块
def producer_data(self,):
try:
for _id in range(600,610):
params = {"msg" : str(_id)}
parmas_message = json.dumps(params,ensure_ascii=False)
v = parmas_message.encode('utf-8')
k = self.key.encode('utf-8')
print("send msg:(k,v)",k,v)
self.producer.send(kafka_topic, key=k, value= v, partition=0)
self.producer.flush()
# time.sleep(0.5)
self.producer.close()
except KafkaError as e:
print (e)
# # 消费模块
def consumer_data(self):
try:
print('consumer_data start')
for msg in self.consumer:
print(msg)
print('msg----->k,v,offset:', msg.key, msg.value,msg.offset)
# 手动提交偏移量 offsets格式:{TopicPartition:OffsetAndMetadata(offset_num,None)}
self.consumer.commit(offsets={self.topic_partition:(OffsetAndMetadata(msg.offset+1,None))})
committed_offset = self.consumer.committed(self.topic_partition)
print('已保存的偏移量:', committed_offset)
time.sleep(5)
except KeyboardInterrupt as e:
print(e)
if __name__ == '__main__':
try:
kafka=Kafka()
kafka.producer_data()
kafka.consumer_data()
except Exception as e:
import traceback
ex_msg = '{exception}'.format(exception=traceback.format_exc())
print(ex_msg)
参考: https://www.cnblogs.com/reblue520/p/8270412.html https://blog.csdn.net/luanpeng825485697/article/details/81036028 https://www.cnblogs.com/small-office/p/9399907.html https://blog.csdn.net/xiaofei2017/article/details/80924800
pykafka:https://github.com/Parsely/pykafka
pip install pykafka
开始肯定去找python连接kafka的标准库,kafka-python和pykafka 前者使用的人多是比较成熟的库,后者是Samsa的升级版本,在python连接并使用kafka 使用samsa连接zookeeper然后使用kafka Cluster很能满足我的需求,在pykafka的例子中也看到了zk的支持,而kafka-python并没有zk的支持,所以选择了pykafka做为连接库
kafaka和zookeeper的群集,使用samsa的时候生产者和消费者都连接了zookeeper,但是我跟人沟通,他们使用的时候是生产者直接连接kafaka服务器列表,消费者才用zookeeper。这也解决了我看pykafka文档,只有消费者才连接zookeeper的困惑,所以问题解决,直接按照文档搞起。
>>> from pykafka import KafkaClient
>>> client = KafkaClient(hosts="192.168.1.1:9092, 192.168.1.2:9092") # 可接受多个Client这是重点
>>> client.topics # 查看所有topic
>>> topic = client.topics['my.test'] # 选择一个topic
>>> producer = topic.get_producer()
>>> producer.produce(['test message ' + str(i ** 2) for i in range(4)]) # 加了个str官方的例子py2.7跑不过
>>> balanced_consumer = topic.get_balanced_consumer(
consumer_group='testgroup',
auto_commit_enable=True, # 设置为Flase的时候不需要添加 consumer_group
zookeeper_connect='myZkClusterNode1.com:2181,myZkClusterNode2.com:2181/myZkChroot' # 这里就是连接多个zk
)
来源:http://opslinux.com/2015/07/14/python%E8%BF%9E%E6%8E%A5kafka/