前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ElasticSearch 使用 Logstash 从 MySQL 中同步数据

ElasticSearch 使用 Logstash 从 MySQL 中同步数据

作者头像
皇上得了花柳病
修改2020-05-06 17:59:26
3.5K0
修改2020-05-06 17:59:26
举报
文章被收录于专栏:小辰的技术分享

目的是希望将现有的数据导入到 ElasticSearch 中,研究了好几种,除了写代码的方式,最简便的就是使用 Logstash 来导入数据到 ElasticSearch 中了。

因为现有的数据在 MySQL 数据库中,所以希望采用 logstash-input-jdbc 插件来导入数据。

安装 ElasticSearch 和 Logstash

首先需要安装 ElasticSearch 和 Logstash 环境,我选择的版本是 6.3.0

ELK 都是 Elastic 公司的产品,所以安装包可以到 http://www.elastic.co/downloads/elasticsearch 下载,老版本的归档在 https://www.elastic.co/downloads/past-releases 页面选择下载。

代码语言:javascript
复制
liuqianfei@Master:~$ wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.0.tar.gz
liuqianfei@Master:~$ wget https://artifacts.elastic.co/downloads/logstash/logstash-6.3.0.tar.gz
liuqianfei@Master:~$ tar -zxvf elasticsearch-6.3.0.tar.gz
liuqianfei@Master:~$ tar -zxvf logstash-6.3.0.tar.gz
liuqianfei@Master:~$ ls -lh
总用量 228M
drwxrwxr-x  2 liuqianfei liuqianfei 4.0K 11月 21 09:56 connector
drwxr-xr-x  9 liuqianfei liuqianfei 4.0K 11月 21 09:24 elasticsearch-6.3.0
-rw-r--r--  1 liuqianfei liuqianfei  88M 11月 21 09:13 elasticsearch-6.3.0.tar.gz
drwxrwxr-x 14 liuqianfei liuqianfei 4.0K 11月 21 10:15 logstash-6.3.0
-rw-r--r--  1 liuqianfei liuqianfei 141M 11月 21 09:13 logstash-6.3.0.tar.gz

在安装上都很简单,基本上就是解压即用,ElasticSearch 的安装可以参考 ElasticSearch 6.0.0 安装配置,注意配置 IP 和修改系统参数。

直接解压 logstash 到指定目录就可以了,然后执行 ./logstash -e

代码语言:javascript
复制
liuqianfei@Master:~/logstash-6.3.0$ 
liuqianfei@Master:~/logstash-6.3.0$ ./bin/logstash -e

输入 hello 输出如下则表示安装成功:

安装 logstash-input-jdbc 插件

现在使用 Logstash 比较幸福的是,logstash-6.1.1 以后已经默认支持 logstash-input-jdbc 插件,不需要再单独安装了。

如果你有不得已的原因必须要使用老版本的 Logstash,那么可以这么安装 logstash-input-jdbc 插件:

代码语言:javascript
复制
liuqianfei@Master:~/logstash-6.3.0$ ./bin/logstash-plugin install logstash-input-jdbc

需要注意网络配置,因为这是在线安装的。

在线安装网络问题

建议大家在使用 Logstash 的时候使用最新版本,如果必须用老版本在先安装 logstash-input-jdbc 插件。

本节从网上摘录了一段配置,没有经过充分验证。

logstash-input-jdbc 插件是 logstash 的一个插件,使用 ruby 语言开发。下载插件过程中最大的坑是下载插件相关的依赖的时候下不动,因为国内网络的原因,访问不到亚马逊的服务器。解决办法,改成国内的 ruby 仓库镜像。此镜像托管于淘宝的阿里云服务器上。

如果没有安装 gem 的话先安装 gem,这是 ruby 的管理工具包。

代码语言:javascript
复制
yum install gem
gem sources --add https://ruby.taobao.org/ --remove https://rubygems.org/
gem sources -l

*** CURRENT SOURCES ***

https://ruby.taobao.org

# 请确保只有 ruby.taobao.org
# 如果 还是显示 https://rubygems.org/ 进入 home的 .gemrc 文件
vim ~/.gemrc 
# 手动删除 https://rubygems.org/

修改 Gemfile 的数据源地址。步骤:

