ZooKeeper核心代码梳理

ZooKeeper核心代码梳理

1.辅助源码

1.1持久化源码

Leader和Follower中的数据会在内存和磁盘中各保存一份,所以需要将内存中的数据持久化到磁盘中。在org.apache.zookeeper.server.persistence包下的相关类都是序列化相关的代码。

image-20240209110653398

1)快照

1
2
3
4
5
6
7
8
9
10
11
12
13
public interface SnapShot {
// 反序列化方法
long deserialize(DataTree dt, Map<Long, Integer> sessions)
throws IOException;
// 序列化方法
void serialize(DataTree dt, Map<Long, Integer> sessions,
File name)
throws IOException;
// 查找最近的快照文件
File findMostRecentSnapshot() throws IOException;
// 释放资源
void close() throws IOException;
}

2)操作日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public interface TxnLog {
// 设置服务状态
void setServerStats(ServerStats serverStats);
// 滚动日志
void rollLog() throws IOException;
// 追加
boolean append(TxnHeader hdr, Record r) throws IOException;
// 读取数据
TxnIterator read(long zxid) throws IOException;
// 获取最后一个zxid
long getLastLoggedZxid() throws IOException;
// 删除日志
boolean truncate(long zxid) throws IOException;
// 获取 DbId
long getDbId() throws IOException;
// 提交
void commit() throws IOException;
// 日志同步时间
long getTxnLogSyncElapsedTime();
// 关闭日志
void close() throws IOException;
// 读取日志的接口
public interface TxnIterator {
// 获取头信息
TxnHeader getHeader();
// 获取传输的内容
Record getTxn();
// 下一条记录
boolean next() throws IOException;
// 关闭资源
void close() throws IOException;
// 获取存储的大小
long getStorageSize() throws IOException;
}
}

3)处理持久化的核心类

image-20240209111014031

1.2序列化源码

zookeeper-jute代码是关于Zookeeper序列化相关源码

image-20240209111122965

1)序列化与反序列化

1
2
3
4
5
6
7
8
public interface Record {
// 序列化方法
public void serialize(OutputArchive archive, String tag)
throws IOException;
// 反序列化方法
public void deserialize(InputArchive archive, String tag)
throws IOException;
}

2)迭代

1
2
3
4
5
6
public interface Index {
// 结束
public boolean done();
// 下一个
public void incr();
}

3)序列化支持的数据类型(反序列化同样)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public interface OutputArchive {
public void writeByte(byte b, String tag) throws IOException;
public void writeBool(boolean b, String tag) throws IOException;
public void writeInt(int i, String tag) throws IOException;
public void writeLong(long l, String tag) throws IOException;
public void writeFloat(float f, String tag) throws IOException;
public void writeDouble(double d, String tag) throws IOException;
public void writeString(String s, String tag) throws IOException;
public void writeBuffer(byte buf[], String tag)
throws IOException;
public void writeRecord(Record r, String tag) throws IOException;
public void startRecord(Record r, String tag) throws IOException;
public void endRecord(Record r, String tag) throws IOException;
public void startVector(List<?> v, String tag) throws IOException;
public void endVector(List<?> v, String tag) throws IOException;
public void startMap(TreeMap<?,?> v, String tag) throws IOException;
public void endMap(TreeMap<?,?> v, String tag) throws IOException;

}

2.ZK服务端初始化源码

image-20240209111718494

image-20240209112251104

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
protected void initializeAndRun(String[] args)
throws ConfigException, IOException, AdminServerException
{
// 管理ZK的配置信息
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
// 1.解析参数,zoo.cfg 和 myid
config.parse(args[0]);
}
// 2.启动定时任务,对过期的快照,执行删除(默认该功能关闭)
// Start and schedule the the purge task
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();
if (args.length == 1 && config.isDistributed()) {
// 3.启动集群
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// there is only server in the quorum -- run as standalone
ZooKeeperServerMain.main(args);
}
}

2.1解析配置参数

