前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布

Flume

作者头像
挽风
发布2023-10-17 15:41:16
2320
发布2023-10-17 15:41:16
举报
文章被收录于专栏:小道小道

1 Flume丢包问题

  单机upd的flume source的配置,100+M/s数据量,10w qps flume就开始大量丢包,因此很多公司在搭建系统时,抛弃了Flume,自己研发传输系统,但是往往会参考Flume的Source-Channel-Sink模式。

  一些公司在Flume工作过程中,会对业务日志进行监控,例如Flume agent中有多少条日志,Flume到Kafka后有多少条日志等等,如果数据丢失保持在1%左右是没有问题的,当数据丢失达到5%左右时就必须采取相应措施。

2 Flume与Kafka的选取

  采集层主要可以使用Flume、Kafka两种技术。

  Flume:Flume 是管道流方式,提供了很多的默认实现,让用户通过参数部署,及扩展API。

  Kafka:Kafka是一个可持久化的分布式的消息队列。

  Kafka 是一个非常通用的系统。你可以有许多生产者和很多的消费者共享多个主题Topics。相比之下,Flume是一个专用工具被设计为旨在往HDFS,HBase发送数据。它对HDFS有特殊的优化,并且集成了Hadoop的安全特性。所以,Cloudera 建议如果数据被多个系统消费的话,使用kafka;如果数据被设计给Hadoop使用,使用Flume。

如果需要向HDFS写入数据,Flume需要安装在Hadoop集群上,否则会找不到HDFS文件系统。

  Flume可以使用拦截器实时处理数据。这些对数据屏蔽或者过量是很有用的。Kafka需要外部的流处理系统才能做到。

  Kafka和Flume都是可靠的系统,通过适当的配置能保证零数据丢失。然而,Flume不支持副本事件。于是,如果Flume代理的一个节点奔溃了,即使使用了可靠的文件管道方式,你也将丢失这些事件直到你恢复这些磁盘。如果需要一个高可靠行的管道,那么使用Kafka是个更好的选择。

  Flume和Kafka可以很好地结合起来使用。如果你的设计需要从Kafka到Hadoop的流数据,使用Flume代理并配置Kafka的Source读取数据也是可行的:你没有必要实现自己的消费者。你可以直接利用Flume与HDFS及HBase的结合的所有好处。你可以使用Cloudera Manager对消费者的监控,并且你甚至可以添加拦截器进行一些流处理。

3 日志数据如何采集到Kafka?

  日志采集Flume需要采集日志文件内容,并对日志格式(JSON)进行校验,然后将校验通过的日志发送到Kafka。

可选择TaildirSource和KafkaChannel,并配置日志校验拦截器

3.1 TailDirSource

  TailDirSource相比ExecSource、SpoolingDirectorySource的优势:

  TailDirSource:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。(Apache1.7、CDH1.6版本开始存在)

taildir挂了不会丢数(断点续传),但是有可能数据重复,生产环境通常不处理重复数据,出现重复的概率比较低。处理会影响传输效率。可以在下一级处理(hive dwd sparkstreaming flink布隆)、去重手段(groupby、开窗取窗口第一条、redis;如果需要在Flume处理则可以在taildirsource里面增加自定义事务。

taildir source不支持递归遍历文件夹读取文件。

  ExecSource可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。

  SpoolingDirectorySource监控目录,支持断点续传。

3.2 KafkaChannel

  采用Kafka Channel,可以省去Sink,提高效率。

日志采集Flume关键配置如下:

在这里插入图片描述
在这里插入图片描述

4 flume不采集Nginx日志,通过Logger4j采集日志,优缺点是什么?

  优点:Nginx的日志格式是固定的,但是缺少sessionid,通过logger4j采集的日志是带有sessionid的,而session可以通过redis共享,保证了集群日志中的同一session落到不同的tomcat时,sessionId还是一样的,而且logger4j的方式比较稳定,不会宕机。

  缺点:不够灵活,logger4j的方式和项目结合过于紧密,而flume的方式比较灵活,拔插式比较好,不会影响项目性能。

5 flume和kafka采集日志区别,采集日志时中间停了,怎么记录之前的日志。

Flume采集日志是通过流的方式直接将日志收集到存储层,而kafka是将缓存在kafka集群,待后期可以采集到存储层。

Flume采集中间停了,可以采用文件的方式记录之前的日志,而kafka是采用offset的方式记录之前的日志。

6 flume的source、channel、sink具体是做什么的

  1)source:用于采集数据,Source是产生数据流的地方,同时Source会将产生的数据流传输到Channel,这个有点类似于Java IO部分的Channel。

  Source输入端类型有Avro、Thrift、exec、netcat等,企业中最常用的还是采集日志文件。

  2)channel:用于桥接Sources和Sinks,类似于一个队列。

    ① Channel 被设计为 Event 中转临时缓冲区,存储 Source 收集并且没有被Sink 读取的 Event,为平衡 Source 收集和 Sink 读取的速度,可视为 Flume内部的消息队列。

    ② Channel 线程安全并且具有事务性,⽀持 Source 写失败写,和 Sink 读失败重复读的操作。常⻅的类型包括 Memory Channel, File Channel,Kafka Channel。

  3)sink:从Channel收集数据,将数据写到目标源(可以是下一个Source,也可以是HDFS或者HBase)。

