Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Elasticsearch 底层系列之分片恢复解析

Elasticsearch 底层系列之分片恢复解析

原创
作者头像
老生姜
修改于 2018-12-11 03:33:06
修改于 2018-12-11 03:33:06
10.1K0
举报

我们是基础架构部,腾讯云 CES/CTSDB 产品后台服务的支持团队,我们拥有专业的ES开发运维能力,为大家提供稳定、高性能的服务,欢迎有需求的童鞋接入,同时也欢迎各位交流 Elasticsearch、Lucene 相关的技术!

1. 前言

    在线上生产环境中,对于大规模的ES集群出现节点故障的场景比较多,例如,网络分区、机器故障、集群压力等等,都会导致节点故障。当外在环境恢复后,节点需要重新加入集群,那么当节点重新加入集群时,由于ES的自平衡策略,需要将某些分片恢复到新加入的节点上,那么ES的分片恢复流程是如何进行的呢?遇到分片恢复的坑该如何解决呢?(这里线上用户有碰到过,当恢复的并发调得较大时,会触发es的bug导致分布式死锁)?分片恢复的完整性、一致性如何保证呢?,本文将通过ES源码一窥究竟。注:ES分片恢复的场景有多种,本文只剖析最复杂的场景--peer recovery。

2. 分片恢复总体流程

    ES副本分片恢复主要涉及恢复的目标节点和源节点,目标节点即故障恢复的节点,源节点为提供恢复的节点。目标节点向源节点发送分片恢复请求,源节点接收到请求后主要分两阶段来处理。第一阶段,对需要恢复的shard创建snapshot,然后根据请求中的metadata对比如果 syncid 相同且 doc 数量相同则跳过,否则对比shard的segment文件差异,将有差异的segment文件发送给target node。第二阶段,为了保证target node数据的完整性,需要将本地的translog发送给target node,且对接收到的translog进行回放。整体流程如下图所示。

分片恢复流程
分片恢复流程

    以上为恢复的总体流程,具体实现细节,下面将结合源码进行解析。

3. 副本分片流程

3.1. 目标节点请求恢复

    本节,我们通过源码来剖析副本分片的详细恢复流程。ES根据metadata的变化来驱动各个模块工作,副本分片恢复的起始入口为IndicesClusterStateService.createOrUpdateShards,这里首先会判断本地节点是否在routingNodes中,如果在,说明本地节点有分片创建或更新的需求,否则跳过。逻辑如下:

代码语言:txt
AI代码解释
复制
private void createOrUpdateShards(final ClusterState state) {
    RoutingNode localRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId());
    if (localRoutingNode == null) {
        return;
    }
    DiscoveryNodes nodes = state.nodes();
    RoutingTable routingTable = state.routingTable();
    for (final ShardRouting shardRouting : localRoutingNode) {
        ShardId shardId = shardRouting.shardId();
        if (failedShardsCache.containsKey(shardId) == false) {
            AllocatedIndex<? extends Shard> indexService = indicesService.indexService(shardId.getIndex());
            Shard shard = indexService.getShardOrNull(shardId.id());
            if (shard == null) { // shard不存在则需创建
                createShard(nodes, routingTable, shardRouting, state);
            } else { // 存在则更新
                updateShard(nodes, shardRouting, shard, routingTable, state);
            }
        }
    }
}

    副本分片恢复走的是createShard分支,在该方法中,首先获取shardRouting的类型,如果恢复类型为PEER,说明该分片需要从远端获取,则需要找到源节点,然后调用IndicesService.createShard:

代码语言:txt
AI代码解释
复制
private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardRouting shardRouting, ClusterState state) {
    DiscoveryNode sourceNode = null;
    if (shardRouting.recoverySource().getType() == Type.PEER)  {
        sourceNode = findSourceNodeForPeerRecovery(logger, routingTable, nodes, shardRouting); // 如果恢复方式是peer,则会找到shard所在的源节点进行恢复
        if (sourceNode == null) {
            return;
        }
    }
        RecoveryState recoveryState = new RecoveryState(shardRouting, nodes.getLocalNode(), sourceNode);
        indicesService.createShard(shardRouting, recoveryState, recoveryTargetService, new RecoveryListener(shardRouting), repositoriesService, failedShardHandler);
        ... ...
}

