Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Flume-Hbase-Sink针对不同版本flume与HBase的适配研究与经验总结

Flume-Hbase-Sink针对不同版本flume与HBase的适配研究与经验总结

原创
作者头像
mikealzhou
发布于 2018-01-20 14:43:52
发布于 2018-01-20 14:43:52
4.6K00
代码可运行
举报
文章被收录于专栏:大数据平台TBDS大数据平台TBDS
运行总次数:0
代码可运行

导语:本文细致而全面地讲解使用flume输出数据到HBase的三种不同 Flume-Hbase-Sink 之间的差异性,以及技术细节。并且透彻而全面地总结了不同版本flume和HBase之间的兼容性问题。 为了更加详细说明三种不同hbasesink的差异性,本文在附录附上详细的源码解读。

一、Flume 的HBaseSinks 详细介绍

Flume 有两大类 HBasesinks: HBaseSink (org.apache.flume.sink.hbase.HBaseSink) 和 AsyncHBaseSink (org.apache.flume.sink.hbase.AsyncHBaseSink) 。

1.1、HBasesink

提供两种序列化模式:

1.1.1、SimpleHbaseEventSerializer

将整个事件event的body部分当做完整的一列写入hbase,因此在插入HBase的时候,一个event的body只能被插入一个column;

1.1.2、RegexHbaseEventSerializer

根据正则表达式将event 的body拆分到不同的列当中,因此在插入HBase的时候,支持用户自定义插入同一个rowkey对应的同一个columnFamily 的多个column。

【优点】

(a) 安全性较高:支持secure HBase clusters (FLUME-1626) ,支持往secure hbase写数据(hbase可以开启kerberos校验);

(b) 支持0.96及以上版本的HBase 的IPC通信----- the new HBase IPC which was introduced in HBase 0.96 and up。

【缺点】

性能没有AsyncHBaseSink高。因为HBaseSink采用阻塞调用(blocking calls),而AsyncHBaseSink采用非阻塞调用(non-blocking calls)。

1.2、AsyncHBaseSink

目前只提供一种序列化模式:SimpleAsyncHbaseEventSerializer:

将整个事件event的body部分当做完整的一列写入hbase,因此在插入HBase的时候,一个event的body只能被插入一个column。

【优点】

AsyncHBaseSink采用非阻塞调用(non-blocking calls),因此,性能比HBaseSink高;

【缺点】

(a) 不支持secure HBase clusters (FLUME-1626),不支持往secure hbase写数据;

(b) 不支持0.96及以上版本的HBase 的IPC通信----- the new HBase IPC which was introduced in HBase 0.96 and up。

二、两大类HBasesinks的详细用法

2.1 HBasesink--SimpleHbaseEventSerializer

Required properties 如下表格所示:

Property Name

Default

Description

channel

type

The component type name, needs to be org.apache.flume.sink.HBaseSink

table

The name of the table in Hbase to write to.

columnFamily

The column family in Hbase to write to.

batchSize

100

Number of events to be written per txn.

serializer

org.apache.flume.sink.hbase.SimpleHbaseEventSerializer;org.apache.flume.sink.hbase.RegexHbaseEventSerializer

two serializers are provided with flume inHBasesink

serializer.*

Properties to be passed to the serializer

如下是展示如何使用 HBasesink--SimpleHbaseEventSerializer:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
agenttest.channels = memoryChannel-1
agenttest.sinks = hbaseSink-1
agenttest.sinks.hbaseSink-1.type = org.apache.flume.sink.hbase.HBaseSink
agenttest.sinks.hbaseSink-1.table = test_hbase_table  //HBase表名
agenttest.sinks.hbaseSink-1.columnFamily = familycolumn-1  //HBase表的列族名称
agenttest.sinks.hbaseSink-1.serializer= org.apache.flume.sink.hbase.SimpleHbaseEventSerializer
agenttest.sinks.hbaseSink-1.serializer.payloadColumn = columnname  //HBase表的列族下的某个列名称
agenttest.sinks.hbaseSink-1.channels = memoryChannel-1

