18.9.2 分布式一致性算法(Raft/Paxos)
1. Raft算法原理
1.1 Raft算法基础
public class RaftAlgorithmPrinciple {
/*
* Raft算法核心概念:
*
* 1. 节点状态
* - Leader: 领导者,处理所有客户端请求
* - Follower: 跟随者,被动接收日志条目
* - Candidate: 候选者,参与领导者选举
*
* 2. 核心机制
* - Leader Election: 领导者选举
* - Log Replication: 日志复制
* - Safety: 安全性保证
*
* 3. 关键概念
* - Term: 任期,逻辑时钟
* - Log Entry: 日志条目
* - Commit Index: 已提交索引
*/
public void demonstrateRaftAlgorithm() {
System.out.println("=== Raft算法演示 ===");
RaftCluster cluster = new RaftCluster(5);
demonstrateLeaderElection(cluster);
demonstrateLogReplication(cluster);
demonstrateLeaderFailover(cluster);
}
private void demonstrateLeaderElection(RaftCluster cluster) {
System.out.println("--- 领导者选举演示 ---");
System.out.println("1. 集群启动,所有节点为Follower:");
cluster.showClusterState();
System.out.println("\n2. 选举超时,节点开始选举:");
cluster.startElection();
System.out.println("\n3. 选举完成:");
cluster.showClusterState();
System.out.println("领导者选举完成\n");
}
private void demonstrateLogReplication(RaftCluster cluster) {
System.out.println("--- 日志复制演示 ---");
System.out.println("1. 客户端发送请求:");
cluster.clientRequest("SET key1 value1");
cluster.clientRequest("SET key2 value2");
System.out.println("\n2. 日志复制过程:");
cluster.showLogStates();
System.out.println("日志复制完成\n");
}
private void demonstrateLeaderFailover(RaftCluster cluster) {
System.out.println("--- 领导者故障转移演示 ---");
System.out.println("1. 当前领导者故障:");
cluster.simulateLeaderFailure();
System.out.println("\n2. 重新选举:");
cluster.startElection();
System.out.println("\n3. 新领导者产生:");
cluster.showClusterState();
System.out.println("故障转移完成\n");
}
}
// Raft集群
class RaftCluster {
private java.util.List<RaftNode> nodes;
private int currentTerm = 0;
private RaftNode leader = null;
public RaftCluster(int nodeCount) {
nodes = new java.util.ArrayList<>();
for (int i = 0; i < nodeCount; i++) {
nodes.add(new RaftNode("node-" + i, this));
}
}
public void showClusterState() {
System.out.println(" 集群状态 (Term: " + currentTerm + "):");
for (RaftNode node : nodes) {
String status = node.getState().toString();
if (node == leader) status += " (LEADER)";
System.out.println(" " + node.getId() + ": " + status);
}
}
public void startElection() {
// 模拟选举超时,随机节点开始选举
RaftNode candidate = nodes.get(0);
if (candidate.getState() == NodeState.FOLLOWER) {
candidate.startElection();
}
}
public void clientRequest(String command) {
if (leader != null) {
System.out.println(" 客户端请求: " + command);
leader.handleClientRequest(command);
} else {
System.out.println(" 无可用领导者,请求失败");
}
}
public void showLogStates() {
System.out.println(" 各节点日志状态:");
for (RaftNode node : nodes) {
System.out.println(" " + node.getId() + ": " + node.getLogSize() + " entries");
}
}
public void simulateLeaderFailure() {
if (leader != null) {
System.out.println(" 模拟领导者 " + leader.getId() + " 故障");
leader.setState(NodeState.FAILED);
leader = null;
}
}
// Raft选举逻辑
public boolean requestVote(RaftNode candidate, int term) {
if (term > currentTerm) {
currentTerm = term;
return true;
}
return false;
}
public void setLeader(RaftNode newLeader, int term) {
this.leader = newLeader;
this.currentTerm = term;
// 通知所有节点新的领导者
for (RaftNode node : nodes) {
if (node != newLeader && node.getState() != NodeState.FAILED) {
node.setState(NodeState.FOLLOWER);
node.setCurrentTerm(term);
}
}
}
public java.util.List<RaftNode> getNodes() { return nodes; }
public int getCurrentTerm() { return currentTerm; }
}
// 节点状态枚举
enum NodeState {
FOLLOWER,
CANDIDATE,
LEADER,
FAILED
}
// Raft节点
class RaftNode {
private String id;
private NodeState state = NodeState.FOLLOWER;
private int currentTerm = 0;
private String votedFor = null;
private java.util.List<LogEntry> log = new java.util.ArrayList<>();
private int commitIndex = 0;
private RaftCluster cluster;
public RaftNode(String id, RaftCluster cluster) {
this.id = id;
this.cluster = cluster;
}
public void startElection() {
System.out.println(" " + id + " 开始选举");
state = NodeState.CANDIDATE;
currentTerm++;
votedFor = id;
// 请求投票
int votes = 1; // 自己的票
for (RaftNode node : cluster.getNodes()) {
if (node != this && node.getState() != NodeState.FAILED) {
if (cluster.requestVote(this, currentTerm)) {
votes++;
}
}
}
// 检查是否获得多数票
int majority = cluster.getNodes().size() / 2 + 1;
if (votes >= majority) {
becomeLeader();
} else {
state = NodeState.FOLLOWER;
}
}
private void becomeLeader() {
System.out.println(" " + id + " 成为领导者 (Term: " + currentTerm + ")");
state = NodeState.LEADER;
cluster.setLeader(this, currentTerm);
// 发送心跳
sendHeartbeat();
}
private void sendHeartbeat() {
System.out.println(" " + id + " 发送心跳");
for (RaftNode node : cluster.getNodes()) {
if (node != this && node.getState() != NodeState.FAILED) {
node.receiveHeartbeat(currentTerm);
}
}
}
public void receiveHeartbeat(int term) {
if (term >= currentTerm) {
currentTerm = term;
state = NodeState.FOLLOWER;
votedFor = null;
}
}
public void handleClientRequest(String command) {
if (state == NodeState.LEADER) {
// 创建日志条目
LogEntry entry = new LogEntry(currentTerm, log.size(), command);
log.add(entry);
System.out.println(" " + id + " 接收请求,创建日志条目: " + command);
// 复制到其他节点
replicateLog(entry);
}
}
private void replicateLog(LogEntry entry) {
System.out.println(" " + id + " 开始日志复制");
int replicationCount = 1; // 领导者自己
for (RaftNode node : cluster.getNodes()) {
if (node != this && node.getState() != NodeState.FAILED) {
if (node.appendEntry(entry)) {
replicationCount++;
}
}
}
// 检查是否达到多数
int majority = cluster.getNodes().size() / 2 + 1;
if (replicationCount >= majority) {
commitIndex = entry.getIndex();
System.out.println(" 日志条目已提交: " + entry.getCommand());
}
}
public boolean appendEntry(LogEntry entry) {
log.add(entry);
System.out.println(" " + id + " 接收日志条目: " + entry.getCommand());
return true;
}
// Getters and Setters
public String getId() { return id; }
public NodeState getState() { return state; }
public void setState(NodeState state) { this.state = state; }
public int getCurrentTerm() { return currentTerm; }
public void setCurrentTerm(int term) { this.currentTerm = term; }
public int getLogSize() { return log.size(); }
}
// 日志条目
class LogEntry {
private int term;
private int index;
private String command;
public LogEntry(int term, int index, St
剩余60%内容,订阅专栏后可继续查看/也可单篇购买
Java面试圣经 文章被收录于专栏
Java面试圣经,带你练透java圣经
查看9道真题和解析