前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >利用Flume 汇入数据到HBase:Flume-hbase-sink 使用方法详解

利用Flume 汇入数据到HBase:Flume-hbase-sink 使用方法详解

原创
作者头像
mikealzhou
发布2018-01-20 22:49:04
5.9K1
发布2018-01-20 22:49:04
举报
文章被收录于专栏:大数据平台TBDS

导读:作者在上一篇文章中:https://cloud.tencent.com/developer/article/1025430《Flume-Hbase-Sink针对不同版本flume与HBase的适配研究与经验总结》,详细描述了两大类HBaseSinks:org.apache.flume.sink.hbase.HBaseSink(简称HBaseSink),以及org.apache.flume.sink.hbase.AsyncHBaseSink(简称AsyncHBaseSink)对于不同版本的flume以及不同版本的HBase之间的兼容性问题,并给出了详细源码解读。

本文作者将会详细描述这两大类HBaseSinks 对应的三种序列化模式的使用方法。本文在第一章节详细解读官方文档的使用说明,第二章节里面使用具体的案例来说明具体用法,在第三章节里面给出一个多source、多channel、多sink的复杂案例。

一、HBasesinks的三种序列化模式使用说明

1.1 HBasesink--SimpleHbaseEventSerializer

如下是展示如何使用 HBasesink--SimpleHbaseEventSerializer:

代码语言:javascript
复制
agenttest.channels = memoryChannel-1
agenttest.sinks = hbaseSink-1
agenttest.sinks.hbaseSink-1.type = org.apache.flume.sink.hbase.HBaseSink
agenttest.sinks.hbaseSink-1.table = test_hbase_table  //HBase表名
agenttest.sinks.hbaseSink-1.columnFamily = familycolumn-1  //HBase表的列族名称
agenttest.sinks.hbaseSink-1.serializer= org.apache.flume.sink.hbase.SimpleHbaseEventSerializer
agenttest.sinks.hbaseSink-1.serializer.payloadColumn = columnname  //HBase表的列族下的某个列名称
agenttest.sinks.hbaseSink-1.channels = memoryChannel-1

注:当指定存入到HBase表的某个列族的指定列column时,不能写成:

代码语言:javascript
复制
agenttest.sinks.hbaseSink-1.columnName = columnname
或者:
agenttest.sinks.hbaseSink-1.column = columnname

这些都是网上的错误写法!另外两个序列化模式也是不能这样使用。

1.2 HBasesink--RegexHbaseEventSerializer

如下是展示如何使用 HBasesink--RegexHbaseEventSerializer(使用正则匹配切割event,然后存入HBase表的多个列):

代码语言:javascript
复制
agenttest.channels = memoryChannel-2
agenttest.sinks = hbaseSink-2
agenttest.sinks.hbaseSink-2.type = org.apache.flume.sink.hbase.HBaseSink
agenttest.sinks.hbaseSink-2.table = test_hbase_table
agenttest.sinks.hbaseSink-2.columnFamily = familycolumn-2
agenttest.sinks.hbaseSink-2.serializer= org.apache.flume.sink.hbase.RegexHbaseEventSerializer
// 比如我要对nginx日志做分割,然后按列存储HBase,正则匹配分成的列为: ([xxx] [yyy] [zzz] [nnn] ...) 这种格式, 所以用下面的正则:
agent.sinks.hbaseSink-2.serializer.regex = \\[(.*?)\\]\\ \\[(.*?)\\]\\ \\[(.*?)\\]\\ \\[(.*?)\\]
// 指定上面正则匹配到的数据对应的hbase的familycolumn-2 列族下的4个cloumn列名
agent.sinks.hbaseSink-2.serializer.colNames = column-1,column-2,column-3,column-4
#agent.sinks.hbaseSink-2.serializer.payloadColumn = test
agenttest.sinks.hbaseSink-2.channels = memoryChannel-2

1.3 AsyncHBaseSink--SimpleAsyncHbaseEventSerializer

如下是展示如何使用 AsyncHBaseSink--SimpleAsyncHbaseEventSerializer: 

代码语言:javascript
复制
agenttest.channels = memoryChannel-3
agenttest.sinks = hbaseSink-3
agenttest.sinks.hbaseSink-3.type = org.apache.flume.sink.hbase.AsyncHBaseSink
agenttest.sinks.hbaseSink-3.table = test_hbase_table
agenttest.sinks.hbaseSink-3.columnFamily = familycolumn-3
agenttest.sinks.hbaseSink-3.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
agenttest.sinks.hbaseSink-3.serializer.payloadColumn = columnname  //HBase表的列族下的某个列名称
agenttest.sinks.hbaseSink-3.channels = memoryChannel-3 