注:当指定存入到HBase表的某个列族的指定列column时,不能写成:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
agenttest.sinks.hbaseSink-1.columnName = columnname
或者:
agenttest.sinks.hbaseSink-1.column = columnname

这些都是网上的错误写法!另外两个序列化模式也是不能这样使用。

2.2 HBasesink--RegexHbaseEventSerializer

如下是展示如何使用 HBasesink--RegexHbaseEventSerializer(使用正则匹配切割event,然后存入HBase表的多个列):

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
agenttest.channels = memoryChannel-2
agenttest.sinks = hbaseSink-2
agenttest.sinks.hbaseSink-2.type = org.apache.flume.sink.hbase.HBaseSink
agenttest.sinks.hbaseSink-2.table = test_hbase_table
agenttest.sinks.hbaseSink-2.columnFamily = familycolumn-2
agenttest.sinks.hbaseSink-2.serializer= org.apache.flume.sink.hbase.RegexHbaseEventSerializer
// 比如我要对nginx日志做分割,然后按列存储HBase,正则匹配分成的列为: ([xxx] [yyy] [zzz] [nnn] ...) 这种格式, 所以用下面的正则:
agent.sinks.hbaseSink-2.serializer.regex = \\[(.*?)\\]\\ \\[(.*?)\\]\\ \\[(.*?)\\]\\ \\[(.*?)\\]
// 指定上面正则匹配到的数据对应的hbase的familycolumn-2 列族下的4个cloumn列名
agent.sinks.hbaseSink-2.serializer.colNames = column-1,column-2,column-3,column-4
#agent.sinks.hbaseSink-2.serializer.payloadColumn = test
agenttest.sinks.hbaseSink-2.channels = memoryChannel-

2.3 AsyncHBaseSink--SimpleAsyncHbaseEventSerializer

Required properties 如下表格所示:

Property Name

Default

Description

channel

type

The component type name, needs to be org.apache.flume.sink.AsyncHBaseSink

table

The name of the table in Hbase to write to.

columnFamily

The column family in Hbase to write to.

batchSize

100

Number of events to be written per txn.

timeout

The length of time (in milliseconds) the sink waits for acks from hbase for all events in a transaction. If no timeout is specified, the sink will wait forever.

serializer

org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer

serializer.*

Properties to be passed to the serializer.

如下是展示如何使用 AsyncHBaseSink--SimpleAsyncHbaseEventSerializer:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
agenttest.channels = memoryChannel-3
agenttest.sinks = hbaseSink-3
agenttest.sinks.hbaseSink-3.type = org.apache.flume.sink.hbase.AsyncHBaseSink
agenttest.sinks.hbaseSink-3.table = test_hbase_table
agenttest.sinks.hbaseSink-3.columnFamily = familycolumn-3
agenttest.sinks.hbaseSink-3.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
agenttest.sinks.hbaseSink-3.serializer.payloadColumn = columnname  //HBase表的列族下的某个列名称
agenttest.sinks.hbaseSink-3.channels = memoryChannel-3

如果读者感兴趣,可以仔细阅读Apache flume官网,上面有一些更加详细的信息:

http://archive.cloudera.com/cdh/3/flume-ng/FlumeUserGuide.html

三、使用flume-hbase-sink的常见错误总结

3.1、无HBase读写权限

如果提交./flume-ng 任务的用户没有HBase的读写权限,那么就会出现无权限读写HBase:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[ERROR - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:253)]Unable to start SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@f46fdc1 counterGroup:{ name:null counters:{} } } - Exception follows.
org.apache.flume.FlumeException: Could not start sink. Table or column family does not exist in Hbase (Permission denied).

【解决方法】

需要为用户赋予HBase读写权限,或者超级管理员权限。

3.2、低版本flume使用错误的序列化模式,导致与HBase版本接口不匹配

