前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >elasticsearch的merge机制

elasticsearch的merge机制

原创
作者头像
空洞的盒子
发布2023-11-05 10:00:09
1K3
发布2023-11-05 10:00:09
举报
文章被收录于专栏:JD的专栏

前言:elasticsearch在进行密集的update,update_by_query,delete_by_query操作时会产生大量的doc.deleted文档。而elasticsearch又是如何处理这些doc.deleted文档的呢?

一.什么是elasticsearch的merge

1.数据在elasticsearch中如何进行存储

在elasticsearch中,客户端写入的每一条数据都会保存在索引的shard中,每一个shard都是一个lucene索引,每一个lucene索引都会被分解为多个segement。segement则是存储索引数据的最基本单元。

2.如何对索引进行merge

代码语言:javascript
复制
POST /my-index-000001/_forcemerge
POST /<target>/_forcemerge
POST /_forcemerge

该操作可以强制对elasticsearch中的index,datastream进行数据合并。

3.merge操作的作用

当我们使用merge API对索引的分片中的segement发起强制合并,elasticsearch通过merge操作会将索引分片上的多个segement合并到一个segement中。并对已经标记为deleted状态的文档进行删除。并释放这些已经标记为删除状态文档所占用的磁盘空间。

一般来说elasticsearch自身会自动对索引进行merge。但是在update场景与delete_by_query场景下,自动merge的效果缓慢。往往需要较长时间,这些被标记为删除状态的文档才会被elasticsearch进行merge并释放磁盘空间。所以我们可以通过对索引进行多轮次手动merge来加快索引merge的进度。

4.merge操作的原理

forcemerge的原理是将多个小的索引段(index segment)合并为一个更大的段,以减少磁盘空间的使用和提高搜索性能。当索引被更新时,新的文档会被添加到新的段中,而旧的段则会被标记为删除。这样会导致索引中存在多个小的段,而每个段都会占用一定的磁盘空间和系统资源。forcemerge操作可以通过将这些小的段合并为一个或少量的更大段来优化索引。

代码语言:javascript
复制
 public ForceMergeRequest(String... indices) {
        super(indices);
        forceMergeUUID = UUIDs.randomBase64UUID();
    }

    public ForceMergeRequest(StreamInput in) throws IOException {
        super(in);
        maxNumSegments = in.readInt();
        onlyExpungeDeletes = in.readBoolean();
        flush = in.readBoolean();
        if (in.getTransportVersion().onOrAfter(FORCE_MERGE_UUID_SIMPLE_VERSION)) {
            forceMergeUUID = in.readString();
        } else {
            forceMergeUUID = in.readOptionalString();
            assert forceMergeUUID != null : "optional was just used as a BwC measure";
        }
    }

通过该段代码构造索引合并的请求,并判断需要合并的索引,如果没有指定具体提的索引则会对全部的索引进行forcemerge操作。

代码语言:javascript
复制
 @Override
    public void writeTo(StreamOutput out) throws IOException {
        super.writeTo(out);
        out.writeInt(maxNumSegments);
        out.writeBoolean(onlyExpungeDeletes);
        out.writeBoolean(flush);
        if (out.getTransportVersion().onOrAfter(FORCE_MERGE_UUID_SIMPLE_VERSION)) {
            out.writeString(forceMergeUUID);
        } else {
            out.writeOptionalString(forceMergeUUID);
        }
    }

在合并时则会对文档的UUID进行判断。判断当前文档的UUID是否在FORCE_MERGE_UUID_SIMPLE_VERSION之后。如果是,则将forceMergeUUID(合并操作的UUID)以字符串的形式写入到输出流中。否则,将forceMergeUUID以可选字符串的形式写入到输出流中。

代码语言:javascript
复制
public ActionRequestValidationException validate() {
        ActionRequestValidationException validationError = super.validate();
        if (onlyExpungeDeletes && maxNumSegments != Defaults.MAX_NUM_SEGMENTS) {
            validationError = addValidationError(
                "cannot set only_expunge_deletes and max_num_segments at the same time, those two " + "parameters are mutually exclusive",
                validationError
            );
        }
        return validationError;
    }

在进行forcemerge合并时,我们可以指定segments的数量,在构造请求后会调用该方法进行参数的验证。判断请求是否合法。

代码语言:javascript
复制
public class ForceMergeAction extends ActionType<ForceMergeResponse> {

