前言:Elasticsearch是一个开源的分布式搜索和分析引擎提供了良好的数据插入能力并提供了灵活的数据更新方式。随之而来的便是大量更新操作引起的doc.deleted文档。同时很多用户在使用elasticsearch时由于种种原因需要对elasticsearch的索引数据进行删除。同样会产生大量的doc.deleted文档。
bulk:批量插入更新方式。
POST _bulk
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_id" : "2" } }
{ "create" : { "_index" : "test", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_index" : "test"} }
{ "doc" : {"field2" : "value2"} }
update:根据id,对指定数据进行精确更新。
POST /<index>/_update/<_id>
update_by_query:根据指定的查询条件,对数据进行匹配更新。
POST my-index-000001/_update_by_query?conflicts=proceed
当客户端发起更新操作时,elasticsearch首先会根据更新条件(例如:update api传入的_id,或update_by_query传入的match语句)找到相应的文档。elasticsearch使用文档的唯一标识符(_id)来定位文档。当找到要更新的文档后,elasticsearch首先会将原有的旧文档标记为删除状态。然后再将会将新文档插入到索引中。新文档具有相同的唯一标识符(_id),以此来实现文档的更新操作。
@Override
protected void searchToString(StringBuilder b) {
super.searchToString(b);
if (script != null) {
b.append(" updated with ").append(script);
}
}
此外,elasticsearch在客户端执行更新操作时还提供 脚本更新的方式。
发起update_by_query操作是会通过该类进行实现。
public class UpdateByQueryAction extends ActionType<BulkByScrollResponse> {
public static final UpdateByQueryAction INSTANCE = new UpdateByQueryAction();
public static final String NAME = "indices:data/write/update/byquery";
private UpdateByQueryAction() {
super(NAME, BulkByScrollResponse::new);
}
}
在该类中,定义了一个静态的 INSTANCE 对象,用于表示 UpdateByQueryAction 的单例实例。同时,定义了一个常量 NAME ,表示该动作的名称为"indices:data/write/update/byquery"。 在构造函数中,调用了父类的构造函数 super(NAME, BulkByScrollResponse::new) 。这里传入了动作的名称和一个回调函数,用于创建 BulkByScrollResponse 对象。
BulkByScrollResponse则为执行更新操作后返回的对象。包括执行耗时,执行状态等信息。
在BulkByScrollResponse中通过该构造方法批量滚动操作对象。
接收以下参数:
- took :表示操作的耗时。
- status :表示批量滚动任务的状态,这是一个 BulkByScrollTask.Status 对象。
- bulkFailures :表示批量操作失败的列表,这是一个 List<Failure> 对象。
- searchFailures :表示搜索失败的列表,这是一个 List<ScrollableHitSource.SearchFailure> 对象。
- timedOut :表示操作是否超时。
private TimeValue took;
private BulkByScrollTask.Status status;
private List<Failure> bulkFailures;
private List<ScrollableHitSource.SearchFailure> searchFailures;
private boolean timedOut;
private static final String TOOK_FIELD = "took";
private static final String TIMED_OUT_FIELD = "timed_out";
private static final String FAILURES_FIELD = "failures";
public BulkByScrollResponse(
TimeValue took,
BulkByScrollTask.Status status,
List<Failure> bulkFailures,
List<ScrollableHitSource.SearchFailure> searchFailures,
boolean timedOut
) {
this.took = took;
this.status = requireNonNull(status, "Null status not supported");
this.bulkFailures = bulkFailures;
this.searchFailures = searchFailures;
this.timedOut = timedOut;
}
public BulkByScrollResponse(Iterable<BulkByScrollResponse> toMerge, @Nullable String reasonCancelled) {
long mergedTook = 0;
List<BulkByScrollTask.StatusOrException> statuses = new ArrayList<>();
bulkFailures = new ArrayList<>();
searchFailures = new ArrayList<>();
for (BulkByScrollResponse response : toMerge) {
mergedTook = max(mergedTook, response.getTook().nanos());
statuses.add(new BulkByScrollTask.StatusOrException(response.status));
bulkFailures.addAll(response.getBulkFailures());
searchFailures.addAll(response.getSearchFailures());
timedOut |= response.isTimedOut();
}
took = timeValueNanos(mergedTook);
status = new BulkByScrollTask.Status(statuses, reasonCancelled);
}
具体的更新步骤则为:
优点:能够立刻释放磁盘空间。
缺点:会删除整个索引的全部数据。无法满足只删除部分数据的需求。
优点:操作灵活,能够根据传入的条件对指定的数据进行删除。
缺点:标记删除过程较久,磁盘空间释放较慢。在磁盘空间较为充裕时可以使用该方式进行数据删除操作。
当执行删除操作时,elasticsearch会根据我们传入的条件(例如:delete api传入的_id,或delete_by_query传入的match语句)来找到我们需要进行删除操作的文档。然后标记要删除的文档为已删除状态,在对文档完成标记后并不会立即从磁盘上删除它们。这是为了提高性能和避免数据丢失。标记为已删除的文档仍然存在于索引中,但在搜索和查询时会被过滤掉。 后续elasticsearch会自动对已经标记为删除的文档进行段合并。
public class DeleteByQueryAction extends ActionType<BulkByScrollResponse> {
public static final DeleteByQueryAction INSTANCE = new DeleteByQueryAction();
public static final String NAME = "indices:data/write/delete/byquery";
private DeleteByQueryAction() {
super(NAME, BulkByScrollResponse::new);
}
}
elasticsearch在进行删除动作时,也是通过定义了一个静态的 `INSTANCE` 对象,用于表示"DeleteByQueryAction"的单例实例。同时,定义了一个常量 `NAME` ,表示该动作的名称为"indices:data/write/delete/byquery"。
在构造函数中,调用了父类的构造函数 `super(NAME, BulkByScrollResponse::new)` 。这里传入了动作的名称和一个回调函数,用于创建"BulkByScrollResponse"对象。
关于BulkByScrollResponse返回的大致信息则与前面赘述的基本一致。
值的注意的是update与delete操作都会产生大量的doc.deleted。我们会发现一个有趣的现象。
在大量执行update操作时,我们elasticsearch集群的磁盘使用率会出现一定程度的膨胀,在一定时间之后磁盘使用率才会出现下降并与对数据进行update操作前的磁盘使用率趋于一致。
同样的,很多时候我们在通过delete_by_query 删除数据时,观察集群的磁盘使用率,发现磁盘使用率并不会立刻出现下降,而是极为缓慢的逐渐下降趋势。
这是因为在elasticsearch中当文档被标记为删除状态后,elasticsearch会有一个merge操作(也称为段合并)。
关于更新操作后,产生的doc.deleted文档elasticsearch会如何进行merge,以及merge过程中可能会出现哪些问题则会在后续文章进行讨论。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。