根据网上给的LeaderLatch的示例代码写的业务代码,这里面用到的serverId是从配置中读取的(每个新部署的实例自动生成,之后一直不变)
@PostConstruct
public void setUp() throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory.builder()
.connectString(zkConnectString)
.retryPolicy(retryPolicy)
.sessionTimeoutMs(60000)
.connectionTimeoutMs(3000)
.namespace(Constants.GLOBAL_NAME_SPACE)
.build();
client.start();
leaderLatch = new LeaderLatch(client, QUORUM_PATH, serverId);
leaderLatch.addListener(new LeaderLatchListener() {
@Override
public void isLeader() {
log.info("Currently run as leader");
}
@Override
public void notLeader() {
log.info("Currently run as slave");
}
});
leaderLatch.start();
}
这段代码达到的预期效果应该是当前实例在运行为leader的时候,日志打印Currently run as leader;当丢失leader的时候,日志打印Currently run as leader。 多实例运行时,刚开始选主是没问题的,只有一个为leader。但是丢失主的实例不能切换Slave方式运行;我这里测试丢失主的方式有两种,一是断开实例与zk之间的连接,二是删除zk上面的该实例锁住的数据(例如leaderlatch路径为/test,那么每个实例会在/test这个路径下生成一个临时节点,将这个临时节点手动删除)。 而且这么写代码,很难做主从切换缓冲时间来防止主从来回切换。
所以,将代码中的listener去掉,修改为主动轮询监听:
@PostConstruct
public void setUp() throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory.builder()
.connectString(zkConnectString)
.retryPolicy(retryPolicy)
.sessionTimeoutMs(60000)
.connectionTimeoutMs(3000)
.namespace(Constants.GLOBAL_NAME_SPACE)
.build();
client.start();
leaderLatch = new LeaderLatch(client, QUORUM_PATH, serverId);
leaderLatch.start();
}
@Scheduled(fixedDelay = 3000)
public void checkLeader() throws Exception {
//首先利用serverId检查自己是否还存在于leaderlatch选举结果集中
//考虑网络阻塞,zk数据异常丢失等情况
boolean isExist = false;
Collection participants = leaderLatch.getParticipants();
for (Participant participant : participants) {
if (serverId.equals(participant.getId())) {
isExist = true;
break;
}
}
//如果不存在,则重新加入选举
if (!isExist) {
log.info("Current server does not exist on zk, reset leaderlatch");
leaderLatch.close();
leaderLatch = new LeaderLatch(client, QUORUM_PATH, serverId);
leaderLatch.start();
log.info("Successfully reset leaderlatch");
}
//查看当前leader是否是自己
//注意,不能用leaderLatch.hasLeadership()因为有zk数据丢失的不确定性
//利用serverId对比确认是否主为自己
Participant leader = leaderLatch.getLeader();
boolean hashLeaderShip = serverId.equals(leader.getId());
if (log.isInfoEnabled()) {
log.info("Current Participant: {}", JSON.toJSONString(participants));
log.info("Current Leader: {}", leader);
}
//主从切换缓冲
if(hashLeaderShip) {
isLeaderCount++;
isSlaveCount = 0;
} else {
isLeaderCount = 0;
isSlaveCount ++;
}
if (isLeaderCount > 3 && !isLeader) {
log.info("Currently run as leader");
}
if (isSlaveCount > 3 && isLeader) {
log.info("Currently run as slave");
}
}