private static DiscoveryNode findSourceNodeForPeerRecovery(Logger logger, RoutingTable routingTable, DiscoveryNodes nodes, ShardRouting shardRouting) {
    DiscoveryNode sourceNode = null;
    if (!shardRouting.primary()) {
        ShardRouting primary = routingTable.shardRoutingTable(shardRouting.shardId()).primaryShard();
        if (primary.active()) {
            sourceNode = nodes.get(primary.currentNodeId()); // 找到primary shard所在节点
        }
    } else if (shardRouting.relocatingNodeId() != null) {
        sourceNode = nodes.get(shardRouting.relocatingNodeId()); // 找到搬迁的源节点
    } else {
         ... ...
    }
    return sourceNode;
}

    源节点的确定分两种情况,如果当前shard本身不是primary shard,则源节点为primary shard所在节点,否则,如果当前shard正在搬迁中(从其他节点搬迁到本节点),则源节点为数据搬迁的源头节点。得到源节点后调用IndicesService.createShard,在该方法中调用方法IndexShard.startRecovery开始恢复。对于恢复类型为PEER的任务,恢复动作的真正执行者为PeerRecoveryTargetService.doRecovery。在该方法中,首先获取shard的metadataSnapshot,该结构中包含shard的段信息,如syncid、checksum、doc数等,然后封装为 StartRecoveryRequest,通过RPC发送到源节点:

代码语言:txt
AI代码解释
复制
... ...
metadataSnapshot = recoveryTarget.indexShard().snapshotStoreMetadata();
... ...
// 创建recovery quest 
request = new StartRecoveryRequest(recoveryTarget.shardId(), recoveryTarget.indexShard().routingEntry().allocationId().getId(), recoveryTarget.sourceNode(), clusterService.localNode(), metadataSnapshot, recoveryTarget.state().getPrimary(), recoveryTarget.recoveryId());
... ...
// 向源节点发送请求,请求恢复
cancellableThreads.execute(() -> responseHolder.set(
        transportService.submitRequest(request.sourceNode(), PeerRecoverySourceService.Actions.START_RECOVERY, request,
                new FutureTransportResponseHandler<RecoveryResponse>() {
                    @Override
                    public RecoveryResponse newInstance() {
                        return new RecoveryResponse();
                    }
                }).txGet()));

    注意,请求的发送是异步的,但是这里会调用 PlainTransportFuture.txGet() 方法,等待对端的回复,否则将一直 阻塞 。至此,目标节点已将请求发送给源节点,源节点的执行逻辑随后详细分析。

3.2 源节点处理恢复请求

    源节点接收到请求后会调用恢复的入口函数recover:

代码语言:txt
AI代码解释
复制
class StartRecoveryTransportRequestHandler implements TransportRequestHandler<StartRecoveryRequest> {
    @Override
    public void messageReceived(final StartRecoveryRequest request, final TransportChannel channel) throws Exception {
        RecoveryResponse response = recover(request);
        channel.sendResponse(response);
    }
}

    recover方法根据request得到shard并构造RecoverySourceHandler对象,然后调用handler.recoverToTarget进入恢复的执行体:

代码语言:txt
AI代码解释
复制
public RecoveryResponse recoverToTarget() throws IOException { // 恢复分为两阶段
    try (Translog.View translogView = shard.acquireTranslogView()) { 
        final IndexCommit phase1Snapshot;
        try {
            phase1Snapshot = shard.acquireIndexCommit(false);
        } catch (Exception e) {
            IOUtils.closeWhileHandlingException(translogView);
            throw new RecoveryEngineException(shard.shardId(), 1, "Snapshot failed", e);
        }
        try {
            phase1(phase1Snapshot, translogView);  // 第一阶段,比较syncid和segment,然后得出有差异的部分,主动将数据推送给请求方
        } catch (Exception e) {
            throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e);
        } finally {
            try {
                shard.releaseIndexCommit(phase1Snapshot);
            } catch (IOException ex) {
                logger.warn("releasing snapshot caused exception", ex);
            }
        }
        // engine was just started at the end of phase 1
        if (shard.state() == IndexShardState.RELOCATED) {
            throw new IndexShardRelocatedException(request.shardId());
        }
        try {
            phase2(translogView.snapshot()); // 第二阶段,发送translog
        } catch (Exception e) {
            throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e);
        }
        finalizeRecovery();
    }
    return response;
}

    从上面的代码可以看出,恢复主要分两个阶段,第一阶段恢复segment文件,第二阶段发送translog。这里有个关键的地方,在恢复前,首先需要获取translogView及segment snapshot,translogView的作用是保证当前时间点到恢复结束时间段的translog不被删除,segment snapshot的作用是保证当前时间点之前的segment文件不被删除。接下来看看两阶段恢复的具体执行逻辑。phase1:

代码语言:txt
AI代码解释
复制
public void phase1(final IndexCommit snapshot, final Translog.View translogView) {
    final Store store = shard.store(); //拿到shard的存储信息
    recoverySourceMetadata = store.getMetadata(snapshot); // 拿到snapshot的metadata
    String recoverySourceSyncId = recoverySourceMetadata.getSyncId();
            String recoveryTargetSyncId = request.metadataSnapshot().getSyncId();
            final boolean recoverWithSyncId = recoverySourceSyncId != null && recoverySourceSyncId.equals(recoveryTargetSyncId);
            if (recoverWithSyncId) { // 如果syncid相等,再继续比较下文档数,如果都相同则不用恢复
    final long numDocsTarget = request.metadataSnapshot().getNumDocs();
    final long numDocsSource = recoverySourceMetadata.getNumDocs();
    if (numDocsTarget != numDocsSource) {
        throw new IllegalStateException("... ...");
    } 
} else {
	final Store.RecoveryDiff diff = recoverySourceMetadata.recoveryDiff(request.metadataSnapshot()); // 找出target和source有差别的segment
	List<StoreFileMetaData> phase1Files = new ArrayList<>(diff.different.size() + diff.missing.size());
	phase1Files.addAll(diff.different);
	phase1Files.addAll(diff.missing);
	... ...
	final Function<StoreFileMetaData, OutputStream> outputStreamFactories =
        md -> new BufferedOutputStream(new RecoveryOutputStream(md, translogView), chunkSizeInBytes);
	sendFiles(store, phase1Files.toArray(new StoreFileMetaData[phase1Files.size()]), outputStreamFactories); // 将需要恢复的文件发送到target node
	... ...
    }
    prepareTargetForTranslog(translogView.totalOperations(), shard.segmentStats(false).getMaxUnsafeAutoIdTimestamp());

    从上面代码可以看出,phase1的具体逻辑是,首先拿到待恢复shard的metadataSnapshot从而得到recoverySourceSyncId,根据request拿到recoveryTargetSyncId,比较两边的syncid,如果相同再比较源和目标的文档数,如果也相同,说明在当前提交点之前源和目标的shard对应的segments都相同,因此不用恢复segment文件。如果两边的syncid不同,说明segment文件有差异,则需要找出所有有差异的文件进行恢复。通过比较recoverySourceMetadata和recoveryTargetSnapshot的差异性,可以找出所有有差别的segment文件。这块逻辑如下:

代码语言:txt
AI代码解释
复制
public RecoveryDiff recoveryDiff(MetadataSnapshot recoveryTargetSnapshot) {
    final List<StoreFileMetaData> identical = new ArrayList<>();  // 相同的file 
    final List<StoreFileMetaData> different = new ArrayList<>();  // 不同的file
    final List<StoreFileMetaData> missing = new ArrayList<>();   // 缺失的file
    final Map<String, List<StoreFileMetaData>> perSegment = new HashMap<>();
    final List<StoreFileMetaData> perCommitStoreFiles = new ArrayList<>();
    ... ...
    for (List<StoreFileMetaData> segmentFiles : Iterables.concat(perSegment.values(), Collections.singleton(perCommitStoreFiles))) {
        identicalFiles.clear();
        boolean consistent = true;
        for (StoreFileMetaData meta : segmentFiles) {
            StoreFileMetaData storeFileMetaData = recoveryTargetSnapshot.get(meta.name());
            if (storeFileMetaData == null) {
                consistent = false;
                missing.add(meta); // 该segment在target node中不存在,则加入到missing
            } else if (storeFileMetaData.isSame(meta) == false) {
                consistent = false;
                different.add(meta); // 存在但不相同,则加入到different
            } else {
                identicalFiles.add(meta);  // 存在且相同
            }
        }
        if (consistent) {
            identical.addAll(identicalFiles);
        } else {
            // make sure all files are added - this can happen if only the deletes are different
            different.addAll(identicalFiles);
        }
    }
    RecoveryDiff recoveryDiff = new RecoveryDiff(Collections.unmodifiableList(identical), Collections.unmodifiableList(different), Collections.unmodifiableList(missing));
    return recoveryDiff;
}

    这里将所有的segment file分为三类:identical(相同)、different(不同)、missing(target缺失)。然后将different和missing的segment files作为第一阶段需要恢复的文件发送到target node。发送完segment files后,源节点还会向目标节点发送消息以通知目标节点清理临时文件,然后也会发送消息通知目标节点打开引擎准备接收translog,这里需要注意的是,这两次网络通信都会调用 PlainTransportFuture.txGet() 方法阻塞等待 对端回复。至此,第一阶段的恢复逻辑完毕。

    第二阶段的逻辑比较简单,只需将translog view到当前时间之间的所有translog发送给源节点即可。

3.3 目标节点开始恢复

  • 接收segment

    对应上一小节源节点恢复的第一阶段,源节点将所有有差异的segment发送给目标节点,目标节点接收到后会将segment文件落盘。segment files的写入函数为RecoveryTarget.writeFileChunk:

代码语言:txt
AI代码解释
复制
public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content, boolean lastChunk, int totalTranslogOps) throws IOException {
    final Store store = store();
    final String name = fileMetaData.name();
    ... ...
    if (position == 0) {
        indexOutput = openAndPutIndexOutput(name, fileMetaData, store);
    } else {
        indexOutput = getOpenIndexOutput(name); // 加一层前缀,组成临时文件
    }
    ... ...
    while((scratch = iterator.next()) != null) { 
        indexOutput.writeBytes(scratch.bytes, scratch.offset, scratch.length); // 写临时文件
    }
    ... ...
    store.directory().sync(Collections.singleton(temporaryFileName));  // 这里会调用fsync落盘
}
  • 打开引擎

    经过上面的过程,目标节点完成了追数据的第一步。接收完segment后,目标节点打开shard对应的引擎准备接收translog,注意,这里打开引擎后,正在恢复的shard便可进行写入、删除(操作包括primary shard同步的请求和translog中的操作命令)。打开引擎的逻辑如下:

代码语言:txt
AI代码解释
复制
private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boolean indexExists, long maxUnsafeAutoIdTimestamp) throws IOException {
    ... ...
    recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
    final EngineConfig.OpenMode openMode;
    if (indexExists == false) {
        openMode = EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG;
    } else if (skipTranslogRecovery) {
        openMode = EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG;
    } else {
        openMode = EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG;
    }
    final EngineConfig config = newEngineConfig(openMode, maxUnsafeAutoIdTimestamp);
    // we disable deletes since we allow for operations to be executed against the shard while recovering
    // but we need to make sure we don't loose deletes until we are done recovering
    config.setEnableGcDeletes(false); // 恢复过程中不删除translog
    Engine newEngine = createNewEngine(config); // 创建engine
    ... ...
}
  • 接收并重放translog

    打开引擎后,便可以根据translog中的命令进行相应的回放动作,回放的逻辑和正常的写入、删除类似,这里需要根据translog还原出操作类型和操作数据,并根据操作数据构建相应的数据对象,然后再调用上一步打开的engine执行相应的操作,这块逻辑如下:

代码语言:txt
AI代码解释
复制
private void performRecoveryOperation(Engine engine, Translog.Operation operation, boolean allowMappingUpdates, Engine.Operation.Origin origin) throws IOException {
    switch (operation.opType()) { // 还原出操作类型及操作数据并调用engine执行相应的动作
        case INDEX:
            Translog.Index index = (Translog.Index) operation;           
            // ...  根据index构建engineIndex对象 ...
            maybeAddMappingUpdate(engineIndex.type(), engineIndex.parsedDoc().dynamicMappingsUpdate(), engineIndex.id(), allowMappingUpdates);
            index(engine, engineIndex); // 执行写入操作
            break;
        case DELETE:
            Translog.Delete delete = (Translog.Delete) operation;
            // ...  根据delete构建engineDelete对象 ...
            delete(engine, engineDelete); // 执行删除操作
            break;
        default:
            throw new IllegalStateException("No operation defined for [" + operation + "]");
    }
}

    通过上面的步骤,translog的重放完毕,此后需要做一些收尾的工作,包括,refresh让回放后的最新数据可见,打开translog gc:

