前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >读猿码系列——3. 从filebeat和go-stash深入日志收集及处理(filebeat篇)

读猿码系列——3. 从filebeat和go-stash深入日志收集及处理(filebeat篇)

作者头像
才浅Coding攻略
发布于 2022-12-12 10:08:24
发布于 2022-12-12 10:08:24
66600
代码可运行
举报
文章被收录于专栏:才浅coding攻略才浅coding攻略
运行总次数:0
代码可运行

阿巩

伙伴们,端午节快乐哦!

提到容器的日志采集,在实际生产开发流程中,我们通常是先自己封装的日志库,然后走 filebeat + kafka + logstash + es这个完整的日志收集处理流程。关于kafka和es的资料网上比较多,这两块我们暂且不细看。go-satsh是logstash 的 Go 语言替代版,是go-zero生态中的一个组件,这部分我们将在go-satsh篇介绍。事不宜迟,日拱一卒,我们开始吧!

filebeat官方文档:

https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-installation-configuration.html

filebeat项目地址:

https://github.com/elastic/beats/tree/main/filebeat

在基于elk的日志系统中,filebeat几乎是其中必不可少的一个组件,也是云原生时代下足够轻量级和高性能的容器日志采集工具。filebeat归属于beats项目,beats项目的设计初衷是为了采集各类的数据,和其他beats一样都基于libbeat库实现。其中,libbeat是一个提供公共功能的库,它实现了内存缓存队列memqueue、几种output日志发送客户端,数据的过滤处理processor等通用功能,filebeat只需要实现日志文件的读取等和日志相关的逻辑即可。

filebeat目录组织如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
├── autodiscover        # 包含filebeat的autodiscover适配器(adapter),当autodiscover发现新容器时创建对应类型的输入
├── beater              # 包含与libbeat库交互相关的文件
├── channel             # 包含filebeat输出到pipeline相关的文件
├── config              # 包含filebeat配置结构和解析函数
├── fileset             # 包含module和fileset相关的结构
├── harvester           # 包含Harvester接口定义、Reader接口及实现等
├── input               # 包含所有输入类型的实现(比如: log, stdin, syslog)
├── inputsource         # 在syslog输入类型中用于读取tcp或udp syslog
├── module              # 包含各module和fileset配置
├── modules.d           # 包含各module对应的日志路径配置文件,用于修改默认路径
├── processor           # 用于从容器日志的事件字段source中提取容器id
├── registrar           # 包含Registrar结构和方法
└── ...

在filebeat的入口函数main.go中作者提到了几个执行的基本模块:

  • input:找到配置的日志文件,启动harvester;
  • harvester: 读取文件,发送至spooler;
  • spooler: 缓存日志数据,直到可以发送至publisher;
  • publisher: 发送日志至后端,同时通知registrar;
  • registrar: 记录日志文件被采集的状态。

在filebeat中日志被采集经过以下流程:首先找到日志文件——>读取日志文件——>将数据存放到缓存队列中——>通知消费者到缓存队列中消费日志数据——>消费者获取日志数据发送到管道中供client读取——>从消费者发送的管道读日志数据调用client.Publish批量发送日志。

接下来我们展开来看上述流程。首先对于日志文件的采集和生命周期的管理,filebeat抽象出一个crawler结构体,其数据结构如下(filebeat/beate/crawler.go):

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
type crawler struct {
    log             *logp.Logger
    inputs          map[uint64]cfgfile.Runner // 包含所有输入的runner
    inputConfigs    []*conf.C+
    wg              sync.WaitGroup
    inputsFactory   cfgfile.RunnerFactory
    modulesFactory  cfgfile.RunnerFactory
    modulesReloader *cfgfile.Reloader
    inputReloader   *cfgfile.Reloader
    once            bool
    beatDone        chan struct{}
}

启动filebeat后crawler会根据配置创建,然后遍历并运行每个input。在每个input运行逻辑中,采用linux glob的规则(而非正则)来根据配置获取匹配的日志文件。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
for _, inputConfig := range c.inputConfigs {
    err := c.startInput(pipeline, inputConfig)
    if err != nil {
        return fmt.Errorf("starting input failed: %w", err)
    }
}
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
for _, path := range p.config.Paths {
    matches, err := filepath.Glob(path)
    if err != nil {
        logger.Errorf("glob(%s) failed: %v", path, err)
        continue
    }...
}

