Loading [MathJax]/jax/input/TeX/config.js
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Flume对接Kafka详细过程[通俗易懂]

Flume对接Kafka详细过程[通俗易懂]

作者头像
全栈程序员站长
发布于 2022-09-10 01:27:51
发布于 2022-09-10 01:27:51
2.4K01
代码可运行
举报
运行总次数:1
代码可运行

大家好,又见面了,我是你们的朋友全栈君。

Flume对接Kafka

一、为什么要集成Flume和Kafka

一般使用 Flume + Kafka 来完成实时流式的日志处理,后面再连接上Storm/Spark Streaming等流式实时处理技术,从而完成日志实时解析的目标。如果Flume直接对接实时计算框架,当数据采集速度大于数据处理速度,很容易发生数据堆积或者数据丢失,而kafka可以当做一个消息缓存队列,当数据从数据源到flume再到Kafka时,数据一方面可以同步到HDFS做离线计算,另一方面可以做实时计算,可实现数据多分发。

二、flume 与 kafka 的关系及区别

  • Flume
  1. Flume 是一个分布式、高可用的海量日志聚合的系统,支持在系统中定制各类数据发送方,通过监控整个文件目录或者某一个特定文件,用于收集数据;同时Flume也可以将数据写到各种数据接受方,用于转发数据。Flume的易用性在于通过读取配置文件,可以自动收集日志文件,在大数据处理及各种复杂的情况下,flume 经常被用来作为数据处理的工具
  2. flume分为sources,channels,sinks三部分,每一部分都可以根据需求定制。
  3. 与kafka相比,flume 可以定制很多数据源,减少开发量,因此做数据采集很好。
  • Kafka
  1. 是由LinkedIn 开发的开源分布式消息系统,主要用于处理LinkedIn 的活跃数据,及日志数据。这些数据通常以日志的形式进行存储,现有的消息队列系统可以很好的用于日志分析系统对于实时数据的处理,提高日志解析效率。
  2. kafka 是分布式消息中间件,自带存储,提供 push 和 pull 存取数据的功能,是一个非常通用消息缓存的系统,可以有许多生产者和很多的消费者共享多个主题

三、Flume 对接 Kafka(详细步骤)

(1). Kafka作为source端

1. 配置flume

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
a1.sources = r1  
a1.channels = c1  
a1.sinks = k1  
  
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = centos1:9092
a1.sources.r1.kafka.topics = mytopic
a1.sources.r1.kafka.consumer.group.id = group1
a1.sources.r1.channels=c1  


a1.channels.c1.type=memory  
a1.channels.c1.capacity=1000  
a1.channels.c1.transactionCapacity=100  


a1.sinks.k1.type=logger  
a1.sinks.k1.channel=c1  

2. 启动flume

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[hadoop@master1 ~]# flume-ng agent -c /usr/local/src/flume/conf -f /usr/local/src/flume/conf/hdfs_skin.conf -n a1 -Dflume.root.logger=DEBUG,console

3. 启动Kafka producer

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[hadoop@master1 ~]# kafka-console-producer.sh --broker-list master1:2181,slave1:2181,slave2:2181 --topic hello

(2). Kafka作为sink端

1. 配置flume

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
a1.sources = r1
a1.sinks = k1
a1.channels = c1


# netcat 监听端口
a1.sources.r1.type = netcat
a1.sources.r1.bind =master1
a1.sources.r1.port = 10000
a1.sources.r1.channels = c1 
# 一行的最大字节数
a1.sources.r1.max-line-length = 1024000


# channels具体配置
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100


# KAFKA_sinks
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = hello
a1.sinks.k1.brokerList = master1:9092,slave1:9092,slave2:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20

2. 启动zookeeper集群

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[hadoop@master1 ~]# zkServer.sh start

3. 启动kafka集群

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[hadoop@master1 ~]# kafka-server-start.sh /usr/local/src/kafka/config/server.properties

kafka后台运行:kafka-server-start.sh /usr/local/src/kafka/config/server.properties 1>/dev/null 2>&1 &

4.创建并查看topic

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[hadoop@master1 ~]# kafka-topics.sh --create --zookeeper master1:2181,slave1:2181,slave2:2181 --replication-factor 2 --topic hello --partitions 1

[hadoop@master1 ~]# kafka-topics.sh --list --zookeeper master1:2181,slave1:2181,slave2:2181 

5. 创建kafka消费者

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[hadoop@master1 ~]# kafal-console-consumer.sh --zookeeper master1:2181,slave1:2181,slave2:2181 --topic hello --from-beginning

