前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊elasticsearch的PeerFinder

聊聊elasticsearch的PeerFinder

原创
作者头像
code4it
发布2019-04-29 21:37:40
6760
发布2019-04-29 21:37:40
举报
文章被收录于专栏:码匠的流水账

本文主要研究一下elasticsearch的PeerFinder

PeersRequest

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/PeersRequest.java

代码语言:javascript
复制
public class PeersRequest extends TransportRequest {
    private final DiscoveryNode sourceNode;
    private final List<DiscoveryNode> knownPeers;
​
    public PeersRequest(DiscoveryNode sourceNode, List<DiscoveryNode> knownPeers) {
        assert knownPeers.contains(sourceNode) == false : "local node is not a peer";
        this.sourceNode = sourceNode;
        this.knownPeers = knownPeers;
    }
​
    public PeersRequest(StreamInput in) throws IOException {
        super(in);
        sourceNode = new DiscoveryNode(in);
        knownPeers = in.readList(DiscoveryNode::new);
    }
​
    @Override
    public void writeTo(StreamOutput out) throws IOException {
        super.writeTo(out);
        sourceNode.writeTo(out);
        out.writeList(knownPeers);
    }
​
    public List<DiscoveryNode> getKnownPeers() {
        return knownPeers;
    }
​
    public DiscoveryNode getSourceNode() {
        return sourceNode;
    }
​
    @Override
    public String toString() {
        return "PeersRequest{" +
            "sourceNode=" + sourceNode +
            ", knownPeers=" + knownPeers +
            '}';
    }
​
    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        PeersRequest that = (PeersRequest) o;
        return Objects.equals(sourceNode, that.sourceNode) &&
            Objects.equals(knownPeers, that.knownPeers);
    }
​
    @Override
    public int hashCode() {
        return Objects.hash(sourceNode, knownPeers);
    }
}
  • PeersRequest有两个属性,分别是sourceNode以及knownPeers

PeersResponse

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/cluster/coordination/PeersResponse.java

代码语言:javascript
复制
public class PeersResponse extends TransportResponse {
    private final Optional<DiscoveryNode> masterNode;
    private final List<DiscoveryNode> knownPeers;
    private final long term;
​
    public PeersResponse(Optional<DiscoveryNode> masterNode, List<DiscoveryNode> knownPeers, long term) {
        assert masterNode.isPresent() == false || knownPeers.isEmpty();
        this.masterNode = masterNode;
        this.knownPeers = knownPeers;
        this.term = term;
    }
​
    public PeersResponse(StreamInput in) throws IOException {
        masterNode = Optional.ofNullable(in.readOptionalWriteable(DiscoveryNode::new));
        knownPeers = in.readList(DiscoveryNode::new);
        term = in.readLong();
        assert masterNode.isPresent() == false || knownPeers.isEmpty();
    }
​
    @Override
    public void writeTo(StreamOutput out) throws IOException {
        super.writeTo(out);
        out.writeOptionalWriteable(masterNode.orElse(null));
        out.writeList(knownPeers);
        out.writeLong(term);
    }
​
    /**
     * @return the node that is currently leading, according to the responding node.
     */
    public Optional<DiscoveryNode> getMasterNode() {
        return masterNode;
    }
​
    /**
     * @return the collection of known peers of the responding node, or an empty collection if the responding node believes there
     * is currently a leader.
     */
    public List<DiscoveryNode> getKnownPeers() {
        return knownPeers;
    }
​
    /**
     * @return the current term of the responding node. If the responding node is the leader then this is the term in which it is
     * currently leading.
     */
    public long getTerm() {
        return term;
    }
​
    @Override
    public String toString() {
        return "PeersResponse{" +
            "masterNode=" + masterNode +
            ", knownPeers=" + knownPeers +
            ", term=" + term +
            '}';
    }
​
    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        PeersResponse that = (PeersResponse) o;
        return term == that.term &&
            Objects.equals(masterNode, that.masterNode) &&
            Objects.equals(knownPeers, that.knownPeers);
    }
