本文主要研究一下scalecube-cluster的FailureDetector
scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetector.java
/**
* Failure Detector component responsible for monitoring availability of other members in the
* cluster. This interface is supposed to be used internally as part cluster membership protocol. It
* doesn't specify that particular node is failed, but just provide information that either it is
* suspected or trusted at current moment of time. So it is up to cluster membership or other top
* level component to define when suspected member is actually failed.
*/
public interface FailureDetector {
/**
* Starts running failure detection algorithm. After started it begins to receive and send ping
* messages.
*/
void start();
/** Stops running failure detection algorithm and releases occupied resources. */
void stop();
/** Listens for results of ping checks (ALIVE/SUSPECT) done periodically by failure detector. */
Flux<FailureDetectorEvent> listen();
}
scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java
public final class FailureDetectorImpl implements FailureDetector {
private static final Logger LOGGER = LoggerFactory.getLogger(FailureDetectorImpl.class);
// Qualifiers
public static final String PING = "sc/fdetector/ping";
public static final String PING_REQ = "sc/fdetector/pingReq";
public static final String PING_ACK = "sc/fdetector/pingAck";
// Injected
private final Member localMember;
private final Transport transport;
private final FailureDetectorConfig config;
private final CorrelationIdGenerator cidGenerator;
// State
private long currentPeriod = 0;
private List<Member> pingMembers = new ArrayList<>();
private int pingMemberIndex = 0; // index for sequential ping member selection
// Disposables
private final Disposable.Composite actionsDisposables = Disposables.composite();
// Subject
private final FluxProcessor<FailureDetectorEvent, FailureDetectorEvent> subject =
DirectProcessor.<FailureDetectorEvent>create().serialize();
private final FluxSink<FailureDetectorEvent> sink = subject.sink();
// Scheduled
private final Scheduler scheduler;
/**
* Creates new instance of failure detector with given transport and settings.
*
* @param localMember local cluster member
* @param transport cluster transport
* @param membershipProcessor membership event processor
* @param config failure detector settings
* @param scheduler scheduler
* @param cidGenerator correlationId generator
*/
public FailureDetectorImpl(
Member localMember,
Transport transport,
Flux<MembershipEvent> membershipProcessor,
FailureDetectorConfig config,
Scheduler scheduler,
CorrelationIdGenerator cidGenerator) {
this.localMember = Objects.requireNonNull(localMember);
this.transport = Objects.requireNonNull(transport);
this.config = Objects.requireNonNull(config);
this.scheduler = Objects.requireNonNull(scheduler);
this.cidGenerator = Objects.requireNonNull(cidGenerator);
// Subscribe
actionsDisposables.addAll(
Arrays.asList(
membershipProcessor //
.publishOn(scheduler)
.subscribe(this::onMemberEvent, this::onError),
transport
.listen() //
.publishOn(scheduler)
.subscribe(this::onMessage, this::onError)));
}
@Override
public void start() {
actionsDisposables.add(
scheduler.schedulePeriodically(
this::doPing,
config.getPingInterval(),
config.getPingInterval(),
TimeUnit.MILLISECONDS));
}
@Override
public void stop() {
// Stop accepting requests and sending pings
actionsDisposables.dispose();
// Stop publishing events
sink.complete();
}
@Override
public Flux<FailureDetectorEvent> listen() {
return subject.onBackpressureBuffer();
}
//......
}
scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java
public final class FailureDetectorImpl implements FailureDetector {
//......
private void onMemberEvent(MembershipEvent event) {
Member member = event.member();
if (event.isRemoved()) {
pingMembers.remove(member);
}
if (event.isAdded()) {
// insert member into random positions
int size = pingMembers.size();
int index = size > 0 ? ThreadLocalRandom.current().nextInt(size) : 0;
pingMembers.add(index, member);
}
}
//......
}
scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java
public final class FailureDetectorImpl implements FailureDetector {
//......
private void onMessage(Message message) {
if (isPing(message)) {
onPing(message);
} else if (isPingReq(message)) {
onPingReq(message);
} else if (isTransitPingAck(message)) {
onTransitPingAck(message);
}
}
private boolean isPing(Message message) {
return PING.equals(message.qualifier());
}
private boolean isPingReq(Message message) {
return PING_REQ.equals(message.qualifier());
}
private boolean isTransitPingAck(Message message) {
return PING_ACK.equals(message.qualifier())
&& message.<PingData>data().getOriginalIssuer() != null;
}
/** Listens to PING message and answers with ACK. */
private void onPing(Message message) {
long period = this.currentPeriod;
LOGGER.trace("Received Ping[{}]", period);
PingData data = message.data();
if (!data.getTo().id().equals(localMember.id())) {
LOGGER.warn(
"Received Ping[{}] to {}, but local member is {}", period, data.getTo(), localMember);
return;
}
String correlationId = message.correlationId();
Message ackMessage =
Message.withData(data)
.qualifier(PING_ACK)
.correlationId(correlationId)
.sender(localMember.address())
.build();
Address address = data.getFrom().address();
LOGGER.trace("Send PingAck[{}] to {}", period, address);
transport
.send(address, ackMessage)
.subscribe(
null,
ex ->
LOGGER.debug(
"Failed to send PingAck[{}] to {}, cause: {}", period, address, ex.toString()));
}
/** Listens to PING_REQ message and sends PING to requested cluster member. */
private void onPingReq(Message message) {
long period = this.currentPeriod;
LOGGER.trace("Received PingReq[{}]", period);
PingData data = message.data();
Member target = data.getTo();
Member originalIssuer = data.getFrom();
String correlationId = message.correlationId();
PingData pingReqData = new PingData(localMember, target, originalIssuer);
Message pingMessage =
Message.withData(pingReqData)
.qualifier(PING)
.correlationId(correlationId)
.sender(localMember.address())
.build();
Address address = target.address();
LOGGER.trace("Send transit Ping[{}] to {}", period, address);
transport
.send(address, pingMessage)
.subscribe(
null,
ex ->
LOGGER.debug(
"Failed to send transit Ping[{}] to {}, cause: {}",
period,
address,
ex.toString()));
}
/**
* Listens to ACK with message containing ORIGINAL_ISSUER then converts message to plain ACK and
* sends it to ORIGINAL_ISSUER.
*/
private void onTransitPingAck(Message message) {
long period = this.currentPeriod;
LOGGER.trace("Received transit PingAck[{}]", period);
PingData data = message.data();
Member target = data.getOriginalIssuer();
String correlationId = message.correlationId();
PingData originalAckData = new PingData(target, data.getTo());
Message originalAckMessage =
Message.withData(originalAckData)
.qualifier(PING_ACK)
.correlationId(correlationId)
.sender(localMember.address())
.build();
Address address = target.address();
LOGGER.trace("Resend transit PingAck[{}] to {}", period, address);
transport
.send(address, originalAckMessage)
.subscribe(
null,
ex ->
LOGGER.debug(
"Failed to resend transit PingAck[{}] to {}, cause: {}",
period,
address,
ex.toString()));
}
//......
}
scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java
public final class FailureDetectorImpl implements FailureDetector {
//......
private void doPing() {
// Increment period counter
long period = currentPeriod++;
// Select ping member
Member pingMember = selectPingMember();
if (pingMember == null) {
return;
}
// Send ping
String cid = cidGenerator.nextCid();
PingData pingData = new PingData(localMember, pingMember);
Message pingMsg =
Message.withData(pingData)
.qualifier(PING)
.correlationId(cid)
.sender(localMember.address())
.build();
LOGGER.trace("Send Ping[{}] to {}", period, pingMember);
Address address = pingMember.address();
transport
.requestResponse(pingMsg, address)
.timeout(Duration.ofMillis(config.getPingTimeout()), scheduler)
.publishOn(scheduler)
.subscribe(
message -> {
LOGGER.trace("Received PingAck[{}] from {}", period, pingMember);
publishPingResult(period, pingMember, MemberStatus.ALIVE);
},
ex -> {
LOGGER.debug(
"Failed to get PingAck[{}] from {} within {} ms",
period,
pingMember,
config.getPingTimeout());
final int timeLeft = config.getPingInterval() - config.getPingTimeout();
final List<Member> pingReqMembers = selectPingReqMembers(pingMember);
if (timeLeft <= 0 || pingReqMembers.isEmpty()) {
LOGGER.trace("No PingReq[{}] occurred", period);
publishPingResult(period, pingMember, MemberStatus.SUSPECT);
} else {
doPingReq(currentPeriod, pingMember, pingReqMembers, cid);
}
});
}
private Member selectPingMember() {
if (pingMembers.isEmpty()) {
return null;
}
if (pingMemberIndex >= pingMembers.size()) {
pingMemberIndex = 0;
Collections.shuffle(pingMembers);
}
return pingMembers.get(pingMemberIndex++);
}
private List<Member> selectPingReqMembers(Member pingMember) {
if (config.getPingReqMembers() <= 0) {
return Collections.emptyList();
}
List<Member> candidates = new ArrayList<>(pingMembers);
candidates.remove(pingMember);
if (candidates.isEmpty()) {
return Collections.emptyList();
}
Collections.shuffle(candidates);
boolean selectAll = candidates.size() < config.getPingReqMembers();
return selectAll ? candidates : candidates.subList(0, config.getPingReqMembers());
}
private void doPingReq(
long period, final Member pingMember, final List<Member> pingReqMembers, String cid) {
Message pingReqMsg =
Message.withData(new PingData(localMember, pingMember))
.qualifier(PING_REQ)
.correlationId(cid)
.sender(localMember.address())
.build();
LOGGER.trace("Send PingReq[{}] to {} for {}", period, pingReqMembers, pingMember);
Duration timeout = Duration.ofMillis(config.getPingInterval() - config.getPingTimeout());
pingReqMembers.forEach(
member ->
transport
.requestResponse(pingReqMsg, member.address())
.timeout(timeout, scheduler)
.publishOn(scheduler)
.subscribe(
message -> {
LOGGER.trace(
"Received transit PingAck[{}] from {} to {}",
period,
message.sender(),
pingMember);
publishPingResult(period, pingMember, MemberStatus.ALIVE);
},
throwable -> {
LOGGER.trace(
"Timeout getting transit PingAck[{}] from {} to {} within {} ms",
period,
pingReqMembers,
pingMember,
timeout);
publishPingResult(period, pingMember, MemberStatus.SUSPECT);
}));
}
private void publishPingResult(long period, Member member, MemberStatus status) {
LOGGER.debug("Member {} detected as {} at [{}]", member, status, period);
sink.next(new FailureDetectorEvent(member, status));
}
//......
}
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。