欢迎大家关注 github.com/hsfxuebao ,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈
在前面一篇已经分析过了FLE的原理流程以及通信结构,接下来便详细分析一下ZK集群在建立通信结构源码层面的操作流程,在进行接下来的源码通信流程前需要对这个流程有一个大概的认识,否则很容易分析到一半便开始失去思路。本篇只分析创建集群通信对的流程,具体的选举源码留到下篇再来分析。
本篇是对上一篇的补充,建议在看这次的源码分析时可以对照着上一篇的流程图来看,更容易跳出代码知道在流程中的作用,上一篇的链接:FLE(FastLeaderElection)算法集群选举通信原理及流程结构(类解读)
注:本篇基于ZK版本3.7分析的。
1.源码分析
本次的源码分析步骤和以前不太一样,由于ZK集群选举时涉及三种角色:Leader、Follower和Observer,三种不同角色的源码流程是不一样的,因此本次源码将会以Leader和Follower两种角色的源码流程分析,从开始选举流程通用的流程开始,当各个机器有不同的流程时再分开依次分析,Observer由于不参与选举,因此本次源码分析忽略。
本次源码分析假设ZK集群有三台机器:
A机器:myid=1,启动时间最早;
B机器:myid=3,启动时间在A之后;
C机器:myid=5,启动时间最后。
接下来将会以这三台机器为例开始逐步分析ZK的选举流程。构建集群内部通信对源码解析整体流程图:
2. QuorumPeer对象发起投票
在看过前面有一篇的启动组价及流程可以知道,ZK集群内的每台机器都会有一个QuorumPeer集群对象,此对象是个线程对象,用来监听本机的状态:1、选举流程状态;2、确认角色后的数据同步流程状态,起到一个统筹兼顾的作用。关键源码如下:
public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider { // 集群对象会含有三种不同的角色对象,如果机器在选举时被表明了是什么角色时 // 对应的对象将会被初始化,代表着本机器的角色,执行相应的操作 public Follower follower; public Leader leader; public Observer observer; // 代表着本机的当前投票 volatile private Vote currentVote; @Override public void run() { // 源码中这里有个流程是用来注册JMX对象的,这里和选举流程无关因此忽略 try { // 正式开始ZK集群执行流程,这里会有三种情况: // 1、如果在选举流程,peerState将一直会是LOOLING,直到集群选举出 // Leader;2、当选出Leader后,本机器的peerState将会变成对应的状态 // 直到Leader宕机不得不选举出新的Leader;3、每一次新的轮询都代表 // 着本机的角色发生了改变,执行的作用也发生改变。 while (running) { switch (getPeerState()) { // 代表本机正在进行选举流程 case LOOKING: // 本机是否开启只读模式,有兴趣的可以去看下,本次只分析 // 普通正常的流程 if (Boolean.getBoolean("readonlymode.enabled")) { // 忽略 ... } else { try { // 选举前先把之前的投票清空,以免对选举流程产生误导 setBCVote(null); // 设置本次投票结果,lookForLeader()方法里面将会 // 一直轮询和集群内的机器进行通信,直到选举出新的 // Leader或者发生了异常情况 setCurrentVote( makeLEStrategy().lookForLeader()); } catch (Exception e) { // 如果发生了异常情况则设置本机的状态为选举中 // 以便进入下一次选举流程 setPeerState(ServerState.LOOKING); } } break; // 代表本机已经确认为Observer角色,正在集群内进行观察 case OBSERVING: // 选举流程中的Observer不起作用,因此这个流程暂不分析,等到 // 下一篇分析ZK集群的数据同步再来具体分析其作用 break; // 代表本机已经确认为Follower角色,正在跟随Leader case FOLLOWING: try { // 本机器的上一次轮询确定出了本机器为Follower角色 setFollower(makeFollower(logFactory)); // 开始执行Follower角色的工作:跟随Leader机器 follower.followLeader(); } finally { // 当本次Follower跟随的集群发生了异常时将会改变本机的 // 角色,重新设置成LOOKING状态选举出新Leader // 异常情况:1、Leader宕机,导致本集群不得不重新选举; // 2、本集群内其它的Follower宕机超过半数导致Leader // 投票数低于总数一半,进行重新选举。 // 如果是本机宕机程序直接死亡,不会进入到Finally块 follower.shutdown(); setFollower(null); setPeerState(ServerState.LOOKING); } break; // 代表本机已经确认为Leader角色,正在领导集群内的各个机器 case LEADING: try { // 本机器的上一次轮询确定出了本机器为Leader角色 setLeader(makeLeader(logFactory)); // 开始执行Leader角色的工作:作为集群中心发送同步命令 leader.lead(); // 退出了lead()方法说明集群的Leader发生了变化,需要 // 选举出新的Leader setLeader(null); }finally { // 关闭当前Leader对象并设置状态LOOKING开始准备下一次 // 选举流程 if (leader != null) { leader.shutdown("Forcing shutdown"); setLeader(null); } setPeerState(ServerState.LOOKING); } break; } } } finally { // 执行到这说明本机器的ZK服务被关闭,将会关闭机器的对象并退出 } }}
3. FastLeaderElection选举发送通知
无论是刚刚启动或者是上一代的Leader退位开始选举新的Leader,各个机器在开始选举流程时的状态都是LOOKING,都会执行FastLeaderElection选举对象的公共流程。接下来便分析一下这个对象的公共流程,关键源码如下:
public class FastLeaderElection implements Election { // 本机器的集群对象 QuorumPeer self; // 选举流程时的逻辑迭代数,每调用一次lookForLeader进行选举时该值会+1 // 发送到其它机器上时对应Notification对象的electionEpoch属性 volatile long logicalclock; // 本机推崇将要当选leader的myid,对应Notification对象的leader,可以看成是 // 某个机器的id long proposedLeader; // 本机推崇将要当选leader的zxid,对应Notification对象的zxid long proposedZxid; // 本机推崇将要当选leader的epoch,对应Notification对象的peerEpoch long proposedEpoch; public Vote lookForLeader() throws InterruptedException { // 开始在集群内选举Leader,注册LeaderElection到JMX忽略 if (self.start_fle == 0) { // 记录FLE算法的开始时间 self.start_fle = System.currentTimeMillis(); } try { // 本集合key为leaderId,value为对应id的投票信息,集合将会记录 // 本次投票的各个机器投票情况 HashMap<Long, Vote> recvset = new HashMap<Long, Vote>(); // 新加入的机器用来记录集群内其它机器的投票情况 HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>(); // 每次轮询其它机器发来消息的间隔时间,固定200毫秒执行一次 int notTimeout = finalizeWait; synchronized(this){ // 逻辑选举次数+1,代表本机器有一次执行了重新选举Leader的操作 logicalclock++; // 投票前先把本机器的投票信息投给自己,getInitId()为本机器的 // myid值,getInitLastLoggedZxid()为本机器的zxid值 // getPeerEpoch()为本机器的currentEpoch值 updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } // 对集群内的各个机器发送消息通知,告诉他们我选举自己当选Leader // 此时各个机器的通信对已经创建完毕,因此可以将消息发送给集群内的 // 各个机器,结果为A->B、A->C通知A当选Leader,B->A,B->C通知B当选 // Leader,C->B、C->A通知C当选Leader,例子中的三台机器每台机器都 // 会向集群内其它两台机器发送当选本机器为Leqader的消息通知,当然 // 也会通知自己,但是通知自己不会经过网络通信 sendNotifications(); // 发完通知消息后开始轮询其它机器的消息 while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){ // 轮询集合内是否有其它机器发来的消息,在本次三台机器的集群中, // recvqueue.poll()方法一定可以轮询出三个响应消息,其中一个 // 消息通知为本系统在前面的sendNotifications()方法发出的,没 // 经过网络通信,而是直接放在了本机的集合中等待处理 Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS); // 后续收到通知处理流程这里暂不分析,等分析完本机器发送完通知后 // 再逐个分析 ... } } } synchronized void updateProposal(long leader, long zxid, long epoch){ // 更新本机器记录的Leader信息,投票前把这些信息改成本机器的,即先把票 // 投给自己 proposedLeader = leader; proposedZxid = zxid; proposedEpoch = epoch; } private void sendNotifications() { // 轮询配置文件中所配置的各个Server信息,并向每台机器发送通知 for (QuorumServer server : self.getVotingView().values()) { long sid = server.id; // 将本机器的信息封装,并发给myid为sid的机器 ToSend notmsg = new ToSend(ToSend.mType.notification, proposedLeader,// 第一次发送此值为本机器的myid proposedZxid,// 第一次发送此值为本机器的zxid logicalclock,// 第一次发送此值为本机器的logicalclock QuorumPeer.ServerState.LOOKING,// 本机器流程为LOOKING sid,// 目标机器的myid proposedEpoch);// 第一次发送此值为本机器的currentEpoch // 放入sendqueue集合中以便本选举对象的WorkerSender发送这些 // 通知消息给其它的机器 sendqueue.offer(notmsg); } }}
4. WorkerSender选择机器并发送通知
前面已经说过了FLE对象将会把投票信息放入到sendqueue集合中,而这个集合便是FLE对象和WorkerSender对象的通信集合。接下来看下WorkerSender在拿到这些消息对象执行了什么操作:
class WorkerSender extends ZooKeeperThread { // 集群连接管理对象,WorkerSender实际上是该对象的内部类 QuorumCnxManager manager; // 将要使用通信对发送消息的消息存储队列集合,通信对发送消息时将会从该集合中 // 取出消息对象并使用Socket通信发送给对应的机器 LinkedBlockingQueue<ToSend> sendqueue; public void run() { // 启动通信对的发送信息对象后本方法将会被执行,直到该对象被调用finish() // 方法销毁 while (!stop) { try { // 从消息队列集合中获取需要发送的消息对象,固定阻塞3s,如果 // 没有轮询到则返回null ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS); // 如果为null说明暂时没有消息发送,继续轮回 if(m == null) continue; // 如果不为空则说明有需要发送的消息,调用process发送消息对象 process(m); } catch (InterruptedException e) { // 如果轮询集合发生异常则退出 break; } } } void process(ToSend m) { // 将需要发送的消息转换成Socket方便发送的ByteBuffer缓存对象 ByteBuffer requestBuffer = buildMsg(m.state.ordinal(), m.leader, m.zxid, m.electionEpoch, m.peerEpoch); // 通知连接管理对象需要发送requestBuffer对象中的信息 manager.toSend(m.sid, requestBuffer); } static ByteBuffer buildMsg(int state, long leader, long zxid, long electionEpoch, long epoch) { // 生成ByteBuffer对象并封装byte[]数组,这里需要特别说明下各个参数和 // 在FLE中参数的对应关系 byte requestBytes[] = new byte[40]; ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes); requestBuffer.clear(); // 对应PeerQuorum中的peerState,此时值为LOOKING requestBuffer.putInt(state); // 对应PeerQuorum中的proposedLeader,刚开始选举为本机器的myid requestBuffer.putLong(leader); // 对应PeerQuorum中的proposedZxid,刚开始选举为本机器的zxid requestBuffer.putLong(zxid); // 对应PeerQuorum中的logicclock,代表本次选举的迭代数 requestBuffer.putLong(electionEpoch); // 对应PeerQuorum中的proposedEpoch,选举开始为本机器的currentEpoch requestBuffer.putLong(epoch); // 默认版本信息,接收到后会设置为接收消息的version属性 requestBuffer.putInt(Notification.CURRENTVERSION); return requestBuffer; }}
5. QuorumCnxManager连接机器并发送消息
在将需要发送的消息转换成Socket发送消息的对象ByteBuffer后,现在面临一个问题:那便是本机器还尚未和其它机器简历Socket长连接通信,而QuorumCnxManager的职责便是管理连接,它会帮我们解决这个问题。关键源码如下:
public class QuorumCnxManager { // 每次发送消息的数量,固定是一,确保消息可以有序安全的发送出去 static final int SEND_CAPACITY = 1; // QuorumCnxManager对象和外界对象进行交互消息交互的集合中介,往这个集合中 // 放入数据说明一个问题:RecvWorker已经收到了其它机器的消息并处理转换完成 public final ArrayBlockingQueue<Message> recvQueue; // 接收集合recvQueue的容量 static final int RECV_CAPACITY = 100; // 将要发送给某个机器的ByteBuffer集合,key为发送机器的sid,value为单个消息 // 元素的阻塞队列,确保每次只发送一条消息(ArrayBlockingQueue长度固定) final Map<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap; // 保存和集群内另一台机器通信对的集合,key为另一台机器的myid,value则是 // 本机器与其通信的通信对 final ConcurrentHashMap<Long, SendWorker> senderWorkerMap; public void toSend(Long sid, ByteBuffer b) { // 如果要发送的myid等于本机器的id,不用发送,直接放入recvQueue集合中 // 需要注意的是recvQueue集合和前面在FLE对象中提到的recvqueue集合很像 // 这里做个简单说明:recvQueue集合是和FLE中的WorkerReceiver进行交互的 // recvqueue集合则是WorkerReceiver和真正的FLE对象交互的。交互对象需要 // 搞清楚,要不然看源码的时候很容易迷糊 if (self.getId() == sid) { b.position(0); // 直接添加到recvQueue集合中,相当于已经通过RecvWorker收到了消息 // 但是由于是发给自己的,因此忽略了RecvWorker这一步 addToRecvQueue(new Message(b.duplicate(), sid)); } else { // 如果集合中还没有sid的阻塞队列,则进行创建并放入到集合中 BlockingQueue<ByteBuffer> bq = queueSendMap.computeIfAbsent(sid, serverId -> new CircularBlockingQueue<>(SEND_CAPACITY)); // 再将需要发送的ByteBuffer对象消息放入到阻塞队列中 addToSendQueue(bq, b); // 真正开始根据sid去和对应的机器创建Socket长通信 connectOne(sid); } } public void addToRecvQueue(final Message msg) { // 将最新需要发送的消息添加到集合中 final boolean success = this.recvQueue.offer(msg); if (!success) { throw new RuntimeException("Could not insert into receive queue"); } } synchronized boolean connectOne(long sid, MultipleAddresses electionAddr) { if (senderWorkerMap.get(sid) != null) { if (self.isMultiAddressEnabled() && electionAddr.size() > 1 && self.isMultiAddressReachabilityCheckEnabled()) { senderWorkerMap.get(sid).asyncValidateIfSocketIsStillReachable(); } return true; } // 如果和一台机器的myid为sid没有创建过通信对则准备创建 return initiateConnectionAsync(electionAddr, sid); } public boolean initiateConnectionAsync(final MultipleAddresses electionAddr, final Long sid) { if (!inprogressConnections.add(sid)) { return true; } try { // todo connectionExecutor.execute(new QuorumConnectionReqThread(electionAddr, sid)); connectionThreadCnt.incrementAndGet(); } ... return true; } // 调用QuorumConnectionReqThread的run()方法 private class QuorumConnectionReqThread extends ZooKeeperThread { final MultipleAddresses electionAddr; final Long sid; QuorumConnectionReqThread(final MultipleAddresses electionAddr, final Long sid) { super("QuorumConnectionReqThread-" + sid); this.electionAddr = electionAddr; this.sid = sid; } @Override public void run() { try { initiateConnection(electionAddr, sid); } finally { inprogressConnections.remove(sid); } } } public void initiateConnection(final MultipleAddresses electionAddr, final Long sid) { // 创建Socket对象 Socket sock = null; try { LOG.debug("Opening channel to server {}", sid); if (self.isSslQuorum()) { sock = self.getX509Util().createSSLSocket(); } else { sock = SOCKET_FACTORY.get(); } // 在这个方法中设置timeout setSockOpts(sock); // 连接另外一台参与选举的机器,并且设置连接时间为5s sock.connect(electionAddr.getReachableOrOne(), cnxTO); ... } ... try { // todo startConnection(sock, sid); } ... } private boolean startConnection(Socket sock, Long sid) throws IOException { DataOutputStream dout = null; DataInputStream din = null; LOG.debug("startConnection (myId:{} --> sid:{})", self.getId(), sid); try { // Use BufferedOutputStream to reduce the number of IP packets. This is // important for x-DC scenarios. BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream()); // 连接上另一台myid为sid的机器后立马向其发送本机器的myid dout = new DataOutputStream(buf); ... dout.write(addr_bytes); dout.flush(); din = new DataInputStream(new BufferedInputStream(sock.getInputStream())); } ... // authenticate learner QuorumPeer.QuorumServer qps = self.getVotingView().get(sid); if (qps != null) { // TODO - investigate why reconfig makes qps null. authLearner.authenticate(sock, qps.hostname); } // If lost the challenge, then drop the new connection // 这里是创建集群内通信结构的关键点之一,即在上一篇中写过的complete事件 // 在本次分析源码的假设三台机器A、B、C中,A的sid最小为1,B的sid居中为3 // C的sid最大为5,因此在各个机器中,A将会由于sid小于其它的机器而无法主动 // 建立通信对,B只能主动对A建立通信对,而C可以主动向B和A建立通信对。A和B // 的被动连接将会在后续分析Listener类中讲解。 // 简而言之,sid大的->sid小的=大的建立通信对,sid小的->sid大的=关闭连接 if (sid > self.getId()) { LOG.info("Have smaller server identifier, so dropping the connection: (myId:{} --> sid:{})", self.getId(), sid); // 当A->B、A->C和B->C这三种情况时会进入到这里面主动关闭本Socket // 解释:sid小的主动连接sid大的会主动关闭Socket连接。显示场景为: // B和C机器都已经启动了,而A是最后启动的,此时A机器执行到了这里, // A机器会主动的关闭连接 closeSocket(sock); // Otherwise proceed with the connection } else { LOG.debug("Have larger server identifier, so keeping the connection: (myId:{} --> sid:{})", self.getId(), sid); // 当C->A、C->B和B->A这三种情况时会进入到这里面主动创建通信对 // 解释:sid大的主动连接sid小的将会在本机器中主动创建通信对 // 根据传入的Socket对象创建通信对,需要注意的是通信对里面的sid是 // 需要进行通信的机器sid,而不是本机器的 // 现实场景为:A和B机器已经创建了,C最后启动的,此时C机器由于sid比 // A和B要大,因此会执行到这里,主动创建和A、B的通信对 SendWorker sw = new SendWorker(sock, sid); RecvWorker rw = new RecvWorker(sock, din, sid, sw); sw.setRecv(rw); // 获取以前选举通信时可能存在的通信对对象 SendWorker vsw = senderWorkerMap.get(sid); // 如果原来senderWorkerMap中有了sid对应的通信对,则拿出来主动销毁 // 因为通信对都是线程对象,可能存在以前选举时残留的数据,需要主动的 // 清空并关闭Socket连接,重新使用新的通信对对象 if (vsw != null) { vsw.finish(); } // 以sid为key,通信对为value放入到senderWorkerMap集合中 senderWorkerMap.put(sid, sw); // 如果消息发送集合中没有key为sid的阻塞队列则先创建放入集合中 // 再做一次确认,但在刚刚的流程中queueSendMap肯定已经被初始化并 // 放入了需要发送的数据的 queueSendMap.putIfAbsent(sid, new CircularBlockingQueue<>(SEND_CAPACITY)); // 启动发送消息线程对象,开始监听queueSendMap对象的阻塞队列 sw.start(); // 启动接收消息线程对象,用来接收对应机器发送来的消息 rw.start(); return true; } return false; } }
经过了这个流程,在三台机器中的通信对情况如下图:
6 Listener监听Socket连接
在上面的四个流程中,经过在QuorumCnxManager对象建立连接后sid大的机器已经主动创建完了通信对,形成了上面图示的通信对情况,接下来要分析的是sid小的机器被动创建通信对的流程。关键源码如下:
public class Listener extends ZooKeeperThread { // 使用配置的electionPort端口+本机的地址创建的服务Socket,用来被动的和其它 // 机器进行交互(与其说被动的不如说给sid小的机器主动通信的机会) volatile ServerSocket ss = null; // 用负数来记录观察者的数量,并为其赋值负值来标明唯一性 private long observerCounter = -1; // 保存和集群内另一台机器通信对的集合,key为另一台机器的myid,value则是 // 本机器与其通信的通信对 final ConcurrentHashMap<Long, SendWorker> senderWorkerMap; // 将要发送给某个机器的ByteBuffer集合,key为发送机器的sid,value为单个消息 // 元素的阻塞队列,确保每次只发送一条消息(ArrayBlockingQueue长度固定) final Map<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap; @Override public void run() { int numRetries = 0; InetSocketAddress addr; // IO失败可重试三次 while((!shutdown) && (numRetries < 3)){ try { // 创建服务Socket对象 ss = new ServerSocket(); ss.setReuseAddress(true); // 获取本机器在配置中所配置的端口或者地址 if (self.getQuorumListenOnAllIPs()) { int port = self.quorumPeers.get(self.getId()). electionAddr.getPort(); addr = new InetSocketAddress(port); } else { addr = self.quorumPeers.get(self.getId()) .electionAddr; } // 设置本listener的地址名称 setName(self.quorumPeers.get(self.getId()).electionAddr .toString()); // 将服务Socket对象绑定该地址 ss.bind(addr); while (!shutdown) { // 开始接收其它机器发送过来的连接请求,sid大的或者sid小的 // 都会发送连接请求,在前面分析过,sid小的对sid大的机器发 // 送连接之后会主动关闭连接,其对sid大的机器创建通信对的操 // 作便是放在这个流程中 Socket client = ss.accept(); // 设置timeout时间为tickTime*syncLimit setSockOpts(client); // 接收到其它机器的请求后开始处理 receiveConnection(client); // 重试次数重置为0 numRetries = 0; } }// 异常捕获忽略 ... } } public void receiveConnection(Socket sock) { // 从名字也可以看出来这个方法就是用来接收Socket连接并处理的 // 和刚刚分析过的initiateConnection方法作用类似,只是 // initiateConnection方法是让sid大的主动创建通信对,而这个方法 // 则是让sid小的被动创建通信对 Long sid = null; try { // 在上面的initiateConnection方法中说了,在判断sid的大小值并处理 // 之前,连上Socket的第一件事便是把本机器的myid发送出去。举个例子: // A->C,由于A的sid比C的小,因此A不会主动创建和C的通信对,但连接 // 之后A会立马把自己的myid发送给C,而C->A时C也会主动的把自己的myid // 发送给A,从而各自触发Listener监听 DataInputStream din = new DataInputStream(sock .getInputStream()); // 读取其它机器发送过来的myid sid = din.readLong(); // 第一次接收的sid可能是version版本号 if (sid < 0) { // 如果是版本号则再次读取sid sid = din.readLong(); // 判断是否有剩余的数组需要读取 int num_remaining_bytes = din.readInt(); // 如果接下来的数据长度为负数或者大于了最大缓存值2048字节 // 则说明有问题,需要关闭连接(值如果是0也是OK的) if (num_remaining_bytes < 0 || num_remaining_bytes > maxBuffer) { closeSocket(sock); return; } // 将读取到的长度实例化一个数组并将剩余的读取完 byte[] b = new byte[num_remaining_bytes]; int num_read = din.read(b); } // 如果这个sid等于观察者的id,则将其赋值为observerCounter,每次 // 有新的观察者,observerCounter都会减一,保持sid的特殊性以及 // 观察者sid的唯一性 if (sid == QuorumPeer.OBSERVER_ID) { sid = observerCounter--; } } catch (IOException e) { closeSocket(sock); return; } // 看到这里又是熟悉的感觉,在initiateConnection方法中也有类似的场景 // 但是需要注意的是initiateConnection方法第一个if判断语句条件是 // “sid > self.getId()”,和本方法中的if判断相反,原因就是本方法实际上 // 就是initiateConnection方法的被动实现。 // 依然是A、B、C三台机器,我们已经确认了经过在initiateConnection方法中 // 执行完后的逻辑,C将会有B和A的通信对,而B将会有A的通信对,所有sid大的 // 机器都会有sid小的机器通信对,但是小的sid机器没有大的sid机器通信对。 // 以上述情况是根本无法做到集群内的机器互相通信的,因此需要本方法来补充 // 下面的逻辑大致为:sid小的可以在本机被动的创建和sid大的机器通信对;而 // sid大的机器接收到sid小的机器连接请求后,如果本机器没有sid小的机器的 // 通信对,则会关闭本次的Socket对象并在本机建立和sid小的机器的通信对。 if (sid < self.getId()) { // 进入到这里的情况是A->B、A->C、B->C,即sid小的机器向sid大的机器 // 发送请求连接,现实场景可以理解成A机器在C机器后面启动,A机器在启 // 动的时候向C机器发送连接请求,但由于C没启动,无法达到,因此作废。 // 而等到A机器启动时就会向C机器发送请求,此时C机器在监听到了A的请求 // 后便会遍执行到了这里 // 从本机器的senderWorkerMap集合取出可能存在的通信对 SendWorker sw = senderWorkerMap.get(sid); // 如果原来存在的将原来的通信对销毁释放 if (sw != null) { // 销毁通信对 sw.finish(); } // 关闭A或者B机器(即sid小的机器)的连接请求Socket对象 closeSocket(sock); // 调用已经分析过的connectOne方法,开始在本机器上再次主动创建 // 和A、B机器的通信对(即sid小的机器) connectOne(sid); } else { // 进入到这里的情况是C->A、C->B、B->A,即sid大的机器向sid小的机器 // 发送连接请求,此时sid小的机器监听后将会执行到这里开始在本机器中 // 被动的创建和sid大的机器的通信对 // 显示场景为:A和B机器已经启动了,但是C机器最后启动的,此时C机器 // 会向A和B机器发送连接请求,A和B机器由于sid小于C机器,因此监听到 // 连接请求后会执行到这里被动的创建和C机器的通信对 // 使用sid大的机器信息和Socket通信对象创建通信对 SendWorker sw = new SendWorker(sock, sid); RecvWorker rw = new RecvWorker(sock, sid, sw); sw.setRecv(rw); // 如果本机器原来有sid对应机器的通信对则销毁 SendWorker vsw = senderWorkerMap.get(sid); if(vsw != null) { // 调用销毁方法 vsw.finish(); } // 将新的通信对放入到senderWorkerMap集合中以便通信对可以监听 // 集合的消息变化 senderWorkerMap.put(sid, sw); // 如果保存要发送消息集合不包含新请求进来的sid对应机器则创建 if (!queueSendMap.containsKey(sid)) { queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>( SEND_CAPACITY)); } // 启动通信对发送消息线程对象,开始监听queueSendMap集合发送消息 sw.start(); // 启动通信对接收消息线程对象,开始监听其它机器的Socket消息并接收 rw.start(); return; } }}
至此,集群内的各个机器通信对建立情况如下图:
经过这些流程,三台机器已经建立了和集群内每台机器的通信对,已经可以互相发送接收选举消息了,接下来便开始分析选举流程。
参考文章
zookeeper3.7版本github源码注释分析## zk源码分析系列Zookeeper原理和源码学习系列\ Zookeeper学习系列\ Zookeeper源码系列
原文:https://juejin.cn/post/7100494637638352910