​
    @Override
    public int hashCode() {
        return Objects.hash(masterNode, knownPeers, term);
    }
}
  • PeersResponse有三个属性,分别是masterNode、knownPeers、term

PeerFinder

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java

代码语言:javascript
复制
public abstract class PeerFinder {
​
    private static final Logger logger = LogManager.getLogger(PeerFinder.class);
​
    public static final String REQUEST_PEERS_ACTION_NAME = "internal:discovery/request_peers";
​
    // the time between attempts to find all peers
    public static final Setting<TimeValue> DISCOVERY_FIND_PEERS_INTERVAL_SETTING =
        Setting.timeSetting("discovery.find_peers_interval",
            TimeValue.timeValueMillis(1000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope);
​
    public static final Setting<TimeValue> DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING =
        Setting.timeSetting("discovery.request_peers_timeout",
            TimeValue.timeValueMillis(3000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope);
​
    private final Settings settings;
​
    private final TimeValue findPeersInterval;
    private final TimeValue requestPeersTimeout;
​
    private final Object mutex = new Object();
    private final TransportService transportService;
    private final TransportAddressConnector transportAddressConnector;
    private final ConfiguredHostsResolver configuredHostsResolver;
​
    private volatile long currentTerm;
    private boolean active;
    private DiscoveryNodes lastAcceptedNodes;
    private final Map<TransportAddress, Peer> peersByAddress = new LinkedHashMap<>();
    private Optional<DiscoveryNode> leader = Optional.empty();
    private volatile List<TransportAddress> lastResolvedAddresses = emptyList();
​
    public PeerFinder(Settings settings, TransportService transportService, TransportAddressConnector transportAddressConnector,
                      ConfiguredHostsResolver configuredHostsResolver) {
        this.settings = settings;
        findPeersInterval = DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings);
        requestPeersTimeout = DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING.get(settings);
        this.transportService = transportService;
        this.transportAddressConnector = transportAddressConnector;
        this.configuredHostsResolver = configuredHostsResolver;
​
        transportService.registerRequestHandler(REQUEST_PEERS_ACTION_NAME, Names.GENERIC, false, false,
            PeersRequest::new,
            (request, channel, task) -> channel.sendResponse(handlePeersRequest(request)));
​
        transportService.registerRequestHandler(UnicastZenPing.ACTION_NAME, Names.GENERIC, false, false,
            UnicastZenPing.UnicastPingRequest::new, new Zen1UnicastPingRequestHandler());
    }
​
    public void activate(final DiscoveryNodes lastAcceptedNodes) {
        logger.trace("activating with {}", lastAcceptedNodes);
​
        synchronized (mutex) {
            assert assertInactiveWithNoKnownPeers();
            active = true;
            this.lastAcceptedNodes = lastAcceptedNodes;
            leader = Optional.empty();
            handleWakeUp(); // return value discarded: there are no known peers, so none can be disconnected
        }
​
        onFoundPeersUpdated(); // trigger a check for a quorum already
    }
​
    public void deactivate(DiscoveryNode leader) {
        final boolean peersRemoved;
        synchronized (mutex) {
            logger.trace("deactivating and setting leader to {}", leader);
            active = false;
            peersRemoved = handleWakeUp();
            this.leader = Optional.of(leader);
            assert assertInactiveWithNoKnownPeers();
        }
        if (peersRemoved) {
            onFoundPeersUpdated();
        }
    }
​
    // exposed to subclasses for testing
    protected final boolean holdsLock() {
        return Thread.holdsLock(mutex);
    }
​
    private boolean assertInactiveWithNoKnownPeers() {
        assert holdsLock() : "PeerFinder mutex not held";
        assert active == false;
        assert peersByAddress.isEmpty() : peersByAddress.keySet();
        return true;
    }
​
    PeersResponse handlePeersRequest(PeersRequest peersRequest) {
        synchronized (mutex) {
            assert peersRequest.getSourceNode().equals(getLocalNode()) == false;
            final List<DiscoveryNode> knownPeers;
            if (active) {
                assert leader.isPresent() == false : leader;
                if (peersRequest.getSourceNode().isMasterNode()) {
                    startProbe(peersRequest.getSourceNode().getAddress());
                }
                peersRequest.getKnownPeers().stream().map(DiscoveryNode::getAddress).forEach(this::startProbe);
                knownPeers = getFoundPeersUnderLock();
            } else {
                assert leader.isPresent() || lastAcceptedNodes == null;
                knownPeers = emptyList();
            }
            return new PeersResponse(leader, knownPeers, currentTerm);
        }
    }
​
    // exposed for checking invariant in o.e.c.c.Coordinator (public since this is a different package)
    public Optional<DiscoveryNode> getLeader() {
        synchronized (mutex) {
            return leader;
        }
    }
​
    // exposed for checking invariant in o.e.c.c.Coordinator (public since this is a different package)
    public long getCurrentTerm() {
        return currentTerm;
    }
​
    public void setCurrentTerm(long currentTerm) {
        this.currentTerm = currentTerm;
    }
​
    private DiscoveryNode getLocalNode() {
        final DiscoveryNode localNode = transportService.getLocalNode();
        assert localNode != null;
        return localNode;
    }
​
    protected abstract void onActiveMasterFound(DiscoveryNode masterNode, long term);
​
    protected abstract void onFoundPeersUpdated();
​
    public List<TransportAddress> getLastResolvedAddresses() {
        return lastResolvedAddresses;
    }
​
    public Iterable<DiscoveryNode> getFoundPeers() {
        synchronized (mutex) {
            return getFoundPeersUnderLock();
        }
    }
​
    private List<DiscoveryNode> getFoundPeersUnderLock() {
        assert holdsLock() : "PeerFinder mutex not held";
        return peersByAddress.values().stream()
            .map(Peer::getDiscoveryNode).filter(Objects::nonNull).distinct().collect(Collectors.toList());
    }
​
    private Peer createConnectingPeer(TransportAddress transportAddress) {
        Peer peer = new Peer(transportAddress);
        peer.establishConnection();
        return peer;
    }
​
    /**
     * @return whether any peers were removed due to disconnection
     */
    private boolean handleWakeUp() {
        assert holdsLock() : "PeerFinder mutex not held";
​
        final boolean peersRemoved = peersByAddress.values().removeIf(Peer::handleWakeUp);
​
        if (active == false) {
            logger.trace("not active");
            return peersRemoved;
        }
​
        logger.trace("probing master nodes from cluster state: {}", lastAcceptedNodes);
        for (ObjectCursor<DiscoveryNode> discoveryNodeObjectCursor : lastAcceptedNodes.getMasterNodes().values()) {
            startProbe(discoveryNodeObjectCursor.value.getAddress());
        }
​
        configuredHostsResolver.resolveConfiguredHosts(providedAddresses -> {
            synchronized (mutex) {
                lastResolvedAddresses = providedAddresses;
                logger.trace("probing resolved transport addresses {}", providedAddresses);
                providedAddresses.forEach(this::startProbe);
            }
        });
​
        transportService.getThreadPool().scheduleUnlessShuttingDown(findPeersInterval, Names.GENERIC, new AbstractRunnable() {
            @Override
            public boolean isForceExecution() {
                return true;
            }
​
            @Override
            public void onFailure(Exception e) {
                assert false : e;
                logger.debug("unexpected exception in wakeup", e);
            }
​
            @Override
            protected void doRun() {
                synchronized (mutex) {
                    if (handleWakeUp() == false) {
                        return;
                    }
                }
                onFoundPeersUpdated();
            }
​
            @Override
            public String toString() {
                return "PeerFinder handling wakeup";
            }
        });
​
        return peersRemoved;
    }
​
    protected void startProbe(TransportAddress transportAddress) {
        assert holdsLock() : "PeerFinder mutex not held";
        if (active == false) {
            logger.trace("startProbe({}) not running", transportAddress);
            return;
        }
​
        if (transportAddress.equals(getLocalNode().getAddress())) {
            logger.trace("startProbe({}) not probing local node", transportAddress);
            return;
        }
​
        peersByAddress.computeIfAbsent(transportAddress, this::createConnectingPeer);
    }
​
    private class Zen1UnicastPingRequestHandler implements TransportRequestHandler<UnicastZenPing.UnicastPingRequest> {
        @Override
        public void messageReceived(UnicastZenPing.UnicastPingRequest request, TransportChannel channel, Task task) throws Exception {
            final PeersRequest peersRequest = new PeersRequest(request.pingResponse.node(),
                Optional.ofNullable(request.pingResponse.master()).map(Collections::singletonList).orElse(emptyList()));
            final PeersResponse peersResponse = handlePeersRequest(peersRequest);
            final List<ZenPing.PingResponse> pingResponses = new ArrayList<>();
            final ClusterName clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
            pingResponses.add(new ZenPing.PingResponse(createDiscoveryNodeWithImpossiblyHighId(transportService.getLocalNode()),
                peersResponse.getMasterNode().orElse(null),
                clusterName, ClusterState.UNKNOWN_VERSION));
            peersResponse.getKnownPeers().forEach(dn -> pingResponses.add(
                new ZenPing.PingResponse(ZenPing.PingResponse.FAKE_PING_ID,
                    isZen1Node(dn) ? dn : createDiscoveryNodeWithImpossiblyHighId(dn), null, clusterName, ClusterState.UNKNOWN_VERSION)));
            channel.sendResponse(new UnicastZenPing.UnicastPingResponse(request.id, pingResponses.toArray(new ZenPing.PingResponse[0])));
        }
    }
​
    //......
}
  • PeerFinder的构造器注册了两个handler,一个是针对REQUEST_PEERS_ACTION_NAME,执行handlePeersRequest;一个是针对UnicastZenPing.ACTION_NAME,其handler为Zen1UnicastPingRequestHandler,该handler里头也调用了handlePeersRequest方法
  • handlePeersRequest主要是针对masterNode及peersRequest.getKnownPeers()挨个执行startProbe,然后通过getFoundPeersUnderLock设置knownPeers,最后通过leader、knownPeers及currentTerm构造PeersResponse返回
  • startProbe方法主要是执行createConnectingPeer,并将结果放入key为transportAddress的名为peersByAddress的map中;createConnectingPeer方法主要是创建Peer,然后调用Peer的establishConnection方法

Peer

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java

代码语言:javascript
复制
    private class Peer {
        private final TransportAddress transportAddress;
        private SetOnce<DiscoveryNode> discoveryNode = new SetOnce<>();
        private volatile boolean peersRequestInFlight;
​
        Peer(TransportAddress transportAddress) {
            this.transportAddress = transportAddress;
        }
​
        @Nullable
        DiscoveryNode getDiscoveryNode() {
            return discoveryNode.get();
        }
​
        boolean handleWakeUp() {
            assert holdsLock() : "PeerFinder mutex not held";
​
            if (active == false) {
                return true;
            }
​
            final DiscoveryNode discoveryNode = getDiscoveryNode();
            // may be null if connection not yet established
​
            if (discoveryNode != null) {
                if (transportService.nodeConnected(discoveryNode)) {
                    if (peersRequestInFlight == false) {
                        requestPeers();
                    }
                } else {
                    logger.trace("{} no longer connected", this);
                    return true;
                }
            }
​
            return false;
        }
​
        void establishConnection() {
            assert holdsLock() : "PeerFinder mutex not held";
            assert getDiscoveryNode() == null : "unexpectedly connected to " + getDiscoveryNode();
            assert active;
​
            logger.trace("{} attempting connection", this);
            transportAddressConnector.connectToRemoteMasterNode(transportAddress, new ActionListener<DiscoveryNode>() {
                @Override
                public void onResponse(DiscoveryNode remoteNode) {
                    assert remoteNode.isMasterNode() : remoteNode + " is not master-eligible";
                    assert remoteNode.equals(getLocalNode()) == false : remoteNode + " is the local node";
                    synchronized (mutex) {
                        if (active == false) {
                            return;
                        }
​
                        assert discoveryNode.get() == null : "discoveryNode unexpectedly already set to " + discoveryNode.get();
                        discoveryNode.set(remoteNode);
                        requestPeers();
                    }
​
                    assert holdsLock() == false : "PeerFinder mutex is held in error";
                    onFoundPeersUpdated();
                }
​
                @Override
                public void onFailure(Exception e) {
                    logger.debug(() -> new ParameterizedMessage("{} connection failed", Peer.this), e);
                    synchronized (mutex) {
                        peersByAddress.remove(transportAddress);
                    }
                }
            });
        }
​
        private void requestPeers() {
            assert holdsLock() : "PeerFinder mutex not held";
            assert peersRequestInFlight == false : "PeersRequest already in flight";
            assert active;
​
            final DiscoveryNode discoveryNode = getDiscoveryNode();
            assert discoveryNode != null : "cannot request peers without first connecting";
​
            if (discoveryNode.equals(getLocalNode())) {
                logger.trace("{} not requesting peers from local node", this);
                return;
            }
​
            logger.trace("{} requesting peers", this);
            peersRequestInFlight = true;
​
            final List<DiscoveryNode> knownNodes = getFoundPeersUnderLock();
​
            final TransportResponseHandler<PeersResponse> peersResponseHandler = new TransportResponseHandler<PeersResponse>() {
​
                @Override
                public PeersResponse read(StreamInput in) throws IOException {
                    return new PeersResponse(in);
                }
​
                @Override
                public void handleResponse(PeersResponse response) {
                    logger.trace("{} received {}", Peer.this, response);
                    synchronized (mutex) {
                        if (active == false) {
                            return;
                        }
​
                        peersRequestInFlight = false;
​
                        response.getMasterNode().map(DiscoveryNode::getAddress).ifPresent(PeerFinder.this::startProbe);
                        response.getKnownPeers().stream().map(DiscoveryNode::getAddress).forEach(PeerFinder.this::startProbe);
                    }
​
                    if (response.getMasterNode().equals(Optional.of(discoveryNode))) {
                        // Must not hold lock here to avoid deadlock
                        assert holdsLock() == false : "PeerFinder mutex is held in error";
                        onActiveMasterFound(discoveryNode, response.getTerm());
                    }
                }
​
                @Override
                public void handleException(TransportException exp) {
                    peersRequestInFlight = false;
                    logger.debug(new ParameterizedMessage("{} peers request failed", Peer.this), exp);
                }
​
                @Override
                public String executor() {
                    return Names.GENERIC;
                }
            };
            final String actionName;
            final TransportRequest transportRequest;
            final TransportResponseHandler<?> transportResponseHandler;
            if (isZen1Node(discoveryNode)) {
                actionName = UnicastZenPing.ACTION_NAME;
                transportRequest = new UnicastZenPing.UnicastPingRequest(1, ZenDiscovery.PING_TIMEOUT_SETTING.get(settings),
                    new ZenPing.PingResponse(createDiscoveryNodeWithImpossiblyHighId(getLocalNode()), null,
                        ClusterName.CLUSTER_NAME_SETTING.get(settings), ClusterState.UNKNOWN_VERSION));
                transportResponseHandler = peersResponseHandler.wrap(ucResponse -> {
                    Optional<DiscoveryNode> optionalMasterNode = Arrays.stream(ucResponse.pingResponses)
                        .filter(pr -> discoveryNode.equals(pr.node()) && discoveryNode.equals(pr.master()))
                        .map(ZenPing.PingResponse::node)
                        .findFirst();
                    List<DiscoveryNode> discoveredNodes = new ArrayList<>();
                    if (optionalMasterNode.isPresent() == false) {
                        Arrays.stream(ucResponse.pingResponses).map(PingResponse::master).filter(Objects::nonNull)
                            .forEach(discoveredNodes::add);
                        Arrays.stream(ucResponse.pingResponses).map(PingResponse::node).forEach(discoveredNodes::add);
                    }
                    return new PeersResponse(optionalMasterNode, discoveredNodes, 0L);
                }, UnicastZenPing.UnicastPingResponse::new);
            } else {
                actionName = REQUEST_PEERS_ACTION_NAME;
                transportRequest = new PeersRequest(getLocalNode(), knownNodes);
                transportResponseHandler = peersResponseHandler;
            }
            transportService.sendRequest(discoveryNode, actionName,
                transportRequest,
                TransportRequestOptions.builder().withTimeout(requestPeersTimeout).build(),
                transportResponseHandler);
        }
​
        @Override
        public String toString() {
            return "Peer{" +
                "transportAddress=" + transportAddress +
                ", discoveryNode=" + discoveryNode.get() +
                ", peersRequestInFlight=" + peersRequestInFlight +
                '}';
        }
    }
  • Peer的establishConnection方法主要是通过transportAddressConnector.connectToRemoteMasterNode请求masterNode,在结果成功返回时执行requestPeers及onFoundPeersUpdated方法;如果出现异常则从peersByAddress移除该transportAddress对应的Peer
  • requestPeers方法通过getFoundPeersUnderLock方法获取knownNodes构造PeersRequest,然后通过transportService.sendRequest向discoveryNode发送请求,请求成功返回则执行masterNode的startProbe以及knownPeers的startProbe方法,如果当前discoveryNode是masterNode,则触发onActiveMasterFound方法
  • 7.0版本的代码针对之前的版本做了不同的处理,requestPeers方法发送的request为UnicastZenPing.UnicastPingRequest

小结