6. 启动flume

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[hadoop@master1 ~]# flume-ng agent -c /usr/local/src/flume/conf -f /usr/local/src/flume/conf/hdfs_skin.conf -n a1 -Dflume.root.logger=DEBUG,console

7. 向flume端口发送消息

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[hadoop@master1 ~]# telnet master1 10000

8. 在kafka消费者接收信息

如有错误,欢迎私信纠正,谢谢支持!

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/152369.html原文链接:https://javaforall.cn

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Flume+Kafka整合案例实现
我们很多人在在使用Flume和kafka时,都会问一句为什么要将Flume和Kafka集成?那首先就应该明白业务需求,一般使用Flume+Kafka架构都是希望完成实时流式的日志处理,后面再连接上Flink/Storm/Spark Streaming等流式实时处理技术,从而完成日志实时解析的目标。第一、如果Flume直接对接实时计算框架,当数据采集速度大于数据处理速度,很容易发生数据堆积或者数据丢失,而kafka可以当做一个消息缓存队列,从广义上理解,把它当做一个数据库,可以存放一段时间的数据。第二、Kafka属于中间件,一个明显的优势就是使各层解耦,使得出错时不会干扰其他组件。
全栈程序员站长
2022/09/10
8910
Flume+Kafka整合案例实现
flume整合kafka
Flume 发送数据到 Kafka 上主要是通过 KafkaSink 来实现的,主要步骤如下:
全栈程序员站长
2022/09/10
3850
flume整合kafka
【Kafka】安装及使用
修改默认server.properties(kafka/config/),并修改/etc/profile文件,添加kafka环境变量
Xiongan-桃子
2023/06/10
2590
【Kafka】安装及使用
基于实际业务场景下的Flume部署
这时候在kafka就能看到用户点击行为,也正是nginx记录的内容 不断点击,kafka模拟消费端就能不断看到消息进来。
王知无-import_bigdata
2020/08/13
4080
基于实际业务场景下的Flume部署
重磅:Flume1-7结合kafka讲解
本文主要是将flume监控目录,文件,kafka Source,kafka sink,hdfs sink这几种生产中我们常用的flume+kafka+hadoop场景,希望帮助大家快速入生产。 flume只有一个角色agent,agent里都有三部分构成:source、channel和sink。就相当于source接收数据,通过channel传输数据,sink把数据写到下一端。这就完了,就这么简单。其中source有很多种可以选择,channel有很多种可以选择,sink也同样有多种可以选择,并且都支持
Spark学习技巧
2018/01/31
2.3K0
重磅:Flume1-7结合kafka讲解
Flume-Kafka-Flume对接Kafka以及Kafka数据分类传输
Flume日志采集组件;Flume对接kafka主要是为了通过kafka的topic功能,动态的增加或者减少接收的节点,并且Flume要对接多个节点是需要多个channel和sink的会导致内存不够的情况。
全栈程序员站长
2022/09/10
6590
快速学习-Flume 对接 Kafka
2) 启动 kafkaIDEA 消费者 3) 进入 flume 根目录下,启动 flume
cwl_java
2020/02/20
6490
Flume学习笔记
一、什么是Flume?     Apache Flume 是一个从可以收集例如日志,事件等数据资源,并将这些数量庞大的数据从各项数据资源中集中起来存储的工具/服务,或者数集中机制。flume具有高可用,分布式,配置工具,其设计的原理也是基于将数据流,如日志数据从各种网站服务器上汇集起来存储到HDFS,HBase等集中存储器中。 二、flume特性     Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。     Flume可以采集文件,socket数据包、文件、文件夹、kafka等各种形式源数据,又可以将采集到的数据(下沉sink)输出到HDFS、hbase、hive、kafka等众多外部存储系统中     一般的采集需求,通过对flume的简单配置即可实现     Flume针对特殊场景也具备良好的自定义扩展能力,因此,flume可以适用于大部分的日常数据采集场景 三、flume组件解析     对于每一个Agent来说,它就是一共独立的守护进程(JVM),它从客户端接收数据     1、Flume分布式系统中最核心的角色是agent,flume采集系统就是由一个个agent所连接起来形成     2、每一个agent相当于一个数据(被封装成Event对象)传递员,内部有三个组件:         a)Source:采集组件,用于跟数据源对接,以获取数据         b)Sink:下沉组件,用于往下一级agent传递数据或者往最终存储系统传递数据         c)Channel:传输通道组件,用于从source将数据传递到sink         d)event(所传的消息就是event)一行文本内容会被反序列化成一个event(event的最大定义为2048字节,超过,则会切割,剩下的会被放到下一个event中,默认编码是UTF-8。 四、flume安装     1)解压
曼路
2018/10/18
9380
分布式日志收集器 - Flume
Flume是一种分布式、高可靠和高可用的日志数据采集服务,可高效地收集、聚合和移动大量日志数据。它具有一种基于流数据的简单且灵活的体系结构。它具有健壮性和容错性,具有可调整的可靠性机制和许多故障切换和恢复机制。它使用一个简单的可扩展数据模型,允许在线分析应用程序。
端碗吹水
2020/11/04
7060
分布式日志收集器 - Flume
通过Flume简单实现Kafka与Hive对接(Json格式)
将以下存储在kafka的topic中的JSON格式字符串,对接存储到Hive的表中
栗筝i
2022/12/01
1K0
storm基础系列之五---------接入数据收集系统flume
1.基本结构介绍   flume是三层架构,agent,collector,storage。每一层都可水平扩展。   其中,agent就是数据采集方;collector是数据整合方;storage是各种数据落地方,如hdfs。   前两者都是由source和sink组成,source是数据读取组件,sink是数据分发组件。   前两者作为不同类型node统一归master管理。可在master shell活web中动态配置。 2.自带的source   text 文件,按行发送   tail 探测新产生数据
小端
2018/04/16
6360
flume与kafka整合高可靠教程
问题导读 1.安装kafka是否需要安装zookeeper? 2.kafka安装需要哪些步骤? 3.如何验证kafka是否安装成功? 4.flume source目录是哪个? 5.flume在kafka中扮演什么角色? 6.如何测试整合配置是否成功? kafka安装 flume与kafka整合很多人都用到,但是网上却没有一份详细可靠的教程。说的都是些只言片语。这里整理份flume与kafka整合的教程。 flume原先并不兼容kafka。后来兼容添加上去。对于flume及与kafka的相关知识,推荐
用户1410343
2018/03/26
2K0
flume与kafka整合高可靠教程
Flume+Kafka+Storm整合
有一个客户端Client可以产生日志信息,我们需要通过Flume获取日志信息,再把该日志信息放入到Kafka的一个Topic:flume-to-kafka
Hongten
2018/12/27
1.1K0
CentOS7 部署kafaka集群
郑郑SunUp
2025/01/06
1440
flume 整合 kafka
1.0.0 is the latest release. The current stable version is 1.0.0.
WindWant
2020/09/11
4100
2-网站日志分析案例-日志采集:Flume-Kafka-Flume-HDFS
参考:https://blog.csdn.net/m0_38139250/article/details/121155903
IT从业者张某某
2022/11/12
3830
2-网站日志分析案例-日志采集:Flume-Kafka-Flume-HDFS
Flume + Kafka + Spark Streaming整合
参考: http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.5.0/FlumeUserGuide.html Logger-->Flume 1/配置Flume配置文件streaming.conf agent1.sources=avro-source agent1.channels=logger-channel agent1.sinks=log-sink #define source agent1.sources.avro-source
sparkle123
2018/06/14
1.3K0
玩转Flume+Kafka原来也就那点事儿
好久没有写分享了,继前一个系列进行了Kafka源码分享之后,接下来进行Flume源码分析系列,望大家继续关注,今天先进行开篇文章Flume+kafka的环境配置与使用。
小程故事多
2018/08/22
5270
玩转Flume+Kafka原来也就那点事儿
flume使用kafka作为sink
启动kafka 先启动zookeeper: bin/zookeeper-server-start.sh -daemon config/zookeeper.properties 然后启动kafka: bin/kafka-server-start.sh -daemon config/server.properties 创建topic bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitio
用户2936342
2018/08/27
1K0
flume使用教程_三阶魔方初级入门教程详细图解
  Flume 是 Cloudera 提供的一种高可用、高可靠、分布式的海量日志采集、聚合和传输的系统。Flume 基于流式架构,灵活简单。   Flume 最主要的作用是,实时读取服务器本地磁盘的数据,将数据写到 HDFS。
全栈程序员站长
2022/11/18
9220
flume使用教程_三阶魔方初级入门教程详细图解
相关推荐
Flume+Kafka整合案例实现
更多 >
交个朋友
加入腾讯云官网粉丝站
蹲全网底价单品 享第一手活动信息
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验