前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >使用 go-mysql-elasticsearch 把 MySQL 中的业务日志导入 Elasticsearch

使用 go-mysql-elasticsearch 把 MySQL 中的业务日志导入 Elasticsearch

作者头像
崔秀龙
发布2019-07-23 15:18:42
2.8K0
发布2019-07-23 15:18:42
举报
文章被收录于专栏:伪架构师

前言

相当一部分应用的日志是保存在数据库之中的,这些陈旧又稳定的应用在支撑着业务的运行。它们产生的日志通常来说也是有其价值的,但是如果能够融入到目前较为通用的 Elasticsearch 当中的话,可能有助于降低运维工作量,防止信息孤岛,并且进一步挖掘既有应用和业务的商业价值。

go-mysql-elasticsearch 就是这样一个项目,它可以从 MySQL 的数据表中读取指定数据表的数据,发送到 ElasticSearch 之中。它会使用 mysqldump 命令处理现有存量数据,并借助 binlog 的方式跟踪增量数据,从而保证 Elasticsearch 的数据和 MySQL 数据库中的数据保持同步。下面会简单讲一下这一项目的配置,并试验一个简单例子,最后根据实际情况进行一些改进。

条件和假设

  • 目前该工具支持 MySQL 和 ES 的版本都是 5.x。
  • MySQL 服务器需要开启 row 模式的 binlog。
  • 因为要使用 mysqldump 命令,因此该进程的所在的服务器需要部署这一工具。
  • 这一工具使用 GoLang 开发,需要 Go 1.9+ 的环境进行构建。
  • 可用的 MySQL、Elasticsearch 以及 Kibana 实例。

另外为了进行演示,这里做一点假设:

业务日志表

代码语言:javascript
复制
CREATE TABLE `biz_log` (  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,  `receive_content` text,  `send_content` text,  `fd_receive_content` text,  `fd_send_content` text,  `log_time` datetime DEFAULT NULL,  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=21 DEFAULT CHARSET=utf8mb4;

四个 text 字段使用 JSON 格式存储了几个不同的日志种类。

工具构建

  1. go get github.com/siddontang/go-mysql-elasticsearch
  2. cd $GOPATH/src/github.com/siddontang/go-mysql-elasticsearch
  3. make

转换配置

执行 go-mysql-elasticsearch --help,会看到一系列的参数,最主要的参数就是 -config,这个参数用于设置转换过程所需的参数配置文件,在源码的 /etc/river.toml 中包含了一份配置样本。作者在这一配置样本中提供了非常详尽的注释,可以对转换过程做出很多定制。但是由于工具本身具备很好的适应能力,加上 ES 的强大功能,只需要一点简单的设置,就能够顺利完成常见任务了。

一个简单的配置:

代码语言:javascript
复制
# MySQL 的相关配置
# 指定用户必须具备复制权限
my_addr = "127.0.0.1:3306"
my_user = "root"
my_pass = "Flzx3000c"
my_charset = "utf8mb4"# ES 相关配置
es_addr = "127.0.0.1:9200"
es_user = ""
es_pass = ""# 数据源配置
# 以 Slave 模式工作
server_id = 10001
# mysql/mariadb
flavor = "mysql"# mysqldump 路径,如果为空或者未设置,会跳过这一环节。
mysqldump = "mysqldump"
bulk_size = 128
flush_bulk_time = "200ms"
skip_no_pk_table = false[[source]]
# 数据库名称
schema = "biz"
# 数据表同步范围,支持通配符
tables = ["biz_log"]# 规则定义
[[rule]]
# 数据库名称
schema = "biz"
# 规则对应的数据表,支持通配符
table = "biz_log"
# 目标 ES 索引
index = "biz"
# 该规则在 ES 中生成的文档类型
type = "log_db"

同步

配置文件完成之后,就可以执行 ./go-mysql-elasticsearch -config=./river.toml,日志中会显示首先执行 mysqldump 导出存量数据,然后开始守护进程阶段,跟踪 binlog 并进行同步。

此时打开 Kibana,执行 GET _search,会看到数据库记录已经进入了 ES 中,并且按照我们定义的规则进行了索引。在守护进行运行期间,如果有新的数据插入,也会同步到 ES 之中。

但是这里我们会发现一个小问题,前面提到的 JSON 字段被作为单一的字符串存入了 ES 索引。这样就根据 JSON 中的特定字段进行搜索的需要就比较费劲了,而我们也知道,如果直接向 ES 提交文档,其中的 JSON 是会被映射为 Object 类型的。如果对 ES 索引进行数据类型的定义,会发现直接将 JSON 字段映射到 Object 类型后,同步过程会失败,返回错误认为将无效内容映射到了这一类型。因此可以推测是字符串并没有使用原有格式提交给 ES。

经过对代码的阅读跟踪,发现在 elastic/client.go 中对数据进行了一次 Json 编码:

代码语言:javascript
复制
    default:        //for create and index
       data, err = json.Marshal(r.Data)        if err != nil {            return errors.Trace(err)
       }

下面就尝试进行一点改动,使之支持嵌套在字段内容中的 JSON 内容。

JSON

这里我想到了一个简单粗暴的办法就是,对数据报文进行一次检查,如果该字段内容是有效 JSON 的话,就使用 github.com/buger/jsonparserset 方法,将压缩后的 JSON 字符串重新赋值给编码后的 byte[]

首先给 BulkRequest 定义一个新方法,用于数据编码

代码语言:javascript
复制
func (b *BulkRequest) encodeData() []byte {
    jsonResults,_ := json.Marshal(b.Data)     // 判断是否有效的 JSON 数据
    isJson := func(s string) bool {         var js map[string]interface{}         return json.Unmarshal([]byte(s), &js) == nil    }     for key, value := range b.Data {
        stringValue, ok := value.(string)         // 如果字段内容是字符串并且是 JSON 格式
        if ok && isJson(stringValue) {             // 设置编码后内容该字段的值为原文
            jsonResults,_ = jsonparser.Set(jsonResults, []byte(stringValue), key)
           }
    }     return jsonResults
}

然后将原有的 data, err = json.Marshal(r.Data) 替换为 data = r.encodeData(),再次构建运行。会看到 ES 成功的将 JSON 字段进行了解析,生成了 Object 类型的映射关系。

补充说明

这里引用go-mysql-elasticsearch功能及性能验证 一文的性能测试结果:

代码语言:javascript
复制
1.全量同步
支持:需要安装mysqldump(mysql自带),同步11.5w数据,耗时3分13秒。
全量基于mysqldump,需要将工具和mysql安装在同一个节点,其它方式尚未找到。
2.增量同步
支持。
增量插入20W数据,耗时8分钟。
删除20w条数据,耗时6分。
更新20w条数据,12分钟。

这一工具还有一些其它亮点,例如多表聚合、字段过滤、自定义字段映射等。

相关链接

  1. 项目地址:https://github.com/siddontang/go-mysql-elasticsearch
  2. 配置文件样本:https://github.com/siddontang/go-mysql-elasticsearch/blob/master/etc/river.toml
  3. Fork 地址:https://github.com/fleeto/go-mysql-elasticsearch
  4. 作者原版教程:https://www.jianshu.com/p/96c7858b580f
  5. go-mysql-elasticsearch 功能及性能验证 :https://my.oschina.net/u/2282993/blog/1821930
  6. JsonParser:https://github.com/buger/jsonparser
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2018-11-01,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 伪架构师 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 条件和假设
    • 业务日志表
    • 工具构建
    • 转换配置
    • 同步
    • JSON
    • 补充说明
    • 相关链接
    相关产品与服务
    Elasticsearch Service
    腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档