本文作者使用 flume-1.6.0 的RegexHbaseEventSerializer(属于 HBasesink)对HBase-1.1.3 和 HBase-1.2.1进行插入数据,出现以下错误:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
2016-12-22 12:14:50 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.hbase.HBaseSink.process:351)  - Failed to commit transaction.Transaction rolled back.
java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.Put.setWriteToWAL(Z)V
    at org.apache.flume.sink.hbase.HBaseSink$3.run(HBaseSink.java:377)
    at org.apache.flume.sink.hbase.HBaseSink$3.run(HBaseSink.java:372)
    at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
    at org.apache.flume.sink.hbase.HBaseSink.putEventsAndCommit(HBaseSink.java:372)
    at org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:342)
    at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
    at java.lang.Thread.run(Thread.java:745)

错误信息提示:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.Put.setWriteToWAL(Z)V

  查看源码,SimpleHbaseEventSerializer和 RegexHbaseEventSerializer的getActions函数产生的是put对象实例,也就是org.apache.hadoop.hbase.client.Put实例(想要了解更详细的内容,可以阅读本文的【附录:源码解读】章节)。org.apache.hadoop.hbase.client.Put里的确包含setWriteToWAL(boolean write)这个函数。新版本的hbase(0.98以上版本),setWriteToWAL(boolean write)方法改变了返回值,从void 变成了 Mutation。而flume-1.6.0以及以下版本,无法适配setWriteToWAL(boolean write)的改变,从而导致出错。

与SimpleHbaseEventSerializer和 RegexHbaseEventSerializer不同的是,SimpleAsyncHbaseEventSerializer的getActions函数不是产生put实例,而是生成PutRequest实例。而PutRequest实例是可以与任意版本的HBase接口适配的。

想要了解更详细的内容,可以阅读本文的【附录:源码解读】章节。

【解决方法】

(1) 如果不改变flume的版本,那么需要将HBase降级到0.98 及以下版本;

(2) 如果不改变HBase版本,需要将flume升级到 1.7.0 及以上版本。

四、总结flume与HBase版本适配问题&&用户自定义HBase 的column

总结:经过上述解读,以及作者本人验证,有以下几条经验总结:

4.1 flume与HBase版本适配问题

4.1.1 对于HBasesink

(a) 对于Flume-1.6.0 及以下版本:HBasesink目前只支持往 HBase-0.98 版本及以下版本写入数据,当HBase超过0.98版本,1.6.0 及以下版本的Flume则不支持往HBase写入数据;

(b) 对于Flume-1.7.0 及以上版本:HBasesink目前支持往0.98及以上版本的HBase写入数据(当然也支持往0.98及以下版本的HBase写入数据);

4.1.2 对于AsyncHBaseSink

(a) 支持所有版本的HBase写入数据。

(b) 不支持0.96及以上版本的HBase 的IPC通信方式------ the new HBase IPC which was introduced in HBase 0.96 and up。

4.2 flume-hbase-sink支持用户自定义HBase的column

4.2.1 对于HBasesink

(a)序列化模式SimpleHbaseEventSerializer

将整个事件event的body部分当做完整的一列写入hbase,因此在插入HBase的时候,一个event的body只能被插入一个column;

(b) 序列化模式RegexHbaseEventSerializer

根据正则表达式将event body拆分到不同的列当中,因此在插入HBase的时候,支持用户自定义插入同一个rowkey对应的同一个columnFamily 的多个column。

4.2.2 对于AsyncHBaseSink

目前只提供一种序列化模式:SimpleAsyncHbaseEventSerializer:

将整个事件event的body部分当做完整的一列写入hbase,因此在插入HBase的时候,一个event的body只能被插入一个column。

【附录:源码解读】

1、HBasesink---SimpleHbaseEventSerializer

SimpleHbaseEventSerializer类中包括的函数有:

l  SimpleHbaseEventSerialzer函数:这是构造函数

l  configure(ComponentConfiguration conf)函数:这是配置函数,目前是空白。

l  close()函数:这是个关闭函数,估计是用于关闭调用资源的。

l  configure(Context context)函数:配置函数,主要作用是从flume的配置文件中读取信息。

