最近项目的消息中间件从nsq切换至kafka,说是为了避免消息丢失的问题。 没有项目管理,让我去推进,大家吭呲吭呲切换了,结果测试的时候发现性能跟不上,功能上没有问题。 kafka的基础组件是由工程院提供,是将官方的sdk包了一层,供各个业务方调用。是一个应届生写的,真搞不懂这么重要的东西,交给应届生去弄。 然后问题来了,业务方怎么用都是性能有问题,很卡顿,性能很差。工程院死活不承认,也不测试,拒不接受问题,反复让业务方提供证据。当业务方一份又一份报告给出的时候,他们就是不认可,极限拉扯。
也难怪,他们也没有几个人懂,就一个人弄这个东西,其它的事情特别多,也懒得看。 反复拉扯了个把月, 天天让我这样测,那样测,问题摆在那,就是没人去分析和解决,推进这个都搞得好烦躁,向上反馈也没啥用,上面也不懂,提供不了任何帮助,锅一直挂我头上,压力山大。
突然有一天,也许是良心发现,也许是受到某种压力,工程院去看问题了,马上就改了个参数就解决问题了。美丽的话语就不问候了。只能说这公司的管理水平和技术水平就这么奇葩。
先不吐槽了,现在来讨论如何测试kafka如何测试。
高吞吐、低延迟
:kakfa 最大的特点就是收发消息非常快,kafka 每秒可以处理几十万条消息,它的最低延迟只有几毫秒。高伸缩性
:每个主题(topic) 包含多个分区(partition),主题中的分区可以分布在不同的主机(broker)中。持久性、可靠性
:Kafka 能够允许数据的持久化存储,消息被持久化到磁盘,并支持数据备份防止数据丢失,Kafka 底层的数据存储是基于 Zookeeper 存储的,Zookeeper 我们知道它的数据能够持久存储。容错性
:允许集群中的节点失败,某个节点宕机,Kafka 集群能够正常工作高并发
:支持数千个客户端同时读写消息:Kafka 中的数据单元被称为消息
,也被称为记录,可以把它看作数据库表中某一行的记录。
批次:为了提高效率, 消息会分批次
写入 Kafka,批次就代指的是一组消息。
主题:消息的种类称为 主题
(Topic),可以说一个主题代表了一类消息。相当于是对消息进行分类。主题就像是数据库中的表。
分区:主题可以被分为若干个分区(partition),同一个主题中的分区可以不在一个机器上,有可能会部署在多个机器上,由此来实现 kafka 的伸缩性
,单一主题中的分区有序,但是无法保证主题中所有的分区有序
生产者:向主题发布消息的客户端应用程序称为生产者
(Producer),生产者用于持续不断的向某个主题发送消息。
消费者:订阅主题消息的客户端程序称为消费者
(Consumer),消费者用于处理生产者产生的消息。
消费者群组:生产者与消费者的关系就如同餐厅中的厨师和顾客之间的关系一样,一个厨师对应多个顾客,也就是一个生产者对应多个消费者,消费者群组
(Consumer Group)指的就是由一个或多个消费者组成的群体。
偏移量:偏移量
(Consumer Offset)是一种元数据,它是一个不断递增的整数值,用来记录消费者发生重平衡时的位置,以便用来恢复数据。
broker: 一个独立的 Kafka 服务器就被称为 broker
,broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。
broker 集群:broker 是集群
的组成部分,broker 集群由一个或多个 broker 组成,每个集群都有一个 broker 同时充当了集群控制器
的角色(自动从集群的活跃成员中选举出来)。
副本:Kafka 中消息的备份又叫做 副本
(Replica),副本的数量是可以配置的,Kafka 定义了两类副本:领导者副本(Leader Replica) 和 追随者副本(Follower Replica),前者对外提供服务,后者只是被动跟随。
重平衡:Rebalance。消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。
可以理解为,以前取快递,需要快递员送到手中,mq作为快递站,可以存和收快递,快递就很方便了。 这样就解耦了必须本人签字收快递。 但是有个原则,快递不能错,不能丢失,不能超期,不能没人领取。 其它的快递站因为快递都丢一起,快递有丢失风险。所以用kafka, kafka可以理解为顺丰的丰巢。
生产者可以理解为快递小哥。 消息可以理解为快递。 消费者可以理解为收快递的。 消费者群组可以理解为一个小区收快递的。 broker可以理解为快递柜。 偏移量可以理解为快递柜上的小仓格。
测试kafka, 要保证消息没堆积,如果堆积消息要能消费完,如果发生故障消息不丢失, 而且要保证错误消息要重发,重消费,能消费正确。
如果要测试性能,需要用脚本压一下,以下是示例:
from confluent_kafka import Producer
import json
import time
import uuid
from datetime import datetime
# 生成随机UUID
kafka_conf = {
'bootstrap.servers': 'kafka-headless.resource.svc.cluster.local:9097',
'sasl.mechanism': 'PLAIN',
'security.protocol': 'SASL_PLAINTEXT',
'sasl.username': 'bot',
'sasl.password': 'com123'
}
producer = Producer(kafka_conf)
# with open('test.json', 'r') as file:
# body_data = json.load(file)
topic = 'as.audit_log.log_login'
def sent_message(body_data):
producer.produce(topic, value=json.dumps(body_data))
producer.flush()
time.sleep(0.007)
if __name__ == "__main__":
data_count = 500000 # 总数据量
sleep_interval = 1000 # 每5万条数据后暂停
sleep_duration = 1 # 暂停1秒
# 获取当前时间
now = datetime.now()
# 按照指定格式输出
start_time = now.strftime('%Y-%m-%d %H:%M:%S')
print("start: {}".format(start_time))
for i in range(data_count):
uuid4 = uuid.uuid4()
body_data = {
"user_id": "c8d8a054-3a9d-11ef-a871-0ebc48156fd7",
"user_name": "anna",
"user_type": "authenticated_user",
"level": 1,
"op_type": 2,
"date": 1720061706335227,
"ip": "10.4.36.215",
"mac": "",
"msg": "上传文件“归档库使用明细表.2024-06-12.csv”成功",
"ex_msg": "文档唯一标识:59FBDF50B015411C9B1DD093C2D226F6,父路径: AnyShare://darren; 文件大小:129 字节(Bytes); 文件密级:非密 UserAgent:Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"obj_id": "59FBDF50B015411C9B1DD093C2D226F6",
"additional_info": "{\"user_account\":\"admin\"}",
"out_biz_id": str(uuid4),
"dept_paths": "组织结构, 组织结构/一个新的部门001"
}
sent_message(body_data)
# if (i+1) % sleep_interval == 0:
# time.sleep(1)
now = datetime.now()
# 按照指定格式输出
end_time = now.strftime('%Y-%m-%d %H:%M:%S')
print("end: {}".format(end_time))
测试场景: