本文基于Elasticsearch 7.14版本源码。
在 Elasticsearch 7.14 中,Reindex API是一个核心数据管理工具,用于跨索引高效复制,转换文档或索引迁移。它通过后台并行处理机制实现大规模数据迁移,同时提供丰富的控制选项。
只需要指定源索引与目的索引,其余参数均采用默认值。
POST _reindex
{
"source": {
"index": "my-index-000001"
},
"dest": {
"index": "my-new-index-000001"
}
}
针对每个请求指定切片ID与切片总数。来作为reindex请求的任务切片参数。
POST _reindex
{
"source": {
"index": "my-index-000001",
"slice": {
"id": 0,
"max": 2
}
},
"dest": {
"index": "my-new-index-000001"
}
}
POST _reindex
{
"source": {
"index": "my-index-000001",
"slice": {
"id": 1,
"max": 2
}
},
"dest": {
"index": "my-new-index-000001"
}
}
使用Sliced scroll对_id进行任务切片,使其并行执行。使用slices指定切片总数。
POST _reindex?slices=5&refresh
{
"source": {
"index": "my-index-000001"
},
"dest": {
"index": "my-new-index-000001"
}
}
在执行reindex时如果文档带有路由信息,在指定路由信息后,该文档的路由信息会被保留。
POST _reindex
{
"source": {
"index": "source",
"query": {
"match": {
"company": "cat"
}
}
},
"dest": {
"index": "dest",
"routing": "=cat"
}
}
# 指定batch_size进行reindex的文档路由。
POST _reindex
{
"source": {
"index": "source",
"size": 100
},
"dest": {
"index": "dest",
"routing": "=cat"
}
}
如果需要再reindex的过程中加工查询到的数据,然后在写入目的索引。就可以指定管道对数据进行加工。
POST _reindex
{
"source": {
"index": "source"
},
"dest": {
"index": "dest",
"pipeline": "some_ingest_pipeline"
}
}
在复制数据过程中,如果只需要复制每条数据中的部分字段,我们可以使用指定_source的方式对字段进行筛选。
POST _reindex
{
"source": {
"index": "my-index-000001",
"_source": ["user.id", "_doc"]
},
"dest": {
"index": "my-new-index-000001"
}
}
在进行reindex时,如果需要应用最新模板配置或对文档进行修改后再写入目的索引。则可以使用该方式。
POST _reindex
{
"source": {
"index": "metricbeat-*"
},
"dest": {
"index": "metricbeat"
},
"script": {
"lang": "painless",
"source": "ctx._index = 'metricbeat-' + (ctx._index.substring('metricbeat-'.length(), ctx._index.length())) + '-1'"
}
}
# 使用script对文档进行修改。
POST _reindex
{
"source": {
"index": "my-index-000001"
},
"dest": {
"index": "my-new-index-000001",
"version_type": "external"
},
"script": {
"source": "if (ctx._source.foo == 'bar') {ctx._version++; ctx._source.remove('foo')}",
"lang": "painless"
}
}
POST _reindex
{
"source": {
"remote": {
"host": "http://otherhost:9200",
"username": "user",
"password": "pass"
},
"index": "my-index-000001",
"query": {
"match": {
"test": "data"
}
}
},
"dest": {
"index": "my-new-index-000001"
}
}
Query parameters | 含义 |
---|---|
refresh | 默认值:false;设置为true会立即刷新索引分片。 |
timeout | 索引等待操作的超时时间(Automatic index creation,Dynamic mapping updates,Waiting for active shards) |
wait_for_active_shards | 操作之前必须处于活动状态的分片副本数。默认值为1,代表必须主分片活跃。 |
wait_for_completion | 是否阻塞当前任务指导操作完成。默认为true。如果设置为false,则会返回taskID,在后台执行该任务。 |
requests_per_second | 请求流量的限制(以每秒子请求数为统计单位),默认值:-1(无限制) |
require_alias | 如果设置为true,则目标索引必须为索引别名。默认值:false |
scroll | 指定滚动搜索维持一致性的时间。 |
slices | 用于指定当前任务被划分为多少子任务的切片数。默认值:1,代表当前任务不会在切分子任务。 |
max_docs | 需要处理的最大文档数。默认是全部文档。 |
从调用reindex API发起reindex任务,至reindex任务执行完成大致分为以下几个阶段:
POST _reindex
)在接受到客户端提交的请求并验证完成后,Elasticsearch便要开始基于客户端提交的语句开始请求的构造。
任务生成与交互流程图如下图所示:
ReindexAction.java
public class ReindexAction extends ActionType<BulkByScrollResponse> {
public static final ReindexAction INSTANCE = new ReindexAction();
public static final String NAME = "indices:data/write/reindex";
private ReindexAction() {
super(NAME, BulkByScrollResponse::new);
}
}
用于Elasticsearch中的重新索引操作,通过BulkByScrollResponse来响应reindex请求。
主要功能包括:
ReindexRequest.java
这个文件主要用于 Elasticsearch 中的 reindex 操作。它封装了源索引和目标索引的信息、查询条件、版本控制、路由、远程集群信息等,并提供了构建、验证、序列化和反序列化方法。支持设置分片、文档数量限制、脚本处理等功能,确保 reindex 任务正确执行。
public ActionRequestValidationException validate() {
ActionRequestValidationException e = super.validate();
if (getSearchRequest().indices() == null || getSearchRequest().indices().length == 0) {
e = addValidationError("use _all if you really want to copy from all existing indexes", e);
}
if (getSearchRequest().source().fetchSource() != null && getSearchRequest().source().fetchSource().fetchSource() == false) {
e = addValidationError("_source:false is not supported in this context", e);
}
/*
* Note that we don't call index's validator - it won't work because
* we'll be filling in portions of it as we receive the docs. But we can
* validate some things so we do that below.
*/
if (destination.index() == null) {
e = addValidationError("index must be specified", e);
return e;
}
if (false == routingIsValid()) {
e = addValidationError("routing must be unset, [keep], [discard] or [=<some new value>]", e);
}
if (destination.versionType() == INTERNAL) {
if (destination.version() != Versions.MATCH_ANY && destination.version() != Versions.MATCH_DELETED) {
e = addValidationError("unsupported version for internal versioning [" + destination.version() + ']', e);
}
}
if (getRemoteInfo() != null) {
if (getSearchRequest().source().query() != null) {
e = addValidationError("reindex from remote sources should use RemoteInfo's query instead of source's query", e);
}
if (getSlices() == AbstractBulkByScrollRequest.AUTO_SLICES || getSlices() > 1) {
e = addValidationError("reindex from remote sources doesn't support slices > 1 but was [" + getSlices() + "]", e);
}
}
return e;
}
该方法用于验证ReindexRequest的有效性,主要检查以下几点:
BulkByScrollTask.java
主要功能:
public TaskInfo taskInfoGivenSubtaskInfo(String localNodeId, List<TaskInfo> sliceInfo) {
if (isLeader() == false) {
throw new IllegalStateException("This task is not set to be a leader of other slice subtasks");
}
List<BulkByScrollTask.StatusOrException> sliceStatuses = Arrays.asList(
new BulkByScrollTask.StatusOrException[leaderState.getSlices()]
);
for (TaskInfo t : sliceInfo) {
BulkByScrollTask.Status status = (BulkByScrollTask.Status) t.status();
sliceStatuses.set(status.getSliceId(), new BulkByScrollTask.StatusOrException(status));
}
Status status = leaderState.getStatus(sliceStatuses);
return taskInfo(localNodeId, getDescription(), status);
}
该方法用于构建一个任务状态信息对象(Task Info)。首先检查当前任务是否为切片任务的领导者,如果不是则抛出异常;然后根据切片数量初始化一个状态列表,并遍历子任务信息填充对应的状态;最后通过领导者状态生成汇总状态并返回完整的任务信息。
public void setWorkerCount(int slices) {
if (isLeader()) {
throw new IllegalStateException("This task is already a leader for other slice subtasks");
}
if (isWorker()) {
throw new IllegalStateException("This task is already a worker");
}
leaderState = new LeaderBulkByScrollTaskState(this, slices);
}
该方法用于将当前任务设置为领导任务,管理指定数量的切片子任务。首先检查当前任务是否已经是领导或工作者角色,若是则抛出异常;否则创建一个新的 `LeaderBulkByScrollTaskState` 实例作为领导状态。
public void setWorker(float requestsPerSecond, @Nullable Integer sliceId) {
if (isWorker()) {
throw new IllegalStateException("This task is already a worker");
}
if (isLeader()) {
throw new IllegalStateException("This task is already a leader for other slice subtasks");
}
workerState = new WorkerBulkByScrollTaskState(this, sliceId, requestsPerSecond);
if (isCancelled()) {
workerState.handleCancel();
}
}
这段代码用于将当前任务设置为执行搜索请求的工作者(worker)任务。首先检查任务是否已经是工作者或领导者,若是则抛出异常;否则创建一个新的 WorkerBulkByScrollTaskState 实例,并在任务已被取消时调用 handleCancel() 方法处理取消逻辑。
WorkerBulkByScrollTaskState.java
主要用于跟踪和管理批量滚动任务的执行状态,包括文档处理计数、批处理、重试、版本冲突等。它支持任务取消、节流控制和状态统计,并通过 DelayedPrepareBulkRequest 实现请求的延迟调度与重新节流。
public void delayPrepareBulkRequest(
ThreadPool threadPool,
long lastBatchStartTimeNS,
int lastBatchSize,
AbstractRunnable prepareBulkRequestRunnable
) {
// Synchronize so we are less likely to schedule the same request twice.
synchronized (delayedPrepareBulkRequestReference) {
TimeValue delay = throttleWaitTime(lastBatchStartTimeNS, System.nanoTime(), lastBatchSize);
logger.debug("[{}]: preparing bulk request for [{}]", task.getId(), delay);
try {
delayedPrepareBulkRequestReference.set(
new DelayedPrepareBulkRequest(threadPool, getRequestsPerSecond(), delay, new RunOnce(prepareBulkRequestRunnable))
);
} catch (EsRejectedExecutionException e) {
prepareBulkRequestRunnable.onRejection(e);
}
}
}
该方法用于延迟执行批量请求,实现请求限流。通过计算当前批次的等待时间,若线程池拒绝执行,则触发拒绝回调;否则将任务封装为DelayedPrepareBulkRequest并提交。使用同步块避免重复调度同一任务。
class DelayedPrepareBulkRequest {
private final ThreadPool threadPool;
private final Runnable command;
private final float requestsPerSecond;
private final Scheduler.ScheduledCancellable scheduled;
DelayedPrepareBulkRequest(ThreadPool threadPool, float requestsPerSecond, TimeValue delay, Runnable command) {
this.threadPool = threadPool;
this.requestsPerSecond = requestsPerSecond;
this.command = command;
this.scheduled = threadPool.schedule(() -> {
throttledNanos.addAndGet(delay.nanos());
command.run();
}, delay, ThreadPool.Names.GENERIC);
}
DelayedPrepareBulkRequest rethrottle(float newRequestsPerSecond) {
if (newRequestsPerSecond < requestsPerSecond) {
/* The user is attempting to slow the request down. We'll let the
* change in throttle take effect the next time we delay
* prepareBulkRequest. We can't just reschedule the request further
* out in the future because the bulk context might time out. */
logger.debug(
"[{}]: skipping rescheduling because the new throttle [{}] is slower than the old one [{}]",
task.getId(),
newRequestsPerSecond,
requestsPerSecond
);
return this;
}
long remainingDelay = scheduled.getDelay(TimeUnit.NANOSECONDS);
// Actually reschedule the task
if (scheduled == null || false == scheduled.cancel()) {
// Couldn't cancel, probably because the task has finished or been scheduled. Either way we have nothing to do here.
logger.debug("[{}]: skipping rescheduling because we couldn't cancel the task", task.getId());
return this;
}
/* Strangely enough getting here doesn't mean that you actually
* cancelled the request, just that you probably did. If you stress
* test it you'll find that requests sneak through. So each request
* is given a runOnce boolean to prevent that. */
TimeValue newDelay = newDelay(remainingDelay, newRequestsPerSecond);
logger.debug("[{}]: rescheduling for [{}] in the future", task.getId(), newDelay);
return new DelayedPrepareBulkRequest(threadPool, requestsPerSecond, newDelay, command);
}
/**
* Scale back remaining delay to fit the new delay.
*/
TimeValue newDelay(long remainingDelay, float newRequestsPerSecond) {
if (remainingDelay < 0) {
return timeValueNanos(0);
}
return timeValueNanos(round(remainingDelay * requestsPerSecond / newRequestsPerSecond));
}
}
DelayedPrepareBulkRequest 用于延迟执行批量请求,支持限流(requestsPerSecond)。构造时通过 ThreadPool 延迟调度任务。rethrottle 方法用于动态调整请求速率:若新速率更慢,则不立即调整;若更快,则取消原任务并根据剩余延迟和新速率重新计算延迟时间,重新调度任务。newDelay 方法负责按比例缩放延迟时间。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。