image-20240209112744457
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public void parseProperties(Properties zkProp)
throws IOException, ConfigException {
int clientPort = 0;
int secureClientPort = 0;
String clientPortAddress = null;
String secureClientPortAddress = null;
VerifyingFileFactory vff = new VerifyingFileFactory.Builder(LOG).warnForRelativePath().build();
for (Entry<Object, Object> entry : zkProp.entrySet()) {
String key = entry.getKey().toString().trim();
String value = entry.getValue().toString().trim();
if (key.equals("dataDir")) {
dataDir = vff.create(value);
} else if (key.equals("dataLogDir")) {
dataLogDir = vff.create(value);
} else if (key.equals("clientPort")) {
clientPort = Integer.parseInt(value);
} else if (key.equals("localSessionsEnabled")) {
localSessionsEnabled = Boolean.parseBoolean(value);
} else if (key.equals("localSessionsUpgradingEnabled")) {
localSessionsUpgradingEnabled = Boolean.parseBoolean(value);
} else if (key.equals("clientPortAddress")) {
clientPortAddress = value.trim();
} else if (key.equals("secureClientPort")) {
secureClientPort = Integer.parseInt(value);
} else if (key.equals("secureClientPortAddress")){
secureClientPortAddress = value.trim();
} else if (key.equals("tickTime")) {
tickTime = Integer.parseInt(value);
} else if (key.equals("maxClientCnxns")) {
maxClientCnxns = Integer.parseInt(value);
} else if (key.equals("minSessionTimeout")) {
minSessionTimeout = Integer.parseInt(value);
} else if (key.equals("maxSessionTimeout")) {
maxSessionTimeout = Integer.parseInt(value);
} else if (key.equals("initLimit")) {
initLimit = Integer.parseInt(value);
} else if (key.equals("syncLimit")) {
syncLimit = Integer.parseInt(value);
} else if (key.equals("electionAlg")) {
electionAlg = Integer.parseInt(value);
} else if (key.equals("quorumListenOnAllIPs")) {
quorumListenOnAllIPs = Boolean.parseBoolean(value);
} else if (key.equals("peerType")) {
if (value.toLowerCase().equals("observer")) {
peerType = LearnerType.OBSERVER;
} else if (value.toLowerCase().equals("participant")) {
peerType = LearnerType.PARTICIPANT;
} else
{
throw new ConfigException("Unrecognised peertype: " + value);
}
} else if (key.equals( "syncEnabled" )) {
syncEnabled = Boolean.parseBoolean(value);
} else if (key.equals("dynamicConfigFile")){
dynamicConfigFileStr = value;
} else if (key.equals("autopurge.snapRetainCount")) {
snapRetainCount = Integer.parseInt(value);
} else if (key.equals("autopurge.purgeInterval")) {
purgeInterval = Integer.parseInt(value);
}
......

// backward compatibility - dynamic configuration in the same file as
// static configuration params see writeDynamicConfig()
if (dynamicConfigFileStr == null) {
setupQuorumPeerConfig(zkProp, true);
if (isDistributed() && isReconfigEnabled()) {
// we don't backup static config for standalone mode.
// we also don't backup if reconfig feature is disabled.
backupOldConfig();
}
}
}

image-20240209113254746

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private void setupMyId() throws IOException {
// 解析ZK数据目录下的myid文件
File myIdFile = new File(dataDir, "myid");
// standalone server doesn't need myid file.
if (!myIdFile.isFile()) {
return;
}
BufferedReader br = new BufferedReader(new FileReader(myIdFile));
String myIdString;
try {
myIdString = br.readLine();
} finally {
br.close();
}
try {
// 将解析的myid值赋值给serverId
serverId = Long.parseLong(myIdString);
MDC.put("myid", myIdString);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("serverid " + myIdString
+ " is not a number");
}
}

2.2过期快照删除

可以启动定时任务,对过期的快照,执行删除,默认该功能时关闭的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
protected void initializeAndRun(String[] args)
throws ConfigException, IOException, AdminServerException
{
// 管理ZK的配置信息
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
// 1.解析参数,zoo.cfg 和 myid
config.parse(args[0]);
}
// 2.启动定时任务,对过期的快照,执行删除(默认该功能关闭)
// Start and schedule the the purge task
// config.getSnapRetainCount()) = 3 最少保留的快照个数
// config.getPurgeInterval() = 0 默认0表示关闭
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();
if (args.length == 1 && config.isDistributed()) {
// 3.启动集群
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// there is only server in the quorum -- run as standalone
ZooKeeperServerMain.main(args);
}
}

image-20240209114125079

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
static class PurgeTask extends TimerTask {
private File logsDir;
private File snapsDir;
private int snapRetainCount;

public PurgeTask(File dataDir, File snapDir, int count) {
logsDir = dataDir;
snapsDir = snapDir;
snapRetainCount = count;
}

@Override
public void run() {
LOG.info("Purge task started.");
try {
// 清理快照
PurgeTxnLog.purge(logsDir, snapsDir, snapRetainCount);
} catch (Exception e) {
LOG.error("Error occurred while purging.", e);
}
LOG.info("Purge task completed.");
}
}

public static void purge(File dataDir, File snapDir, int num) throws IOException {
if (num < 3) {
throw new IllegalArgumentException(COUNT_ERR_MSG);
}

FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir);

List<File> snaps = txnLog.findNRecentSnapshots(num);
int numSnaps = snaps.size();
if (numSnaps > 0) {
purgeOlderSnapshots(txnLog, snaps.get(numSnaps - 1));
}
}

