Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >聊聊scalecube-cluster的GossipProtocol

聊聊scalecube-cluster的GossipProtocol

原创
作者头像
code4it
发布于 2019-05-06 11:58:18
发布于 2019-05-06 11:58:18
91200
代码可运行
举报
文章被收录于专栏:码匠的流水账码匠的流水账
运行总次数:0
代码可运行

本文主要研究一下scalecube-cluster的GossipProtocol

GossipProtocol

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocol.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * Gossip Protocol component responsible for spreading information (gossips) over the cluster
 * members using infection-style dissemination algorithms. It provides reliable cross-cluster
 * broadcast.
 */
public interface GossipProtocol {/** Starts running gossip protocol. After started it begins to receive and send gossip messages */
  void start();/** Stops running gossip protocol and releases occupied resources. */
  void stop();/**
   * Spreads given message between cluster members.
   *
   * @return future result with gossip id once gossip fully spread.
   */
  Mono<String> spread(Message message);/** Listens for gossips from other cluster members. */
  Flux<Message> listen();
}
  • GossipProtocol接口定义了start、stop、spread、listen方法

GossipProtocolImpl

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public final class GossipProtocolImpl implements GossipProtocol {private static final Logger LOGGER = LoggerFactory.getLogger(GossipProtocolImpl.class);// Qualifierspublic static final String GOSSIP_REQ = "sc/gossip/req";// Injectedprivate final Member localMember;
  private final Transport transport;
  private final GossipConfig config;// Local Stateprivate long currentPeriod = 0;
  private long gossipCounter = 0;
  private Map<String, GossipState> gossips = new HashMap<>();
  private Map<String, MonoSink<String>> futures = new HashMap<>();private List<Member> remoteMembers = new ArrayList<>();
  private int remoteMembersIndex = -1;// Disposablesprivate final Disposable.Composite actionsDisposables = Disposables.composite();// Subjectprivate final FluxProcessor<Message, Message> subject =
      DirectProcessor.<Message>create().serialize();private final FluxSink<Message> sink = subject.sink();// Scheduledprivate final Scheduler scheduler;/**
   * Creates new instance of gossip protocol with given memberId, transport and settings.
   *
   * @param localMember local cluster member
   * @param transport cluster transport
   * @param membershipProcessor membership event processor
   * @param config gossip protocol settings
   * @param scheduler scheduler
   */
  public GossipProtocolImpl(
      Member localMember,
      Transport transport,
      Flux<MembershipEvent> membershipProcessor,
      GossipConfig config,
      Scheduler scheduler) {this.transport = Objects.requireNonNull(transport);
    this.config = Objects.requireNonNull(config);
    this.localMember = Objects.requireNonNull(localMember);
    this.scheduler = Objects.requireNonNull(scheduler);// Subscribe
    actionsDisposables.addAll(
        Arrays.asList(
            membershipProcessor //
                .publishOn(scheduler)
                .subscribe(this::onMemberEvent, this::onError),
            transport
                .listen()
                .publishOn(scheduler)
                .filter(this::isGossipReq)
                .subscribe(this::onGossipReq, this::onError)));
  }
​
  @Override
  public void start() {
    actionsDisposables.add(
        scheduler.schedulePeriodically(
            this::doSpreadGossip,
            config.getGossipInterval(),
            config.getGossipInterval(),
            TimeUnit.MILLISECONDS));
  }
​
  @Override
  public void stop() {
    // Stop accepting gossip requests and spreading gossips
    actionsDisposables.dispose();// Stop publishing events
    sink.complete();
  }
​
  @Override
  public Mono<String> spread(Message message) {
    return Mono.fromCallable(() -> message)
        .subscribeOn(scheduler)
        .flatMap(msg -> Mono.create(sink -> futures.put(createAndPutGossip(msg), sink)));
  }
​
  @Override
  public Flux<Message> listen() {
    return subject.onBackpressureBuffer();
  }private void onMemberEvent(MembershipEvent event) {
    Member member = event.member();
    if (event.isRemoved()) {
      remoteMembers.remove(member);
    }
    if (event.isAdded()) {
      remoteMembers.add(member);
    }
  }private void onGossipReq(Message message) {
    long period = this.currentPeriod;
    GossipRequest gossipRequest = message.data();
    for (Gossip gossip : gossipRequest.gossips()) {
      GossipState gossipState = gossips.get(gossip.gossipId());
      if (gossipState == null) { // new gossip
        gossipState = new GossipState(gossip, period);
        gossips.put(gossip.gossipId(), gossipState);
        sink.next(gossip.message());
      }
      gossipState.addToInfected(gossipRequest.from());
    }
  }private boolean isGossipReq(Message message) {
    return GOSSIP_REQ.equals(message.qualifier());
  }private String createAndPutGossip(Message message) {
    long period = this.currentPeriod;
    Gossip gossip = new Gossip(generateGossipId(), message);
    GossipState gossipState = new GossipState(gossip, period);
    gossips.put(gossip.gossipId(), gossipState);
    return gossip.gossipId();
  }//......}
  • GossipProtocolImpl实现了GossipProtocol接口,它维护了名为gossips的gossipId与GossipState的map,以及remoteMembers列表
  • 它的构造器订阅了membershipProcessor,触发onMemberEvent方法,该方法根据MembershipEvent来对remoteMembers进行添加或移除member;订阅了transport.listen(),过滤出GossipReq,触发onGossipReq方法,该方法合并GossipRequest的gossips到本地的gossips,对于新的gossip的message则发送到sink,并维护该gossip的gossipState,将请求的memberId添加到infected中;spread方法则将message放入到本地的gossips中
  • start方法每隔gossipInterval执行doSpreadGossip方法;spread方法则通过createAndPutGossip创建Gossip并放入gossips中

doSpreadGossip

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public final class GossipProtocolImpl implements GossipProtocol {//......private List<Member> remoteMembers = new ArrayList<>();private int remoteMembersIndex = -1;private void doSpreadGossip() {
    // Increment period
    long period = currentPeriod++;// Check any gossips exists
    if (gossips.isEmpty()) {
      return; // nothing to spread
    }try {
      // Spread gossips to randomly selected member(s)
      selectGossipMembers().forEach(member -> spreadGossipsTo(period, member));// Sweep gossips
      sweepGossips(period);
    } catch (Exception ex) {
      LOGGER.warn("Exception at doSpreadGossip[{}]: {}", period, ex.getMessage(), ex);
    }
  }private void spreadGossipsTo(long period, Member member) {
    // Select gossips to send
    List<Gossip> gossips = selectGossipsToSend(period, member);
    if (gossips.isEmpty()) {
      return; // nothing to spread
    }// Send gossip request
    Address address = member.address();
​
    gossips
        .stream()
        .map(this::buildGossipRequestMessage)
        .forEach(
            message ->
                transport
                    .send(address, message)
                    .subscribe(
                        null,
                        ex ->
                            LOGGER.debug(
                                "Failed to send GossipReq[{}]: {} to {}, cause: {}",
                                period,
                                message,
                                address,
                                ex.toString())));
  }private List<Gossip> selectGossipsToSend(long period, Member member) {
    int periodsToSpread =
        ClusterMath.gossipPeriodsToSpread(config.getGossipRepeatMult(), remoteMembers.size() + 1);
    return gossips
        .values()
        .stream()
        .filter(
            gossipState -> gossipState.infectionPeriod() + periodsToSpread >= period) // max rounds
        .filter(gossipState -> !gossipState.isInfected(member.id())) // already infected
        .map(GossipState::gossip)
        .collect(Collectors.toList());
  }private List<Member> selectGossipMembers() {
    int gossipFanout = config.getGossipFanout();
    if (remoteMembers.size() < gossipFanout) { // select all
      return remoteMembers;
    } else { // select random members
      // Shuffle members initially and once reached top bound
      if (remoteMembersIndex < 0 || remoteMembersIndex + gossipFanout > remoteMembers.size()) {
        Collections.shuffle(remoteMembers);
        remoteMembersIndex = 0;
      }// Select members
      List<Member> selectedMembers =
          gossipFanout == 1
              ? Collections.singletonList(remoteMembers.get(remoteMembersIndex))
              : remoteMembers.subList(remoteMembersIndex, remoteMembersIndex + gossipFanout);// Increment index and return result
      remoteMembersIndex += gossipFanout;
      return selectedMembers;
    }
  }private Message buildGossipRequestMessage(Gossip gossip) {
    GossipRequest gossipRequest = new GossipRequest(gossip, localMember.id());
    return Message.withData(gossipRequest)
        .qualifier(GOSSIP_REQ)
        .sender(localMember.address())
        .build();
  }private void sweepGossips(long period) {
    // Select gossips to sweep
    int periodsToSweep =
        ClusterMath.gossipPeriodsToSweep(config.getGossipRepeatMult(), remoteMembers.size() + 1);
    Set<GossipState> gossipsToRemove =
        gossips
            .values()
            .stream()
            .filter(gossipState -> period > gossipState.infectionPeriod() + periodsToSweep)
            .collect(Collectors.toSet());// Check if anything selected
    if (gossipsToRemove.isEmpty()) {
      return; // nothing to sweep
    }// Sweep gossips
    LOGGER.debug("Sweep gossips[{}]: {}", period, gossipsToRemove);
    for (GossipState gossipState : gossipsToRemove) {
      gossips.remove(gossipState.gossip().gossipId());
      MonoSink<String> sink = futures.remove(gossipState.gossip().gossipId());
      if (sink != null) {
        sink.success(gossipState.gossip().gossipId());
      }
    }
  }//......}
  • doSpreadGossip方法首先递增currentPeriod,然后执行selectGossipMembers,遍历该member执行spreadGossipsTo,最后执行sweepGossips
  • selectGossipMembers方法会根据gossipFanout配置随机选择gossipFanout个member,这里维护了remoteMembersIndex,具体是对remoteMembers进行subList,当remoteMembersIndex小于0或remoteMembersIndex + gossipFanout > remoteMembers.size()时会Collections.shuffle(remoteMembers)并重置remoteMembersIndex为0,之后对remoteMembersIndex加上gossipFanout
  • spreadGossipsTo方法首先执行selectGossipsToSend获取要发送的gossips,然后通过buildGossipRequestMessage构造GOSSIP_REQ消息,最后通过transport.send方法发送
  • sweepGossips方法则选取periodsToSweep,然后从gossips移除period > gossipState.infectionPeriod() + periodsToSweep的gossipState

小结

  • GossipProtocol接口定义了start、stop、spread、listen方法;GossipProtocolImpl实现了GossipProtocol接口,它维护了名为gossips的gossipId与GossipState的map,以及remoteMembers列表
  • GossipProtocolImpl的构造器订阅了membershipProcessor,触发onMemberEvent方法,该方法根据MembershipEvent来对remoteMembers进行添加或移除member;订阅了transport.listen(),过滤出GossipReq,触发onGossipReq方法,该方法合并GossipRequest的gossips到本地的gossips,对于新的gossip的message则发送到sink,并维护该gossip的gossipState,将请求的memberId添加到infected中;spread方法则将message放入到本地的gossips中
  • GossipProtocolImpl的start方法每隔gossipInterval执行doSpreadGossip方法;spread方法则通过createAndPutGossip创建Gossip并放入gossips中;doSpreadGossip方法首先递增currentPeriod,然后执行selectGossipMembers,遍历该member执行spreadGossipsTo,最后执行sweepGossips

这里GossipProtocolImpl注册了onMemberEvent及onGossipReq,其中onMemberEvent用于监听MembershipEvent,并根据该event来维护remoteMembers列表;onGossipReq则是监听其他member的doSpreadGossip方法发送过来的GossipReq消息,合并该消息的gossips到本地的gossips;而doSpreadGossip方法则是每隔gossipInterval执行,根据gossipFanout配置随机选择gossipFanout个member,然后针对每个member选择要发送的gossips进行spread(onGossipReq及spread方法会更改gossips,而每隔gossipInterval触发的doSpreadGossip则从gossips选择待spread的消息进行发送)

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
聊聊scalecube-cluster的GossipProtocol
scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocol.java
code4it
2019/05/15
4390
聊聊scalecube-cluster的FailureDetector
本文主要研究一下scalecube-cluster的FailureDetector
code4it
2019/05/05
5030
聊聊scalecube-cluster的FailureDetector
聊聊scalecube-cluster的MembershipProtocol
本文主要研究一下scalecube-cluster的MembershipProtocol
code4it
2019/05/07
6230
聊聊scalecube-cluster的MembershipProtocol
聊聊scalecube-cluster的MembershipProtocol
本文主要研究一下scalecube-cluster的MembershipProtocol
code4it
2019/05/15
9720
聊聊apache gossip的ActiveGossiper
incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java
code4it
2019/05/04
4580
聊聊apache gossip的ActiveGossiper
聊聊Cassandra的FailureDetector
cassandra-3.11.4/src/java/org/apache/cassandra/gms/IFailureDetector.java
code4it
2019/05/01
1.7K0
聊聊Cassandra的FailureDetector
聊聊elasticsearch的SeedHostsResolver
elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java
code4it
2019/04/26
8140
聊聊elasticsearch的SeedHostsResolver
聊聊elasticsearch的SeedHostsProvider
elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/SeedHostsProvider.java
code4it
2019/04/27
5750
聊聊elasticsearch的SeedHostsProvider
聊聊nacos的ServerListManager
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerListManager.java
code4it
2019/09/30
9890
聊聊nacos的ServerListManager
聊聊Elasticsearch的MonitorService
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/monitor/MonitorService.java
code4it
2019/05/26
7380
聊聊Elasticsearch的MonitorService
通过memberlist库实现gossip管理集群以及集群数据交互
memberlist库的简单用法如下,注意下面使用for循环来执行list.Join,原因是一开始各节点都没有runing,直接执行Join会出现连接拒绝的错误。
charlieroro
2022/09/08
8500
聊聊artemis的FederatedQueue
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/queue/FederatedQueue.java
code4it
2020/02/14
3770
聊聊artemis的FederatedQueue
Akka(10): 分布式运算:集群-Cluster
   Akka-Cluster可以在一部物理机或一组网络连接的服务器上搭建部署。用Akka开发同一版本的分布式程序可以在任何硬件环境中运行,这样我们就可以确定以Akka分布式程序作为标准的编程方式了。
用户1150956
2018/01/05
1.9K0
聊聊chronos的DeleteBgWorker
DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/workers/DeleteBgWorker.java
code4it
2020/01/07
5620
聊聊chronos的DeleteBgWorker
FluxInterval实例及解析
reactor-core-3.1.3.RELEASE-sources.jar!/reactor/core/publisher/FluxInterval.java
code4it
2018/09/17
1.2K0
聊聊kingbus的starRaft
starRaft方法先通过rafthttp.NewRoundTripper创建http.RoundTripper,之后通过storage.NewDiskStorage创建DiskStorage,之后根据logExist及cfg.NewCluster做不同处理;若二者都为false则更新membership.RaftCluster的id为存在的cluster的id,然后执行startEtcdRaftNode;若cfg.NewCluster为true则使用cl.MemberIDs()来执行startEtcdRaftNode;若logExist为true则执行restartEtcdNode;最后创建rafthttp.Transport,执行tr.Start()、tr.AddRemote、tr.AddPeer
code4it
2020/06/13
4610
聊聊kingbus的starRaft
聊聊DebeziumEngine
debezium-v1.1.1.Final/debezium-api/src/main/java/io/debezium/engine/DebeziumEngine.java
code4it
2020/05/16
7150
聊聊DebeziumEngine
聊聊storm的WindowedBoltExecutor
storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
code4it
2018/10/25
5140
聊聊storm的WindowedBoltExecutor
ONOS集群选举分析
首先简单介绍下自己,之前是做 floodlight 控制器开发的,鉴于 ODL 和 onos 的如火如荼的发展,如果不对了解点就感觉自己 OUT 了,因此忙里偷闲,看了点 onos的源码收获颇丰,不敢
SDNLAB
2018/03/30
1.2K0
ONOS集群选举分析
聊聊hazelcast的PhiAccrualFailureDetector
本文主要研究一下hazelcast的PhiAccrualFailureDetector
code4it
2019/04/30
7830
聊聊hazelcast的PhiAccrualFailureDetector
相关推荐
聊聊scalecube-cluster的GossipProtocol
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验