前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >聊聊claudb的MasterReplication

聊聊claudb的MasterReplication

原创
作者头像
code4it
修改于 2020-08-24 02:03:44
修改于 2020-08-24 02:03:44
15100
代码可运行
举报
文章被收录于专栏:码匠的流水账码匠的流水账
运行总次数:0
代码可运行

本文主要研究一下claudb的MasterReplication

MasterReplication

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/replication/MasterReplication.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class MasterReplication implements Runnable {private static final Logger LOGGER = LoggerFactory.getLogger(MasterReplication.class);private static final String SELECT_COMMAND = "SELECT";
  private static final String PING_COMMAND = "PING";
  private static final int TASK_DELAY = 2;private final DBServerContext server;
  private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();public MasterReplication(DBServerContext server) {
    this.server = server;
  }public void start() {
    executor.scheduleWithFixedDelay(this, TASK_DELAY, TASK_DELAY, TimeUnit.SECONDS);
  }public void stop() {
    executor.shutdown();
  }public void addSlave(String id) {
    getServerState().addSlave(id);
    LOGGER.info("new slave: {}", id);
  }public void removeSlave(String id) {
    getServerState().removeSlave(id);
    LOGGER.info("slave revomed: {}", id);
  }
​
  @Override
  public void run() {
    List<RedisToken> commands = createCommands();for (SafeString slave : getServerState().getSlaves()) {
      for (RedisToken command : commands) {
        server.publish(slave.toString(), command);
      }
    }
  }private List<RedisToken> createCommands() {
    List<RedisToken> commands = new LinkedList<>();
    commands.add(pingCommand());
    commands.addAll(commandsToReplicate());
    return commands;
  }private List<RedisToken> commandsToReplicate() {
    List<RedisToken> commands = new LinkedList<>();for (RedisToken command : server.getCommandsToReplicate()) {
      command.accept(new AbstractRedisTokenVisitor<Void>() {
        @Override
        public Void array(ArrayRedisToken token) {
          commands.add(selectCommand(token));
          commands.add(command(token));
          return null;
        }
      });
    }
    return commands;
  }private RedisToken selectCommand(ArrayRedisToken token) {
    return array(string(SELECT_COMMAND),
        token.getValue().stream().findFirst().orElse(string("0")));
  }private RedisToken pingCommand() {
    return array(string(PING_COMMAND));
  }private RedisToken command(ArrayRedisToken token) {
    return array(token.getValue().stream().skip(1).collect(toList()));
  }private DBServerState getServerState() {
    return serverState().getOrElseThrow(() -> new IllegalStateException("missing server state"));
  }private Option<DBServerState> serverState() {
    return server.getValue("state");
  }
}
  • MasterReplication实现了Runnable接口,其start方法调度执行自身的runnable,每隔2秒执行一次;其run方法先执行createCommands方法,然后遍历slaves,然后遍历commands,执行server.publish(slave.toString(), command);createCommands先添加ping命令,然后再添加commandsToReplicate;commandsToReplicate方法遍历server.getCommandsToReplicate(),遇到array方法时先添加select命令,再添加command命令,最后返回commands