2.3初始化通信组件

image-20240209115151002

image-20240209115246357

3.ZK服务端加载数据源码

image-20240209115429746

1)ZK中的数据模型,是一棵树DataTree,每个节点叫做DataNode

2)ZK集群中的DataTree时刻保持状态同步

3)ZK集群中每个ZK节点中,数据在内存和磁盘中都有一份完整的数据

  • 内存数据:DataTree

  • 磁盘数据:快照文件 + 编辑日志

image-20240209115719215

3.1恢复快照数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public synchronized void start() {
if (!getView().containsKey(myid)) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
// 冷启动数据恢复
loadDataBase();
startServerCnxnFactory();
try {
// 启动通信工厂实例对象
adminServer.start();
} catch (AdminServerException e) {
LOG.warn("Problem starting AdminServer", e);
System.out.println(e);
}
// 准备选举环境
startLeaderElection();
// 执行选举
super.start();
}

private void loadDataBase() {
try {
// 加载磁盘数据到内存,恢复DataTree
// zk 的操作分两种:事务操作和非事务操作
// 事务操作:zk.cteate();都会被分配一个全局唯一的 zxid,zxid 组成:64 位:
//(前 32 位:epoch 每个 leader 任期的代号;后 32 位:txid 为事务 id)
// 非事务操作:zk.getData()
// 数据恢复过程:
// (1)从快照文件中恢复大部分数据,并得到一个 lastProcessZXid
// (2)再从编辑日志中执行 replay,执行到最后一条日志并更新 lastProcessZXid
// (3)最终得到,datatree 和 lastProcessZXid,表示数据恢复完成
zkDb.loadDataBase();
// load the epochs
long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
......
}
......
}

public long loadDataBase() throws IOException {
long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
initialized = true;
return zxid;
}

image-20240209120833635

image-20240209121647889

3.2恢复日志数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public long fastForwardFromEdits(DataTree dt, Map<Long, Integer> sessions,
PlayBackListener listener) throws IOException {
// 在此之前,已经从快照文件中恢复了大部分数据,接下来只需从快照的 zxid + 1位置开始恢复
TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
// 快照中最大的 zxid,在执行编辑日志时,这个值会不断更新,直到所有操作执行完
long highestZxid = dt.lastProcessedZxid;
TxnHeader hdr;
try {
// 从 lastProcessedZxid 事务编号器开始,不断的从编辑日志中恢复剩下的还没有恢复的数据
while (true) {
// iterator points to
// the first valid txn when initialized
// 获取事务头信息(有 zxid)
hdr = itr.getHeader();
if (hdr == null) {
//empty logs
return dt.lastProcessedZxid;
}
if (hdr.getZxid() < highestZxid && highestZxid != 0) {
LOG.error("{}(highestZxid) > {}(next log) for type {}",
highestZxid, hdr.getZxid(), hdr.getType());
} else {
highestZxid = hdr.getZxid();
}
try {
processTransaction(hdr,dt,sessions, itr.getTxn());
} catch(KeeperException.NoNodeException e) {
throw new IOException("Failed to process transaction type: " +
hdr.getType() + " error: " + e.getMessage(), e);
}
listener.onTxnLoaded(hdr, itr.getTxn());
if (!itr.next())
break;
}
} finally {
if (itr != null) {
itr.close();
}
}
return highestZxid;
}

4.ZK选举源码

image-20240209132602508

image-20240209132722112

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public synchronized void start() {
if (!getView().containsKey(myid)) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
// 冷启动数据恢复
loadDataBase();
startServerCnxnFactory();
try {
// 启动通信工厂实例对象
adminServer.start();
} catch (AdminServerException e) {
LOG.warn("Problem starting AdminServer", e);
System.out.println(e);
}
// 准备选举环境
startLeaderElection();
// 执行选举
super.start();
}

image-20240209133056306

image-20240209133412635

1
2
3
4
5
6
7
8
9
10
11
public QuorumCnxManager createCnxnManager() {
return new QuorumCnxManager(this,
this.getId(),
this.getView(),
this.authServer,
this.authLearner,
this.tickTime * this.syncLimit,
this.getQuorumListenOnAllIPs(),
this.quorumCnxnThreadsSize,
this.isQuorumSaslAuthEnabled());
}

image-20240209133632795

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){
this.stop = false;
this.manager = manager;
starter(self, manager);
}

private void starter(QuorumPeer self, QuorumCnxManager manager) {
this.self = self;
proposedLeader = -1;
proposedZxid = -1;

sendqueue = new LinkedBlockingQueue<ToSend>();
recvqueue = new LinkedBlockingQueue<Notification>();
this.messenger = new Messenger(manager);
}

public void start() {
this.messenger.start();
}

image-20240209134138842