其中log类型Input结构体数据结构如下(filebeat/input/log/input.go):

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// Input contains the input and its config
type Input struct {
    cfg                 *conf.C
    logger              *logp.Logger
    config              config
    states              *file.States
    harvesters          *harvester.Registry
    outlet              channel.Outleter
    stateOutlet         channel.Outleter
    done                chan struct{}
    numHarvesters       atomic.Uint32
    meta                map[string]string
    stopOnce            sync.Once
    fileStateIdentifier file.StateIdentifier
}

匹配到需要采集的日志文件后,filebeat会对每个文件启动harvester goroutine,在该goroutine中不停的读取日志,并发送给内存缓存队列memqueue。harvester模块对应一个输入源,是收集数据的实际工作者。在配置中,一个具体的Input可以包含多个输入源(harvester)。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func (h *Harvester) Run() error {
    ...
    for {
        ...
        message, err := h.reader.Next()
        if err != nil {
            switch err {
            case ErrFileTruncate:
                logger.Info("File was truncated. Begin reading file from offset 0.")
                h.state.Offset = 0
                filesTruncated.Add(1)
            case ErrRemoved:
                logger.Info("File was removed. Closing because close_removed is enabled.")
            case ErrRenamed:
                logger.Info("File was renamed. Closing because close_renamed is enabled.")
            case ErrClosed:
                logger.Info("Reader was closed. Closing.")
            case io.EOF:
                logger.Info("End of file reached. Closing because close_eof is enabled.")
            case ErrInactive:
                logger.Infof("File is inactive. Closing because close_inactive of %v reached.", h.config.CloseInactive)
            default:
                logger.Errorf("Read line error: %v", err)
            }
            return nil
        }
        ...
        // Stop harvester in case of an error
        if !h.onMessage(forwarder, state, message, startingOffset) {
            return nil
        }
        ...
    }
}

我们看到harvester.reader.Next()方法会不停读取日志,如果没有异常返回,则发送日志数据到缓存队列中。其中h.onMessage()方法中完成更新状态及发送事件:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
err := forwarder.Send(beat.Event{
    Timestamp: timestamp,
    Fields:    fields,
    Meta:      meta,
    Private:   state,
})    

harvester结构体数据结构如下(filebeat/input/log/harvester.go):

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// Harvester contains all harvester related data
type Harvester struct {
    logger *logp.Logger

    id     uuid.UUID
    config config
    source harvester.Source // the source being watched

    // shutdown handling
    done     chan struct{}
    doneWg   *sync.WaitGroup
    stopOnce sync.Once
    stopWg   *sync.WaitGroup
    stopLock sync.Mutex

    // internal harvester state
    state  file.State
    states *file.States
    log    *Log

    // file reader pipeline
    reader          reader.Reader
    encodingFactory encoding.EncodingFactory
    encoding        encoding.Encoding

    // event/state publishing
    outletFactory OutletFactory
    publishState  func(file.State) bool

    metrics *harvesterProgressMetrics

    onTerminate func()
}

之后的流程是将数据存放到缓存队列中,缓存队列bufferingEventLoop数据结构如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
type bufferingEventLoop struct {
    broker *broker

    buf       *batchBuffer
    flushList flushList
    eventCount int

    unackedEventCount int

    minEvents    int
    maxEvents    int
    flushTimeout time.Duration

    // ack handling
    pendingACKs chanList

    // buffer flush timer state
    timer *time.Timer
    idleC <-chan time.Time
}    

bufferingEventLoop的run()方法同样是被放到一个无限循环中,这里可以看做一个日志事件调度器。之前提到的harvester goroutine不断读取日志,并发送给内存缓存队列memqueue,即日志数据转换成的事件会被发送到bufferingEventLoop的PushRequest channel中。

被发送进来的事件被放入pushChan,并触发执行handleInsert方法,将

