前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >怎样将 MySQL 数据表导入到 Elasticsearch

怎样将 MySQL 数据表导入到 Elasticsearch

作者头像
netkiller old
发布于 2018-03-05 10:36:00
发布于 2018-03-05 10:36:00
5.2K10
代码可运行
举报
文章被收录于专栏:NetkillerNetkiller
运行总次数:0
代码可运行

本文节选自《Netkiller Database 手札》

MySQL 导入 Elasticsearch 的方法有很多,通常是使用ETL工具,但我觉得太麻烦。于是想到 logstash 。

23.8. Migrating MySQL Data into Elasticsearch using logstash

23.8.1. 安装 logstash

安装 JDBC 驱动 和 Logstash

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
curl -s https://raw.githubusercontent.com/oscm/shell/master/database/mysql/5.7/mysql-connector-java.sh	 | bash			
curl -s https://raw.githubusercontent.com/oscm/shell/master/log/kibana/logstash-5.x.sh | bash			

mysql 驱动文件位置在 /usr/share/java/mysql-connector-java.jar

23.8.2. 配置 logstash

创建配置文件 /etc/logstash/conf.d/jdbc-mysql.conf

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
			mysql> desc article;
+-------------+--------------+------+-----+---------+-------+
| Field       | Type         | Null | Key | Default | Extra |
+-------------+--------------+------+-----+---------+-------+
| id          | int(11)      | NO   |     | 0       |       |
| title       | mediumtext   | NO   |     | NULL    |       |
| description | mediumtext   | YES  |     | NULL    |       |
| author      | varchar(100) | YES  |     | NULL    |       |
| source      | varchar(100) | YES  |     | NULL    |       |
| ctime       | datetime     | NO   |     | NULL    |       |
| content     | longtext     | YES  |     | NULL    |       |
+-------------+--------------+------+-----+---------+-------+
7 rows in set (0.00 sec)			
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
			input {
  jdbc {
    jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
    jdbc_user => "cms"
    jdbc_password => "password"
    schedule => "* * * * *"
    statement => "select * from article"
  }
}
output {
    elasticsearch {
    		hosts => "localhost:9200"
        index => "information"
        document_type => "article"
        document_id => "%{id}"
        
    }
}			

23.8.3. 启动 Logstash

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
			root@netkiller /var/log/logstash % systemctl restart logstash

root@netkiller /var/log/logstash % systemctl status logstash
● logstash.service - logstash
   Loaded: loaded (/etc/systemd/system/logstash.service; enabled; vendor preset: disabled)
   Active: active (running) since Mon 2017-07-31 09:35:00 CST; 11s ago
 Main PID: 10434 (java)
   CGroup: /system.slice/logstash.service
           └─10434 /usr/bin/java -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -XX:+DisableExplicitGC -Djava.awt.headless=true -Dfi...

Jul 31 09:35:00 VM_3_2_centos systemd[1]: Started logstash.
Jul 31 09:35:00 VM_3_2_centos systemd[1]: Starting logstash...
			