l  initalize(Event event, byte[] cf): 从event中读取内容,并配置HBase的column family

l  getActions():创建put对象,每个put对应着hbase的一行数据。

l  getIncrements():将hbase的自增列加1

Configure(Context context)函数

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 // 读取flume配置文件中的rowPrefix,rowPrefix的默认值是default 
·         rowPrefix = context.getString("rowPrefix", "default"); 
·         // 读取flume配置文件中的incrementRow,默认值是inRow
·         incrementRow = 
·             context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8); 
·         // 读取flume配置文件中的suffix,默认值是uuid 
·         String suffix = context.getString("suffix", "uuid"); 
·         // 读取flume配置文件的payloadColumn,默认之是pCol。payloadColumn对应这hbase的列名 
·         String payloadColumn = context.getString("payloadColumn","pCol"); 
·         // 读取flume配置文件中的incrementColumn,默认值是iCol 
·         String incColumn = context.getString("incrementColumn","iCol"); 
·         if(payloadColumn != null && !payloadColumn.isEmpty()) { 
·           // 这几行代码是配置hbase中的rowkey前缀     
·           if(suffix.equals("timestamp")){ 
·             keyType = KeyType.TS; 
·           } else if (suffix.equals("random")) { 
·             keyType = KeyType.RANDOM; 
·           } else if(suffix.equals("nano")){ 
·             keyType = KeyType.TSNANO; 
·           } else { 
·             keyType = KeyType.UUID; 
·           } 
·           plCol = payloadColumn.getBytes(Charsets.UTF_8); 
·         } 
·         if(incColumn != null && !incColumn.isEmpty()) { 
·           incCol = incColumn.getBytes(Charsets.UTF_8); 
·         } 

对于Configure函数,主要需要说明的flume配置文件和代码之间的对应关系。

比如,如果你在flume的配置文件中有一行如:a1.sinks.k1.serializer.payloadColumn=colName。 那么Configure函数中的context.getString("payloadColumn", "pCol")的返回值就是colName.

同样如果你设置 a1.sinks.k1.serializer.rowPrefix=123456, 那么context.getString("rowPrefix", "default")的返回值就是123456.

initalize(Event event, byte[] cf)函数

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
·         public void initialize(Event event, byte[] cf) { 
·             this.payload = event.getBody();
·             this.cf = cf; 
·           }

这个函数代码简单,cf表示hbase中的column family; event是flume的一个事件,是flume数据流中的一个data object。如果flume的source是文本文件的话,文件中的每一行就会产生一个flume event。

getActions()函数

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
·         public List<Row> getActions() throws FlumeException { 
·           List<Row> actions = new LinkedList<Row>(); 
·           if(plCol != null){ 
·             byte[] rowKey; 
·             try { 
·               // 配置rowkey,具体靠参考SimpleRowKeyGenerator类 
·               if (keyType == KeyType.TS) { 
·                 rowKey = SimpleRowKeyGenerator.getTimestampKey(rowPrefix); 
·               } else if(keyType == KeyType.RANDOM) { 
·                 rowKey = SimpleRowKeyGenerator.getRandomKey(rowPrefix); 
·               } else if(keyType == KeyType.TSNANO) { 
·                 rowKey = SimpleRowKeyGenerator.getNanoTimestampKey(rowPrefix); 
·               } else { 
·                 rowKey = SimpleRowKeyGenerator.getUUIDKey(rowPrefix); 
·               } 
·               // 创建rowkey的一个put 
·               Put put = new Put(rowKey); 
·               // 在put中添加一列数据。columnfamily是cf,colunname是plCol,value是payload。 
·               // plCol是payloadColumn的byte形式。而payloadColumn初始化于Configure函数,来自于flume的配置文件 
·               // payload初始化于initalize函数,来自于event 
·               put.add(cf, plCol, payload); 
·               actions.add(put); 
·             } catch (Exception e){ 
·               throw new FlumeException("Could not get row key!", e); 
·             }
·           } 
·           return actions; 
·         } 