数据添加到bufferingEventLoop的buf中,buf即是实际缓存日志数据的队列。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func (l *bufferingEventLoop) run() {
    broker := l.broker

    for {
        var pushChan chan pushRequest
        if l.eventCount < l.maxEvents {
            pushChan = l.broker.pushChan
        }

        var getChan chan getRequest
        if !l.flushList.empty() {
            getChan = l.broker.getChan
        }

        var schedACKs chan chanList
        if !l.pendingACKs.empty() {
            schedACKs = l.broker.scheduledACKs
        }

        select {
        case <-broker.done:
            return

        case req := <-pushChan: 
            l.handleInsert(&req)

        case req := <-l.broker.cancelChan: 
            l.handleCancel(&req)

        case req := <-getChan:
            l.handleGetRequest(&req)

        case schedACKs <- l.pendingACKs:
            l.pendingACKs = chanList{}

        case count := <-l.broker.ackChan:
            l.handleACK(count)

        case req := <-l.broker.metricChan: 
            l.handleMetricsRequest(&req)

        case <-l.idleC:
            l.idleC = nil
            l.timer.Stop()
            if l.buf.length() > 0 {
                l.flushBuffer()
            }
        }
    }
}

当flushList不为空时触发req := <-getChan,执行handleGetRequest方法,获取到消费者的response channel,并通知消费者到缓存队列中消费日志数据,其中核心逻辑是这句:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
req.responseChan <- getResponse{acker.ackChan, entries}

消费的逻辑在libeat/publisher/pipeline/consumer.go中:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func (c *eventConsumer) run() {
    queueReader := makeQueueReader()
    go func() {
        queueReader.run(log)
    }()
outerLoop:
    for {
        if queueBatch == nil && !pendingRead {
            pendingRead = true
            queueReader.req <- queueReaderRequest{
                queue:      c.queue,
                retryer:    c,
                batchSize:  target.batchSize,
                timeToLive: target.timeToLive,
            }
        }
        ...
        var outputChan chan publisher.Batch
        if active != nil {
            outputChan = target.ch
        }
        select {
        case outputChan <- active:
            if len(retryBatches) > 0 {
                c.observer.eventsRetry(len(active.Events()))
                retryBatches = retryBatches[1:]
            } else {
                queueBatch = nil
            }

        case target = <-c.targetChan:

        case queueBatch = <-queueReader.resp:
            pendingRead = false

        case req := <-c.retryChan:
            ...
            retryBatches = append(retryBatches, req.batch)

        case <-c.done:
            break outerLoop
        }
    }
}

消费者consumer从Broker中获取日志数据,然后发送至out的channel中被output client发送代码如下(libbeat/publisher/queue/memqueue/broker.go)

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func (b *broker) Get(count int) (queue.Batch, error) {
    responseChan := make(chan getResponse, 1)
    select {
    case <-b.done:
        return nil, io.EOF
    case b.getChan <- getRequest{
        entryCount: count, responseChan: responseChan}:
    }
    resp := <-responseChan
    return &batch{
        queue:   b,
        entries: resp.entries,
        ackChan: resp.ackChan,
    }, nil
}

getRequest和getResponse的结构如下。其中getResponse里包含了日志的数据,而getRequest包含了一个发送至消费者的channel。在之前bufferingEventLoop缓冲队列的handleGetRequest方法中接收到的参数为getRequest,里面包含了consumer请求的getResponse channel。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
type getRequest struct {
    entryCount   int              
    responseChan chan getResponse 
}

type getResponse struct {
    ackChan chan batchAckMsg
    entries []queueEntry
}
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
type queueEntry struct {
    event  interface{}
    client clientState
}

从消费者发送的管道中读日志数据调用client.Publish批量发送日志。libbeat的outputs下包含了kafka、logstash、redis、es等等几种client,它们均实现了client接口,其中最重要是实现Publish接口,将日志发送出去。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
type Client interface {
    Close() error
    Publish(context.Context, publisher.Batch) error
    String() string
}

例如kafka下的client.go中Publish接口实现如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func (c *client) Publish(_ context.Context, batch publisher.Batch) error {
    events := batch.Events()
    c.observer.NewBatch(len(events))

    ref := &msgRef{
        client: c,
        count:  int32(len(events)),
        total:  len(events),
        failed: nil,
        batch:  batch,
    }

    ch := c.producer.Input()
    for i := range events {
        d := &events[i]
        msg, err := c.getEventMessage(d)
        if err != nil {
            c.log.Errorf("Dropping event: %+v", err)
            ref.done()
            c.observer.Dropped(1)
            continue
        }

        msg.ref = ref
        msg.initProducerMessage()
        ch <- &msg.msg
    }

    return nil
}

本篇中我们从源码层面了解了日志文件是如何被filbebeat发现又是如何被采集的整个流程。在go-stash篇中,将介绍有着logstash 5倍的吞吐性能,并且通过一个可执行文件便可部署的处理工具。

参考:

https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-installation-configuration.html