  • PeerFinder的构造器注册了两个handler,一个是针对REQUEST_PEERS_ACTION_NAME,执行handlePeersRequest;一个是针对UnicastZenPing.ACTION_NAME,其handler为Zen1UnicastPingRequestHandler,该handler里头也调用了handlePeersRequest方法
  • handlePeersRequest主要是针对masterNode及peersRequest.getKnownPeers()挨个执行startProbe,然后通过getFoundPeersUnderLock设置knownPeers,最后通过leader、knownPeers及currentTerm构造PeersResponse返回;startProbe方法主要是执行createConnectingPeer,并将结果放入key为transportAddress的名为peersByAddress的map中;createConnectingPeer方法主要是创建Peer,然后调用Peer的establishConnection方法
  • Peer的establishConnection方法主要是通过transportAddressConnector.connectToRemoteMasterNode请求masterNode,在结果成功返回时执行requestPeers及onFoundPeersUpdated方法;如果出现异常则从peersByAddress移除该transportAddress对应的Peer;requestPeers方法通过getFoundPeersUnderLock方法获取knownNodes构造PeersRequest,然后通过transportService.sendRequest向discoveryNode发送请求,请求成功返回则执行masterNode的startProbe以及knownPeers的startProbe方法,如果当前discoveryNode是masterNode,则触发onActiveMasterFound方法

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • PeersRequest
  • PeersResponse
  • PeerFinder
  • Peer
  • 小结
  • doc
相关产品与服务
Elasticsearch Service
腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档