进入 logstash 根目录,vi Gemfile 修改 Gemfile 文件,修改 source 的值为 "https://ruby.taobao.org"

还是在 logstash 根目录,修改 lock 文件 Gemfile.lock,找到 remote: https://rubygems.org/ 修改为 remote: https://ruby.taobao.org

或者直接替换源这样你不用改你的 Gemfile 的 source。

代码语言:javascript
复制
sudo gem install bundler
$ bundle config mirror.https://rubygems.org https://ruby.taobao.org

然后就可以使用 ./logstash-plugin install logstash-input-jdbc 命令来安装 jdbc 插件了。

JDBC

logstash-input-jdbc 运行任务需要对应数据库的 JDBC 驱动文件。

我们在 home 目录新建目录 connector,把 MySQL 的驱动文件放在里面。

代码语言:javascript
复制
liuqianfei@Master:~$ mkdir connector
liuqianfei@Master:~$ cd connector
liuqianfei@Master:~$ ls -lah
liuqianfei@Master:~/connector$ ls -lah
总用量 836K
drwxrwxr-x 2 liuqianfei liuqianfei 4.0K 11月 21 09:56 .
drwxr-xr-x 8 liuqianfei liuqianfei 4.0K 11月 21 19:19 ..
-rw-rw-r-- 1 liuqianfei liuqianfei 827K 11月 21 09:56 mysql-connector-java-5.1.24-bin.jar
liuqianfei@Master:~/connector$

Logstash 导入导出脚本

我在 logstash 根目录新建了一个目录 script,专门存放 logstash 导入导出脚本配置。

代码语言:javascript
复制
liuqianfei@Master:~/logstash-6.3.0$ cd script/
liuqianfei@Master:~/logstash-6.3.0/script$ ls -lah
总用量 12K
drwxrwxr-x  2 liuqianfei liuqianfei 4.0K 11月 21 11:33 .
drwxrwxr-x 14 liuqianfei liuqianfei 4.0K 11月 21 19:19 ..
-rw-rw-r--  1 liuqianfei liuqianfei  444 11月 21 10:34 mysql.conf
liuqianfei@Master:~/logstash-6.3.0/script$

我的脚本内容很简单,从远程 MySQL 库 test_data_100w 导入表 test1_text 的全部数据到 ElasticSearch,任务只执行一次。

代码语言:javascript
复制
input {
  jdbc {
    jdbc_driver_library => "/home/liuqianfei/connector/mysql-connector-java-5.1.24-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://192.190.10.119:3306/test_data_100w?serverTimezone=GMT%2B8"
    jdbc_user => "root"
    jdbc_password => "root"
    statement => "SELECT * from test1_text"
  }
}

output {
        elasticsearch {
        hosts => [ "localhost:9200" ]
    }
}

执行 Logstash 导入任务

使用命令 ./bin/logstash -f ./script/mysql.conf 执行导入脚本。

