导语:前面一章讲了Filebeat对接Ckafka,通常的场景是各种beats将数据存到CKafka,然后Logstash将从Ckafka中消息消息进行过滤,再经过Ckafka存入到Elasticsearch 。
Logstash 是一个开源的日志处理工具,它可以从多个源头收集数据、过滤收集的数据以及对数据进行存储作为其他用途。
Logstash 灵活性强并且拥有强大的语法分析功能,其插件丰富,支持多种输入和输出源;同时其作为水平可伸缩的数据管道与 Elasticsearch 和 Kibana 配合在日志收集检索方面功能强大。
Logstash 数据处理可以分为三个阶段:inputs → filters → outputs。
1. inputs:产生数据来源,例如文件、syslog、redis 和 beats 此类来源。
2. filter:修改过滤数据, 在 Logstash 数据管道中属于中间环节,可以根据条件去对事件进行更改。一些常见的过滤器如下:grok、mutate、drop 和 clone 等。
3. outputs:将数据传输到其他地方,一个事件可以传输到多个 outputs,当传输完成后这个事件就结束。Elasticsearch 就是最常见的 outputs。
同时 Logstash 支持编码解码,可以在 inputs 和 outputs 端指定格式。
· Java 版本:java 8
· Logstash 版本:5.5.2 :
· Logstash Ckafka 实例,并且创建相应 topic
Logstash下载地址:https://www.elastic.co/cn/downloads/past-releases/logstash-5-5-2
检查Logstash对kafka的支持:
配置文件:
cd /opt/logstash-5.5.2/bin
[root@VM_1_250_centos bin]# cat ../config/output.conf
input {
stdin{}
}
output {
kafka {
bootstrap_servers => "10.1.3.90:9092"
topic_id => "topic_test1"
}
}
./logstash -f ../config/output.conf
用logstash生产消息到Ckafka:
用kafka的客户端从Ckafka中消费消息:
配置文件:
[root@VM_1_250_centos bin]# cat ../config/input.conf
input {
kafka {
bootstrap_servers => "10.1.3.90:9092" # ckafka vip 实例地址
group_id => "console-consumer-92728" # ckafka groupid 名称
topics => ["topic_test1"] # ckafka topic 名字
consumer_threads => 1 # 消费线程数,一般跟 ckafka 分区数一致
auto_offset_reset => "latest"
}
}
output {
stdout{codec=>rubydebug}
}
用kafka的客户端生产消息到CKafka中:
用logstash的input作为Ckafka的消费者:
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。