首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

kafka-msgpack-json:将msgpack和protobuf转化成json

pic by awesome from Instagram

用kafka-console-consumer.sh消费kafka数据时,如果数据是msgpack或protobuf的二进制格式时,打印出来的是乱码,不方便查看。

如何开发一个程序,直接消费kafka的msgpack和protobuf格式的数据,然后转化成json格式输出到控制台呢?

一种思路是,直接调用KafkaConsumer API,Java程序开发,代价比较高。因为kafka-console-consumer.sh支持很多参数,包括—topic,—bootstrap-server,—zookeeper等,重新开发一套程序,要兼容这些参数,很困难。

另一种思路是,借用linux的管道,前面还是用kafka-console-consumer.sh消费,用管道(|)将流数据传给Java程序,Java程序只负责解析二进制数据即可。

具体做法是:

msgpack2json.sh只要简单调用java -jar msgpack2json.jar即可。

Java程序读取管道也很简单:

这个方案存在问题。由于是二进制流,BufferedInputStream不知道要读多少个字节才是一条完整的kafka数据。缓存的buffer数组设置大一些,可以减少读到不完整记录的情况。kafka数据量小的时候,问题不大,但在kafka数据量大时,还是会经常出现读不到完整记录的情况。

如果是文本记录,用BufferedReader读取,换行符分隔记录,就不会出现这个问题。

所以管道读取不是很好的解决方案。

从kafka-console-consumer.sh这个脚本入手,底层调用的是ConsoleConsumer这个类。

研究ConsoleConsumer.scala的源码,发现打印到控制台的代码是:

只要将convertedBytes转化成json就达到目的了。

改造后的代码如下:

默认按msgpack格式进行解析,如果脚本运行参数里有—proto参数,则按protobuf格式数据进行解析。

由于protobuf格式是需要描述文件进行解析的,是.proto结尾的文件,定义了每个字段的类型和字段的顺序。如果要动态解析protobuf数据,就需要在脚本参数里指定.proto的描述文件。

Google的protobuf API里,有DynamicMessage类型,可以用DynamicMessage.parseFrom(Descriptors.Descriptor, byte[])进行解析。byte[]可以传kafka的二进制数据,而Descriptors.Descriptor对象构造需要如下步骤。

1、生成descriptor文件。

2、从descriptor文件生成Descriptors.Descriptor对象。

这里引用了protobuf-dynamic包。

注意,程序默认取的是proto文件里第1个定义的Message对象。如果proto文件里定义了几个Message对象,则程序很难知道要匹配哪个。建议一个proto文件只定义一个Message。

改造ConsumerConfig.scala程序后,参照kafka-console-consumer.sh生成脚本kafka-msgpack-json.sh。

最终调用方式为:

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20190116G1AIAE00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券