ZooKeeper核心代码梳理 1.辅助源码 1.1持久化源码 Leader和Follower中的数据会在内存和磁盘中各保存一份,所以需要将内存中的数据持久化到磁盘中。在org.apache.zookeeper.server.persistence包下的相关类都是序列化相关的代码。
1)快照
1 2 3 4 5 6 7 8 9 10 11 12 13 public interface SnapShot { long deserialize (DataTree dt, Map<Long, Integer> sessions) throws IOException; void serialize (DataTree dt, Map<Long, Integer> sessions, File name) throws IOException; File findMostRecentSnapshot () throws IOException; void close () throws IOException; }
2)操作日志
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public interface TxnLog { void setServerStats (ServerStats serverStats) ; void rollLog () throws IOException; boolean append (TxnHeader hdr, Record r) throws IOException; TxnIterator read (long zxid) throws IOException; long getLastLoggedZxid () throws IOException; boolean truncate (long zxid) throws IOException; long getDbId () throws IOException; void commit () throws IOException; long getTxnLogSyncElapsedTime () ; void close () throws IOException; public interface TxnIterator { TxnHeader getHeader () ; Record getTxn () ; boolean next () throws IOException; void close () throws IOException; long getStorageSize () throws IOException; } }
3)处理持久化的核心类
1.2序列化源码 zookeeper-jute代码是关于Zookeeper序列化相关源码
1)序列化与反序列化
1 2 3 4 5 6 7 8 public interface Record { public void serialize (OutputArchive archive, String tag) throws IOException; public void deserialize (InputArchive archive, String tag) throws IOException; }
2)迭代
1 2 3 4 5 6 public interface Index { public boolean done () ; public void incr () ; }
3)序列化支持的数据类型(反序列化同样)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public interface OutputArchive { public void writeByte (byte b, String tag) throws IOException; public void writeBool (boolean b, String tag) throws IOException; public void writeInt (int i, String tag) throws IOException; public void writeLong (long l, String tag) throws IOException; public void writeFloat (float f, String tag) throws IOException; public void writeDouble (double d, String tag) throws IOException; public void writeString (String s, String tag) throws IOException; public void writeBuffer (byte buf[], String tag) throws IOException; public void writeRecord (Record r, String tag) throws IOException; public void startRecord (Record r, String tag) throws IOException; public void endRecord (Record r, String tag) throws IOException; public void startVector (List<?> v, String tag) throws IOException; public void endVector (List<?> v, String tag) throws IOException; public void startMap (TreeMap<?,?> v, String tag) throws IOException; public void endMap (TreeMap<?,?> v, String tag) throws IOException; }
2.ZK服务端初始化源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 protected void initializeAndRun (String[] args) throws ConfigException, IOException, AdminServerException { QuorumPeerConfig config = new QuorumPeerConfig (); if (args.length == 1 ) { config.parse(args[0 ]); } DatadirCleanupManager purgeMgr = new DatadirCleanupManager (config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval()); purgeMgr.start(); if (args.length == 1 && config.isDistributed()) { runFromConfig(config); } else { LOG.warn("Either no config or no quorum defined in config, running " + " in standalone mode" ); ZooKeeperServerMain.main(args); } }
2.1解析配置参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 public void parseProperties (Properties zkProp) throws IOException, ConfigException { int clientPort = 0 ; int secureClientPort = 0 ; String clientPortAddress = null ; String secureClientPortAddress = null ; VerifyingFileFactory vff = new VerifyingFileFactory .Builder(LOG).warnForRelativePath().build(); for (Entry<Object, Object> entry : zkProp.entrySet()) { String key = entry.getKey().toString().trim(); String value = entry.getValue().toString().trim(); if (key.equals("dataDir" )) { dataDir = vff.create(value); } else if (key.equals("dataLogDir" )) { dataLogDir = vff.create(value); } else if (key.equals("clientPort" )) { clientPort = Integer.parseInt(value); } else if (key.equals("localSessionsEnabled" )) { localSessionsEnabled = Boolean.parseBoolean(value); } else if (key.equals("localSessionsUpgradingEnabled" )) { localSessionsUpgradingEnabled = Boolean.parseBoolean(value); } else if (key.equals("clientPortAddress" )) { clientPortAddress = value.trim(); } else if (key.equals("secureClientPort" )) { secureClientPort = Integer.parseInt(value); } else if (key.equals("secureClientPortAddress" )){ secureClientPortAddress = value.trim(); } else if (key.equals("tickTime" )) { tickTime = Integer.parseInt(value); } else if (key.equals("maxClientCnxns" )) { maxClientCnxns = Integer.parseInt(value); } else if (key.equals("minSessionTimeout" )) { minSessionTimeout = Integer.parseInt(value); } else if (key.equals("maxSessionTimeout" )) { maxSessionTimeout = Integer.parseInt(value); } else if (key.equals("initLimit" )) { initLimit = Integer.parseInt(value); } else if (key.equals("syncLimit" )) { syncLimit = Integer.parseInt(value); } else if (key.equals("electionAlg" )) { electionAlg = Integer.parseInt(value); } else if (key.equals("quorumListenOnAllIPs" )) { quorumListenOnAllIPs = Boolean.parseBoolean(value); } else if (key.equals("peerType" )) { if (value.toLowerCase().equals("observer" )) { peerType = LearnerType.OBSERVER; } else if (value.toLowerCase().equals("participant" )) { peerType = LearnerType.PARTICIPANT; } else { throw new ConfigException ("Unrecognised peertype: " + value); } } else if (key.equals( "syncEnabled" )) { syncEnabled = Boolean.parseBoolean(value); } else if (key.equals("dynamicConfigFile" )){ dynamicConfigFileStr = value; } else if (key.equals("autopurge.snapRetainCount" )) { snapRetainCount = Integer.parseInt(value); } else if (key.equals("autopurge.purgeInterval" )) { purgeInterval = Integer.parseInt(value); } ...... if (dynamicConfigFileStr == null ) { setupQuorumPeerConfig(zkProp, true ); if (isDistributed() && isReconfigEnabled()) { backupOldConfig(); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private void setupMyId () throws IOException { File myIdFile = new File (dataDir, "myid" ); if (!myIdFile.isFile()) { return ; } BufferedReader br = new BufferedReader (new FileReader (myIdFile)); String myIdString; try { myIdString = br.readLine(); } finally { br.close(); } try { serverId = Long.parseLong(myIdString); MDC.put("myid" , myIdString); } catch (NumberFormatException e) { throw new IllegalArgumentException ("serverid " + myIdString + " is not a number" ); } }
2.2过期快照删除 可以启动定时任务,对过期的快照,执行删除,默认该功能时关闭的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 protected void initializeAndRun (String[] args) throws ConfigException, IOException, AdminServerException { QuorumPeerConfig config = new QuorumPeerConfig (); if (args.length == 1 ) { config.parse(args[0 ]); } DatadirCleanupManager purgeMgr = new DatadirCleanupManager (config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval()); purgeMgr.start(); if (args.length == 1 && config.isDistributed()) { runFromConfig(config); } else { LOG.warn("Either no config or no quorum defined in config, running " + " in standalone mode" ); ZooKeeperServerMain.main(args); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 static class PurgeTask extends TimerTask { private File logsDir; private File snapsDir; private int snapRetainCount; public PurgeTask (File dataDir, File snapDir, int count) { logsDir = dataDir; snapsDir = snapDir; snapRetainCount = count; } @Override public void run () { LOG.info("Purge task started." ); try { PurgeTxnLog.purge(logsDir, snapsDir, snapRetainCount); } catch (Exception e) { LOG.error("Error occurred while purging." , e); } LOG.info("Purge task completed." ); } } public static void purge (File dataDir, File snapDir, int num) throws IOException { if (num < 3 ) { throw new IllegalArgumentException (COUNT_ERR_MSG); } FileTxnSnapLog txnLog = new FileTxnSnapLog (dataDir, snapDir); List<File> snaps = txnLog.findNRecentSnapshots(num); int numSnaps = snaps.size(); if (numSnaps > 0 ) { purgeOlderSnapshots(txnLog, snaps.get(numSnaps - 1 )); } }
2.3初始化通信组件
3.ZK服务端加载数据源码
1)ZK中的数据模型,是一棵树DataTree,每个节点叫做DataNode
2)ZK集群中的DataTree时刻保持状态同步
3)ZK集群中每个ZK节点中,数据在内存和磁盘中都有一份完整的数据
内存数据:DataTree
磁盘数据:快照文件 + 编辑日志
3.1恢复快照数据 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public synchronized void start () { if (!getView().containsKey(myid)) { throw new RuntimeException ("My id " + myid + " not in the peer list" ); } loadDataBase(); startServerCnxnFactory(); try { adminServer.start(); } catch (AdminServerException e) { LOG.warn("Problem starting AdminServer" , e); System.out.println(e); } startLeaderElection(); super .start(); } private void loadDataBase () { try { zkDb.loadDataBase(); long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid; long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid); ...... } ...... } public long loadDataBase () throws IOException { long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener); initialized = true ; return zxid; }
3.2恢复日志数据 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public long fastForwardFromEdits (DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException { TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1 ); long highestZxid = dt.lastProcessedZxid; TxnHeader hdr; try { while (true ) { hdr = itr.getHeader(); if (hdr == null ) { return dt.lastProcessedZxid; } if (hdr.getZxid() < highestZxid && highestZxid != 0 ) { LOG.error("{}(highestZxid) > {}(next log) for type {}" , highestZxid, hdr.getZxid(), hdr.getType()); } else { highestZxid = hdr.getZxid(); } try { processTransaction(hdr,dt,sessions, itr.getTxn()); } catch (KeeperException.NoNodeException e) { throw new IOException ("Failed to process transaction type: " + hdr.getType() + " error: " + e.getMessage(), e); } listener.onTxnLoaded(hdr, itr.getTxn()); if (!itr.next()) break ; } } finally { if (itr != null ) { itr.close(); } } return highestZxid; }
4.ZK选举源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public synchronized void start () { if (!getView().containsKey(myid)) { throw new RuntimeException ("My id " + myid + " not in the peer list" ); } loadDataBase(); startServerCnxnFactory(); try { adminServer.start(); } catch (AdminServerException e) { LOG.warn("Problem starting AdminServer" , e); System.out.println(e); } startLeaderElection(); super .start(); }
1 2 3 4 5 6 7 8 9 10 11 public QuorumCnxManager createCnxnManager () { return new QuorumCnxManager (this , this .getId(), this .getView(), this .authServer, this .authLearner, this .tickTime * this .syncLimit, this .getQuorumListenOnAllIPs(), this .quorumCnxnThreadsSize, this .isQuorumSaslAuthEnabled()); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public FastLeaderElection (QuorumPeer self, QuorumCnxManager manager) { this .stop = false ; this .manager = manager; starter(self, manager); } private void starter (QuorumPeer self, QuorumCnxManager manager) { this .self = self; proposedLeader = -1 ; proposedZxid = -1 ; sendqueue = new LinkedBlockingQueue <ToSend>(); recvqueue = new LinkedBlockingQueue <Notification>(); this .messenger = new Messenger (manager); } public void start () { this .messenger.start(); }