二、具体案例示例---利用flume+HBase构建大数据采集汇总系统

2.1 利用SimpleHbaseEventSerializer序列化模式

我们首先在HBase里面建立一个表mikeal-hbase-table,拥有familyclom1和familyclom2两个列族:

代码语言:javascript
复制
hbase(main):102:0> create 'mikeal-hbase-table','familyclom1','familyclom2'
0 row(s) in 1.2490 seconds
=> Hbase::Table - mikeal-hbase-table

然后写一个flume的配置文件test-flume-into-hbase.conf:

代码语言:javascript
复制
# 从文件读取实时消息,不做处理直接存储到Hbase
agent.sources = logfile-source
agent.channels = file-channel
agent.sinks = hbase-sink

# logfile-source配置
agent.sources.logfile-source.type = exec
agent.sources.logfile-source.command = tail -f /data/flume-hbase-test/mkhbasetable/data/nginx.log
agent.sources.logfile-source.checkperiodic = 50
# 组合source和channel
agent.sources.logfile-source.channels = file-channel

# channel配置,使用本地file
agent.channels.file-channel.type = file
agent.channels.file-channel.checkpointDir = /data/flume-hbase-test/checkpoint
agent.channels.file-channel.dataDirs = /data/flume-hbase-test/data

# sink 配置为HBaseSink 和 SimpleHbaseEventSerializer
agent.sinks.hbase-sink.type = org.apache.flume.sink.hbase.HBaseSink
#HBase表名
agent.sinks.hbase-sink.table = mikeal-hbase-table
#HBase表的列族名称
agent.sinks.hbase-sink.columnFamily  = familyclom1
agent.sinks.hbase-sink.serializer = org.apache.flume.sink.hbase.SimpleHbaseEventSerializer
#HBase表的列族下的某个列名称
agent.sinks.hbase-sink.serializer.payloadColumn = cloumn-1
# 组合sink和channel
agent.sinks.hbase-sink.channel = file-channel

从配置文件可以看出,我们选择本地的/data/flume-hbase-test/mkhbasetable/data/nginx.log日志目录作为实时数据采集源,选择本地文件目录/data/flume-hbase-test/data作为channel,选择HBase为为sink(也就是数据流向写入HBase)。

注意:提交 flume-ng 任务的用户,比如flume用户,必须要有/data/flume-hbase-test/mkhbasetable/data/nginx.log /data/flume-hbase-test/data 目录与文件的读写权限;也必须要有HBase的读写权限。

启动Flume:

代码语言:javascript
复制
bin/flume-ng agent --name agent --conf /etc/flume/conf/agent/ --conf-file /etc/flume/conf/agent/test-flume-into-hbase.conf -Dflume.root.logger=DEBUG,console

在另外一个shell客户端,输入:

代码语言:javascript
复制
echo "nging-1" >> /data/flume-hbase-test/mkhbasetable/data/nginx.log;
echo "nging-2" >> /data/flume-hbase-test/mkhbasetable/data/nginx.log;

再查看mikeal-hbase-table表:

数据已经作为value插入到表里面。

2.2 利用SimpleAsyncHbaseEventSerializer序列化模式

为了示例清晰,先把mikeal-hbase-table表数据清空:

代码语言:javascript
复制
truncate 'mikeal-hbase-table'

然后写一个flume的配置文件test-flume-into-hbase-2.conf:

代码语言:javascript
复制
# 从文件读取实时消息,不做处理直接存储到Hbase
agent.sources = logfile-source
agent.channels = file-channel
agent.sinks = hbase-sink# logfile-source配置
agent.sources.logfile-source.type = exec
agent.sources.logfile-source.command = tail -f /data/flume-hbase-test/mkhbasetable/data/nginx.log
agent.sources.logfile-source.checkperiodic = 50

# channel配置,使用本地file
agent.channels.file-channel.type = file
agent.channels.file-channel.checkpointDir = /data/flume-hbase-test/checkpoint
agent.channels.file-channel.dataDirs = /data/flume-hbase-test/data