getCommandsToReplicate

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/ClauDB.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class ClauDB extends RespServerContext implements DBServerContext {//......
​
  @Override
  public ImmutableList<RedisToken> getCommandsToReplicate() {
    return executeOn(Observable.<ImmutableList<RedisToken>>create(observable -> {
      observable.onNext(getState().getCommandsToReplicate());
      observable.onComplete();
    })).blockingFirst();
  }//......}
  • getCommandsToReplicate方法执行的是getState().getCommandsToReplicate()

getCommandsToReplicate

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/DBServerState.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class DBServerState {private static final int RDB_VERSION = 6;private static final SafeString SLAVES = safeString("slaves");
  private static final DatabaseKey SLAVES_KEY = safeKey("slaves");
  private static final DatabaseKey SCRIPTS_KEY = safeKey("scripts");private boolean master = true;private final List<Database> databases = new ArrayList<>();
  private final Database admin;
  private final DatabaseFactory factory;private final Queue<RedisToken> queue = new LinkedList<>();public void append(RedisToken command) {
    queue.offer(command);
  }//......public ImmutableList<RedisToken> getCommandsToReplicate() {
    ImmutableList<RedisToken> list = ImmutableList.from(queue);
    queue.clear();
    return list;
  }//......}
  • getCommandsToReplicate方法会根据queue创建ImmutableList,然后清空queue;而append方法会添加command到queue

executeCommand

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/ClauDB.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class ClauDB extends RespServerContext implements DBServerContext {//......protected RedisToken executeCommand(RespCommand command, Request request) {
    if (!isReadOnly(request.getCommand())) {
      try {
        RedisToken response = command.execute(request);
        replication(request);
        notification(request);
        return response;
      } catch (RuntimeException e) {
        LOGGER.error("error executing command: " + request, e);
        return error("error executing command: " + request);
      }
    } else {
      return error("READONLY You can't write against a read only slave");
    }
  }private void replication(Request request) {
    if (!isReadOnlyCommand(request.getCommand())) {
      RedisToken array = requestToArray(request);
      if (hasSlaves()) {
        getState().append(array);
      }
      persistence.ifPresent(manager -> manager.append(array));
    }
  }
​
  @Override
  public void publish(String sourceKey, RedisToken message) {
    Session session = getSession(sourceKey);
    if (session != null) {
      session.publish(message);
    }
  }//......}
  • executeCommand方法除了执行command.execute,还会执行replication方法,它会在有slaves的条件将非readOnlyCommand追加到state;publish方法执行的是session.publish(message)传输给slave

小结

MasterReplication实现了Runnable接口,其start方法调度执行自身的runnable,每隔2秒执行一次;其run方法先执行createCommands方法,然后遍历slaves,然后遍历commands,执行server.publish(slave.toString(), command);createCommands先添加ping命令,然后再添加commandsToReplicate;commandsToReplicate方法遍历server.getCommandsToReplicate(),遇到array方法时先添加select命令,再添加command命令,最后返回commands

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
聊聊claudb的server command
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/server/SelectCommand.java
code4it
2020/08/24
3130
聊聊claudb的server command
聊聊claudb的set command
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/set/SetAddCommand.java
code4it
2020/08/27
3110
聊聊claudb的set command
聊聊claudb的zset command
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/zset/SortedSetAddCommand.java
code4it
2020/08/28
2000
聊聊claudb的zset command
聊聊claudb的DatabaseCleaner
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/data/DatabaseCleaner.java
code4it
2020/08/20
3430
聊聊claudb的DatabaseCleaner
聊聊claudb的SlaveReplication
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/replication/SlaveReplication.java
code4it
2020/08/22
1800
聊聊claudb的SlaveReplication
聊聊claudb的transaction command
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/TransactionState.java
code4it
2020/08/30
3850
聊聊claudb的transaction command
聊聊claudb的pubsub command
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/pubsub/PublishCommand.java
code4it
2020/08/29
4550
聊聊claudb的pubsub command
聊聊claudb的hash command
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/hash/HashSetCommand.java
code4it
2020/09/01
2480
聊聊claudb的hash command
聊聊claudb的keys command
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/key/KeysCommand.java
code4it
2020/08/25
5990
聊聊claudb的keys command
聊聊claudb的scripting command
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/scripting/AbstractEvalCommand.java
code4it
2020/08/31
4930
聊聊claudb的scripting command
聊聊claudb的list command
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/list/LeftPushCommand.java
code4it
2020/08/26
2770
聊聊claudb的list command
聊聊claudb的NotificationManager
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/event/NotificationManager.java
code4it
2020/09/03
1720
聊聊claudb的NotificationManager
聊聊claudb的string command
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/string/GetCommand.java
code4it
2020/09/02
3380
聊聊claudb的string command
聊聊claudb的scripting command
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/scripting/AbstractEvalCommand.java
code4it
2020/09/04
3580
聊聊RespServer
resp-server-0.16.0/src/main/java/com/github/tonivade/resp/Resp.java
code4it
2020/08/14
2640
聊聊RespServer
聊聊claudb的Database
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/data/Database.java
code4it
2020/08/23
6860
聊聊claudb的Database
聊聊claudb的importRDB
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/DBServerState.java
code4it
2020/08/19
3230
聊聊claudb的importRDB
聊聊claudb的exportRDB
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/DBServerState.java
code4it
2020/08/18
3730
聊聊claudb的exportRDB
相关推荐
聊聊claudb的server command
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验