代码语言:txt
AI代码解释
复制
public void finalizeRecovery() {
    recoveryState().setStage(RecoveryState.Stage.FINALIZE);
    Engine engine = getEngine();
    engine.refresh("recovery_finalization"); 
    engine.config().setEnableGcDeletes(true);
}

    到这里,replica shard恢复的两个阶段便完成了,由于此时shard还处于INITIALIZING状态,还需通知master节点启动已恢复的shard:

代码语言:txt
AI代码解释
复制
private class RecoveryListener implements PeerRecoveryTargetService.RecoveryListener {
    @Override
    public void onRecoveryDone(RecoveryState state) {
        if (state.getRecoverySource().getType() == Type.SNAPSHOT) {
            SnapshotRecoverySource snapshotRecoverySource = (SnapshotRecoverySource) state.getRecoverySource();
            restoreService.indexShardRestoreCompleted(snapshotRecoverySource.snapshot(), shardRouting.shardId());
        }
        shardStateAction.shardStarted(shardRouting, "after " + state.getRecoverySource(), SHARD_STATE_ACTION_LISTENER);
    }
}

    至此,shard recovery的所有流程都已完成。

4. 答疑解惑

    通过上述源码剖析后,本节将对文章开头抛出的几个问题进行答疑解惑,加深大家对分片恢复的理解。

  • 分布式死锁     通过上述源码的分析,大家注意3.1和3.2末尾处加粗的地方,可以看出,源节点和目标节点都有调用PlainTransportFuture.txGet()方法阻塞线程同步返回结果,这是导致死锁的关键点。具体问题描述及处理方法见https://cloud.tencent.com/developer/article/1370318,大家可以结合本文源码分析搞清楚死锁的原因。
  • 完整性     首先,phase1阶段,保证了存量的历史数据可以恢复到从分片。phase1阶段完成后,从分片引擎打开,可以正常处理index、delete请求,而translog覆盖完了整个phase1阶段,因此在phase1阶段中的index/delete操作都将被记录下来,在phase2阶段进行translog回放时,副本分片正常的index和delete操作和translog是并行执行的,这就保证了恢复开始之前的数据、恢复中的数据都会完整的写入到副本分片,保证了数据的完整性。如下图所示:
从分片恢复时序图
从分片恢复时序图
  • 一致性

    由于phase1阶段完成后,从分片便可正常处理写入操作,而此时从分片的写入和phase2阶段的translog回放时并行执行的,如果translog的回放慢于正常的写入操作,那么可能会导致老的数据后写入,造成数据不一致。ES为了保证数据的一致性在进行写入操作时,会比较当前写入的版本和lucene文档版本号,如果当前版本更小,说明是旧数据则不会将文档写入lucene。相关代码如下:

代码语言:txt
AI代码解释
复制
final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnVersions(index);
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
    plan = IndexingStrategy.skipAsStale(false, index.version());
} 

5. 小结

    本文结合ES源码详细分析了副本分片恢复的具体流程,并通过对源码的理解对文章开头提出的问题进行答疑解惑。后面,我们也将推出更多ES相关的文章,欢迎大家多多关注和交流。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
