pic by awesome from Instagram
用kafka-console-consumer.sh消费kafka数据时,如果数据是msgpack或protobuf的二进制格式时,打印出来的是乱码,不方便查看。
如何开发一个程序,直接消费kafka的msgpack和protobuf格式的数据,然后转化成json格式输出到控制台呢?
一种思路是,直接调用KafkaConsumer API,Java程序开发,代价比较高。因为kafka-console-consumer.sh支持很多参数,包括—topic,—bootstrap-server,—zookeeper等,重新开发一套程序,要兼容这些参数,很困难。
另一种思路是,借用linux的管道,前面还是用kafka-console-consumer.sh消费,用管道(|)将流数据传给Java程序,Java程序只负责解析二进制数据即可。
具体做法是:
msgpack2json.sh只要简单调用java -jar msgpack2json.jar即可。
Java程序读取管道也很简单:
这个方案存在问题。由于是二进制流,BufferedInputStream不知道要读多少个字节才是一条完整的kafka数据。缓存的buffer数组设置大一些,可以减少读到不完整记录的情况。kafka数据量小的时候,问题不大,但在kafka数据量大时,还是会经常出现读不到完整记录的情况。
如果是文本记录,用BufferedReader读取,换行符分隔记录,就不会出现这个问题。
所以管道读取不是很好的解决方案。
从kafka-console-consumer.sh这个脚本入手,底层调用的是ConsoleConsumer这个类。
研究ConsoleConsumer.scala的源码,发现打印到控制台的代码是:
只要将convertedBytes转化成json就达到目的了。
改造后的代码如下:
默认按msgpack格式进行解析,如果脚本运行参数里有—proto参数,则按protobuf格式数据进行解析。
由于protobuf格式是需要描述文件进行解析的,是.proto结尾的文件,定义了每个字段的类型和字段的顺序。如果要动态解析protobuf数据,就需要在脚本参数里指定.proto的描述文件。
Google的protobuf API里,有DynamicMessage类型,可以用DynamicMessage.parseFrom(Descriptors.Descriptor, byte[])进行解析。byte[]可以传kafka的二进制数据,而Descriptors.Descriptor对象构造需要如下步骤。
1、生成descriptor文件。
2、从descriptor文件生成Descriptors.Descriptor对象。
这里引用了protobuf-dynamic包。
注意,程序默认取的是proto文件里第1个定义的Message对象。如果proto文件里定义了几个Message对象,则程序很难知道要匹配哪个。建议一个proto文件只定义一个Message。
改造ConsumerConfig.scala程序后,参照kafka-console-consumer.sh生成脚本kafka-msgpack-json.sh。
最终调用方式为:
领取专属 10元无门槛券
私享最新 技术干货