前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >通过Flume简单实现Kafka与Hive对接(Json格式)

通过Flume简单实现Kafka与Hive对接(Json格式)

作者头像
栗筝i
发布2022-12-01 08:50:26
9580
发布2022-12-01 08:50:26
举报
文章被收录于专栏:迁移内容

将以下存储在kafka的topic中的JSON格式字符串,对接存储到Hive的表中

代码语言:javascript
复制
{"id":1,"name":"小李"}
{"id":2,"name":"小张"}
{"id":3,"name":"小刘"}
{"id":4,"name":"小王"}

1、在hive/conf/hive-site.xml中添加或修改如下内容:

代码语言:javascript
复制
    <property>
   	 <name>hive.txn.manager</name>
   	 <value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
    </property>
    <property>
    	<name>hive.support.concurrency</name>
     	<value>true</value>
    </property>
    <property>
    	<name>hive.metastore.uris</name>
    	<value>thrift://localhost:9083</value>
    </property>

2、创建database、table,其中表有id、name这个两个字段

代码语言:javascript
复制
hive> create database hivetokafka;
 
hive> create table kafkatable(id int,name string) 
hive> clustered by(id) into 2 buckets stored as orc tblproperties('transactional'='true');

3、执行 hive --service metastore & 启动元数据服务

代码语言:javascript
复制
 hive --service metastore & 

4、配置conf文件,这里文件名和位置可以随意(我的是放在hive/myconf/新建的目录下,名字为kafkatohive.conf),添加如下内容

代码语言:javascript
复制
a.sources=source_from_kafka
a.channels=mem_channel
a.sinks=hive_sink


#kafka为souce的配置
a.sources.source_from_kafka.type=org.apache.flume.source.kafka.KafkaSource
a.sources.source_from_kafka.zookeeperConnect=localhost:2181
a.sources.source_from_kafka.bootstrap.servers=localhost:9092
a.sources.source_from_kafka.topic=testtopic
a.sources.source_from_kafka.channels=mem_channel
a.sources.source_from_kafka.consumer.timeout.ms=1000
#hive为sink的配置
a.sinks.hive_sink.type=hive
a.sinks.hive_sink.hive.metastore=thrift://localhost:9083
a.sinks.hive_sink.hive.database=hivetokafka
a.sinks.hive_sink.hive.table=kafkatable
a.sinks.hive_sink.hive.txnsPerBatchAsk=2
a.sinks.hive_sink.batchSize=10
a.sinks.hive_sink.serializer=JSON
a.sinks.hive_sink.serializer.fieldnames=id,name
#channel的配置
a.channels.mem_channel.type=memory
a.channels.mem_channel.capacity=1500
a.channels.mem_channel.transactionCapacity=1000
#三者之间的关系
a.sources.source_from_kafka.channels=mem_channel
a.sinks.hive_sink.channel=mem_channel

5、将/hive/hcatalog/share/hcatalog/hive-hcatalog-streaming-x.x.x.jar拷贝到/flume/lib/下

此外还需要注意/hive/lib/guava-xx.x-jre.jar下与/flume/lib/下的版本是否一致。

6、启动flume,命令格式如下

代码语言:javascript
复制
flume-ng agent --conf conf/ --conf-file conf/….  --name a -Dflume.root.logger=INFO,console;

我这里就是(在flume/路径下 ):

代码语言:javascript
复制
bin/flume-ng agent --conf myconf/ --conf-file myconf/kafkatohive.conf  --name a -Dflume.root.logger=INFO,console;

7、新建终端窗口,创建topic(默认已经启动了zookeeper和kafka服务了)

代码语言:javascript
复制
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testtopic

8、启动kafka生产者,进行生产消息

启动命令:

代码语言:javascript
复制
kafka-console-producer.sh --broker-list localhost:9092 --topic testtopic

生产消息:

代码语言:javascript
复制
>{"id":1,"name":"小李"}
>{"id":2,"name":"小张"}
>{"id":3,"name":"小刘"}
>{"id":4,"name":"小王"}

9、查看结果

代码语言:javascript
复制
hive> select * from student;
OK
1	小李
2	小张
3	小刘
4	小王

Time taken: 0.589 seconds, Fetched: 10 row(s)
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020-06-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档