root@netkiller /var/log/logstash % cat logstash-plain.log 
[2017-07-31T09:35:28,169][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}}
[2017-07-31T09:35:28,172][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://localhost:9200/, :path=>"/"}
[2017-07-31T09:35:28,298][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>#<Java::JavaNet::URI:0x453a18e9>}
[2017-07-31T09:35:28,299][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil}
[2017-07-31T09:35:28,337][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-*", "version"=>50001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"_default_"=>{"_all"=>{"enabled"=>true, "norms"=>false}, "dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date", "include_in_all"=>false}, "@version"=>{"type"=>"keyword", "include_in_all"=>false}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}}
[2017-07-31T09:35:28,344][INFO ][logstash.outputs.elasticsearch] Installing elasticsearch template to _template/logstash
[2017-07-31T09:35:28,465][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>[#<Java::JavaNet::URI:0x66df34ae>]}
[2017-07-31T09:35:28,483][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>1000}
[2017-07-31T09:35:29,562][INFO ][logstash.pipeline        ] Pipeline main started
[2017-07-31T09:35:29,700][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2017-07-31T09:36:01,019][INFO ][logstash.inputs.jdbc     ] (0.006000s) select * from article	

23.8.4. 验证

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
			% curl -XGET 'http://localhost:9200/_all/_search?pretty'			

23.8.5. 配置模板

23.8.5.1. 全量导入

适合数据没有改变的归档数据或者只能增加没有修改的数据

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
				input {
  jdbc {
    jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
    jdbc_user => "cms"
    jdbc_password => "password"
    schedule => "* * * * *"
    statement => "select * from article"
  }
}
output {
    elasticsearch {
    		hosts => "localhost:9200"
        index => "information"
        document_type => "article"
        document_id => "%{id}"
        
    }
}				
23.8.5.2. 多表导入

多张数据表导入到 Elasticsearch

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
				# multiple inputs on logstash jdbc

input {
  jdbc {
    jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
    jdbc_user => "cms"
    jdbc_password => "password"
    schedule => "* * * * *"
    statement => "select * from article"
    type => "article"
  }
  jdbc {
    jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
    jdbc_user => "cms"
    jdbc_password => "password"
    schedule => "* * * * *"
    statement => "select * from comment"
    type => "comment"
  } 
}
output {
    elasticsearch {
    		hosts => "localhost:9200"
        index => "information"
        document_type => "%{type}"
        document_id => "%{id}"
        
    }
}				

需要在每一个jdbc配置项中加入 type 配置,然后 elasticsearch 配置项中加入 document_type => "%{type}"

23.8.5.3. 通过 ID 主键字段增量复制数据
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
				input {
  jdbc {
    statement => "SELECT id, mycolumn1, mycolumn2 FROM my_table WHERE id > :sql_last_value"
    use_column_value => true
    tracking_column => "id"
    tracking_column_type => "numeric"
    # ... other configuration bits
  }
}				

tracking_column_type => "numeric" 可以声明 id 字段的数据类型, 如果不指定将会默认为日期

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[2017-07-31T11:08:00,193][INFO ][logstash.inputs.jdbc     ] (0.020000s) select * from article where id > '2017-07-31 02:47:00'				

如果复制不对称可以加入 clean_run => true 配置项,清楚数据

23.8.5.4. 通过日期字段增量复制数据
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
				input {
  jdbc {
    statement => "SELECT * FROM my_table WHERE create_date > :sql_last_value"
    use_column_value => true
    tracking_column => "create_date"
    # ... other configuration bits
  }
}				

如果复制不对称可以加入 clean_run => true 配置项,清楚数据

23.8.5.5. 指定SQL文件

statement_filepath 指定 SQL 文件,有时SQL太复杂写入 statement 配置项维护部方便,可以将 SQL 写入一个文本文件,然后使用 statement_filepath 配置项引用该文件。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
				input {
    jdbc {
        jdbc_driver_library => "/path/to/driver.jar"
        jdbc_driver_class => "org.postgresql.Driver"
        jdbc_url => "jdbc://postgresql"
        jdbc_user => "neo"
        jdbc_password => "password"
        statement_filepath => "query.sql"
    }
}				
23.8.5.6. 参数传递

将需要复制的条件参数写入 parameters 配置项

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
				input {
  jdbc {
    jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"
    jdbc_user => "mysql"
    parameters => { "favorite_artist" => "Beethoven" }
    schedule => "* * * * *"
    statement => "SELECT * from songs where artist = :favorite_artist"
  }
}				
23.8.5.7. 控制返回JDBC数据量
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
	jdbc_fetch_size => 1000  #jdbc获取数据的数量大小
	jdbc_page_size => 1000 #jdbc一页的大小,
	jdbc_paging_enabled => true  #和jdbc_page_size组合,将statement的查询分解成多个查询,相当于: SELECT * FROM table LIMIT 1000 OFFSET 4000 				

23.8.6. example

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
			input {
  jdbc {
    jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
    jdbc_user => "cms"
    jdbc_password => "password"
    schedule => "* * * * *"	#定时cron的表达式,这里是每分钟执行一次
    statement => "select id, title, description, author, source, ctime, content from article where id > :sql_last_value"
    use_column_value => true
    tracking_column => "id"
    tracking_column_type => "numeric" 
    record_last_run => true
    last_run_metadata_path => "/var/tmp/article.last"
  }
}
output {
    elasticsearch {
    		hosts => "localhost:9200"
        index => "information"
        document_type => "article"
        document_id => "%{id}"
        action => "update"  # 操作执行的动作,可选值有["index", "delete", "create", "update"]
        doc_as_upsert => true  #支持update模式
    }
}
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2017-07-31,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Netkiller 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
1 条评论
热度
最新
怎么联系你
怎么联系你
回复回复点赞举报
推荐阅读
编辑精选文章
换一批
TienChin 活动管理-添加活动页面
直接将原有的 index.vue 的全部内容替换成下面的,这里先替换,我只是补齐文档,后面新模块开发我会一步一步进行记录起来:
程序员NEO
2023/10/12
6650
猿实战08——属性库实现之属性关系绑定
属性和属性值,看上去很不起眼,数据粒度也很小,但是正式因为数据粒度小,灵活多变,组织得当可以强有力的区分千变万化的商品。
山旮旯的胖子
2020/09/05
8800
猿实战08——属性库实现之属性关系绑定
TienChin 活动管理-活动列表展示
程序员NEO
2023/10/12
5510
TienChin 活动管理-活动列表展示
vue2.0+Element-ui实战案例
我们将会选择使用一些 vue 周边的库vue-cli, vue-router,axios,moment,Element-ui搭建一个前端项目案例,后端数据接口,会使用json-server快速搭建一个本地的服务,方便对数据的增删改查,
小周sir
2019/09/23
2.3K0
vue2.0+Element-ui实战案例
基于HTML+CSS+JavaScript角色后台管理系统设计毕业论文源码
✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 💂 作者主页: 【主页——🚀获取更多优质源码】 🎓 web前端期末大作业: 【📚毕设项目精品实战案例 (1000套) 】 🧡 程序员有趣的告白方式:【💌HTML七夕情人节表白网页制作 (110套) 】 🌎超炫酷的Echarts大屏可视化源码:【🔰 echarts大屏展示大数据平台可视化(150套) 】 🎁 免费且实用的WEB前端学习指南: 【📂web前端零基础到高级学习视频教程 120G干货分享】 🥇 关于作者: 历任研发工程师
IT司马青衫
2022/08/16
1.2K0
基于HTML+CSS+JavaScript角色后台管理系统设计毕业论文源码
ElementUI Dialog 对话框,组件之间传值
Dialog 组件的内容可以是任意的,甚至可以是表格或表单,下面是应用了 Element Table 和 Form 组件的两个样例。
py3study
2021/01/29
4.9K0
手把手教你实现一个Vue无限级联树形表格(增删改)
平时我们可能在做项目时,会遇到一个业务逻辑。实现一个无限级联树形表格,什么叫做无限级联树形表格呢?就是下图所展示的内容,有一个祖元素,然后下面可能有很多子孙元素,你可以实现添加、编辑、删除这样几个功能。
马克社区
2022/05/11
1.6K0
el-table 多表格弹窗嵌套数据显示异常错乱问题
使用vue+element开发报表功能时,需要列表上某列的超链接按钮弹窗展示,在弹窗的el-table列表某列中再次使用超链接按钮点开弹窗,以此类推多表格弹窗嵌套,本文以弹窗两次为例 最终效果如下示例页面
GoodTime
2024/03/05
3230
el-table 多表格弹窗嵌套数据显示异常错乱问题
vue的修饰符!sync和el-dialog弹窗组件
父组件 index.vue: <template> <info :value="myValue" @valueChanged="e => myValue = e"></info> </template> <script> inport info from './info.vue'; export default { components: { info, }, data() { return {
leader755
2022/03/09
7830
微服务项目:尚融宝(42)(核心业务流程:借款额度审批(2))
创建 src/views/core/borrow-info/detail.vue 
一个风轻云淡
2022/11/15
3870
微服务项目:尚融宝(42)(核心业务流程:借款额度审批(2))
实现表格行的拖拽以及分页
在做一些后台管理系统时,表格的数据信息展示是很常见的需求,而对应的都是一些增删改查的操作
itclanCoder
2021/12/06
3.1K0
实现表格行的拖拽以及分页
猿实战13——实现你没听说过的前台类目
上几个章节,猿人君教会了你实现了属性/属性值和后台类目的绑定关系,今天,猿人君就带你来实现前台类目。
山旮旯的胖子
2020/09/23
6720
猿实战13——实现你没听说过的前台类目
Vue电商实践项目(二)
1.实现后台首页的基本布局 2.实现左侧菜单栏 3.实现用户列表展示 4.实现添加用户
用户6808043
2022/02/24
5.1K0
vue表格分页以及增删改查的实际应用
效果 1:表格以及分页 2:增加一条数据 3:删除一条数据 4:修改一条数据 5:查询一条数据 实例: <template> <div class="tab-container"> <d
王小婷
2021/03/17
1.9K0
TienChin-课程管理-添加课程页面
这个 index.vue 是 course 文件夹下面的 index.vue 别弄错了。
程序员NEO
2023/10/12
2200
Vue + Element ui 实现动态表单,包括新增行/删除行/动态表单验证/提交功能
最近通过Vue + Element ui实现了动态表单功能,该功能还包括了动态表单新增行、删除行、动态表单验证、动态表单提交功能,趁热打铁,将开发心得记录下来,方便以后再遇到类似功能时,直接拿来应用。
朱季谦
2023/07/21
6.4K0
Vue + Element ui 实现动态表单,包括新增行/删除行/动态表单验证/提交功能
vue3+element-plus 表格table实现树状结构父子级互不影响
用户5899361
2024/01/31
1.1K0
element ui 图片上传封装多张或单张
最近写了一个后台管理项目,发现每个后台项目都离不开上传图片,决定把上传图片做个封装,话不多说直接上代码!
前端小白@阿强
2020/08/11
2.4K0
手把手教你实现一个Vue无限级联树形表格(增删改)
平时我们可能在做项目时,会遇到一个业务逻辑。实现一个无限级联树形表格,什么叫做无限级联树形表格呢?就是下图所展示的内容,有一个祖元素,然后下面可能有很多子孙元素,你可以实现添加、编辑、删除这样几个功能。
Vam的金豆之路
2021/12/01
5880
手把手教你实现一个Vue无限级联树形表格(增删改)
不需要web服务器,如何构建一个可以内部跨域的http服务(Vue+Flask)
前端把需要测试的接口地址,报文通过axios 发送给后端Flask服务,Flask服务通过 requests 模块实现测试
山河已无恙
2023/03/02
8560
推荐阅读
相关推荐
TienChin 活动管理-添加活动页面
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验