代码语言:javascript
复制
liuqianfei@Master:~/logstash-6.3.0$ ./bin/logstash -f ./script/mysql.conf
Sending Logstash's logs to /home/liuqianfei/logstash-6.3.0/logs which is now configured via log4j2.properties
[2018-11-21T10:55:37,328][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2018-11-21T10:55:39,760][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"6.3.0"}
[2018-11-21T10:55:47,803][INFO ][logstash.pipeline        ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>1, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2018-11-21T10:55:49,717][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}}
[2018-11-21T10:55:49,802][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://localhost:9200/, :path=>"/"}
[2018-11-21T10:55:50,643][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://localhost:9200/"}
[2018-11-21T10:55:50,861][INFO ][logstash.outputs.elasticsearch] ES Output version determined {:es_version=>6}
[2018-11-21T10:55:50,879][WARN ][logstash.outputs.elasticsearch] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>6}
[2018-11-21T10:55:50,939][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil}
[2018-11-21T10:55:51,040][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-*", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"_default_"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}}
[2018-11-21T10:55:51,273][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//localhost:9200"]}
[2018-11-21T10:55:52,344][INFO ][logstash.pipeline        ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x349c342d run>"}
[2018-11-21T10:55:52,688][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2018-11-21T10:55:54,146][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2018-11-21T10:55:59,368][INFO ][logstash.inputs.jdbc     ] (2.011717s) SELECT * from test1_text
[2018-11-21T10:57:18,337][INFO ][logstash.pipeline        ] Pipeline has terminated {:pipeline_id=>"main", :thread=>"#<Thread:0x349c342d run>"}

等待几分钟任务执行完毕。

这时候查询 ElasticSearch 索引,发现已经有数据了。

代码语言:javascript
复制
D:\
λ curl -X GET http://192.190.10.170:9200/_cat/indices?v
health status index               uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   logstash-2018.11.21 SFS4m0oWSh6O30vEE3INhg   5   1     100000            0     66.mb         66.mb

D:\
λ

如果执行脚本的时候报错:

代码语言:javascript
复制
Java::JavaSql::SQLException: The server time zone value 'Öйú±ê׼ʱ¼ä' is unrecognized or represents more than one time zone. 

在 JDBC 链接的 url 后要带入时区参数 ?serverTimezone=GMT%2B8

注意 MySQL 要支持远程连接才行(如果是本地的 mysql 可以不管),不然后报拒绝访问的异常:

代码语言:javascript
复制
 is not allowed to connect to this MySql server

脚本说明

最后附一个从 MySQL 定时增量导入数据的脚本和参数说明,仅供参考。

代码语言:javascript
复制
#-----------------------------------start-----------------------------------
#输入部分
input {
  jdbc {
    #连接MySQL驱动,需要自己下载
    jdbc_driver_library => "/es/mysql-connector-java-5.1.31.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://10.112.29.30:3306/mstore"
    #连接数据库账号信息
    jdbc_user => "MySQL_admin"
    jdbc_password => "password"
    #分页
    jdbc_paging_enabled => true
    #分页大小
    jdbc_page_size => 100000
    #流式获取数据,每次取10000.
    jdbc_fetch_size => 10000
    #Maximum number of times to try connecting to database
    connection_retry_attempts => 3
    #Number of seconds to sleep between connection attempts
    connection_retry_attempts_wait_time => 1
    #Connection pool configuration. The amount of seconds to wait to acquire a connection before raising a PoolTimeoutError (default 5)
    jdbc_pool_timeout => 5
    #Whether to force the lowercasing of identifier fields
    lowercase_column_names => true
    #Whether to save state or not in last_run_metadata_path
    #保存上次运行记录,增量提取数据时使用
    record_last_run = > true
    #"* * * * *"为每分钟执行一次
    schedule => "* * * * *"
    #Use an incremental column value rather than a timestamp
    use_column_value => true
    #sql_last_value
    #The value used to calculate which rows to query. Before any query is run, this is set to Thursday, 1 January 1970, or 0 if use_column_value is true and tracking_column is set. It is updated accordingly after subsequent queries are run.
    tracking_column => "id"
    #查询语句
    statement => "SELECT id,package_name,name,sub_name,editor_comment,high_quality,sub_category,tag,update_time FROM tbl_app WHERE id > :sql_last_value"
  }
}

#过滤部分
filter {
  json {
    source => "message"
    remove_field => ["message"]
  }
  date{
    match => ["update_time","yyy-MM-dd HH:mm:ss"]
  }
}

#输出到elastsicearch
output {
  elasticsearch {
    #elasticsearch集群地址,不用列出所有节点,默认端口号也可省略
    hosts => ["10.127.92.181:9200", "10.127.92.212:9200", "10.127.92.111:9200"]
    #索引值,查询的时候会用到;需要先在elasticsearch中创建对应的mapping,也可以采用默认的mapping
    index => "store"
    #指定插入elasticsearch文档ID,对应input中sql字段id
    document_id => "%{id}"
  }
}

#------------------------------------end------------------------------------

使用时请去掉此文件中的注释,不然会报错。logstash 会把执行记录默认存在账户根目录下: /root/.logstash_jdbc_last_run,如果需要重新加载数据到 elasticsearch,需要删除这个文件。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 安装 ElasticSearch 和 Logstash
  • 安装 logstash-input-jdbc 插件
    • 在线安装网络问题
    • JDBC
    • Logstash 导入导出脚本
    • 执行 Logstash 导入任务
    • 脚本说明
    相关产品与服务
    云数据库 SQL Server
    腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档