前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Kafka】使用Wireshark抓包分析Kafka通信协议

【Kafka】使用Wireshark抓包分析Kafka通信协议

原创
作者头像
图样图森跛
修改2021-05-27 20:12:02
5.1K0
修改2021-05-27 20:12:02
举报
文章被收录于专栏:大数据与实时计算

Wireshark

什么是Wireshark?

Wireshark (前身 Ethereal)是一个网络封包分析软件。网络封包分析软件的功能是撷取网络封包,并尽可能显示出最为详细的网络封包资料。

是目前全球使用最广泛的开源抓包软件,其前身为Ethereal,是一个通用的网络数据嗅探器和协议分析器,由Gerald Combs编写并于1998年以GPL开源许可证发布。如果是网络工程师,可以通过Wireshark对网络进行 故障定位和排错; 如果安全工程师,可以通过Wireshark对网络 黑客渗透攻击进行快速定位并找出攻击源; 如果是测试或软件工程师,可以通过Wireshark 分析底层通讯机制等

Wireshark下载地址

界面介绍

打开WireShark,整个界面分为两部分——工具栏和窗格

image.png
image.png

最上面是工具栏,包含两部分

  • 主工具栏:提供从菜单快速访问常用项目的功能,该工具栏不能由用户自定义
  • “Filter” 工具栏:可以快速编辑和应用显示过滤器。

窗格从上到下总共有3块区域

Packet List窗格:显示当前捕获文件中的所有数据包

Packet Details窗格:数据包详细信息窗格以更详细的形式显示当前数据包

Packet Bytes窗格:数据包字节窗格以十六进制转储样式显示当前数据包的数据

使用显示过滤器

Wireshark 提供了一种显示过滤器语言,可以精确地控制显示哪些数据包。它们可用于检查协议或字段的存在,字段的值,甚至可以将两个字段相互比较。

显示过滤器字段

最简单的显示过滤器是显示单个协议的过滤器。要仅显示包含特定协议的数据包,请在 Wireshark 的显示过滤器工具栏中键入该协议。

例如,要仅显示 Kafka 数据包,请在 Wireshark 的显示过滤器工具栏中键入 kafka.

image.png
image.png

Wireshark内置支持的协议类型非常多,可以参考: https://www.wireshark.org/docs/dfref/

Wireshark支持的Kafka协议字段可参考此链接: https://www.wireshark.org/docs/dfref/k/kafka.html

比较值

可以使用多个不同的比较运算符来构建用于比较值的显示过滤器。

例如,要仅显示去往或来自 IP 地址 192.168.0.1 的数据包,请使用 ip.addr==192.168.0.1。

列出了可用的比较运算符的完整列表

image.png
image.png

Kafka通信协议

Kafka的Producer、Broker和Consumer之间采用的是一套自行设计的基于TCP层的协议。Kafka的这套协议完全是为了Kafka自身的业务需求而定制的,协议定义了所有 API 的请求及响应消息。

概述

Kafka 协议是相当简单的,只有六个核心客户端请求 API:

  1. 元数据(Metadata) – 描述当前可用的 brokers,brokers 的主机和端口信息,并提供了哪个 broker 托管了哪些分区的信息;
  2. 发送(Send) – 发送消息到 broker;
  3. 获取(Fetch) – 从 broker 上获取消息。主要分三类:一个用于获取数据,一个用于获取集群的元数据,还有一个用于获取 topic 的偏移量信息;
  4. 偏移量(Offsets) – 获取给定 topic 分区的可用偏移量信息;
  5. 提交偏移量(Offset Commit) – 提交消费者组(Consumer Group)的一组偏移量;
  6. 获取偏移量(Offset Fetch) – 为消费者组获取一组偏移量

此外,从 0.9 版本开始,Kafka 支持为消费者和 Kafka 连接进行分组管理。客户端 API 包括五个请求:

  1. 分组协调者(GroupCoordinator) – 用来定位一个分组的当前协调者。
  2. 加入分组(JoinGroup) – 成为某个分组的成员,当分组不存在(没有一个成员时)则创建分组。
  3. 同步分组(SyncGroup) – 同步分组中所有成员的状态(例如分发分区分配信息(Partition Assignments)到各个组员)。
  4. 心跳(Heartbeat) – 保持组内成员的活跃状态。
  5. 离开分组(LeaveGroup) – 直接离开一个组。

最后,有几个管理 API,可用于监控/管理 Kafka 集群:

  1. 描述消费者组(DescribeGroups) – 用于检查一组群体的当前状态(如:查看消费者分区分配)。
  2. 列出组(ListGroups) – 列出某个 broker 当前管理的所有组

这里不针对性细讲,完整的协议介绍可以参考:

版本和兼容性

协议的目的要达到在向后兼容的基础上渐进演化。版本是基于每个API基础之上,每个版本包括一个请求和响应对。每个请求包含API Key,里面包含了被调用的API标识,以及表示这些请求和响应格式的版本号。

0.9.0.1 Kafka集群支持如下ApiKey的请求

代码语言:txt
复制
PRODUCE(0, "Produce"),

FETCH(1, "Fetch"),

LIST_OFFSETS(2, "Offsets"),

METADATA(3, "Metadata"),

LEADER_AND_ISR(4, "LeaderAndIsr"),

STOP_REPLICA(5, "StopReplica"),

UPDATE_METADATA_KEY(6, "UpdateMetadata"),

CONTROLLED_SHUTDOWN_KEY(7, "ControlledShutdown"),

OFFSET_COMMIT(8, "OffsetCommit"),

OFFSET_FETCH(9, "OffsetFetch"),

GROUP_COORDINATOR(10, "GroupCoordinator"),

JOIN_GROUP(11, "JoinGroup"),

HEARTBEAT(12, "Heartbeat"),