getActions函数,它生成一个put对象实例,put最后插入到hbase中。需要注意的是put实例中所有的数据来源。        plCol来自于payloadColumn, payloadColumn来自于flume的配置文件;cf也是来自于flume配置文件;payload来自于event。

plCol对应hbase中的colum, cf对应hbase中的columnfamily,payload对应hbase中的value。

2、HBasesink---RegexHbaseEventSerializer

RegexHbaseEventSerializer 的源码和 SimpleHbaseEventSerializer 差不多,主要在于以下几个区别:

(1) RegexHbaseEventSerializer.configure(Context context):

此Serializer根据正则可以写入多列:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public void configure(Context context) {
    String regex = context.getString(REGEX_CONFIG, REGEX_DEFAULT);  //获取配置文件中的正则表达式,默认是“(.*)”
    regexIgnoreCase = context.getBoolean(IGNORE_CASE_CONFIG,
        INGORE_CASE_DEFAULT);  //是否忽略大小写
    inputPattern = Pattern.compile(regex, Pattern.DOTALL + (regexIgnoreCase ? Pattern.CASE_INSENSITIVE : 0));  //将给定的正则表达式编译到具有给定标志的模式中
    String colNameStr = context.getString(COL_NAME_CONFIG, COLUMN_NAME_DEFAULT);  //获取配置文件中的列名
    String[] columnNames = colNameStr.split(",");  //分割列名获得列名数组
    for (String s: columnNames) {
      colNames.add(s.getBytes(Charsets.UTF_8));
    }
  }

(2) RegexHbaseEventSerializer.getActions()方法

首先会做一些判断匹配成功否?匹配出的个数和指定的列数相同否?,然后是获取rowkey,这里的rowkey是[time in millis]-[random key]-[nonce]三部分组成的字符串。剩下的是依次匹配列组成Put,返回List<Row> actions。

(3) RegexHbaseEventSerializer.getIncrements()

直接返回一个没有数据的List<Increment>,即不设置计数器。

3、AsyncHBasesink---SimpleAsyncHbaseEventSerializer

SimpleAsyncHbaseEventSerializer类和SimpleHbaseEventSerializer的主要区别在于getActions函数。

SimpleAsyncHbaseEventSerializer:getActions()函数

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
·         public List<PutRequest> getActions() { 
·           List<PutRequest> actions = new ArrayList<PutRequest>(); 
·           if(payloadColumn != null){ 
·             byte[] rowKey; 
·             try { 
·               switch (keyType) { 
·                 case TS: 
·                   rowKey = SimpleRowKeyGenerator.getTimestampKey(rowPrefix); 
·                   break; 
·                 case TSNANO: 
·                   rowKey = SimpleRowKeyGenerator.getNanoTimestampKey(rowPrefix); 
·                   break; 
·                 case RANDOM: 
·                   rowKey = SimpleRowKeyGenerator.getRandomKey(rowPrefix); 
·                   break; 
·                 default: 
·                   rowKey = SimpleRowKeyGenerator.getUUIDKey(rowPrefix); 
·                   break; 
·               } 
·               // table是hbase中的表名 
·               // rowkey是hbase中的rowkey 
·               // cf是hbase中的columnFamily 
·               // payloadColum是hbase中的column 
·               // payload是hbase中的value 
·               PutRequest putRequest =  new PutRequest(table, rowKey, cf, 
·                   payloadColumn, payload); 
·               actions.add(putRequest); 
·             } catch (Exception e){ 
·               throw new FlumeException("Could not get row key!", e); 
·             } 
·           } 
·           return actions; 
·         } 

 与SimpleHbaseEventSerializer的getActions的不同,不是产生put实例,而是生成PutRequest实例。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