ElasticSearch Recovery 分析
org.elasticsearch.indices.cluster.IndicesClusterStateService.clusterChanged 被触发后,会触发applyNewOrUpdatedShards 函数的调用,这里是我们整个分析的起点。大家可以跑进去看看,然后跟着文章打开对应的源码浏览。
用户2936994
2018/08/27
1.4K0
《Elasticsearch 源码解析与优化实战》第10章:索引恢复流程分析
索引恢复(index.recovery)是ES数据恢复过程。待恢复的数据是客户端写入成功,但未执行刷盘(flush)的Lucene分段。例如,当节点异常重启时,写入磁盘的数据先到文件系统的缓冲,未必来得及刷盘,如果不通过某种方式将未刷盘的数据找回来,则会丢失一些数据,这是保持数据完整性的体现;另一方面,由于写入操作在多个分片副本上没有来得及全部执行,副分片需要同步成和主分片完全一致,这是数据副本一致性的体现。
HLee
2021/05/27
2.4K0
《Elasticsearch 源码解析与优化实战》第10章:索引恢复流程分析
《Elasticsearch 源码解析与优化实战》第12章:allocation模型分析
本文主要分析allocation 模块的结构和原理,然后以集群启动过程为例分析 allocation 模块的工作过程
HLee
2021/05/27
1.1K1
《Elasticsearch 源码解析与优化实战》第12章:allocation模型分析
《Elasticsearch 源码解析与优化实战》第3章:集群启动流程
让我们从启动流程开始,先在宏观上看看整个集群是如何启动的,集群状态如何从Red变成Green,不涉及代码,然后分析其他模块的流程。
HLee
2021/06/02
1.5K0
《Elasticsearch 源码解析与优化实战》第3章:集群启动流程
ElasticSearch Bulk 源码解析
读这篇文章前,建议先看看ElasticSearch Rest/RPC 接口解析,有利于你把握ElasticSearch接受处理请求的脉络。对于RPC类的调用,我会在后文简单提及,只是endpoint不一样,内部处理逻辑还是一样的。这篇只会讲IndexRequest,其他如DeleteRequest,UpdateRequest之类的,我们暂时不涉及。
用户2936994
2018/08/27
8850
【腾讯云ES】Elasticsearch 分布式架构剖析及扩展性优化
Elasticsearch 是一个实时的分布式搜索分析引擎,简称 ES。一个集群由多个节点组成,节点的角色可以根据用户的使用场景自由配置,集群可以以节点为单位自由扩缩容,数据以索引、分片的形式散列在各个节点上。本文介绍 ES 分布式架构基础原理,剖析分布式元数据管理模型,并介绍腾讯云 ES 在分布式扩展性层面相关的优化,解析代码基于 8.5 版本。
黄华
2022/11/18
3.7K1
【腾讯云ES】Elasticsearch 分布式架构剖析及扩展性优化
《Elasticsearch 源码解析与优化实战》第8章:GET流程
ES的读取分为Get和Search两种操作,这两种读取操作有较大的差异,GET/MGET必须指定三元组:index、_type、_id。 也就是说,根据文档id从正排索引中获取内容。而Search不指定_id,根据关键词从倒排索引中获取内容。本章分析GET/MGET过程,下一章分析Search过程。
HLee
2021/06/11
9740
《Elasticsearch 源码解析与优化实战》第8章:GET流程
《Elasticsearch 源码解析与优化实战》第7章:写流程
本章分析ES写入单个和批量文档写请求的处理流程,仅限于ES内部实现,并不涉及Lucene内部处理。在ES中,写入单个文档的请求称为Index请求,批量写入的请求称为Bulk请求。写单个和多个文档使用相同的处理逻辑,请求被统一封装为BulkRequest。
HLee
2021/05/25
2.4K0
《Elasticsearch 源码解析与优化实战》第7章:写流程
Elasticsearch 最佳实践系列之分片恢复并发故障
大家好,今天为大家分享一次 ES 的填坑经验。主要是关于集群恢复过程中,分片恢复并发数调整过大导致集群 hang 住的问题。
黄华
2018/12/10
6.9K0
Elasticsearch 最佳实践系列之分片恢复并发故障
Elasticsearch源码分析-写入解析
Elasticsearch(ES)是一个基于Lucene的近实时分布式存储及搜索分析系统,其应用场景广泛,可应用于日志分析、全文检索、结构化数据分析等多种场景,既可作为NoSQL数据库,也可作为搜索引擎。由于ES具备如此强悍的能力,因此吸引了很多公司争相使用,如维基百科、GitHub、Stack Overflow等。
技术姐
2018/11/07
6K0
Elasticsearch快照(snapshot)备份原理及分析
Snapshot是Elasticsearch提供的一种将集群数据备份至远程存储库的功能。例如将数据备份至S3,HDFS,共享文件系统等。
空洞的盒子
2024/08/02
2K3
ES系列(五):获取单条数据get处理过程实现
前面讲的都是些比较大的东西,即框架层面的东西。今天咱们来个轻松点的,只讲一个点:如题,get单条记录的es查询实现。
烂猪皮
2021/06/10
1.3K0
ES系列(五):获取单条数据get处理过程实现
《Elasticsearch 源码解析与优化实战》第17章:Shrink原理分析
官方文档:https://www.elastic.co/guide/en/elasticsearch/reference/master/indices-shrink-index.html
HLee
2021/08/16
1.1K0
《Elasticsearch 源码解析与优化实战》第17章:Shrink原理分析
【ES三周年】深入理解 Elasticsearch 集群数据快照
之前我们生产 ES 集群因为数据分片过大,导致集群重启无法选举,具体可以看这篇文章。当系统分片数据量越来越大,给生产集群造成一定压力,同时也会影响数据检索和查询效率。为了减轻集群压力,缩小集群分片数,减少集群故障,需要考虑数据归档方案,将查询频率低的数据从集群中归档到一个集中区域。
leon 橙
2022/12/21
5.2K2
万文Elasticsearch巧妙的架构详解
本书作为 Elastic Stack 指南,关注于 Elasticsearch 在日志和数据分析场景的应用,并不打算对底层的 Lucene 原理或者 Java 编程做详细的介绍,但是 Elasticsearch 层面上的一些架构设计,对我们做性能调优,故障处理,具有非常重要的影响。
大数据老哥
2022/02/17
7670
万文Elasticsearch巧妙的架构详解
《Elasticsearch 源码解析与优化实战》第13章:Snapshot 模块分析
快照模块是ES备份、迁移数据的重要手段。它支持增量备份,支持多种类型的仓库存储。本章我们先来看看如何使用快照,以及它的一些细节特性,然后分析创建、删除及取消快照的实现原理。
HLee
2021/06/17
1.8K1
《Elasticsearch 源码解析与优化实战》第13章:Snapshot 模块分析
《Elasticsearch 源码解析与优化实战》第11章:gateway 模块分析
上述信息被持久化到磁盘,需要注意的是:持久化的 state 不包括某个分片存在于哪个节点这种内容路由信息,集群完全重启时,依靠gateway的recovery过程重建RoutingTable。 当读取某个文档时,根据路由算法确定目的分片后,从RoutingTable中查找分片位于哪个节点,然后将请求转发到目的节点。
HLee
2021/05/31
1.2K0
《Elasticsearch 源码解析与优化实战》第11章:gateway 模块分析
如何提高ElasticSearch 索引速度
这篇文章会讲述上面几个参数的原理,以及一些其他的思路。这些参数大体上是朝着两个方向优化的:
用户2936994
2018/08/27
1.7K0
Elasticsearch底层系列之Shard Allocation机制
    Elasticsearch由一些Elasticsearch进程(Node)组成集群,用来存放索引(Index)。为了存放数据量很大的索引,Elasticsearch将Index切分成多个分片(Shard),在这些Shard里存放一个个的文档(document)。通过这一批shard组成一个完整的index。并且,每个Shard可以设置一定数量的副本(Replica),写入的文档同步给副本Shard,副本Shard可以提供查询功能,分摊系统的读负载。在主Shard所在Node(ES进程)挂掉后,可以提升一个副本Shard为主Shard,文档继续写在新的主Shard上,来提升系统的容灾能力。
技术姐
2018/09/11
2.3K0
Elasticsearch底层系列之Shard Allocation机制
大数据ELK(十一):Elasticsearch架构原理
在Elasticsearch有两类节点,一类是Master,一类是DataNode。
Lansonli
2021/10/11
7180
大数据ELK(十一):Elasticsearch架构原理
推荐阅读
相关推荐
ElasticSearch Recovery 分析
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档