LEAVE_GROUP(13, "LeaveGroup"),

SYNC_GROUP(14, "SyncGroup"),

DESCRIBE_GROUPS(15, "DescribeGroups"),

LIST_GROUPS(16, "ListGroups");

服务器会拒绝它不支持的版本的请求,并始终返回它期望收到的能够完成请求响应的版本的协议格式。

通用的请求和响应格式

所有请求和响应都从以下语法为基础

代码语言:txt
复制
RequestOrResponse => Size (RequestMessage | ResponseMessage)  Size => int32

描述

MessageSize

MessageSize 域给出了后续请求或响应消息的字节(bytes)长度。客户端可以先读取4字节的长度N,然后读取并解析后续的N字节请求内容

请求(Requests)
代码语言:txt
复制
RequestMessage => ApiKey ApiVersion CorrelationId ClientId RequestMessage

  ApiKey => int16

  ApiVersion => int16

  CorrelationId => int32

  ClientId => string

  RequestMessage => MetadataRequest | ProduceRequest | FetchRequest | OffsetRequest | OffsetCommitRequest | OffsetFetchRequest

描述

ApiKey

这是一个表示所调用的API的数字id(即它表示是一个元数据请求?生产请求?获取请求等)

ApiVersion

这是该API的一个数字版本号。我们为每个API定义一个版本号,该版本号允许服务器根据版本号正确地解释请求内容。响应消息也始终对应于所述请求的版本的格式

CorrelationId

这是一个用户提供的整数。它将会被服务器原封不动地回传给客户端。用于匹配客户机和服务器之间的请求和响应

ClientId

这是为客户端应用程序的自定义的标识。用户可以使用他们喜欢的任何标识符,他们会被用在记录错误时,监测统计信息等场景。例如,你可能不仅想要监视每秒的总体请求,还要根据客户端应用程序进行监视,那它就可以被用上(其中每一个都将驻留在多个服务器上)。这个ID作为特定的客户端对所有的请求的逻辑分组

响应(Responses)
代码语言:txt
复制
Response => CorrelationId ResponseMessage

CorrelationId => int32

ResponseMessage => MetadataResponse | ProduceResponse | FetchResponse | OffsetResponse | OffsetCommitResponse | OffsetFetchResponse

描述

CorrelationId

服务器传回给客户端它所提供用作关联请求和响应消息的整数

所有响应都是与请求成对匹配(例如,发送回一个元数据请求,就会得到一个元数据响应)。

案例

kafka高版本Client连接0.9Serve

有高版本客户端连接0.9 Kafka集群时会出现生产和写入问题。

查看服务端日志如下:

image.png
image.png

看到java.lang.ArrayIndexOutOfBoundsException: 18这个关键字报错,可以明确有apikey=18的请求访问0.9集群,从前面可以知道0.9集群ApiKey最大支持到16,当前要找出是哪个任务用高版本客户端访问该0.9集群

使用tcpdump+Wireshark抓包分析

tcpdump抓包

在服务端,根据kafka所使用9092端口抓包

代码语言:txt
复制
tcpdump -i any -nn -vv tcp port 9092 -s 0 -w kafka.cap

wireshark分析

wireshark可能未能自动识别出kafka协议。首先检查一下Wireshark是否支持kafka协议解析。

image.png
image.png

出现以上信息说明wireshark支持Kafka协议,如果没有的话,更新wireshark最新版即可。当前笔者使用的是Version 3.4.5

接下来点选中一条数据消息右键,点击“Decode As”,在弹出窗口的“当前”下拉列表中选择“kafka”,然后点击“OK”。

image.png
image.png

可以看到除了tcp控制报文外,其他报文都被解析成kafka协议(如解析不出来,可尝试退出wireshark重新打开)。

image.png
image.png

Decode As临时设置解码器,退出Wireshark以后,这些设置会丢失

在“Filter” 工具栏中输入kafka.api\_key == 18 搜索apikey=18的请求来自哪个ip和端口

image.png
image.png

根据来源IP找到是实时计算集群,结合作业发布平台找出对应时间段可能的任务一一核实,找开发确认后将任务停掉恢复。

在案例中,之前处理方案是Kafka开启Trace日志重启,根据日志的最近的报错IP来猜测,具有一定的随机性,使用Wireshark工具分析可以又快又准的找出来。

查看Fetch请求或响应报文的详细字段

Kafka Fetch Request

可以看到向两个Partition 分区53和13请求消息,53分区请求offset是3605043491,Max Bytes是1MB。

image.png
image.png

Kafka Fetch Response

可以看到返回两个Partition 分区53和13的消息,53返回的是offset是3605043491,消息大小是2981B

image.png
image.png

总结

Wireshark在支持协议的数量方面是出类拔萃的,目前已提供了超过上千种协议的支持。这些协议包括从最基础的IP协议和DHCP协议到高级的专用协议比如Appletalk和Bittorrente等。

Wireshark从1.12.0版本开始支持Kafka通信协议,到现在最新的3.4.5更完善支持协议。通过Wireshark分析学习Kafka通信协议加深对Kafka的理解和问题处理。

由于Wireshark在开源模式下进行开发,每次更新都会增加一些对新协议的支持。后续鲲鹏运维将考虑对Pulsar协议的支持调研。

参考资料:

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Wireshark
    • 什么是Wireshark?
      • 界面介绍
        • 使用显示过滤器
          • 显示过滤器字段
          • 比较值
      • Kafka通信协议
        • 概述
          • 版本和兼容性
            • 通用的请求和响应格式
              • 请求(Requests)
              • 响应(Responses)
          • 案例
            • kafka高版本Client连接0.9Serve
              • 查看Fetch请求或响应报文的详细字段
              • 总结
              相关产品与服务
              云服务器
              云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档