
在分布式系统中,多个节点需要就某个值达成一致,就像一群将军需要决定是进攻还是撤退。但传统的分布式共识算法(如 Paxos)复杂难懂,实现困难。
Raft 共识算法的出现,就像给分布式系统提供了一套 "民主决策机制",让多个节点在存在故障的情况下,仍能就某个值达成一致。它广泛应用于分布式数据库、分布式文件系统、分布式协调服务等领域,是保障系统高可用和数据一致性的关键技术。
Raft 采用 "领导者 - 跟随者" 模型,将共识过程分为三个子问题:
整个过程就像一个公司的董事会决策:
以下是一个简化版的 Raft 共识算法 Java 实现(注:实际应用中需结合网络通信、持久化等完善安全性):
import java.util.*;
import java.util.concurrent.*;
// Raft节点状态
enum NodeState {
FOLLOWER,
CANDIDATE,
LEADER
}
// 日志条目
class LogEntry {
int term;
Object command;
public LogEntry(int term, Object command) {
this.term = term;
this.command = command;
}
}
// 投票请求
class VoteRequest {
int term;
int candidateId;
int lastLogIndex;
int lastLogTerm;
public VoteRequest(int term, int candidateId, int lastLogIndex, int lastLogTerm) {
this.term = term;
this.candidateId = candidateId;
this.lastLogIndex = lastLogIndex;
this.lastLogTerm = lastLogTerm;
}
}
// 投票响应
class VoteResponse {
int term;
boolean voteGranted;
public VoteResponse(int term, boolean voteGranted) {
this.term = term;
this.voteGranted = voteGranted;
}
}
// 日志复制请求
class AppendEntriesRequest {
int term;
int leaderId;
int prevLogIndex;
int prevLogTerm;
List<LogEntry> entries;
int leaderCommit;
public AppendEntriesRequest(int term, int leaderId, int prevLogIndex, int prevLogTerm,
List<LogEntry> entries, int leaderCommit) {
this.term = term;
this.leaderId = leaderId;
this.prevLogIndex = prevLogIndex;
this.prevLogTerm = prevLogTerm;
this.entries = entries;
this.leaderCommit = leaderCommit;
}
}
// 日志复制响应
class AppendEntriesResponse {
int term;
boolean success;
int matchIndex;
public AppendEntriesResponse(int term, boolean success, int matchIndex) {
this.term = term;
this.success = success;
this.matchIndex = matchIndex;
}
}
// Raft节点
class RaftNode {
private int nodeId;
private NodeState state;
private int currentTerm;
private Integer votedFor;
private List<LogEntry> log;
private int commitIndex;
private int lastApplied;
private Map<Integer, Integer> nextIndex;
private Map<Integer, Integer> matchIndex;
private List<Integer> peerIds;
private ScheduledExecutorService executorService;
private ScheduledFuture<?> electionTimer;
private ScheduledFuture<?> heartbeatTimer;
public RaftNode(int nodeId, List<Integer> peerIds) {
this.nodeId = nodeId;
this.peerIds = peerIds;
this.state = NodeState.FOLLOWER;
this.currentTerm = 0;
this.votedFor = null;
this.log = new ArrayList<>();
this.log.add(new LogEntry(0, null)); // 初始日志条目
this.commitIndex = 0;
this.lastApplied = 0;
this.nextIndex = new HashMap<>();
this.matchIndex = new HashMap<>();
this.executorService = Executors.newScheduledThreadPool(1);
resetElectionTimer();
}
// 重置选举计时器
private void resetElectionTimer() {
if (electionTimer != null) {
electionTimer.cancel(false);
}
// 随机选举超时时间(150-300ms)
long timeout = ThreadLocalRandom.current().nextLong(150, 301);
electionTimer = executorService.schedule(this::startElection, timeout, TimeUnit.MILLISECONDS);
}
// 开始选举
private void startElection() {
synchronized (this) {
if (state == NodeState.LEADER) {
return;
}
state = NodeState.CANDIDATE;
currentTerm++;
votedFor = nodeId;
int votesReceived = 1; // 自己投自己一票
System.out.println("Node " + nodeId + " 开始选举,任期: " + currentTerm);
// 向所有其他节点发送投票请求
for (int peerId : peerIds) {
sendVoteRequest(peerId, votesReceived);
}
resetElectionTimer();
}
}
// 发送投票请求
private void sendVoteRequest(int peerId, int votesReceived) {
int lastLogIndex = log.size() - 1;
int lastLogTerm = lastLogIndex > 0 ? log.get(lastLogIndex).term : 0;
VoteRequest request = new VoteRequest(currentTerm, nodeId, lastLogIndex, lastLogTerm);
// 模拟网络延迟
executorService.schedule(() -> {
VoteResponse response = handleVoteRequest(request);
handleVoteResponse(peerId, response, votesReceived);
}, ThreadLocalRandom.current().nextLong(10, 51), TimeUnit.MILLISECONDS);
}
// 处理投票请求
private VoteResponse handleVoteRequest(VoteRequest request) {
synchronized (this) {
if (request.term < currentTerm) {
return new VoteResponse(currentTerm, false);
}
if (request.term > currentTerm) {
currentTerm = request.term;
state = NodeState.FOLLOWER;
votedFor = null;
}
int lastLogIndex = log.size() - 1;
int lastLogTerm = lastLogIndex > 0 ? log.get(lastLogIndex).term : 0;
boolean logUpToDate = (request.lastLogTerm > lastLogTerm) ||
(request.lastLogTerm == lastLogTerm && request.lastLogIndex >= lastLogIndex);
if ((votedFor == null || votedFor == request.candidateId) && logUpToDate) {
votedFor = request.candidateId;
resetElectionTimer();
return new VoteResponse(currentTerm, true);
}
return new VoteResponse(currentTerm, false);
}
}
// 处理投票响应
private void handleVoteResponse(int peerId, VoteResponse response, int votesReceived) {
synchronized (this) {
if (state != NodeState.CANDIDATE) {
return;
}
if (response.term > currentTerm) {
currentTerm = response.term;
state = NodeState.FOLLOWER;
votedFor = null;
resetElectionTimer();
return;
}
if (response.voteGranted) {
votesReceived++;
int majority = (peerIds.size() + 1) / 2 + 1; // 多数票
if (votesReceived >= majority) {
becomeLeader();
}
}
}
}
// 成为领导者
private void becomeLeader() {
synchronized (this) {
state = NodeState.LEADER;
System.out.println("Node " + nodeId + " 成为领导者,任期: " + currentTerm);
// 初始化nextIndex和matchIndex
for (int peerId : peerIds) {
nextIndex.put(peerId, log.size());
matchIndex.put(peerId, 0);
}
// 停止选举计时器
if (electionTimer != null) {
electionTimer.cancel(false);
}
// 开始发送心跳
sendHeartbeats();
}
}
// 发送心跳
private void sendHeartbeats() {
if (state != NodeState.LEADER) {
return;
}
for (int peerId : peerIds) {
sendAppendEntries(peerId);
}
// 定期发送心跳
if (heartbeatTimer != null) {
heartbeatTimer.cancel(false);
}
heartbeatTimer = executorService.schedule(this::sendHeartbeats, 50, TimeUnit.MILLISECONDS);
}
// 发送日志复制请求
private void sendAppendEntries(int peerId) {
int nextIdx = nextIndex.get(peerId);
int prevLogIndex = nextIdx - 1;
int prevLogTerm = prevLogIndex > 0 ? log.get(prevLogIndex).term : 0;
List<LogEntry> entries = new ArrayList<>(log.subList(nextIdx, log.size()));
AppendEntriesRequest request = new AppendEntriesRequest(
currentTerm, nodeId, prevLogIndex, prevLogTerm, entries, commitIndex);
// 模拟网络延迟
executorService.schedule(() -> {
AppendEntriesResponse response = handleAppendEntries(request);
handleAppendEntriesResponse(peerId, response);
}, ThreadLocalRandom.current().nextLong(10, 51), TimeUnit.MILLISECONDS);
}
// 处理日志复制请求
private AppendEntriesResponse handleAppendEntries(AppendEntriesRequest request) {
synchronized (this) {
if (request.term < currentTerm) {
return new AppendEntriesResponse(currentTerm, false, 0);
}
resetElectionTimer();
if (request.term > currentTerm) {
currentTerm = request.term;
state = NodeState.FOLLOWER;
votedFor = null;
}
if (state == NodeState.CANDIDATE) {
state = NodeState.FOLLOWER;
votedFor = null;
}
// 检查日志是否匹配
if (request.prevLogIndex > log.size() - 1 ||
(request.prevLogIndex > 0 && log.get(request.prevLogIndex).term != request.prevLogTerm)) {
return new AppendEntriesResponse(currentTerm, false, 0);
}
// 移除冲突的日志并添加新日志
if (request.entries.size() > 0) {
for (int i = 0; i < request.entries.size(); i++) {
int idx = request.prevLogIndex + 1 + i;
if (idx >= log.size()) {
log.addAll(request.entries.subList(i, request.entries.size()));
break;
}
if (log.get(idx).term != request.entries.get(i).term) {
log = new ArrayList<>(log.subList(0, idx));
log.addAll(request.entries.subList(i, request.entries.size()));
break;
}
}
}
// 更新commitIndex
if (request.leaderCommit > commitIndex) {
commitIndex = Math.min(request.leaderCommit, log.size() - 1);
// 应用已提交的日志到状态机
applyCommittedEntries();
}
return new AppendEntriesResponse(currentTerm, true, log.size() - 1);
}
}
// 处理日志复制响应
private void handleAppendEntriesResponse(int peerId, AppendEntriesResponse response) {
synchronized (this) {
if (state != NodeState.LEADER) {
return;
}
if (response.term > currentTerm) {
currentTerm = response.term;
state = NodeState.FOLLOWER;
votedFor = null;
resetElectionTimer();
return;
}
if (response.success) {
int matchIdx = response.matchIndex;
nextIndex.put(peerId, matchIdx + 1);
matchIndex.put(peerId, matchIdx);
// 更新commitIndex
updateCommitIndex();
} else {
// 日志不匹配,减小nextIndex并重试
nextIndex.put(peerId, nextIndex.get(peerId) - 1);
sendAppendEntries(peerId);
}
}
}
// 更新commitIndex
private void updateCommitIndex() {
int newCommitIndex = commitIndex;
for (int n = commitIndex + 1; n < log.size(); n++) {
if (log.get(n).term != currentTerm) {
continue;
}
int count = 1; // 领导者自己
for (int peerId : peerIds) {
if (matchIndex.get(peerId) >= n) {
count++;
}
}
int majority = (peerIds.size() + 1) / 2 + 1;
if (count >= majority) {
newCommitIndex = n;
}
}
if (newCommitIndex > commitIndex) {
commitIndex = newCommitIndex;
// 应用已提交的日志到状态机
applyCommittedEntries();
}
}
// 应用已提交的日志到状态机
private void applyCommittedEntries() {
while (lastApplied < commitIndex) {
lastApplied++;
LogEntry entry = log.get(lastApplied);
// 应用日志条目到状态机
System.out.println("Node " + nodeId + " 应用日志条目: " + entry.command);
}
}
// 处理客户端请求
public void handleClientRequest(Object command) {
synchronized (this) {
if (state != NodeState.LEADER) {
System.out.println("Node " + nodeId + " 不是领导者,拒绝请求");
return;
}
log.add(new LogEntry(currentTerm, command));
System.out.println("Node " + nodeId + " 收到客户端请求: " + command);
// 复制日志到所有跟随者
for (int peerId : peerIds) {
sendAppendEntries(peerId);
}
}
}
// 关闭节点
public void shutdown() {
if (electionTimer != null) {
electionTimer.cancel(false);
}
if (heartbeatTimer != null) {
heartbeatTimer.cancel(false);
}
executorService.shutdown();
}
}
// 主类
public class RaftDemo {
public static void main(String[] args) throws InterruptedException {
// 创建5个节点的Raft集群
List<Integer> peerIds = Arrays.asList(1, 2, 3, 4, 5);
List<RaftNode> nodes = new ArrayList<>();
for (int id : peerIds) {
RaftNode node = new RaftNode(id, peerIds);
nodes.add(node);
}
// 等待选举出领导者
Thread.sleep(1000);
// 向领导者发送客户端请求
for (RaftNode node : nodes) {
if (node instanceof RaftNode) {
node.handleClientRequest("操作: 转账100元");
break;
}
}
// 运行一段时间后关闭
Thread.sleep(5000);
for (RaftNode node : nodes) {
node.shutdown();
}
}
}尽管 Raft 在分布式系统中表现出色,但它也面临着新的挑战:
思考延伸: Raft 共识算法的出现,让分布式系统的实现变得更加简单和可靠。它的成功启示我们:在设计复杂系统时,通过合理的抽象和分解,可以将难题转化为易于理解和实现的子问题。随着分布式系统的不断发展,未来还会出现哪些更高效、更可靠的共识算法?这值得我们持续关注和探索。
Raft 共识算法就像分布式世界的 "民主决策机制",让多个节点在存在故障的情况下,仍能就某个值达成一致。它的出现,为分布式系统的可靠性和可用性提供了坚实的保障。
互动话题:你在开发中使用过 Raft 共识算法吗?遇到过哪些有趣的挑战?欢迎在评论区分享你的经验,一起探讨分布式系统的奥秘!