根据业务需求,存在以下场景:
如果是第一种场景,数据迁移过程中可以停止写入,可以采用诸如elasticsearch-dump、logstash、reindex、snapshot等方式进行数据迁移。实际上这几种工具大体上可以分为两类:
如果是第二种场景,数据迁移过程中旧集群不能停止写入,需要根据实际的业务场景解决数据一致性的问题:
离线迁移需要先停止老集群的写操作,将数据迁移完毕后在新集群上进行读写操作。适合于业务可以停服的场景。
离线迁移大概有以下几种方式:
下面介绍一下在旧集群可以停止写入的情况下进行数据迁移的几种工具的用法。
适合数据量不大,迁移索引个数不多的场景
elasticsearch-dump是一款开源的ES数据迁移工具,github地址: https://github.com/taskrabbit/elasticsearch-dump
elasticsearch-dump使用node.js开发,可使用npm包管理工具直接安装:
npm install elasticdump -g
主要参数说明:
--input: 源地址,可为ES集群URL、文件或stdin,可指定索引,格式为:{protocol}://{host}:{port}/{index}
--input-index: 源ES集群中的索引
--output: 目标地址,可为ES集群地址URL、文件或stdout,可指定索引,格式为:{protocol}://{host}:{port}/{index}
--output-index: 目标ES集群的索引
--type: 迁移类型,默认为data,表明只迁移数据,可选settings, analyzer, data, mapping, alias
--limit:每次向目标ES集群写入数据的条数,不可设置的过大,以免bulk队列写满
以下操作通过elasticdump命令将集群172.16.0.39中的companydatabase索引迁移至集群172.16.0.20。注意第一条命令先将索引的settings先迁移,如果直接迁移mapping或者data将失去原有集群中索引的配置信息如分片数量和副本数量等,当然也可以直接在目标集群中将索引创建完毕后再同步mapping与data。
elasticdump --input=http://172.16.0.39:9200/companydatabase --output=http://172.16.0.20:9200/companydatabase --type=settings
elasticdump --input=http://172.16.0.39:9200/companydatabase --output=http://172.16.0.20:9200/companydatabase --type=mapping
elasticdump --input=http://172.16.0.39:9200/companydatabase --output=http://172.16.0.20:9200/companydatabase --type=data
以下操作通过elasticdump命令将将集群172.16.0.39中的所有索引迁移至集群172.16.0.20。 注意此操作并不能迁移索引的配置如分片数量和副本数量,必须对每个索引单独进行配置的迁移,或者直接在目标集群中将索引创建完毕后再迁移数据。
elasticdump --input=http://172.16.0.39:9200 --output=http://172.16.0.20:9200
logstash支持从一个ES集群中读取数据然后写入到另一个ES集群,因此可以使用logstash进行数据迁移,具体的配置文件如下:
input {
elasticsearch {
hosts => ["http://x.x.x.1:9200"]
index => "*"
docinfo => true
}
}
output {
elasticsearch {
hosts => ["http://x.x.x.2:9200"]
index => "%{[@metadata][_index]}"
}
}
上述配置文件将源ES集群的所有索引同步到目标集群中,当然可以设置只同步指定的索引,logstash的更多功能可查阅logstash官方文档 logstash 官方文档.
reindex是Elasticsearch提供的一个api接口,可以把数据从一个集群迁移到另外一个集群。
需要在目标ES集群中配置该参数,指明能够reindex的远程集群的白名单
以下操作表示从源ES集群中查询名为test1的索引,查询条件为title字段为elasticsearch,将结果写入当前集群的test2索引
POST _reindex
{
"source": {
"remote": {
"host": "http://x.x.x.1:9200"
},
"index": "test1",
"query": {
"match": {
"title": "elasticsearch"
}
}
},
"dest": {
"index": "test2"
}
}
适用数据量大的场景
snapshot api是Elasticsearch用于对数据进行备份和恢复的一组api接口,可以通过snapshot api进行跨集群的数据迁移,原理就是从源ES集群创建数据快照,然后在目标ES集群中进行恢复。需要注意ES的版本问题:
目标ES集群的主版本号(如5.6.4中的5为主版本号)要大于等于源ES集群的主版本号;
1.x版本的集群创建的快照不能在5.x版本中恢复;
创建快照前必须先创建repository仓库,一个repository仓库可以包含多份快照文件,repository主要有一下几种类型
fs: 共享文件系统,将快照文件存放于文件系统中
url: 指定文件系统的URL路径,支持协议:http,https,ftp,file,jar
s3: AWS S3对象存储,快照存放于S3中,以插件形式支持
hdfs: 快照存放于hdfs中,以插件形式支持
cos: 快照存放于腾讯云COS对象存储中,以插件形式支持
如果需要从自建ES集群迁移至腾讯云的ES集群,可以直接使用fs类型仓库,注意需要在Elasticsearch配置文件elasticsearch.yml设置仓库路径:
path.repo: ["/usr/local/services/test"]
之后调用snapshot api创建repository:
curl -XPUT http://172.16.0.39:9200/_snapshot/my_backup -H 'Content-Type: application/json' -d '{
"type": "fs",
"settings": {
"location": "/usr/local/services/test"
"compress": true
}
}'
如果需要从其它云厂商的ES集群迁移至腾讯云ES集群,或者腾讯云内部的ES集群迁移,可以使用对应云厂商他提供的仓库类型,如AWS的S3, 阿里云的OSS,腾讯云的COS等
curl -XPUT http://172.16.0.39:9200/_snapshot/my_s3_repository
{
"type": "s3",
"settings": {
"bucket": "my_bucket_name",
"region": "us-west"
}
}
调用snapshot api在创建好的仓库中创建快照
curl -XPUT http://172.16.0.39:9200/_snapshot/my_backup/snapshot_1?wait_for_completion=true
创建快照可以指定索引,也可以指定快照中包含哪些内容,具体的api接口参数可以查阅官方文档
目标ES集群中创建仓库和在源ES集群中创建仓库类似,用户可在腾讯云上创建COS对象bucket, 将仓库将在COS的某个bucket下。
把源ES集群创建好的snapshot上传至目标ES集群创建好的仓库中
curl -XPUT http://172.16.0.20:9200/_snapshot/my_backup/snapshot_1/_restore
curl http://172.16.0.20:9200/_snapshot/_status
如果旧集群不能停止写入,此时进行在线数据迁移,需要保证新旧集群的数据一致性。目前看来,除了官方提供的CCR功能,没有成熟的可以严格保证数据一致性的在线数据迁移方法。此时可以从业务场景出发,根据业务写入数据的特点选择合适的数据迁移方案。
一般来说,业务写入数据的特点有以下几种:
在日志或者APM的场景中,数据都是时序数据,一般索引也都是按天创建的,当天的数据只会写入当前的索引中。此时,可以先把存量的不再写入的索引数据一次性同步到新集群中,然后使用logstash或者其它工具增量同步当天的索引,待数据追平后,把业务对ES的访问切换到新集群中。
具体的实现方案为:
add only的数据写入方式,可以按照数据写入的顺序(根据_doc进行排序,如果有时间戳字段也可以根据时间戳排序)批量从旧集群中拉取数据,然后再批量写入新集群中;可以通过写程序,使用用scroll api 或者search_after参数批量拉取增量数据,再使用bulk api批量写入。
使用scroll拉取增量数据:
POST {my\_index}/_search?scroll=1m
{
"size":"100",
"query": {
"range": {
"timestamp": {
"gte": "now-1m",
"lt": "now/m"
}
}
}
}
POST _search/scroll
{
"scroll": "1m",
"scroll_id": "DnF1ZXJ5VGhlbkZldGNoBQAAAAAAHCbaFndPR3J4bDJtVDh1bnNRaW5yYWZBWncAAAAAABwm2RZ3T0dyeGwybVQ4dW5zUWlucmFmQVp3AAAAAAAcJtwWd09HcnhsMm1UOHVuc1FpbnJhZkFadwAAAAAAHCbbFndPR3J4bDJtVDh1bnNRaW5yYWZBWncAAAAAABwm3RZ3T0dyeGwybVQ4dW5zUWlucmFmQVp3"
}
上述操作可以每分钟执行一次,拉起前一分钟新产生的数据,所以数据在旧集群和新集群的同步延迟为一分钟。
使用search_after批量拉取增量数据:
POST {my\_index}/_search
{
"size":100,
"query": {
"match_all": {}
},
"search_after": [
1569556667000
],
"sort": "timestamp"
}
上述操作可以根据需要自定义事件间隔执行,每次执行时修改search_after参数的值,获取指定值之后的多条数据;search_after实际上相当于一个游标,每执行一次向前推进,从而获取到最新的数据。
使用scroll和search_after的区别是:
另外,如果不想通过写程序迁移旧集群的增量数据到新集群的话,可以使用logstash结合scroll进行增量数据的迁移,可参考的配置文件如下:
input {
elasticsearch {
hosts => "x.x.x.1:9200"
index => "my_index"
query => '{"query":{"range":{"timestamp":{"gte":"now-1m","lt":"now/m"}}}}'
size => 100
scroll => "1m"
docinfo => true
schedule => "*/1 * * * *" #定时任务,每分钟执行一次
}
}
output {
elasticsearch {
hosts => "x.x.x.2:9200"
index => "%{[@metadata][_index]}"
document_type => "%{[@metadata][_type]}"
document_id => "%{[@metadata][_id]}"
}
}
使用过程中可以根据实际业务的需求调整定时任务参数schedule以及scroll相关的参数。
业务场景如果是写入ES时既有追加,又有存量数据的更新,此时比较重要的是怎么解决update操作的数据同步问题。对于新增的数据,可以采用上述介绍的增量迁移热索引的方式同步到新集群中。对于更新的数据,此时如果索引有类似于updateTime的字段用于标记数据更新的时间,则可以通过写程序或者logstash,使用scroll api根据updateTime字段批量拉取更新的增量数据,然后再写入到新的集群中。
可参考的logstash配置文件如下:
input {
elasticsearch {
hosts => "x.x.x.1:9200"
index => "my_index"
query => '{"query":{"range":{"updateTime":{"gte":"now-1m","lt":"now/m"}}}}'
size => 100
scroll => "1m"
docinfo => true
schedule => "*/1 * * * *" #定时任务,每分钟执行一次
}
}
output {
elasticsearch {
hosts => "x.x.x.2:9200"
index => "%{[@metadata][_index]}"
document_type => "%{[@metadata][_type]}"
document_id => "%{[@metadata][_id]}"
}
}
实际应用各种,同步新增(add)的数据和更新(update)的数据可以同时进行。但是如果索引中没有类似updateTime之类的字段可以标识出哪些数据是更新过的,目前看来并没有较好的同步方式,可以采用CCR来保证旧集群和新集群的数据一致性。
如果业务写入ES时既有新增(add)数据,又有更新(update)和删除(delete)数据,可以采用6.5之后商业版X-pack插件中的CCR功能进行数据迁移。但是使用CCR有一些限制,必须要注意:
具体的使用方式如下:
PUT /_cluster/settings
{
"persistent" : {
"cluster" : {
"remote" : {
"leader" : {
"seeds" : [
"x.x.x.1:9300"
]
}
}
}
}
}
PUT my_leader_indx
{
"settings":{
"index.soft_deletes.enabled": true
}
}
PUT my_follower_index/_ccr/follow?wait_for_active_shards=1
{
"remote_cluster" : "leader",
"leader_index" : "my_leader_indx"
}
GET my_follower_index/_ccr/stats
如果业务是通过中间件如kafka把数据写入到ES, 则可以使用如下图中的方式,使用logstash消费kafka的数据到新集群中,在旧集群和新集群数据完全追平之后,可以切换到新集群进行业务的查询,之后再对旧的集群下线处理。
使用中间件进行同步双写的优点是:
本文系转载,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文系转载,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有