# sink 配置为 Hbase
agent.sinks.hbase-sink.type = org.apache.flume.sink.hbase.AsyncHBaseSink
agent.sinks.hbase-sink.table = mikeal-hbase-table
agent.sinks.hbase-sink.columnFamily  = familyclom1
agent.sinks.hbase-sink.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
agent.sinks.hbase-sink.serializer.payloadColumn = cloumn-1

# 组合source、sink和channel
agent.sources.logfile-source.channels = file-channel
agent.sinks.hbase-sink.channel = file-channel

启动Flume:

代码语言:javascript
复制
bin/flume-ng agent --name agent --conf /etc/flume/conf/agent/ --conf-file /etc/flume/conf/agent/test-flume-into-hbase-2.conf -Dflume.root.logger=DEBUG,console

在另外一个shell客户端,输入:

代码语言:javascript
复制
echo "nging-1" >> /data/flume-hbase-test/mkhbasetable/data/nginx.log;
echo "nging-two" >> /data/flume-hbase-test/mkhbasetable/data/nginx.log;
echo "nging-three" >> /data/flume-hbase-test/mkhbasetable/data/nginx.log;

再查看mikeal-hbase-table表:

2.3 利用RegexHbaseEventSerializer序列化模式

RegexHbaseEventSerializer可以使用正则匹配切割event,然后存入HBase表的多个列。因此,本文简单展示如何使用RegexHbaseEventSerializer对event进行切割然后存存入HBase的多个列。

为了示例清晰,先把mikeal-hbase-table表数据清空:

代码语言:javascript
复制
truncate 'mikeal-hbase-table'

然后写一个flume的配置文件test-flume-into-hbase-3.conf:

代码语言:javascript
复制
# 从文件读取实时消息,不做处理直接存储到Hbase
agent.sources = logfile-source
agent.channels = file-channel
agent.sinks = hbase-sink

# logfile-source配置
agent.sources.logfile-source.type = exec
agent.sources.logfile-source.command = tail -f /data/flume-hbase-test/mkhbasetable/data/nginx.log
agent.sources.logfile-source.checkperiodic = 50

# channel配置,使用本地file
agent.channels.file-channel.type = file
agent.channels.file-channel.checkpointDir = /data/flume-hbase-test/checkpoint
agent.channels.file-channel.dataDirs = /data/flume-hbase-test/data

# sink 配置为 Hbase
agent.sinks.hbase-sink.type = org.apache.flume.sink.hbase.HBaseSink
agent.sinks.hbase-sink.table = mikeal-hbase-table
agent.sinks.hbase-sink.columnFamily  = familyclom1
agent.sinks.hbase-sink.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
# 比如我要对nginx日志做分割,然后按列存储HBase,正则匹配分成的列为: ([xxx] [yyy] [zzz] [nnn] ...) 这种格式, 所以用下面的正则:
agent.sinks.hbase-sink.serializer.regex = \\[(.*?)\\]\\ \\[(.*?)\\]\\ \\[(.*?)\\]
agent.sinks.hbase-sink.serializer.colNames = time,url,number

# 组合source、sink和channel
agent.sources.logfile-source.channels = file-channel
agent.sinks.hbase-sink.channel = file-channel

启动Flume:

代码语言:javascript
复制
bin/flume-ng agent --name agent --conf /etc/flume/conf/agent/ --conf-file /etc/flume/conf/agent/test-flume-into-hbase-3.conf -Dflume.root.logger=DEBUG,console

在另外一个shell客户端,输入:

代码语言:javascript
复制
echo "[2016-12-22-19:59:59] [http://www.qq.com] [10]" >> /data/flume-hbase-test/mkhbasetable/data/nginx.log;
echo "[2016-12-22 20:00:12] [http://qzone.qq.com] [19]" >> /data/flume-hbase-test/mkhbasetable/data/nginx.log;

再查看mikeal-hbase-table表:

可以看到数据已经按照规则:正则匹配分成的列为: ([xxx] [yyy] [zzz] [nnn] ...) ,进行切割,并且顺利地存入到mikeal-hbase-table表的time,url,number的三个column列。

三、多source,多channel和多sink的复杂案例

本文接下来展示一个比较复杂的flume导入数据到HBase的实际案例:多souce、多channel和多sink的场景。为了示例清晰,先把mikeal-hbase-table表数据清空:

代码语言:javascript
复制
truncate 'mikeal-hbase-table'

然后写一个flume的配置文件test-flume-into-hbase-multi-position.conf:

