最近因为云原生日志收集的需要,我们打算使用Filebeat作为容器日志收集工具,并对其进行二次开发。开源日志收集组件众多,之所以选择Filebeat,主要基于以下几点:
本文不涉及过具体的源码分析,其中略去了一些实现上的具体细节,希望通过阅读您可以了解filebeat的基本使用方法和原理,姑且算是filebeat的入门吧。
简单来说Filebeat就是数据的搬运工,只不过除了搬运还可以对数据作一些深加工,为业务增加一些附加值。
至少发送一次
的语义;filebeat_overview.png
说到Filebeat,它其实只是beats家族众多成员中的一个。除了Filebeat, 还有很多其他的beat小伙伴:
beat | 功能 |
---|---|
Filebeat | 收集日志文件 |
Metricbeat | 收集各种指标数据 |
Packetbeat | 收集网络数据包 |
Auditbeat | 收集审计数据 |
Heartbeat | 收集服务运行状态监测数据 |
... | ... |
如果你愿意的话,你也可以按照beat的规范来写自己的beat。
能实现以上这些beat,都离不开beats家族真正的“老大”—— libbeat, 它是beat体系的核心库。我们接下来看一下libbeat到底都作了些什么
publisher
组件,用于对接input
;libbeat
后,首先会经过各种 processor
的加工处理,比如过滤添加字段,多行合并等等;input
组件通过publisher
组件将收集到的数据推送到publisher
内部的队列;output
, 因此它负责将处理好的数据通过output组件发送出去;由此可见,大部分活儿都是libbeat来作,当“老大”不容易啊~。
input
仅需要作两件事:
Filebeat本身的使用很简单,我们只需要按需写好相应的input
和output
配置就好了。下面我们以一个收集磁盘日志文件到Kafka集群的例子来讲一下。
filebeat.config.inputs:
enabled: true
path: inputs.d/*.yml
- type: log
# Change to true to enable t
enabled: true
# Paths that should be crawl
paths:
- /home/lw/test/filebeat/*.log
fields:
log_topic: lw_filebeat_t_2
kafka output
: output.kafka:
hosts: ["xxx.xxx.xxx.xxx:9092", "xxx.xxx.xxx.xxx:9092", "xxx.xxx.xxx.xxx:9092"]
version: 0.9.0.1
topic: '%{[fields.log_topic]}'
partition.round_robin:
reachable_only: true
compression: none
required_acks: 1
max_message_bytes: 1000000
codec.format:
string: '%{[host.name]}-%{[message]}'
启动就很简单了,filebeat和filebeat.yml, inputs.d都在同一目录下,然后 ./filebeat run
就好了。
filebeat本身有很多全局的配置,每种input和output又有很多各自的配置,关乎日志收集的内存使用,是不是会丢失日志等方方面面,大家在使用时还需要仔细阅读,这里不赘述。
Pipeline.queue.Producer
创建producer
,用于将处理好的文件内容投递到libbeat的内部队列;filebeat_input.png
filebeat_output.png
ch <-chan *sarama.ProducerError
可以获取到所有发送失败的msg;ErrInvalidMessage
, ErrMessageSizeTooLarge
和 ErrInvalidMessageSize
这三种错误,无需重发;Batch
, 这里重发的时候也是调用Batch.RetryEevnts
;retryer.retry
将需要重新的events再次写入到上图中黄色所示的 workQueue
中,重新进入发送流程;Guaranteed
属性的event来发送;在本文里,我们没有深入到源码层次,为了讲清filebeat运作的原理,我们也忽略了一些实现细节,后续将会从源码层面作进一步剖析。