5.X版本后新增Reindex。Reindex可以直接在Elasticsearch集群里面对数据进行重建,如果你的mapping因为修改而需要重建,又或者索引设置修改需要重建的时候,借助Reindex可以很方便的异步进行重建,并且支持跨集群间的数据迁移。
官方文档:https://www.elastic.co/guide/en/elasticsearch/reference/7.3/docs-reindex.html
在我们开发的过程中,我们有很多时候需要用到 Reindex 接口。它可以帮我们把数据从一个 index 到另外一个 index 进行重新reindex。这个对于特别适用于我们在修改我们数据的 mapping 后,需要重新把数据从现有的 index 转到新的 index 建立新的索引,这是因为我们不能修改现有的 index 的 mapping
为了能够使用 reindex 接口,我们必须满足一下的条件:
_reindex
操作之前设置目标索引,包括设置映射,分片数,副本等。POST _reindex
{
"source": {
"index": "twitter"
},
"dest": {
"index": "new_twitter"
}
}
- 通过加入 query 到 source 中
- 通过定义 max_docs 参数
POST _reindex
{
"max_docs": 100,
"source": {
"index": "twitter2",
"query": {
"match": {
"city": "北京"
}
}
},
"dest": {
"index": "twitter3"
}
}
这里,我们定义最多不超过100个文档,同时,我们只拷贝来自“北京”的 twitter 记录。
设置 op_type to create 将导致 _reindex 仅在目标索引中创建缺少的文档。 所有现有文档都会导致版本冲突,比如:
POST _reindex
{
"source": {
"index": "twitter2"
},
"dest": {
"index": "twitter3",
"op_type": "create"
}
}
结果:
{
"took":2,
"timed_out":false,
"total":1,
"updated":0,
"created":0,
"deleted":0,
"batches":1,
"version_conflicts":1,
"noops":0,
"retries":{
"bulk":0,
"search":0
},
"throttled_millis":0,
"requests_per_second":-1,
"throttled_until_millis":0,
"failures":[
{
"index":"twitter3",
"type":"_doc",
"id":"1",
"cause":{
"type":"version_conflict_engine_exception",
"reason":"[1]: version conflict, document already exists (current version [5])",
"index_uuid":"ffz2LNIIQqqDx211R5f4fQ",
"shard":"0",
"index":"twitter3"
},
"status":409
}
]
}
它表明我们之前的文档 id 为1的有版本上的冲突。
默认情况下,版本冲突会中止 _reindex 进程。 “conflict” 请求 body 参数可用于指示 _reindex 继续处理版本冲突的下一个文档。 请务必注意,其他错误类型的处理不受 “conflict” 参数的影响。 当 “conflict”:在请求正文中设置 “proceed” 时, _reindex 进程将继续发生版本冲突并返回遇到的版本冲突计数:
POST _reindex
{
"conflicts": "proceed",
"source": {
"index": "twitter"
},
"dest": {
"index": "new_twitter",
"op_type": "create"
}
}
将version_type设置为internal,则Elasticsearch强制性的将文档转储到目标索引中,覆盖具有相同类型和ID的任何内容:
POST _reindex
{
"source": {
"index": "twitter"
},
"dest": {
"index": "new_twitter",
"version_type": "internal"
}
}
将version_type设置为external,则将导致Elasticsearch从源文件中保留版本,创建缺失的所有文档,并更新在目标索引中比源索引中版本更老的所有文档:
POST _reindex
{
"source": {
"index": "twitter"
},
"dest": {
"index": "new_twitter",
"version_type": "external"
}
}
source中的index和type都可以是一个列表,允许您在一个请求中从大量的来源进行复制。下面将从twitter和blog索引中的tweet和post类型中复制文档。它也包含twitter索引中post类型以及blog索引中的tweet类型。
POST _reindex
{
"source": {
"index": ["twitter", "blog"],
"type": ["tweet", "post"]
},
"dest": {
"index": "all_together"
}
}
还可以通过设置大小限制处理的文档的数量。下面只会将单个文档从twitter复制到new_twitter:
POST _reindex
{
"size": 1,
"source": {
"index": "twitter"
},
"dest": {
"index": "new_twitter"
}
}
如果你想要从twitter索引获得一个特定的文档集合你需要排序。排序使滚动效率更低,但在某些情况下它是值得的。如果可能,更喜欢更多的选择性查询size和sort。这将从twitter复制10000个文档到new_twitter:
POST _reindex
{
"size": 10000,
"source": {
"index": "twitter",
"sort": { "date": "desc" }
},
"dest": {
"index": "new_twitter"
}
}
source部分支持搜索请求中支持的所有元素。例如,只使用原始文档的一部分字段,使用源过滤如下所示:
POST _reindex
{
"source": {
"index": "twitter",
"_source": ["user", "tweet"]
},
"dest": {
"index": "new_twitter"
}
}
默认情况下,_reindex
批量滚动处理大小为1000
.您可以在source
元素中指定size
字段来更改批量处理大小:
POST _reindex
{
"source": {
"index": "source",
"size": 100 #batch size 这里在机器资源允许的条件下,搞大点
},
"dest": {
"index": "dest",
"routing": "=cat"
}
}
reindex是耗费性能的。借助:scroll+bulk实现。
优化建议:重索引下建议这个size设置大点
Reindex也可以使用[Ingest Node]功能来指定pipeline
, 就像这样:
POST _reindex
{
"source": {
"index": "source"
},
"dest": {
"index": "dest",
"pipeline": "some_ingest_pipeline"
}
}
重新索引大量文档可能会使你的群集泛滥甚至崩溃。requests_per_second 限制索引操作速率。
POST _reindex?requests_per_second=500
{
"source": {
"index": "blogs",
"size": 500
},
"dest": {
"index": "blogs_fixed"
}
}
Reindex支持从远程Elasticsearch群集重建索引:
POST _reindex
{
"source": {
"remote": {
"host": "http://remote_cluster_node1:9200",
"username": "user",
"password": "pass"
},
"index": "source",
"query": {
"match": {
"test": "data"
}
}
},
"dest": {
"index": "dest"
}
}
host参数必须包含scheme,host和port(例如 https://remote_cluster_node1:9200)。用户名和密码参数是可选的,当它们存在时,索引将使用基本认证连接到远程Elasticsearch节点。使用基本认证时请务必使用https,密码将以纯文本格式发送。
必须在elasticsearch.yaml中使用reindex.remote.whitelist属性将远程主机明确列入白名单。它可以设置为允许的远程host和port组合的逗号分隔列表(例如otherhost:9200,another:9200,127.0.10.*:9200,localhost:*)。白名单忽略了scheme ——仅使用主机和端口。
来自远程服务器的重新索引使用默认为最大大小为100mb的堆栈缓冲区。如果远程索引包含非常大的文档,则需要使用较小的批量大小。下面的示例设置非常非常小的批量大小10。
POST _reindex
{
"source": {
"remote": {
"host": "http://otherhost:9200"
},
"index": "source",
"size": 10,
"query": {
"match": {
"test": "data"
}
}
},
"dest": {
"index": "dest"
}
}
也可以使用socket_timeout字段在远程连接上设置socket的读取超时,并使用connect_timeout字段设置连接超时。两者默认为三十秒。此示例将套接字读取超时设置为一分钟,并将连接超时设置为十秒:
POST _reindex
{
"source": {
"remote": {
"host": "http://otherhost:9200",
"socket_timeout": "1m",
"connect_timeout": "10s"
},
"index": "source",
"query": {
"match": {
"test": "data"
}
}
},
"dest": {
"index": "dest"
}
}
reindex支持修改文档的脚本,脚本允许修改文档的元数据。此示例修改了源文档的版本:
POST _reindex
{
"source": {
"index": "twitter"
},
"dest": {
"index": "new_twitter",
"version_type": "external"
},
"script": {
"inline": "if (ctx._source.foo == 'bar') {ctx._version++; ctx._source.remove('foo')}",
"lang": "painless"
}
}
POST test/test/1?refresh
{
"text": "words words",
"flag": "foo"
}
但是你不喜欢这个flag名称,而是要用tag替换它。 _reindex可以为您创建其他索引:
POST _reindex
{
"source": {
"index": "test"
},
"dest": {
"index": "test2"
},
"script": {
"inline": "ctx._source.tag = ctx._source.remove(\"flag\")"
}
}
现在你可以得到新的文件:
GET test2/test/1
{
"found": true,
"_id": "1",
"_index": "test2",
"_type": "test",
"_version": 1,
"_source": {
"text": "words words",
"tag": "foo"
}
}
可以设置ctx.op来更改在目标索引上执行的操作:
新索引需要加一个scope字段,并在reindex过程中给一个默认值。
PUT _ingest/pipeline/defaultvalue
{
"description": "set default scope value",
"processors": [
{
"set": {
"field": "scope",
"value": "suifang",
"override": false
}
}
]
}
POST _reindex
{
"source": {
"index": "sphinx-zhuanti-20.06.29-144128"
},
"dest": {
"index": "sphinx-zhuanti-21.01.29-190212",
"pipeline": "defaultvalue"
}
}
POST _reindex
{
"source": {
"index": "sphinx-zhuanti-20.06.29-144128"
},
"dest": {
"index": "sphinx-zhuanti-21.01.29-190921"
},
"script": {
"source": "ctx._source.scope = 'suifang';"
}
}
默认情况下,如果_reindex看到具有路由的文档,则路由将被保留,除非脚本被更改。您可以根据dest请求设置routing来更改:
例如,您可以使用以下请求将source索引的所有公司名称为cat的文档复制到路由设置为cat的dest索引。
POST _reindex
{
"source": {
"index": "source",
"query": {
"match": {
"company": "cat"
}
}
},
"dest": {
"index": "dest",
"routing": "=cat"
}
}
重建索引支持滚动切片,您可以相对轻松地手动并行化处理:
POST _reindex
{
"source": {
"index": "twitter",
"slice": {
"id": 0,
"max": 2
}
},
"dest": {
"index": "new_twitter"
}
}
POST _reindex
{
"source": {
"index": "twitter",
"slice": {
"id": 1,
"max": 2
}
},
"dest": {
"index": "new_twitter"
}
}
您可以通过以下方式验证:
GET _refresh
POST new_twitter/_search?size=0&filter_path=hits.total
其结果一个合理的total像这样:
{
"hits": {
"total": 120
}
}
你还可以让重建索引使用切片的_uid
来自动并行的滚动切片。
POST _reindex?slices=5&refresh
{
"source": {
"index": "twitter"
},
"dest": {
"index": "new_twitter"
}
}
您可以通过以下方式验证:
POST new_twitter/_search?size=0&filter_path=hits.total
其结果一个合理的total像这样:
{
"hits": {
"total": 120
}
}
在这一点上,我们围绕要使用的slices
数量提供了一些建议(比如手动并行化时,切片API中的max
参数):
500
就能造成相当大的CPU抖动。slices
数量线性扩展。reindexing
的集群。虽然这个不在我们的 reindex 介绍范围,但是在有些情况下,我们可以可以通过 _update_by_query API 来让我们轻松地更新一个字段的值:
POST blogs_fixed/_update_by_query
{
"query": {
"match": {
"category.keyword": ""
}
},
"script": {
"source": """ ctx._source['category'] = "None" """
}
}
在上面,把 category.keyword 项为空的所有文档的 category 通过脚本设置为默认的 "None" 字符串。它和 reindex 的作用相似。
假设我们要向 twitter_new 索引的 mapping 添加一个多字段(multi-field)
PUT new_new/_mapping
{
"properties": {
"content": {
"type": "text",
"fields": {
"english": {
"type": "text",
"analyzer": "english"
}
}
}
}
}
在上面我们为 content 字段添加了一个新的 english 字段,并且它的 analyzer 为 english。
由于 mapping 已经发生改变,但是索引中已经有的文档没有这个新的字段 english,如果这个时候我们进行如下的搜索,将不会找到任何的结果:
GET twitter_new/_search
{
"query": {
"match": {
"content.english": "performance tips"
}
}
}
那么我们该如何使得索引中现有的文档都有 content.english 这个字段呢?运行 _update_by_query 以拥有现有文档选择新的 “content.english” 字段:
POST twitter_new/_update_by_query
当我们完成上面的请求后,然后再执行如下的操作,将会在twitter_new 索引中搜索到想要的文档:
GET twitter_new/_search
{
"query": {
"match": {
"content.english": "performance tips"
}
}
}
reindex的核心做跨索引、跨集群的数据迁移。 慢的原因及优化思路无非包括:
默认情况下,_reindex使用1000进行批量操作,您可以在source中调整batch_size。
POST _reindex
{
"source": {
"index": "source",
"size": 5000
},
"dest": {
"index": "dest",
"routing": "=cat"
}
}
如果要进行大量批量导入,请考虑通过设置index.number_of_replicas来禁用副本:0。
主要原因在于:复制文档时,将整个文档发送到副本节点,并逐字重复索引过程。 这意味着每个副本都将执行分析,索引和潜在合并过程。 相反,如果您使用零副本进行索引,然后在提取完成时启用副本,则恢复过程本质上是逐字节的网络传输。 这比复制索引过程更有效。
PUT /my_logs/_settings
{
"number_of_replicas": 0
}
如:
PUT /regroupmembers-20.11.12-151612/_settings
{
"number_of_replicas": 0
}
如果你的搜索结果不需要接近实时的准确性,考虑先不要急于索引刷新refresh。默认值是1s,在做reindex时可以将每个索引的refresh_interval到30s或禁用(-1)。
如果正在进行大量数据导入,reindex就是此场景,先将此值设置为-1来禁用刷新。
设置方法:
PUT /index_name/_settings
{ "refresh_interval": -1 }
还原方法:
PUT /index_name/_settings
{ "refresh_interval": "30s" }
1.设置Refresh:
PUT /regroupmembers-20.11.23-000000/_settings
{
"refresh_interval": "30s"
}
2.设置Batch_size:
POST _reindex
{
"source": {
"index": "regroupmembers-20.05.28-142940",
"size": 4000
},
"dest": {
"index": "regroupmembers-20.11.23-000000"
}
}
3.设置副本分片:0
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。