代码语言:javascript
复制
# 从文件读取实时消息,不做处理直接存储到Hbase
agent.sources = logfile-source-1 logfile-source-2
agent.channels = file-channel-1 file-channel-2
agent.sinks = hbase-sink-1 hbase-sink-2

# logfile-source配置
agent.sources.logfile-source-1.type = exec
agent.sources.logfile-source-1.command = tail -f /data/flume-hbase-test/mkhbasetable/data/nginx.log
agent.sources.logfile-source-1.checkperiodic = 50

agent.sources.logfile-source-2.type = exec
agent.sources.logfile-source-2.command = tail -f /data/flume-hbase-test/mkhbasetable/data/tomcat.log
agent.sources.logfile-source-2.checkperiodic = 50

# channel配置,使用本地file
agent.channels.file-channel-1.type = file
agent.channels.file-channel-1.checkpointDir = /data/flume-hbase-test/checkpoint
agent.channels.file-channel-1.dataDirs = /data/flume-hbase-test/data

agent.channels.file-channel-2.type = file
agent.channels.file-channel-2.checkpointDir = /data/flume-hbase-test/checkpoint2
agent.channels.file-channel-2.dataDirs = /data/flume-hbase-test/data2

# sink 配置为 Hbase
agent.sinks.hbase-sink-1.type = org.apache.flume.sink.hbase.HBaseSink
agent.sinks.hbase-sink-1.table = mikeal-hbase-table
agent.sinks.hbase-sink-1.columnFamily  = familyclom1
agent.sinks.hbase-sink-1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
# 比如我要对nginx日志做分割,然后按列存储HBase,正则匹配分成的列为: ([xxx] [yyy] [zzz] [nnn] ...) 这种格式, 所以用下面的正则:
agent.sinks.hbase-sink-1.serializer.regex = \\[(.*?)\\]\\ \\[(.*?)\\]\\ \\[(.*?)\\]
agent.sinks.hbase-sink-1.serializer.colNames = time,url,number

agent.sinks.hbase-sink-2.type = org.apache.flume.sink.hbase.HBaseSink
agent.sinks.hbase-sink-2.table = mikeal-hbase-table
agent.sinks.hbase-sink-2.columnFamily  = familyclom2
agent.sinks.hbase-sink-2.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
agent.sinks.hbase-sink-2.serializer.regex = \\[(.*?)\\]\\ \\[(.*?)\\]\\ \\[(.*?)\\]
agent.sinks.hbase-sink-2.serializer.colNames = time,IP,number

# 组合source、sink和channel
agent.sources.logfile-source-1.channels = file-channel-1
agent.sinks.hbase-sink-1.channel = file-channel-1

agent.sources.logfile-source-2.channels = file-channel-2
agent.sinks.hbase-sink-2.channel = file-channel-2

启动Flume:

代码语言:javascript
复制
bin/flume-ng agent --name agent --conf /etc/flume/conf/agent/ --conf-file /etc/flume/conf/agent/test-flume-into-hbase-multi-position.conf -Dflume.root.logger=DEBUG,console

在另外一个shell客户端,输入:

代码语言:javascript
复制
echo "[2016-12-22 20:04:12] [http://music.user.qq.com] [16]" >> nginx.log;
echo "[2016-12-22 20:04:13] [123.41.90.135] [22]" >> tomcat.log;
echo "[2016-12-22 20:05:19] [http://xuetang.vip.qq.com] [24]" >> nginx.log;
echo "[2016-12-22 20:05:21] [134.92.146.109] [25]" >> tomcat.log;

再查看mikeal-hbase-table表:

可以看到数据已经按照规则:正则匹配分成的列为: ([xxx] [yyy] [zzz] [nnn] ...) ,进行切割,并且顺利地存入到mikeal-hbase-table表,并且按照familyclom1 和 familyclom2 两个列族分配存到三个cloumn列里面。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、HBasesinks的三种序列化模式使用说明
    • 1.1 HBasesink--SimpleHbaseEventSerializer
      • 1.2 HBasesink--RegexHbaseEventSerializer
        • 1.3 AsyncHBaseSink--SimpleAsyncHbaseEventSerializer
        • 二、具体案例示例---利用flume+HBase构建大数据采集汇总系统
          • 2.1 利用SimpleHbaseEventSerializer序列化模式
            • 2.2 利用SimpleAsyncHbaseEventSerializer序列化模式
              • 2.3 利用RegexHbaseEventSerializer序列化模式
              • 三、多source,多channel和多sink的复杂案例
              相关产品与服务
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档