前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >读猿码系列——3. 从filebeat和go-stash深入日志收集及处理(filebeat篇)

读猿码系列——3. 从filebeat和go-stash深入日志收集及处理(filebeat篇)

作者头像
才浅Coding攻略
发布于 2022-12-12 10:08:24
发布于 2022-12-12 10:08:24
66600
代码可运行
举报
文章被收录于专栏:才浅coding攻略才浅coding攻略
运行总次数:0
代码可运行

阿巩

伙伴们,端午节快乐哦!

提到容器的日志采集,在实际生产开发流程中,我们通常是先自己封装的日志库,然后走 filebeat + kafka + logstash + es这个完整的日志收集处理流程。关于kafka和es的资料网上比较多,这两块我们暂且不细看。go-satsh是logstash 的 Go 语言替代版,是go-zero生态中的一个组件,这部分我们将在go-satsh篇介绍。事不宜迟,日拱一卒,我们开始吧!

filebeat官方文档:

https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-installation-configuration.html

filebeat项目地址:

https://github.com/elastic/beats/tree/main/filebeat

在基于elk的日志系统中,filebeat几乎是其中必不可少的一个组件,也是云原生时代下足够轻量级和高性能的容器日志采集工具。filebeat归属于beats项目,beats项目的设计初衷是为了采集各类的数据,和其他beats一样都基于libbeat库实现。其中,libbeat是一个提供公共功能的库,它实现了内存缓存队列memqueue、几种output日志发送客户端,数据的过滤处理processor等通用功能,filebeat只需要实现日志文件的读取等和日志相关的逻辑即可。

filebeat目录组织如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
├── autodiscover        # 包含filebeat的autodiscover适配器(adapter),当autodiscover发现新容器时创建对应类型的输入
├── beater              # 包含与libbeat库交互相关的文件
├── channel             # 包含filebeat输出到pipeline相关的文件
├── config              # 包含filebeat配置结构和解析函数
├── fileset             # 包含module和fileset相关的结构
├── harvester           # 包含Harvester接口定义、Reader接口及实现等
├── input               # 包含所有输入类型的实现(比如: log, stdin, syslog)
├── inputsource         # 在syslog输入类型中用于读取tcp或udp syslog
├── module              # 包含各module和fileset配置
├── modules.d           # 包含各module对应的日志路径配置文件,用于修改默认路径
├── processor           # 用于从容器日志的事件字段source中提取容器id
├── registrar           # 包含Registrar结构和方法
└── ...

在filebeat的入口函数main.go中作者提到了几个执行的基本模块:

  • input:找到配置的日志文件,启动harvester;
  • harvester: 读取文件,发送至spooler;
  • spooler: 缓存日志数据,直到可以发送至publisher;
  • publisher: 发送日志至后端,同时通知registrar;
  • registrar: 记录日志文件被采集的状态。

在filebeat中日志被采集经过以下流程:首先找到日志文件——>读取日志文件——>将数据存放到缓存队列中——>通知消费者到缓存队列中消费日志数据——>消费者获取日志数据发送到管道中供client读取——>从消费者发送的管道读日志数据调用client.Publish批量发送日志。

接下来我们展开来看上述流程。首先对于日志文件的采集和生命周期的管理,filebeat抽象出一个crawler结构体,其数据结构如下(filebeat/beate/crawler.go):

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
type crawler struct {
    log             *logp.Logger
    inputs          map[uint64]cfgfile.Runner // 包含所有输入的runner
    inputConfigs    []*conf.C+
    wg              sync.WaitGroup
    inputsFactory   cfgfile.RunnerFactory
    modulesFactory  cfgfile.RunnerFactory
    modulesReloader *cfgfile.Reloader
    inputReloader   *cfgfile.Reloader
    once            bool
    beatDone        chan struct{}
}

启动filebeat后crawler会根据配置创建,然后遍历并运行每个input。在每个input运行逻辑中,采用linux glob的规则(而非正则)来根据配置获取匹配的日志文件。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
for _, inputConfig := range c.inputConfigs {
    err := c.startInput(pipeline, inputConfig)
    if err != nil {
        return fmt.Errorf("starting input failed: %w", err)
    }
}
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
for _, path := range p.config.Paths {
    matches, err := filepath.Glob(path)
    if err != nil {
        logger.Errorf("glob(%s) failed: %v", path, err)
        continue
    }...
}