    public static final ForceMergeAction INSTANCE = new ForceMergeAction();
    public static final String NAME = "indices:admin/forcemerge";

    private ForceMergeAction() {
        super(NAME, ForceMergeResponse::new);
    }
}

ForceMergeAction通过继承ActionType<ForceMergeResponse>来实现ForceMerge操作。并通过ForceMergeResponse

获取到相应的返回信息。

代码语言:javascript
复制
private static final ConstructingObjectParser<ForceMergeResponse, Void> PARSER = new ConstructingObjectParser<>(
        "force_merge",
        true,
        arg -> {
            BaseBroadcastResponse response = (BaseBroadcastResponse) arg[0];
            return new ForceMergeResponse(
                response.getTotalShards(),
                response.getSuccessfulShards(),
                response.getFailedShards(),
                Arrays.asList(response.getShardFailures())
            );
        }
    );

在ForceMergeResponse的这段代码中定义了一个ConstructingObjectParser解析器。用来解析我们发起Forcemerge操作后的各中响应信息。

通过该解析器我们可以拿到以下返回信息。

response.getTotalShards() 返回总分片数, response.getSuccessfulShards() 返回成功分片数, response.getFailedShards() 返回失败分片数, Arrays.asList(response.getShardFailures()) 返回所有失败分片的详细信息

关于merge操作的参数优化

index.merge.scheduler.max_thread_count:单个分片上可以同时合并的最大线程数。

默认值Math.max(1, Math.min(4, <<node.processors, node.processors>> / 2))

在固态硬盘中,基于良好的硬件读写性能,我们可以适当调整该参数。如果存储介质为机械硬盘,则建议适当减小该参数值。

代码语言:javascript
复制
public static final Setting<Integer> MAX_THREAD_COUNT_SETTING = new Setting<>(
        "index.merge.scheduler.max_thread_count",
        (s) -> Integer.toString(Math.max(1, Math.min(4, EsExecutors.allocatedProcessors(s) / 2))),
        (s) -> Setting.parseInt(s, 1, "index.merge.scheduler.max_thread_count"),
        Property.Dynamic,
        Property.IndexScope
    );
public static final Setting<Integer> MAX_MERGE_COUNT_SETTING = new Setting<>(
        "index.merge.scheduler.max_merge_count",
        (s) -> Integer.toString(MAX_THREAD_COUNT_SETTING.get(s) + 5),
        (s) -> Setting.parseInt(s, 1, "index.merge.scheduler.max_merge_count"),
        Property.Dynamic,
        Property.IndexScope
    );

在上述代码中我们可以看到elasticsearch在对索引进行merge时,提供了两种不同的合并调度器策略。

1. index.merge.scheduler.max_thread_count :定义了索引合并调度器的最大线程数。它使用 EsExecutors.allocatedProcessors(s) 方法获取可用的处理器数量,并根据其值计算线程数。线程数的计算公式为可用处理器数量除以2,结果取1和4之间的较小值。这样可以确保最少有一个线程,并且最多不超过4个线程。

2. index.merge.scheduler.max_merge_count :该设置项定义了索引合并调度器的最大合并数量。它使用 MAX_THREAD_COUNT_SETTING.get(s) 方法获取最大线程数,并在此基础上加上5。这样可以确保最少有一个合并任务,并且相对于最大线程数有一定的缓冲。

同时,这两个属性均为动态属性,可以进行热更新,并且作用于索引维度,我们可以根据索引存储介质的不同而对索引进行合适的参数值配置。

关于merge操作的使用建议:

一般存在更新+查询的场景,会产生很多的deleted docs以及零碎的段文件,,可以定期对索引进行forcemerge。当我们对索引进行多轮次的forcemerge时 ,可能会引发高io ,同时也会提高CPU等集群资源的负载。但是可以通过参数大大降低索引中的doc.deleted数量和索引分片的segments 数量,对索引的写入查询性能都会有一定提升。

我正在参与2023腾讯技术创作特训营第三期有奖征文,组队打卡瓜分大奖!

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一.什么是elasticsearch的merge
    • 1.数据在elasticsearch中如何进行存储
      • 2.如何对索引进行merge
        • 3.merge操作的作用
          • 4.merge操作的原理
            • 关于merge操作的参数优化
            • 关于merge操作的使用建议:
        相关产品与服务
        Elasticsearch Service
        腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档