背景:
现有资源及框架是基于python语言编写;部分松耦合资源的串行导致效率较低;
思路:
核心思路:采用数据分片,将存在冲突的数据分到不同数据块上,通过数据块之间的并行及数据块内的串行,尽可能使中间过程并行;提高整体速度 ,就kafka而言,可以帮助解耦部分中间过程处理,下面给出python下生产者与消费者使用的demo,注意:消费者是采用的multiprocessing.Process非线程
import threading, logging, timeimport multiprocessingfrom kafka import KafkaConsumer, KafkaProducer# 生产者 基于线程class Producer(threading.Thread):def __init__(self): threading.Thread.__init__(self) self.stop_event = threading.Event() def stop(self): self.stop_event.set() def run(self): producer = KafkaProducer(bootstrap_servers='localhost:9092') while not self.stop_event.is_set(): producer.send('my-topic', b"test") producer.send('my-topic', b"Hola, mundo!") time.sleep(1) producer.close()
# 消费者 基于多进程class Consumer(multiprocessing.Process):def __init__(self): multiprocessing.Process.__init__(self) self.stop_event = multiprocessing.Event() def stop(self): self.stop_event.set() def run(self): consumer = KafkaConsumer(bootstrap_servers='localhost:9092', auto_offset_reset='earliest', consumer_timeout_ms=1000) consumer.subscribe(['my-topic']) while not self.stop_event.is_set(): for message in consumer: print(message) if self.stop_event.is_set(): break consumer.close()
#main方法中具体进行具体消息的生产及消费时,实例化生产/消费者后,依次调用.start() .stop() .join()方法
具体参见git:https://github.com/dpkp/kafka-python下的example.py文件
领取专属 10元无门槛券
私享最新 技术干货