go-stash是一个高效的从Kafka获取,根据配置的规则进行处理,然后发送到ElasticSearch集群的工具。它属于go-zero生态的一个组件,是logstash 的 Go 语言替代版,它相比于原先的 logstash 节省了2/3的服务器资源。
项目地址:https://github.com/kevwan/go-stash
先从yaml配置中看整体系统设计(stash/etc/config.yaml)其中kafka作为数据输入端,ElasticSearch作为数据输出端,filter 抽象了数据处理过程。
Clusters:
- Input:
Kafka:
Name: go-stash
Log:
Mode: file
Brokers:
- "172.16.48.41:9092"
- "172.16.48.42:9092"
- "172.16.48.43:9092"
Topic: ngapplog
Group: stash
Conns: 3
Consumers: 10
Processors: 60
MinBytes: 1048576
MaxBytes: 10485760
Offset: first
Filters:
- Action: drop
Conditions:
- Key: status
Value: 503
Type: contains
- Key: type
Value: "app"
Type: match
Op: and
- Action: remove_field
Fields:
- message
- source
- beat
- fields
- input_type
- offset
- "@version"
- _score
- _type
- clientip
- http_host
- request_time
Output:
ElasticSearch:
Hosts:
- "http://172.16.188.73:9200"
- "http://172.16.188.74:9200"
- "http://172.16.188.75:9200"
Index: "go-stash-{{yyyy.MM.dd}}"
MaxChunkBytes: 5242880
GracePeriod: 10s
Compress: false
TimeZone: UTC
input:
Conn表示kafka的连接数,一般<=CPU核数;
Consumers表示每个连接数打开的线程数,Conns * Consumers不建议超过topic分片数;
Processors为处理数据的线程数量;
MinBytes和MaxBytes表示每次从kafka获取数据块的区间大小;
Offset参数可选last和false,默认为last,表示从头从kafka开始读取数据。
我们从主函数入口开始了解整个数据流程,入口函数stash/stash.go:
func main() {
// 解析命令行参数,启动优雅退出
flag.Parse()
var c config.Config
conf.MustLoad(*configFile, &c)
proc.SetTimeToForceQuit(c.GracePeriod)
// service 组合模式
group := service.NewServiceGroup()
defer group.Stop()
for _, processor := range c.Clusters {
// 连接es
client, err := elastic.NewClient(
elastic.SetSniff(false),
elastic.SetURL(processor.Output.ElasticSearch.Hosts...),
elastic.SetBasicAuth(processor.Output.ElasticSearch.Username,processor.Output.ElasticSearch.Password),
)
logx.Must(err)
// filter processors 构建
filters := filter.CreateFilters(processor)
writer, err := es.NewWriter(processor.Output.ElasticSearch)
logx.Must(err)
var loc *time.Location
if len(processor.Output.ElasticSearch.TimeZone) > 0 {
loc, err = time.LoadLocation(processor.Output.ElasticSearch.TimeZone)
logx.Must(err)
} else {
loc = time.Local
}
// 准备es的写入操作 {写入的index, 写入器writer}
indexer := es.NewIndex(client, processor.Output.ElasticSearch.Index, loc)
handle := handler.NewHandler(writer, indexer)
handle.AddFilters(filters...)
handle.AddFilters(filter.AddUriFieldFilter("url", "uri"))
// 按照配置启动kafka,并将消费操作传入,同时加入组合器
for _, k := range toKqConf(processor.Input.Kafka) {
group.Add(kq.MustNewQueue(k, handle))
}
}
// 启动这个组合器
group.Start()
}
循环从配置的集群中取出每个processor来处理,首先建立es客户端连接,构建filter processor,其中filter.CreateFilters()方法如下:
func CreateFilters(p config.Cluster) []FilterFunc {
var filters []FilterFunc
for _, f := range p.Filters {
switch f.Action {
case filterDrop:
filters = append(filters, DropFilter(f.Conditions))
case filterRemoveFields:
filters = append(filters, RemoveFieldFilter(f.Fields))
case filterTransfer:
filters = append(filters, TransferFilter(f.Field, f.Target))
}
}
return filters
}
我们看到这里实现的方法对应了我们在yaml配置中约定好的Filters中的drop、remove_field、transfer字段下对应的约束,最终返回满足条件的filters过滤器列表。
es.NewWriter()创建写入器writer,用于es的写入操作,代码如下:
func NewWriter(c config.ElasticSearchConf) (*Writer, error) {
client, err := elastic.NewClient(
elastic.SetSniff(false),
elastic.SetURL(c.Hosts...),
elastic.SetGzip(c.Compress),
elastic.SetBasicAuth(c.Username,c.Password),
)
if err != nil {
return nil, err
}
writer := Writer{
docType: c.DocType,
client: client,
}
writer.inserter = executors.NewChunkExecutor(writer.execute, executors.WithChunkBytes(c.MaxChunkBytes))
return &writer, nil
}
es.NewIndex()创建写入的index,index结构体数据结构如下:
type Index struct {
client *elastic.Client
indexFormat IndexFormat
indices map[string]lang.PlaceholderType
lock sync.RWMutex
singleFlight syncx.SingleFlight
}
然后用writer、index、filter创建MessageHandler,结构体如下:
type MessageHandler struct {
writer *es.Writer
indexer *es.Index
filters []filter.FilterFunc
}
func NewHandler(writer *es.Writer, indexer *es.Index) *MessageHandler {
return &MessageHandler{
writer: writer,
indexer: indexer,
}
}
MessageHandler在结构上对接了下游es,负责数据处理到数据写入;对上接入kafka部分在接口设计上通过go-queue实现了ConsumeHandler接口,在消费过程中执行 handler 的操作,从而写入 es。
func (mh *MessageHandler) Consume(_, val string) error {
var m map[string]interface{}
// 反序列化从 kafka 中的消息
if err := jsoniter.Unmarshal([]byte(val), &m); err != nil {
return err
}
// es 写入index配置
index := mh.indexer.GetIndex(m)
// filter 链式处理(map进map出)
for _, proc := range mh.filters {
if m = proc(m); m == nil {
return nil
}
}
bs, err := jsoniter.Marshal(m)
if err != nil {
return err
}
// es 写入
return mh.writer.Write(index, string(bs))
}
按照配置启动kafka,并将消费操作传入,同时加入组合器,启动组合器group.Start():
for _, k := range toKqConf(processor.Input.Kafka) {
group.Add(kq.MustNewQueue(k, handle))
}
至此数据处理以及上下游的连接点已打通,开发者主动从kafka中拉数据拿到es中处理。加入 group 的 service 都是实现 Start()来启动,kafka启动逻辑如下。
即启动kafka消费程序——>从 kafka 拉取消息到 q.channel——>消费程序终止,收尾工作。
func (q *kafkaQueue) Start() {
q.startConsumers()
q.startProducers()
q.producerRoutines.Wait()
close(q.channel)
q.consumerRoutines.Wait()
}
q.startConsumers()
|- [q.consumeOne(key, value) for msg in q.channel]
|- q.handler.Consume(key, value)
至此整个流程已经串起来了,这里放一张官方数据流程图:
参考:
https://github.com/kevwan/go-stash
https://mp.weixin.qq.com/s/UeeSZi_-ZiiHf3P4tmyszw
END