其中log类型Input结构体数据结构如下(filebeat/input/log/input.go):

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// Input contains the input and its config
type Input struct {
    cfg                 *conf.C
    logger              *logp.Logger
    config              config
    states              *file.States
    harvesters          *harvester.Registry
    outlet              channel.Outleter
    stateOutlet         channel.Outleter
    done                chan struct{}
    numHarvesters       atomic.Uint32
    meta                map[string]string
    stopOnce            sync.Once
    fileStateIdentifier file.StateIdentifier
}

匹配到需要采集的日志文件后,filebeat会对每个文件启动harvester goroutine,在该goroutine中不停的读取日志,并发送给内存缓存队列memqueue。harvester模块对应一个输入源,是收集数据的实际工作者。在配置中,一个具体的Input可以包含多个输入源(harvester)。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func (h *Harvester) Run() error {
    ...
    for {
        ...
        message, err := h.reader.Next()
        if err != nil {
            switch err {
            case ErrFileTruncate:
                logger.Info("File was truncated. Begin reading file from offset 0.")
                h.state.Offset = 0
                filesTruncated.Add(1)
            case ErrRemoved:
                logger.Info("File was removed. Closing because close_removed is enabled.")
            case ErrRenamed:
                logger.Info("File was renamed. Closing because close_renamed is enabled.")
            case ErrClosed:
                logger.Info("Reader was closed. Closing.")
            case io.EOF:
                logger.Info("End of file reached. Closing because close_eof is enabled.")
            case ErrInactive:
                logger.Infof("File is inactive. Closing because close_inactive of %v reached.", h.config.CloseInactive)
            default:
                logger.Errorf("Read line error: %v", err)
            }
            return nil
        }
        ...
        // Stop harvester in case of an error
        if !h.onMessage(forwarder, state, message, startingOffset) {
            return nil
        }
        ...
    }
}

我们看到harvester.reader.Next()方法会不停读取日志,如果没有异常返回,则发送日志数据到缓存队列中。其中h.onMessage()方法中完成更新状态及发送事件:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
err := forwarder.Send(beat.Event{
    Timestamp: timestamp,
    Fields:    fields,
    Meta:      meta,
    Private:   state,
})    

harvester结构体数据结构如下(filebeat/input/log/harvester.go):

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// Harvester contains all harvester related data
type Harvester struct {
    logger *logp.Logger

    id     uuid.UUID
    config config
    source harvester.Source // the source being watched

    // shutdown handling
    done     chan struct{}
    doneWg   *sync.WaitGroup
    stopOnce sync.Once
    stopWg   *sync.WaitGroup
    stopLock sync.Mutex

    // internal harvester state
    state  file.State
    states *file.States
    log    *Log

    // file reader pipeline
    reader          reader.Reader
    encodingFactory encoding.EncodingFactory
    encoding        encoding.Encoding

    // event/state publishing
    outletFactory OutletFactory
    publishState  func(file.State) bool

    metrics *harvesterProgressMetrics

    onTerminate func()
}

之后的流程是将数据存放到缓存队列中,缓存队列bufferingEventLoop数据结构如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
type bufferingEventLoop struct {
    broker *broker

    buf       *batchBuffer
    flushList flushList
    eventCount int

    unackedEventCount int

    minEvents    int
    maxEvents    int
    flushTimeout time.Duration

    // ack handling
    pendingACKs chanList

    // buffer flush timer state
    timer *time.Timer
    idleC <-chan time.Time
}    

bufferingEventLoop的run()方法同样是被放到一个无限循环中,这里可以看做一个日志事件调度器。之前提到的harvester goroutine不断读取日志,并发送给内存缓存队列memqueue,即日志数据转换成的事件会被发送到bufferingEventLoop的PushRequest channel中。

被发送进来的事件被放入pushChan,并触发执行handleInsert方法,将