Source到Channel是Put事务,Channel到Sink是Take事务

在这里插入图片描述
在这里插入图片描述

7 file channel /memory channel/kafka channel

(1)File Channel

  数据存储于磁盘,优势:可靠性高;劣势:传输速度低

  默认容量:100万event

  注意:FileChannel可以通过配置dataDirs指向多个路径,每个路径对应不同的硬盘,增大Flume吞吐量。

(2)Memory Channel

   数据存储于内存,优势:传输速度快;劣势:可靠性差

  默认容量:100个event

(3)Kafka Channel

  数据存储于Kafka,基于磁盘;

  优势:可靠性高;

  传输速度快 Kafka Channel 大于Memory Channel + Kafka Sink 原因省去了Sink阶段

(4)Kafka Channel哪个版本产生的?

  Flume1.6 版本产生=》并没有火;因为有bug:event(header body ) ture 和false 控制是否包含header信息,很遗憾,都不起作用。增加了额外清洗的工作量。Flume1.7解决了这个问题,开始火了。

(5)生产环境如何选择

如果下一级是Kafka,优先选择Kafka Channel

如果是金融、对钱要求准确的公司,选择File Channel

如果就是普通的日志,通常可以选择Memory Channel

8 HDFS sink

  时间(半个小时) – hdfs.rollInterval=1800

  大小128m – hdfs.rollSize=134217728

  event个数(0禁止)-- hdfs.rollCount =0

9 Flume拦截器

9.1 拦截器注意事项

  (1)ETL拦截器:主要是用来判断json是否完整。没有做复杂的清洗操作主要是防止过多的降低传输速率。

  (2)时间戳拦截器:主要是解决零点漂移问题

9.2 自定义拦截器

  Source 将 Event 写⼊到 Channel 之前可以使⽤拦截器对 Event 进⾏各种形式的处理, Source 和 Channel 之间可以有多个拦截器,不同拦截器使⽤不同的规则处理 Event,包括时间、主机、 UUID、正则表达式等多种形式的拦截器。

  自定义拦截器步骤:

(1)实现 Interceptor

(2)重写四个方法

  initialize 初始化

  public Event intercept(Event event) 处理单个Event

  public List intercept(List events) 处理多个Event,在这个方法中调用Event intercept(Event event)

  close方法

(3)静态内部类,实现Interceptor.Builder

9.3 拦截器可以不用吗?

  ETL拦截器可以不用;需要在下一级Hive的dwd层和SparkSteaming里面处理,时间戳拦截器建议使用。 如果不用需要采用延迟15-20分钟处理数据的方式,比较麻烦。

10 Flume Channel选择器

  Source 发送的 Event 通过 Channel 选择器来选择以哪种⽅式写⼊到 Channel中, Flume 提供三种类型 Channel 选择器,分别是复制、复⽤和⾃定义选择器。

  1. 复制选择器(Replicating:默认选择器): ⼀个 Source 以复制的⽅式将⼀个 Event 同时写⼊到多个Channel 中,不同的 Sink 可以从不同的 Channel 中获取相同的 Event,⽐如⼀份⽇志数据同时写 Kafka 和 HDFS,⼀个 Event 同时写⼊两个Channel,然后不同类型的 Sink 发送到不同的外部存储。(将数据发往下一级所有通道

  2. 复⽤选择器(Multiplexing): 需要和拦截器配合使⽤,根据 Event 的头信息中不同键值数据来判断 Event 应该写⼊哪个 Channel 中。(选择性发往指定通道

11 Flume监控器

  1)采用Ganglia监控器,监控到Flume尝试提交的次数远远大于最终成功的次数,说明Flume运行比较差。主要是内存不够导致的。

  2)解决办法?

    (1)自身:flume默认内存2000m。考虑增加flume内存,在flume-env.sh配置文件中修改flume内存为 4-6g

    -Xmx与-Xms最好设置一致,减少内存抖动带来的性能影响,如果设置不一致容易导致频繁fullgc。

    (2)找朋友:增加服务器台数

    搞活动 618 =》增加服务器=》用完在退出

    日志服务器配置:8-16g内存、磁盘8T

12 Flume 的负载均衡和故障转移

  ⽬的是为了提⾼整个系统的容错能⼒和稳定性。简单配置就可以轻松实现,⾸先需要设置 Sink 组,同⼀个 Sink 组内有多个⼦ Sink,不同 Sink 之间可以配置成负载均衡或者故障转移。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-02-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 Flume丢包问题
  • 2 Flume与Kafka的选取
  • 3 日志数据如何采集到Kafka?
    • 3.1 TailDirSource
      • 3.2 KafkaChannel
      • 4 flume不采集Nginx日志,通过Logger4j采集日志,优缺点是什么?
      • 5 flume和kafka采集日志区别,采集日志时中间停了,怎么记录之前的日志。
      • 6 flume的source、channel、sink具体是做什么的
      • 7 file channel /memory channel/kafka channel
      • 8 HDFS sink
      • 9 Flume拦截器
        • 9.1 拦截器注意事项
          • 9.2 自定义拦截器
            • 9.3 拦截器可以不用吗?
            • 10 Flume Channel选择器
            • 11 Flume监控器
            • 12 Flume 的负载均衡和故障转移
            相关产品与服务
            云数据库 HBase
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档