如何使用Flume采集Kafka数据写入HBase
温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。 Fayson的github: https://github.com/fayson/cdhproject 提示:代码块部分可以左右滑动查看噢 1.文档编写目的 ---- 在前面的文章Fayson也介绍了一些关于Flume的文章《非Kerberos环境下Kafka数据到Flume进Hive表》、《如何使用Flume准实时建立Solr的全文索引》、《如何在Kerberos环境使用Flume采集Kafka数据并写入HDFS》和《如何使用Flume采集K
Fayson
2018/07/12
4K0
利用Flume 汇入数据到HBase:Flume-hbase-sink 使用方法详解
本文作者将会详细描述这两大类HBaseSinks 对应的三种序列化模式的使用方法。
mikealzhou
2018/01/20
6K1
利用Flume 汇入数据到HBase:Flume-hbase-sink 使用方法详解
项目三 flume 采集数据至hbase
flume采集数据至hbase有四个实例,本文章一一列举,各实例流程均差不多,区别基本上就是配置文件的编写。其中实例一流程较为详细,后面几个实例参考实例一流程
码农GT038527
2024/09/28
1720
flume安装及配置介绍(二)
注: 环境: skylin-linux Flume的下载方式:   wget http://www.apache.org/dyn/closer.lua/flume/1.6.0/apache-flume-1.6.0-bin.tar. 下载完成之后,使用tar进行解压 tar -zvxf apache-flume-1.6..0-bin.tar. 进入flume的conf配置包中,使用命令touch flume.conf,然后cp flume-conf.properties.template flume.co
Gxjun
2018/03/27
8810
flume安装及配置介绍(二)
Flume+Kafka+Storm+Hbase+HDSF+Poi整合
举例:这个网站www.hongten.com(当然这是一个我虚拟的电商网站),用户在这个网站里面可以有很多行为,比如注册,登录,查看,点击,双击,购买东西,加入购物车,添加记录,修改记录,删除记录,评论,登出等一系列我们熟悉的操作。这些操作都被记录在日志信息里面。我们要对日志信息进行分析。
Hongten
2018/12/28
7250
如何在Kerberos环境下使用Flume采集Kafka数据写入HBase
在前面的文章Fayson也介绍了一些关于Flume的文章《非Kerberos环境下Kafka数据到Flume进Hive表》、《如何使用Flume准实时建立Solr的全文索引》、《如何在Kerberos环境使用Flume采集Kafka数据并写入HDFS》、《如何使用Flume采集Kafka数据写入Kudu》和《如何使用Flume采集Kafka数据写入HBase》。本篇文章Fayson主要介绍在Kerberos的CDH集群中使用Flume采集Kafka数据写入HBase。 内容概述 1.环境准备 2.配置Fl
Fayson
2018/07/12
1.1K0
Flume快速入门系列(9) | 如何自定义Sink
Sink不断地轮询Channel中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个Flume Agent。   Sink是完全事务性的。在从Channel批量删除数据之前,每个Sink用Channel启动一个事务。批量事件一旦成功写出到存储系统或下一个Flume Agent,Sink就利用Channel提交事务。事务一旦被提交,该Channel从自己的内部缓冲区删除事件。   Sink组件目的地包括hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定义。官方提供的Sink类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些Sink。   官方也提供了自定义source的接口: https://flume.apache.org/FlumeDeveloperGuide.html#sink 根据官方说明自定义MySink需要继承AbstractSink类并实现Configurable接口。 实现相应方法:
不温卜火
2020/10/28
1.3K0
Flume快速入门系列(9) | 如何自定义Sink
Flume简介
Apache Flume是一个分布式的,可靠的,高可用系统,用于将大量来自不同源的日志数据高效地收集,统计,搬迁到一个集中存储仓库。
悠扬前奏
2019/05/28
8280
大数据日志收集框架之Flume实战
flume官方文档:http://flume.apache.org/documentation.html
静谧星空TEL
2021/04/27
1K0
大数据日志收集框架之Flume实战
Flume-ng配置
Flume是一个分布式、可靠、和高可用的海量日志聚合的系统,支持在系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。
星哥玩云
2022/06/29
3390
【Flume】实现MySQL数据增量自动提交到ClickHouse
源码在https://reviews.apache.org/r/50692/diff/1#2 下面的操作需要cd到
大数据真好玩
2022/03/28
2.6K0
【Flume】实现MySQL数据增量自动提交到ClickHouse
实战经验 | Flume中同时使用Kafka Source和Kafka Sink的Topic覆盖问题
场景描述:如果在一个Flume Agent中同时使用Kafka Source和Kafka Sink来处理events,便会遇到Kafka Topic覆盖问题,具体表现为,Kafka Source可以正常从指定的Topic中读取数据,但在Kafka Sink中配置的目标Topic不起作用,数据仍然会被写入到Source中指定的Topic中。
大数据真好玩
2019/09/12
2K2
如何使用Flume采集Kafka数据写入Kudu
温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。 Fayson的github: https://github.com/fayson/cdhproject 提示:代码块部分可以左右滑动查看噢 1.文档编写目的 ---- 在前面的文章Fayson也介绍了一些关于Flume的文章《非Kerberos环境下Kafka数据到Flume进Hive表》、《如何使用Flume准实时建立Solr的全文索引》和《如何在Kerberos环境使用Flume采集Kafka数据并写入HDFS》,本篇文章Fayson主要介
Fayson
2018/07/11
5.7K1
【从0开始の全记录】Flume+Kafka+Spark+Spring Boot 统计网页访问量项目
新建Scala文件——WebStatStreamingApp.scala,首先使用Direct模式连通Kafka:
王知无-import_bigdata
2020/08/20
2K0
【从0开始の全记录】Flume+Kafka+Spark+Spring Boot 统计网页访问量项目
大数据技术之_09_Flume学习_Flume概述+Flume快速入门+Flume企业开发案例+Flume监控之Ganglia+Flume高级之自定义MySQLSource+Flume企业真实面试题(
  Flume(水槽) 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume基于流式架构,灵活简单。   在2009年Flume被捐赠了apache软件基金会,为hadoop相关组件之一。尤其近几年随着flume的不断被完善以及升级版本的逐一推出,特别是flume-ng;,同时flume内部的各种组件不断丰富,用户在开发的过程中使用的便利性得到很大的改善,现已成为apache top项目之一。
黑泽君
2019/03/15
1.6K0
Flume(一)Flume原理解析
前言   最近有一点浮躁,遇到了很多不该发生在我身上的事情。没有,忘掉这些。好好的学习,才是正道! 一、Flume简介   flume 作为 cloudera 开发的实时日志收集系统,受到了业界的认可与广泛应用。Flume 初始的发行版本目前被统称为 Flume OG(original generation),属于 cloudera。   但随着 FLume 功能的扩展,Flume OG 代码工程臃肿、核心组件设计不合理、核心配置不标准等缺点暴露出来,尤其是在 Flume OG 的最后一个发行版本 0.9.
用户1195962
2018/01/18
2.8K0
Flume(一)Flume原理解析
Flume学习笔记「建议收藏」
1.基于尚硅谷做的笔记 2.也参考了几篇我觉得写得比较好的博客,参考链接在文中 3.此外,我也将我在操作过程中遇到的问题以及解决方案都记录了下来
全栈程序员站长
2022/11/16
1.1K0
Flume学习笔记「建议收藏」
快速学习-Flume高级之自定义MySQLSource
Source是负责接收数据到Flume Agent的组件。Source组件可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。官方提供的source类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些Source。
cwl_java
2020/02/19
9300
Flume快速入门系列(10) | 如何自定义MySQLSource
  实时监控MySQL,从MySQL中获取数据传输到HDFS或者其他存储框架,所以此时需要我们自己实现MySQLSource。   官方也提供了自定义source的接口:   官网说明:https://flume.apache.org/FlumeDeveloperGuide.html#source
不温卜火
2020/10/28
6260
Flume快速入门系列(10) | 如何自定义MySQLSource
flume 1.8.0 开发基础
Apache Flume是一个用于高效地从大量异构数据源收集、聚合、传输到一个集中式数据存储的分布式、高可靠、高可用的系统。
皮皮熊
2018/08/26
1.2K0
推荐阅读
相关推荐
如何使用Flume采集Kafka数据写入HBase
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验