数据添加到bufferingEventLoop的buf中,buf即是实际缓存日志数据的队列。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func (l *bufferingEventLoop) run() {
    broker := l.broker

    for {
        var pushChan chan pushRequest
        if l.eventCount < l.maxEvents {
            pushChan = l.broker.pushChan
        }

        var getChan chan getRequest
        if !l.flushList.empty() {
            getChan = l.broker.getChan
        }

        var schedACKs chan chanList
        if !l.pendingACKs.empty() {
            schedACKs = l.broker.scheduledACKs
        }

        select {
        case <-broker.done:
            return

        case req := <-pushChan: 
            l.handleInsert(&req)

        case req := <-l.broker.cancelChan: 
            l.handleCancel(&req)

        case req := <-getChan:
            l.handleGetRequest(&req)

        case schedACKs <- l.pendingACKs:
            l.pendingACKs = chanList{}

        case count := <-l.broker.ackChan:
            l.handleACK(count)

        case req := <-l.broker.metricChan: 
            l.handleMetricsRequest(&req)

        case <-l.idleC:
            l.idleC = nil
            l.timer.Stop()
            if l.buf.length() > 0 {
                l.flushBuffer()
            }
        }
    }
}

当flushList不为空时触发req := <-getChan,执行handleGetRequest方法,获取到消费者的response channel,并通知消费者到缓存队列中消费日志数据,其中核心逻辑是这句:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
req.responseChan <- getResponse{acker.ackChan, entries}

消费的逻辑在libeat/publisher/pipeline/consumer.go中:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func (c *eventConsumer) run() {
    queueReader := makeQueueReader()
    go func() {
        queueReader.run(log)
    }()
outerLoop:
    for {
        if queueBatch == nil && !pendingRead {
            pendingRead = true
            queueReader.req <- queueReaderRequest{
                queue:      c.queue,
                retryer:    c,
                batchSize:  target.batchSize,
                timeToLive: target.timeToLive,
            }
        }
        ...
        var outputChan chan publisher.Batch
        if active != nil {
            outputChan = target.ch
        }
        select {
        case outputChan <- active:
            if len(retryBatches) > 0 {
                c.observer.eventsRetry(len(active.Events()))
                retryBatches = retryBatches[1:]
            } else {
                queueBatch = nil
            }

        case target = <-c.targetChan:

        case queueBatch = <-queueReader.resp:
            pendingRead = false

        case req := <-c.retryChan:
            ...
            retryBatches = append(retryBatches, req.batch)

        case <-c.done:
            break outerLoop
        }
    }
}

消费者consumer从Broker中获取日志数据,然后发送至out的channel中被output client发送代码如下(libbeat/publisher/queue/memqueue/broker.go)

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func (b *broker) Get(count int) (queue.Batch, error) {
    responseChan := make(chan getResponse, 1)
    select {
    case <-b.done:
        return nil, io.EOF
    case b.getChan <- getRequest{
        entryCount: count, responseChan: responseChan}:
    }
    resp := <-responseChan
    return &batch{
        queue:   b,
        entries: resp.entries,
        ackChan: resp.ackChan,
    }, nil
}

getRequest和getResponse的结构如下。其中getResponse里包含了日志的数据,而getRequest包含了一个发送至消费者的channel。在之前bufferingEventLoop缓冲队列的handleGetRequest方法中接收到的参数为getRequest,里面包含了consumer请求的getResponse channel。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
type getRequest struct {
    entryCount   int              
    responseChan chan getResponse 
}

type getResponse struct {
    ackChan chan batchAckMsg
    entries []queueEntry
}
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
type queueEntry struct {
    event  interface{}
    client clientState
}

从消费者发送的管道中读日志数据调用client.Publish批量发送日志。libbeat的outputs下包含了kafka、logstash、redis、es等等几种client,它们均实现了client接口,其中最重要是实现Publish接口,将日志发送出去。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
type Client interface {
    Close() error
    Publish(context.Context, publisher.Batch) error
    String() string
}

例如kafka下的client.go中Publish接口实现如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func (c *client) Publish(_ context.Context, batch publisher.Batch) error {
    events := batch.Events()
    c.observer.NewBatch(len(events))

    ref := &msgRef{
        client: c,
        count:  int32(len(events)),
        total:  len(events),
        failed: nil,
        batch:  batch,
    }

    ch := c.producer.Input()
    for i := range events {
        d := &events[i]
        msg, err := c.getEventMessage(d)
        if err != nil {
            c.log.Errorf("Dropping event: %+v", err)
            ref.done()
            c.observer.Dropped(1)
            continue
        }

        msg.ref = ref
        msg.initProducerMessage()
        ch <- &msg.msg
    }

    return nil
}

本篇中我们从源码层面了解了日志文件是如何被filbebeat发现又是如何被采集的整个流程。在go-stash篇中,将介绍有着logstash 5倍的吞吐性能,并且通过一个可执行文件便可部署的处理工具。

参考:

https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-installation-configuration.html

https://cloud.tencent.com/developer/article/1367784

https://zhuanlan.zhihu.com/p/72912085

END

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-06-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 才浅coding攻略 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
[oeasy]python093_find方法_指数为负数_index_实际效果
负数索引 char_list = list("oeasy") char_list-1
oeasy
2025/05/14
670
[oeasy]python093_find方法_指数为负数_index_实际效果
[oeasy]python092_eval_衡量_转化为列表
绕道 s = "oeasy" char_list = list(s) char_list char_list2 = "A" char_list s = "" for c in char_list: s += c
oeasy
2025/04/27
470
[oeasy]python092_eval_衡量_转化为列表
[oeasy]085_列表_列表项的追加_append
list1 = ["刘备", "关羽", "张飞"] liubei = list1[0] # 获取索引为0的元素,即“刘备” print(liubei) # 输出: 刘备
oeasy
2025/04/08
640
[oeasy]085_列表_列表项的追加_append
[oeasy]python089_列表_删除列表项_remove_列表长度_len
clist = list("oeasy") clist clist.clear()
oeasy
2025/04/19
700
[oeasy]python089_列表_删除列表项_remove_列表长度_len
[oeasy]python0071_字符串类型_str_string_下标运算符_中括号
oeasy
2023/07/07
1710
[oeasy]python0071_字符串类型_str_string_下标运算符_中括号
[oeasy]python090_列表_构造_范围_range_start_end_step_步长
从空列表增加列表项 my_list = [] my_list.append(1) my_list.append(2) my_list.append(3) my_list
oeasy
2025/04/21
980
[oeasy]python090_列表_构造_范围_range_start_end_step_步长
[oeasy]python0045_四种进制_binary_octal_decimal_hexadecimal
四种进制回忆上次内容 上次研究了 通过 八进制数值 转义 \ooo把(ooo)8进制对应的ascii字符输出 转义序列 \n、\t 是 转义序列\xhh 也是 转义序列\ooo 还是 转义序列现在 总共有 几种进制 了呢?🤔先数一下 树数树树 就是这么多棵树用八进制的方式 数树八进制八根手指头 (13)8进制棵这是用 八根手指头 数的 如果换成 十根手指头呢?10进制用十根手指头数树 (11)10进制棵到底多少棵树?哪个才对呢? (13)8进制棵(11)10进制棵数树在不同进制下 有不同的数值都是正
oeasy
2023/01/12
2080
[oeasy]python0045_四种进制_binary_octal_decimal_hexadecimal
[oeasy]python086方法_method_函数_function_区别
[oeasy]python086方法_method_函数_function_区别 回忆
oeasy
2025/04/10
610
[oeasy]python086方法_method_函数_function_区别
[oeasy]python0015_十六进制_hexadecimal_字节形态_hex函数
​十六进制(hexadecimal)回忆上次内容上次数制可以转化bin(n)可以把数字转化为 ​​2进制​binary接收一个整数(int)得到一个二进制数形式的字符串​编辑数字在计算机中是用二进制存储的但是展示给我们的时候用的是十进制​编辑也就是0-9这10个字符的形式都说字节是计算机存储的最小单位这些数字在字节里面长什么样子呢?🤔字节首先明确字节长什么样子?​编辑1 个 字节(byte)正好 8 个 位(bit)​编辑如果我们用一个数字来表示一个字节的话需要表示[0,28-1]至少需要3
oeasy
2022/11/15
4870
[oeasy]python0015_十六进制_hexadecimal_字节形态_hex函数
[oeasy]python035_根据序号得到字符_chr函数_字符_character_
oeasy
2024/09/23
1110
[oeasy]python035_根据序号得到字符_chr函数_字符_character_
[oeasy]python0131_[趣味拓展]各种符号_汉语拼音符号_中文全角英文字母_中文全角标点
1.[á]/[â] 2.[ǎ] 3.[ā] 4.[a̖]/[ȁ] 5.[a̗] 6.[à] 总结
oeasy
2023/04/09
9730
[oeasy]python0131_[趣味拓展]各种符号_汉语拼音符号_中文全角英文字母_中文全角标点
[oeasy]python054_python有哪些关键字_keyword_list_列表_reserved_words
oeasy
2024/12/20
590
[oeasy]python054_python有哪些关键字_keyword_list_列表_reserved_words
[oeasy]python0014_二进制_binary_bin
​二进制(binary)回忆上次内容上次我们了解了​​ASCII​​码表​ASCII​​ 码表就是​​A​​merican ​​S​​tandard ​​C​​ode for ​​I​​nformation ​​I​​nterchange美国信息交换标准代码ASCII 码表范围​0x30-0x39​​ 这个范围是 ​​数字​数字的编码减去 ​​0x30​​ 正好得到数字本身​​0x41-0x5A​​ 这个范围是 ​​大​​ 写字母​​0x61-0x7A​​ 这个范围是 ​​小​​ 写字母​​0x20-0x7
oeasy
2022/11/13
4450
[oeasy]python0014_二进制_binary_bin
[oeasy]python0012_字符_character_chr函数_根据序号得到字符
​字符(character)回忆上次内容上次了解了ord函数这个函数可以通过字符得到序号那么可以反过来吗?通过序号得到字符可以吗?​编辑ord的逆运算chr有来就有回​编辑好像可以我们可以把 104 作为参数给到 函数chr()注意给的参数是数字 104而不是字符串"104"没有引号我们可以发现 ​​数字104​​ 和 ​​字符'h'​​ 是有关联的有两个单词了ordchr他们都是什么意思来着?🤔ord是ordinal 序号看一下帮助ordhelp(ord)​编辑看完之后可以q退回来​ord​​
oeasy
2022/11/09
3290
[oeasy]python0020换行字符_feed_line_lf_反斜杠n_B语言_安徒生童话
Basic Combined Programming Language(BCPL)
oeasy
2022/11/25
1.1K0
[oeasy]python0020换行字符_feed_line_lf_反斜杠n_B语言_安徒生童话
[oeasy]python078_字符串类型怎么用_str_string_下标运算符_中括号
oeasy
2025/03/25
490
[oeasy]python078_字符串类型怎么用_str_string_下标运算符_中括号
[oeasy]python0017_解码_decode_字节序列_bytes_字符串_str
​解码 decode回忆上次内容code就是码最早也指电报码后来有各种编码、密码、砝码、条码都指的是把各种事物编个号encode就是编码编码就是给事物编个号​编辑编码基本了解了给事物编号就是编码怎么通过编号找到原来的事物呢?解码解码是编码的逆运算解铃换需系铃人​编辑上次把白菜编上号这次扫到号知道是白菜扫到码就知道这个条码对应这个大白菜并知道价格​编辑这解码用英文怎么说呢?解码(decode)de 的意思是相反的defuse 解除保险炸弹引信decolor 漂白defame 中伤destruct
oeasy
2022/11/19
4870
[oeasy]python0017_解码_decode_字节序列_bytes_字符串_str
[oeasy]python0041_ 转义字符_转义序列_escape_序列_sequence
转义序列回忆上次内容上次回顾了5bit-Baudot博多码的来历从 莫尔斯码 到 博多码 原来 人 来 收发电报现在 机器 来 收发电报输入方式 从 电键改成 键盘输出方式 从 纸带变成 打印纸张后来 电传打字机ASR-33成为 初代 经典终端除了 \n 和 \r 之外 还有什么 特殊字符 吗?🤔搜索 ASCII找到 ascii的定义还有 好多 类似于\n、\r的 特殊字符动手试试总结一下 各种 转义字符转义总结\a 响铃 ␇ (bell)电传打字机 回车前 都会预警响铛 避免 回车过程中 误打字符可以
oeasy
2023/01/09
1.5K0
[oeasy]python0041_ 转义字符_转义序列_escape_序列_sequence
[oeasy]python061_如何接收输入_input函数_字符串_str_容器_ 输入输出
061_python如何接收输入_input函数_字符串_str_容器 输入输出0 播放 · 0 赞同视频
oeasy
2025/01/17
780
[oeasy]python061_如何接收输入_input函数_字符串_str_容器_ 输入输出
[oeasy]python088_列表_清空列表_clear
while True: numbers = [] while True: num = input("请输入商品价格(输入end停止输入):") if num == "end": break try: num = float(num) numbers.append(num) except ValueError: print("输入不合法,请输入有效的浮点数或end。") total = sum(numbers) print(f"本次输入的数字总和为:{total}")
oeasy
2025/04/17
450
[oeasy]python088_列表_清空列表_clear
推荐阅读
相关推荐
[oeasy]python093_find方法_指数为负数_index_实际效果
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验