https://cloud.tencent.com/developer/article/1367784

https://zhuanlan.zhihu.com/p/72912085

END

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-06-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 才浅coding攻略 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Filebeat 收集日志的那些事儿
最近因为云原生日志收集的需要,我们打算使用Filebeat作为容器日志收集工具,并对其进行二次开发。开源日志收集组件众多,之所以选择Filebeat,主要基于以下几点:
扫帚的影子
2020/05/27
3K0
filebeat源码解析
在基于elk的日志系统中,filebeat几乎是其中必不可少的一个组件,例外是使用性能较差的logstash file input插件或自己造个功能类似的轮子:)。
franyang
2018/12/04
10.5K0
filebeat源码解析
读猿码系列——4. 从filebeat和go-stash深入日志收集及处理(go-stash篇)
go-stash是一个高效的从Kafka获取,根据配置的规则进行处理,然后发送到ElasticSearch集群的工具。它属于go-zero生态的一个组件,是logstash 的 Go 语言替代版,它相比于原先的 logstash 节省了2/3的服务器资源。
才浅Coding攻略
2022/12/12
7530
读猿码系列——4. 从filebeat和go-stash深入日志收集及处理(go-stash篇)
Elastic Stack最佳实践系列:记一次filebeat内存泄漏问题分析及调优
filebeat是beats套件的核心组件之一(另一个核心是metricbeat),它一般和生成被采集文件(主要是日志)的程序安装在一个地方,根据官方的建议,是filebeat是不建议用来采集NFS(网络共享磁盘)上的数据的。当filebeat运行起来之后,必定会对cpu,内存,网络等资源产生一定的消耗,当这种消耗能够限定在一个可接受的范围时,我觉得没人会限制你在生产环境上使用filebeat。但如果出现一些非预期的情况,比如占用了大量的内存,那么运维团队肯定是优先保障核心业务的资源,把filebeat进程给杀了。很可惜的是,内存泄漏的问题,从filebeat的诞生到现在就一直没有完全解决过,无论是什么版本(最新的6.5版本暂时还没有观测到)在不同场景和配置下,均出现内存占用过多的问题。在这里,我主要描述一下我碰到的在filebeat 6.0上遇到的问题。
点火三周
2022/02/22
7.1K0
Elastic Stack最佳实践系列:记一次filebeat内存泄漏问题分析及调优
Go实现海量日志收集系统(三)
再次整理了一下这个日志收集系统的框,如下图 这次要实现的代码的整体逻辑为: 完整代码地址为: https://github.com/pythonsite/logagent etcd介绍 高可用的分布式
coders
2018/05/28
1K0
ES06# Filebeat采集原理与监控指标梳理
当Filebeat作为日志采集的agent铺开时,对其自身agent的监控以确保稳定就尤为的重要,有几种方式监控agent运行。
瓜农老梁
2022/06/23
4K0
ES06# Filebeat采集原理与监控指标梳理
Go实现海量日志收集系统(二)
一篇文章主要是关于整体架构以及用到的软件的一些介绍,这一篇文章是对各个软件的使用介绍,当然这里主要是关于架构中我们agent的实现用到的内容 关于zookeeper+kafka 我们需要先把两者启动,先启动zookeeper,再启动kafka 启动ZooKeeper:./bin/zkServer.sh start 启动kafka:./bin/kafka-server-start.sh ./config/server.properties 操作kafka需要安装一个包:go get github.com/S
coders
2018/03/30
3.6K1
Go实现海量日志收集系统(二)
Elastic Stack最佳实践系列:Beats->ES,一个更轻型的架构选择
说起开源的日志分析系统,ELK几乎无人不晓,这个生态并非是Elastic特意而为,毕竟Elasticsearch的初心是分布式的搜索引擎,被广泛用作日志系统纯粹一个“美丽的意外”,这是社区使用者推动而成。而现在各大云厂商推广自己的日志服务时,也往往将各种指标对标于ELK,可见其影响之广。
点火三周
2022/03/16
1.6K0
Elastic Stack最佳实践系列:Beats->ES,一个更轻型的架构选择
ELK 背压浅探
我们在日常使用 ELK 链路的时候,经常会碰到一个问题,由于链路涉及的组件较多,一旦当其中某些组件出现问题,就会出现“事件风暴”,如果没有做好相关的告警或者资源管控,很可能会使链路发生崩溃。
布鲁斯鱼
2022/11/02
9710
ELK 背压浅探
ES09# Filebeat配置项及吞吐调优项梳理
公司有使用filebeat作为日志采集的agent,然而最近发现其在一些node采集吞吐不足,现就其配置项与吞吐调优进行梳理。本文的主要内容有:
瓜农老梁
2022/12/03
2.5K0
【全文检索_10】Filebeat 基本使用
  Filebeat 是 Beats 的一员,用于转发和集中日志数据的轻量级传送工具。当面对成百上千、甚至成千上万的服务器、虚拟机和容器生成的日志时,Filebeat 将为您提供一种轻量型方法,监视指定的日志文件或位置,收集日志事件,并将它们转发到 Elasticsearch、 Logstash 等。
