本文主要研究一下claudb的MasterReplication
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/replication/MasterReplication.java
public class MasterReplication implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(MasterReplication.class);
private static final String SELECT_COMMAND = "SELECT";
private static final String PING_COMMAND = "PING";
private static final int TASK_DELAY = 2;
private final DBServerContext server;
private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
public MasterReplication(DBServerContext server) {
this.server = server;
}
public void start() {
executor.scheduleWithFixedDelay(this, TASK_DELAY, TASK_DELAY, TimeUnit.SECONDS);
}
public void stop() {
executor.shutdown();
}
public void addSlave(String id) {
getServerState().addSlave(id);
LOGGER.info("new slave: {}", id);
}
public void removeSlave(String id) {
getServerState().removeSlave(id);
LOGGER.info("slave revomed: {}", id);
}
@Override
public void run() {
List<RedisToken> commands = createCommands();
for (SafeString slave : getServerState().getSlaves()) {
for (RedisToken command : commands) {
server.publish(slave.toString(), command);
}
}
}
private List<RedisToken> createCommands() {
List<RedisToken> commands = new LinkedList<>();
commands.add(pingCommand());
commands.addAll(commandsToReplicate());
return commands;
}
private List<RedisToken> commandsToReplicate() {
List<RedisToken> commands = new LinkedList<>();
for (RedisToken command : server.getCommandsToReplicate()) {
command.accept(new AbstractRedisTokenVisitor<Void>() {
@Override
public Void array(ArrayRedisToken token) {
commands.add(selectCommand(token));
commands.add(command(token));
return null;
}
});
}
return commands;
}
private RedisToken selectCommand(ArrayRedisToken token) {
return array(string(SELECT_COMMAND),
token.getValue().stream().findFirst().orElse(string("0")));
}
private RedisToken pingCommand() {
return array(string(PING_COMMAND));
}
private RedisToken command(ArrayRedisToken token) {
return array(token.getValue().stream().skip(1).collect(toList()));
}
private DBServerState getServerState() {
return serverState().getOrElseThrow(() -> new IllegalStateException("missing server state"));
}
private Option<DBServerState> serverState() {
return server.getValue("state");
}
}
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/ClauDB.java
public class ClauDB extends RespServerContext implements DBServerContext {
//......
@Override
public ImmutableList<RedisToken> getCommandsToReplicate() {
return executeOn(Observable.<ImmutableList<RedisToken>>create(observable -> {
observable.onNext(getState().getCommandsToReplicate());
observable.onComplete();
})).blockingFirst();
}
//......
}
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/DBServerState.java
public class DBServerState {
private static final int RDB_VERSION = 6;
private static final SafeString SLAVES = safeString("slaves");
private static final DatabaseKey SLAVES_KEY = safeKey("slaves");
private static final DatabaseKey SCRIPTS_KEY = safeKey("scripts");
private boolean master = true;
private final List<Database> databases = new ArrayList<>();
private final Database admin;
private final DatabaseFactory factory;
private final Queue<RedisToken> queue = new LinkedList<>();
public void append(RedisToken command) {
queue.offer(command);
}
//......
public ImmutableList<RedisToken> getCommandsToReplicate() {
ImmutableList<RedisToken> list = ImmutableList.from(queue);
queue.clear();
return list;
}
//......
}
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/ClauDB.java
public class ClauDB extends RespServerContext implements DBServerContext {
//......
protected RedisToken executeCommand(RespCommand command, Request request) {
if (!isReadOnly(request.getCommand())) {
try {
RedisToken response = command.execute(request);
replication(request);
notification(request);
return response;
} catch (RuntimeException e) {
LOGGER.error("error executing command: " + request, e);
return error("error executing command: " + request);
}
} else {
return error("READONLY You can't write against a read only slave");
}
}
private void replication(Request request) {
if (!isReadOnlyCommand(request.getCommand())) {
RedisToken array = requestToArray(request);
if (hasSlaves()) {
getState().append(array);
}
persistence.ifPresent(manager -> manager.append(array));
}
}
@Override
public void publish(String sourceKey, RedisToken message) {
Session session = getSession(sourceKey);
if (session != null) {
session.publish(message);
}
}
//......
}
MasterReplication实现了Runnable接口,其start方法调度执行自身的runnable,每隔2秒执行一次;其run方法先执行createCommands方法,然后遍历slaves,然后遍历commands,执行server.publish(slave.toString(), command);createCommands先添加ping命令,然后再添加commandsToReplicate;commandsToReplicate方法遍历server.getCommandsToReplicate(),遇到array方法时先添加select命令,再添加command命令,最后返回commands
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。