大家好,又见面了,我是你们的朋友全栈君。
一般使用 Flume + Kafka 来完成实时流式的日志处理,后面再连接上Storm/Spark Streaming等流式实时处理技术,从而完成日志实时解析的目标。如果Flume直接对接实时计算框架,当数据采集速度大于数据处理速度,很容易发生数据堆积或者数据丢失,而kafka可以当做一个消息缓存队列,当数据从数据源到flume再到Kafka时,数据一方面可以同步到HDFS做离线计算,另一方面可以做实时计算,可实现数据多分发。
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = centos1:9092
a1.sources.r1.kafka.topics = mytopic
a1.sources.r1.kafka.consumer.group.id = group1
a1.sources.r1.channels=c1
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.sinks.k1.type=logger
a1.sinks.k1.channel=c1
[hadoop@master1 ~]# flume-ng agent -c /usr/local/src/flume/conf -f /usr/local/src/flume/conf/hdfs_skin.conf -n a1 -Dflume.root.logger=DEBUG,console
[hadoop@master1 ~]# kafka-console-producer.sh --broker-list master1:2181,slave1:2181,slave2:2181 --topic hello
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# netcat 监听端口
a1.sources.r1.type = netcat
a1.sources.r1.bind =master1
a1.sources.r1.port = 10000
a1.sources.r1.channels = c1
# 一行的最大字节数
a1.sources.r1.max-line-length = 1024000
# channels具体配置
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# KAFKA_sinks
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = hello
a1.sinks.k1.brokerList = master1:9092,slave1:9092,slave2:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
[hadoop@master1 ~]# zkServer.sh start
[hadoop@master1 ~]# kafka-server-start.sh /usr/local/src/kafka/config/server.properties
kafka后台运行:kafka-server-start.sh /usr/local/src/kafka/config/server.properties 1>/dev/null 2>&1 &
[hadoop@master1 ~]# kafka-topics.sh --create --zookeeper master1:2181,slave1:2181,slave2:2181 --replication-factor 2 --topic hello --partitions 1
[hadoop@master1 ~]# kafka-topics.sh --list --zookeeper master1:2181,slave1:2181,slave2:2181
[hadoop@master1 ~]# kafal-console-consumer.sh --zookeeper master1:2181,slave1:2181,slave2:2181 --topic hello --from-beginning
[hadoop@master1 ~]# flume-ng agent -c /usr/local/src/flume/conf -f /usr/local/src/flume/conf/hdfs_skin.conf -n a1 -Dflume.root.logger=DEBUG,console
[hadoop@master1 ~]# telnet master1 10000
如有错误,欢迎私信纠正,谢谢支持!
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/152369.html原文链接:https://javaforall.cn