Demo_Null
2021/03/02
1.7K0
【全文检索_10】Filebeat 基本使用
elk + filebeat,6.3.2版本简单搭建,实现我们自己的集中式日志系统
  刚从事开发那段时间不习惯输出日志,认为那是无用功,徒增代码量,总认为自己的代码无懈可击;老大的叮嘱、强调也都视为耳旁风,最终导致的结果是我加班排查问题,花的时间还挺长的,要复现问题、排查问题等,幸亏那是公司内部员工用的系统,时间长一点也没什么大问题,但是如果是针对客户的,时间就代表很多东西了,那造成的影响就非常大了。自那以后养成了输出日志的习惯。
青石路
2018/09/10
1.4K0
elk + filebeat,6.3.2版本简单搭建,实现我们自己的集中式日志系统
日志收集组件—Flume、Logstash、Filebeat对比
数据的价值在于把数据变成行动。这里一个非常重要的过程是数据分析。提到数据分析,大部分人首先想到的都是Hadoop、流计算、机器学习等数据加工的方式。从整个过程来看,数据分析其实包含了4个过程:采集,存储,计算,展示。大数据的数据采集工作是大数据技术中非常重要、基础的部分,具体场景使用合适的采集工具,可以大大提高效率和可靠性,并降低资源成本。Flume、Logstash和Filebeat都是可以作为日志采集的工具,本报告将针对这三者进行分析。
数据社
2020/06/28
15.7K0
FileBeat 启动假死问题
上周因为 OOM 问题,某个集群内的 Filebeat 被迫重启后,观测了许久,仍不见事件流恢复,查看 Filebeat 输出日志,发现只有其自监控的日志:
布鲁斯鱼
2022/11/02
2.6K0
FileBeat 启动假死问题
【ES私房菜】Filebeat安装部署及配置详解
本文介绍了如何使用ELK进行日志分析,主要包括收集、索引、查询和分析功能。同时,本文还介绍了一些常见的日志分析工具,包括Graylog、SolarWinds和Datadog。此外,本文还提供了在Kubernetes环境中使用ELK进行日志分析的示例。
张戈
2017/09/29
26.2K3
【ES私房菜】Filebeat安装部署及配置详解
微服务架构中进行日志采集以及统一处理
微服务各个组件的相关实践会涉及到工具,本文将会介绍微服务日常开发的一些利器,这些工具帮助我们构建更加健壮的微服务系统,并帮助排查解决微服务系统中的问题与性能瓶颈等。
aoho求索
2020/11/13
1.5K0
微服务架构中进行日志采集以及统一处理
golang源码分析:sarama kafka client(part I:生产者)
https://github.com/Shopify/sarama 是一个纯go实现的kafka客户端,是gopher学习kafka一个很好的资料。说实话sarama的代码组织很烂,密密麻麻一堆源码文件都在一个目录,让人无从下手,下面列出了一部分:
golangLeetcode
2022/08/02
5810
多台filebeat+ELK模拟记录 原
基于 《filebeat+logstash配置搭建 》同一台及其做ELKF,现在做多台filebeat的实践。
用户1409099
2019/05/14
1.2K0
集群日志收集架构ELK
ELK 是elastic公司提供的一套完整的日志收集以及展示的解决方案,是三个产品的首字母缩写,分别是ElasticSearch、Logstash 和 Kibana。
用户7353950
2023/02/23
8600
集群日志收集架构ELK
logstash与filebeat组件的使用
Logstash 作为 Elasicsearch 常用的实时数据采集引擎,可以采集来自不同数据源的数据,并对数据进行处理后输出到多种输出源;
空洞的盒子
2023/11/11
8140
相关推荐
Filebeat 收集日志的那些事儿
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验