欢迎大家关注 github.com/hsfxuebao ,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈
经过上一篇文章的流程图,对于ZK新建连接的大致流程应该了解的差不多了,接下来开始进行详细的代码分析,同样是三步走,在进行阅读时可以根据前面的流程图一步一步跟着源码走,这样阅读起来会更加的清晰方便。
需要注意的是,ZK的很多代码构成都是通过内部类完成的,因此等下分析源码时可能方法名不会按源码的方式组排,只是简单的展示源码的大致流程和作用。
上篇文章分析了client端发起连接的源码分析新建连接交互流程分析(单机Server服务端与Client客户端),本篇继续看后面的两个步骤。
2. Server端接收处理响应数据
其实在第一步调用SocketChannel.connect()方法时,第二步就已经接收新建连接的通信并且生成了session信息了,但为了便于理解,我们还是把第二步当成依赖于第一步。后面在源码会详细说明。
2.1 NIOServerCnxnFactory接收NIO请求
NIOServerCnxnFactory负责使用Selector多路复用选择器来从多个Client端获取Socket的新建和发送数据,因此在交互流程中,此类为Server端的起始点,也是通过线程轮询的方式不断地获取其它Socket发送的请求数据。
这里面有几个内部类如下:
private abstract class AbstractSelectThread extends ZooKeeperThread;// 功能:该线程主要是接收来自客户端的连接请求,并完成三次握手,建立tcp连接private class AcceptThread extends AbstractSelectThread;/** * 该线程接管连接完成的socket,接收来自该socket的命令处理命令,把处理结果返回给客户端。 * 在主流程中,会调用select()函数来监控socket是否有读和写事件,若有读和写事件会调用handleIO(key)函数对事件进行处理。 */public class SelectorThread extends AbstractSelectThread;/** * IOWorkRequest是一个小的包装类,允许执行doIO()调用 * 使用WorkerService在连接上运行。 */private class IOWorkRequest extends WorkerService.WorkRequest/** * 此线程负责关闭过时的连接,以便未建立会话的连接已正确过期。 */private class ConnectionExpirerThread extends ZooKeeperThread
AcceptThread 线程负责接收来自客户端的连接,并将SocketChannel放入到SelectorThread的acceptedQueue队列中。
SelectorThread 线程负责将读写事件交给workerPool.schedule(workRequest);
处理,然后IOWorkRequest.doWork()方法处理,交给NIOServerCnxn.doIO()
处理。详细代码如下:
public class NIOServerCnxnFactory extends ServerCnxnFactory { // NIO的Server端SocketChannel,可被多个SocketChannel连接并发送数据 ServerSocketChannel ss; // NIO的多路复用选择器 final Selector selector = Selector.open(); // 保存某一IP和其IP下的所有NIO连接对象 final HashMap<InetAddress, Set<NIOServerCnxn>> ipMap = new HashMap<InetAddress, Set<NIOServerCnxn>>( ); // 同一个IP下默认的最大客户端连接数 int maxClientCnxns = 60; private abstract class AbstractSelectThread extends ZooKeeperThread { protected final Selector selector; public AbstractSelectThread(String name) throws IOException { super(name); // Allows the JVM to shutdown even if this thread is still running. setDaemon(true); this.selector = Selector.open(); } public void wakeupSelector() { selector.wakeup(); } ... } // 功能:该线程主要是接收来自客户端的连接请求,并完成三次握手,建立tcp连接 private class AcceptThread extends AbstractSelectThread { private final ServerSocketChannel acceptSocket; private final SelectionKey acceptKey; private final RateLogger acceptErrorLogger = new RateLogger(LOG); private final Collection<SelectorThread> selectorThreads; private Iterator<SelectorThread> selectorIterator; private volatile boolean reconfiguring = false; public AcceptThread(ServerSocketChannel ss, InetSocketAddress addr, Set<SelectorThread> selectorThreads) throws IOException { super("NIOServerCxnFactory.AcceptThread:" + addr); this.acceptSocket = ss; this.acceptKey = acceptSocket.register(selector, SelectionKey.OP_ACCEPT); this.selectorThreads = Collections.unmodifiableList(new ArrayList<SelectorThread>(selectorThreads)); selectorIterator = this.selectorThreads.iterator(); } // 在run()函数中实现线程的主要逻辑。在run()函数中主要调用select()函数。 public void run() { try { while (!stopped && !acceptSocket.socket().isClosed()) { try { //调用select,将连接加入队列中 select(); } catch (RuntimeException e) { LOG.warn("Ignoring unexpected runtime exception", e); } catch (Exception e) { LOG.warn("Ignoring unexpected exception", e); } } } finally { closeSelector(); // This will wake up the selector threads, and tell the // worker thread pool to begin shutdown. // 这将唤醒选择器线程,并告诉工作线程池将开始关闭. if (!reconfiguring) { NIOServerCnxnFactory.this.stop(); } LOG.info("accept thread exitted run method"); } } public void setReconfiguring() { reconfiguring = true; } // 在select()函数中,会调用java的nio库中的函数: // selector.select()对多个socket进行监控,看是否有读、写事件发生。若没有读、写事件发生,该函数会一直阻塞。 private void select() { try { selector.select(); Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator(); while (!stopped && selectedKeys.hasNext()) { SelectionKey key = selectedKeys.next(); selectedKeys.remove(); // 未获取key即无读写事件发生,阻塞 if (!key.isValid()) { continue; } // 获取到key,即有读写事件发生 if (key.isAcceptable()) { // todo if (!doAccept()) { // If unable to pull a new connection off the accept // queue, pause accepting to give us time to free // up file descriptors and so the accept thread // doesn't spin in a tight loop. // 如果无法从服务器上拔出新连接,请接受 // 排队,暂停接受,给我们自由时间 // 启动文件描述符,因此接受线程 // 不会在一个紧密的循环中旋转。 pauseAccept(10); } } else { LOG.warn("Unexpected ops in accept select {}", key.readyOps()); } } } catch (IOException e) { LOG.warn("Ignoring IOException while selecting", e); } } /** * 若有能够accepted事件发生,则调用doAccept()函数进行处理。在函数doAccept中,会调用socket的accept函数,来完成和客户端的三次握手,建立起tcp * 连接。然后把已经完成连接的socket,设置成非阻塞:sc.configureBlocking(false); * 接下来选择一个selector线程,并把连接好的socket添加到该selector线程的acceptedQueue队列中。 * 可见,accepted队列是一个阻塞队列,添加到该队列后,就需要selector线程来接管已连接socket的后续的消息,所以需要唤醒selector队列。在addAcceptedConnection * 把已连接socket添加到阻塞队列中后,调用wakeupSelector();唤醒对应的selector线程。 */ private boolean doAccept() { // 阻塞 boolean accepted = false; SocketChannel sc = null; try { //完成和客户端的三次握手,建立起tcp连接 sc = acceptSocket.accept(); //非阻塞 accepted = true; if (limitTotalNumberOfCnxns()) { throw new IOException("Too many connections max allowed is " + maxCnxns); } InetAddress ia = sc.socket().getInetAddress(); // 从ipMap中获取IP对应的连接对象,并判断是否超过了 // 当前IP最大连接数量 int cnxncount = getClientCnxnCount(ia); if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns) { // 如果超过则抛异常提示已超过并关闭Socket连接 throw new IOException("Too many connections from " + ia + " - max is " + maxClientCnxns); } LOG.debug("Accepted socket connection from {}", sc.socket().getRemoteSocketAddress()); // 设置成非阻塞 sc.configureBlocking(false); // Round-robin assign this connection to a selector thread // 循环将此连接分配给选择器线程 if (!selectorIterator.hasNext()) { selectorIterator = selectorThreads.iterator(); } SelectorThread selectorThread = selectorIterator.next(); //唤醒对应的selector线程 if (!selectorThread.addAcceptedConnection(sc)) { throw new IOException("Unable to add connection to selector queue" + (stopped ? " (shutdown in progress)" : "")); } acceptErrorLogger.flush(); } catch (IOException e) { // accept, maxClientCnxns, configureBlocking // 接受,maxClientCnxns,配置阻止 ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1); acceptErrorLogger.rateLimitLog("Error accepting new connection: " + e.getMessage()); fastCloseSock(sc); } return accepted; } } /** * 该线程接管连接完成的socket,接收来自该socket的命令处理命令,把处理结果返回给客户端。 * 在主流程中,会调用select()函数来监控socket是否有读和写事件,若有读和写事件会调用handleIO(key)函数对事件进行处理。 */ public class SelectorThread extends AbstractSelectThread { private final int id; // 接收队列,接收来自客户端的连接请求 private final Queue<SocketChannel> acceptedQueue; private final Queue<SelectionKey> updateQueue; public SelectorThread(int id) throws IOException { super("NIOServerCxnFactory.SelectorThread-" + id); this.id = id; acceptedQueue = new LinkedBlockingQueue<SocketChannel>(); updateQueue = new LinkedBlockingQueue<SelectionKey>(); } public boolean addAcceptedConnection(SocketChannel accepted) { if (stopped || !acceptedQueue.offer(accepted)) { return false; } wakeupSelector(); return true; } public void run() { try { while (!stopped) { try { // todo select(); processAcceptedConnections(); processInterestOpsUpdateRequests(); } catch (RuntimeException e) { LOG.warn("Ignoring unexpected runtime exception", e); } catch (Exception e) { LOG.warn("Ignoring unexpected exception", e); } } // Close connections still pending on the selector. Any others // with in-flight work, let drain out of the work queue. for (SelectionKey key : selector.keys()) { NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment(); if (cnxn.isSelectable()) { cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN); } cleanupSelectionKey(key); } SocketChannel accepted; while ((accepted = acceptedQueue.poll()) != null) { fastCloseSock(accepted); } updateQueue.clear(); } finally { closeSelector(); // This will wake up the accept thread and the other selector // threads, and tell the worker thread pool to begin shutdown. NIOServerCnxnFactory.this.stop(); LOG.info("selector thread exitted run method"); } } private void select() { try { selector.select(); Set<SelectionKey> selected = selector.selectedKeys(); ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected); // 随机打乱已经获取到的selectedList集合,至于为什么要打乱 // 估计是为了一定程度上保证各个Client端的请求都能被随机处理 Collections.shuffle(selectedList); Iterator<SelectionKey> selectedKeys = selectedList.iterator(); // 获取选择key while (!stopped && selectedKeys.hasNext()) { SelectionKey key = selectedKeys.next(); selected.remove(key); //如果key无效 if (!key.isValid()) { cleanupSelectionKey(key); continue; } //拥有key且有可读或者可写事件 if (key.isReadable() || key.isWritable()) { // todo handleIO(key); } else { LOG.warn("Unexpected ops in select {}", key.readyOps()); } } } catch (IOException e) { LOG.warn("Ignoring IOException while selecting", e); } } /** * 在handleIO中,会启动woker线程池中的一个worker来处理这个事件, * 处理事件的主类是ScheduledWorkRequest,最终会调用run函数中的workRequest.doWork();来处理请求。 * * 计划与关联的连接上处理的I/O * 给定的SelectionKey。如果未使用工作线程池, * I/O直接由该线程运行 */ private void handleIO(SelectionKey key) { IOWorkRequest workRequest = new IOWorkRequest(this, key); NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment(); // Stop selecting this key while processing on its // connection //在处理其连接时停止选择此键 cnxn.disableSelectable(); key.interestOps(0); touchCnxn(cnxn); workerPool.schedule(workRequest); } private void processAcceptedConnections() { SocketChannel accepted; while (!stopped && (accepted = acceptedQueue.poll()) != null) { SelectionKey key = null; try { // 将Socket 注册到Selector中生成SelectionKey key = accepted.register(selector, SelectionKey.OP_READ); // 生成对应的NIO连接对象 NIOServerCnxn cnxn = createConnection(accepted, key, this); // 将连接对象和SelectionKey进行绑定 key.attach(cnxn); // 这里面会保存IP和连接对象集合,一个IP对应着系列 // 的连接对象,因为一台机器可能有多个连接对象 addCnxn(cnxn); } catch (IOException e) { // register, createConnection cleanupSelectionKey(key); fastCloseSock(accepted); } } } } /** * IOWorkRequest是一个小的包装类,允许执行doIO()调用 * 使用WorkerService在连接上运行。 */ private class IOWorkRequest extends WorkerService.WorkRequest { private final SelectorThread selectorThread; private final SelectionKey key; private final NIOServerCnxn cnxn; IOWorkRequest(SelectorThread selectorThread, SelectionKey key) { this.selectorThread = selectorThread; this.key = key; this.cnxn = (NIOServerCnxn) key.attachment(); } // 在IOWorkRequest.doWork()中会判断key的合法性, // 然后调用NIOServerCnxn.doIO(key)来处理事件 public void doWork() throws InterruptedException { //判断key的合法性 if (!key.isValid()) { selectorThread.cleanupSelectionKey(key); return; } if (key.isReadable() || key.isWritable()) { // todo cnxn.doIO(key); // Check if we shutdown or doIO() closed this connection // 检查是否关闭或doIO()是否关闭了此连接 if (stopped) { cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN); return; } if (!key.isValid()) { selectorThread.cleanupSelectionKey(key); return; } touchCnxn(cnxn); } // Mark this connection as once again ready for selection //将此连接再次标记为可供选择 cnxn.enableSelectable(); // Push an update request on the queue to resume selecting // on the current set of interest ops, which may have changed // as a result of the I/O operations we just performed. // 在队列上推送更新请求以继续选择 // 在当前感兴趣的操作集上,可能已更改 // 作为我们刚才执行的I/O操作的结果。 if (!selectorThread.addInterestOpsUpdateRequest(key)) { cnxn.close(ServerCnxn.DisconnectReason.CONNECTION_MODE_CHANGED); } } }}
2.2 连接对象NIOServerCnxn
这个代表着Client端在Server端的连接对象,新连接在Server端的表现便是一个NIOServerCnxn对象。并且这个对象会和对应的SelectionKey、Socket进行绑定。这个类里面最重要的便是doIO()方法,在这个方法中会判断读写事件,并根据相应的值进行处理,在新建连接流程中,只会分析读事件。关键源码如下:
public class NIOServerCnxn extends ServerCnxn { // 这三个对象便不用做过多介绍了 NIOServerCnxnFactory factory; final SocketChannel sock; private final SelectionKey sk; // 用来读取请求长度的buffer对象 ByteBuffer lenBuffer = ByteBuffer.allocate(4); // 实际接受请求长度的buffer对象 ByteBuffer incomingBuffer = lenBuffer; // 是否已经初始化,默认值为false boolean initialized; private final ZooKeeperServer zkServer; // 本连接对应的sessionId,刚开始sessionId不会有,只有当ZK的Server端处理了 // ConnectRequest之后才会被赋值 long sessionId; // 写操作使用的ByteBuffer集合 LinkedBlockingQueue<ByteBuffer> outgoingBuffers; public NIOServerCnxn(ZooKeeperServer zk, SocketChannel sock, SelectionKey sk, NIOServerCnxnFactory factory) throws IOException { ... // 前面的赋值可以忽略,当创建本对象时将会默认开启读事件 sk.interestOps(SelectionKey.OP_READ); } void doIO(SelectionKey k) throws InterruptedException { try { // 进行操作前需要判断Socket是否被关闭 if (isSocketOpen() == false) { return; } // 判断读事件 if (k.isReadable()) { // 从Socket中先读取数据,注意的是incomingBuffer容量只有4字节 int rc = sock.read(incomingBuffer); // 读取长度异常 if (rc < 0) { throw new EndOfStreamException(); } // 读取完毕开始进行处理 if (incomingBuffer.remaining() == 0) { boolean isPayload; // 当这两个完全相等说明已经是下一次连接了,新建时无需分析 if (incomingBuffer == lenBuffer) { incomingBuffer.flip(); isPayload = readLength(k); incomingBuffer.clear(); } else { isPayload = true; } if (isPayload) { // 读取具体连接的地方 readPayload(); } else { return; } } } // 写事件类型 if (k.isWritable()) { // 如果ByteBuffer集合不为空才进入,新建连接时如果响应没有一次性 // 发送完剩余的会被放在outgoingBuffers集合中依次发送出去 if (outgoingBuffers.size() > 0) { // 给发送的ByteBuffer对象分配空间,大小为64 * 1024字节 ByteBuffer directBuffer = factory.directBuffer; directBuffer.clear(); for (ByteBuffer b : outgoingBuffers) { // 这里执行的操作是把已经发送过的数据剔除掉 // 留下未发送的数据截取下来重新发送 if (directBuffer.remaining() < b.remaining()) { b = (ByteBuffer) b.slice().limit( directBuffer.remaining()); } int p = b.position(); // 将未发送的数据放入directBuffer中 directBuffer.put(b); // 更新outgoingBuffers中的ByteBuffer对象属性,以便 // 后续使用 b.position(p); // 如果directBuffer的空间都被占用光了,则直接停止从 // outgoingBuffers集合中获取 if (directBuffer.remaining() == 0) { break; } } directBuffer.flip(); // 发送directBuffer中的数据 int sent = sock.write(directBuffer); ByteBuffer bb; // 这部分的循环便是再次判断前面使用过的对象 // 看这些对象是否已经发送完,根据position信息判断如果发送完 // 则从outgoingBuffers集合中移除 while (outgoingBuffers.size() > 0) { bb = outgoingBuffers.peek(); if (bb == ServerCnxnFactory.closeConn) { throw new CloseRequestException(); } // 获取ByteBuffer的剩余数据 int left = bb.remaining() - sent; // 如果到此大于0,说明前面的数据已经填充满 // 直接退出循环 if (left > 0) { bb.position(bb.position() + sent); break; } // 执行到这里说明ByteBuffer对象已经发送完毕,可以更新 // 发送状态并从将其从outgoingBuffers中移除 packetSent(); sent -= bb.remaining(); outgoingBuffers.remove(); } } synchronized(this.factory){ if (outgoingBuffers.size() == 0) { // 如果outgoingBuffers已经全部被消化完了便把 // OP_WRITE操作关闭 if (!initialized && (sk.interestOps() & SelectionKey.OP_READ) == 0) { throw new CloseRequestException(); } sk.interestOps(sk.interestOps() & (~SelectionKey.OP_WRITE)); } else { // 如果还剩余一些没有发送完,则继续打开OP_WRITE操作 // 接着下次轮询发送 sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE); } } } } // 异常处理忽略 ... } private void readPayload() throws IOException, InterruptedException { // 前面已经判断过,这里一定不会成立 if (incomingBuffer.remaining() != 0) { int rc = sock.read(incomingBuffer); if (rc < 0) { throw new EndOfStreamException(); } } if (incomingBuffer.remaining() == 0) { // 进行接收报文数量+1和更新Server端接收报文数量+1的操作 packetReceived(); incomingBuffer.flip(); // 第一次进来肯定是false if (!initialized) { // 因此这里肯定会进入调用处理ConnectRequest的方法中 readConnectRequest(); } else { // 这里是处理其它Request的方法,此次暂不分析,后续分析ping和 // 其它操作时再来分析此方法中的流程 readRequest(); } lenBuffer.clear(); // 处理完这次请求后再将incomingBuffer复原 incomingBuffer = lenBuffer; } } private void readConnectRequest() throws IOException, InterruptedException { if (zkServer == null) { throw new IOException("ZooKeeperServer not running"); } // 调用ZooKeeperServer的方法处理连接请求 zkServer.processConnectRequest(this, incomingBuffer); // 当前面执行完毕后说明已经初始化完成了 initialized = true; }}
2.3 单机运行的ZooKeeperServer
前面文章解释过,这个类就是ZK的Server实例,每个ZK服务器上对应着一个ZooKeeperServer实例,这里面有诸多服务器方面的属性配置,但前面分析过,因此本次流程代码便不做过多的介绍了,有兴趣的可以翻看前面的文章。
在Client端有ping心跳检测间隔时间,在Server端有tickTime存活检测时间,这两个属性代表的意思是不一样的,Client端的ping心跳检测间隔时间是轮询隔一段时间后向Server端发送ping请求,而Server端的tickTime间隔时间作用是每隔一段时间就判断在Server端的Client连接对象是否已经死亡,如果已经过期死亡则将连接对象进行清除关闭。所以ping心跳检测的意义是Client端告诉服务器我还活着,tickTime意义是定期清除没有告诉Server端还存活的连接。
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { // 默认3S检测一次客户端存活情况 public static final int DEFAULT_TICK_TIME = 3000; // 实际设置的检测存活时间间隔 protected int tickTime = DEFAULT_TICK_TIME; // Server端可接受的最小Client端sessionTimeout,如果未设置则值为tickTime*2 protected int minSessionTimeout = -1; // Server端可接受的最大Client端sessionTimeout,如果未设置则值为tickTime*20 protected int maxSessionTimeout = -1; // 处理客户端请求RequestProcessor的第一个实现类对象 protected RequestProcessor firstProcessor; public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException { BinaryInputArchive bia = BinaryInputArchive .getArchive(new ByteBufferInputStream(incomingBuffer)); // 反序列化ByteBuffer对象为ConnectRequest对象 ConnectRequest connReq = new ConnectRequest(); connReq.deserialize(bia, "connect"); boolean readOnly = false; try { // 是否只可读 readOnly = bia.readBool("readOnly"); cnxn.isOldClient = false; } catch (IOException e) { ... } // 只有ReadOnlyZooKeeperServer类型的Server只接收readOnly为true的 if (readOnly == false && this instanceof ReadOnlyZooKeeperServer) { ... throw new CloseRequestException(msg); } // 获取的zxid需要小于Server端最大的zxid if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) { ... throw new CloseRequestException(msg); } // 这段代码便是Server和Client端协商具体的sessionTimeout值 // 1、获取客户端传来的sessionTimeout int sessionTimeout = connReq.getTimeOut(); byte passwd[] = connReq.getPasswd(); // 2、先判断sessionTimeout是否小于Server端可接受的最小值 // 如果小于Server端可接受最小值则设置成Server端的最小sessionTimeout int minSessionTimeout = getMinSessionTimeout(); if (sessionTimeout < minSessionTimeout) { sessionTimeout = minSessionTimeout; } // 3、再判断sessionTimeout是否大于Server端可接受的最大值 // 如果大于Server端可接受最大值则设置成Server端的最大sessionTimeout int maxSessionTimeout = getMaxSessionTimeout(); if (sessionTimeout > maxSessionTimeout) { sessionTimeout = maxSessionTimeout; } // 最后把满足协商范围的sessionTimeout设置到Client连接对象中 cnxn.setSessionTimeout(sessionTimeout); // 设置该连接对象不再从Client端接收数据 cnxn.disableRecv(); long sessionId = connReq.getSessionId(); // 第一次连接不手动设置sessionId都是0 if (sessionId != 0) { // 如果不是0则需要关闭原来的session并且重新打开sessionId // 这种情况不常见,只需要知道处理的代码逻辑在这里便行,暂不详细分析 long clientSessionId = connReq.getSessionId(); serverCnxnFactory.closeSession(sessionId); cnxn.setSessionId(sessionId); reopenSession(cnxn, sessionId, passwd, sessionTimeout); } else { // 开始创建新的session信息 createSession(cnxn, passwd, sessionTimeout); } } long createSession(ServerCnxn cnxn, byte passwd[], int timeout) { // 根据失效时间创建一个新的session信息并返回唯一ID long sessionId = sessionTracker.createSession(timeout); // 设置失效时间和sessionId Random r = new Random(sessionId ^ superSecret); r.nextBytes(passwd); ByteBuffer to = ByteBuffer.allocate(4); to.putInt(timeout); cnxn.setSessionId(sessionId); // 调用该方法使用刚刚获取到的属性去生成Request请求 submitRequest(cnxn, sessionId, OpCode.createSession, 0, to, null); return sessionId; } private void submitRequest(ServerCnxn cnxn, long sessionId, int type, int xid, ByteBuffer bb, List<Id> authInfo) { // 根据参数生成Request对象,并调用submitRequest()方法开始使用 // RequestProcessor链对Request进行处理 Request si = new Request(cnxn, sessionId, xid, type, bb, authInfo); submitRequest(si); } public void submitRequest(Request si) { // 这个方法功能很简单: // 1、判断Server端是否初始化完成,如果未完成则一直持续等待 // 2、在调用RequestProcessor链前先更新session在Server端的过期时间 // 3、调用firstProcessor对象的processRequest方法开始处理请求 if (firstProcessor == null) { synchronized (this) { try { // 一直轮询直到Server端的各种组件初始化完成 while (state == State.INITIAL) { wait(1000); } } ... // 如果未初始化成功则抛出异常 if (firstProcessor == null || state != State.RUNNING) { throw new RuntimeException("Not started"); } } } // todo 将请求放到requestThrottler的submittedRequests队列中 requestThrottler.submitRequest(si); } public void submitRequest(Request request) { if (stopping) { LOG.debug("Shutdown in progress. Request cannot be processed"); dropRequest(request); } else { request.requestThrottleQueueTime = Time.currentElapsedTime(); // todo submittedRequests.add(request); } } 将请求放到requestThrottler的submittedRequests队列中,然后在requestThrottler 的run()方法中调用zks.submitRequestNow(request); public void submitRequestNow(Request si) { .. try { // 更新session的过期时间 touch(si.cnxn); // 校验请求类型是否有效 boolean validpacket = Request.isValid(si.type); if (validpacket) { setLocalSessionFlag(si); // todo 开始调用firstProcessor对象的processRequest()方法处理请求 firstProcessor.processRequest(si); if (si.cnxn != null) { incInProcess(); } } else { LOG.warn("Received packet at server of unknown type {}", si.type); // Update request accounting/throttling limits requestFinished(si); // 如果处理类型校验不通过则发送无法处理请求并关闭连接 new UnimplementedRequestProcessor().processRequest(si); } } ... } void touch(ServerCnxn cnxn) throws MissingSessionException { if (cnxn == null) { return; } long id = cnxn.getSessionId(); int to = cnxn.getSessionTimeout(); // 获取sessionId和sessionTimeout属性调用sessionTracker去更新session // 在Server端的过期时间 if (!sessionTracker.touchSession(id, to)) { throw new MissingSessionException(); } }}
2.4 session追踪类SessionTracker
取名为SessionTracker,实际上这个类的功能就是维护session生命周期
,主要进行session过期判断和更新session状态的操作,判断session过期还是放到后面分析ping流程再看吧,新建连接时就看其如何更新session状态。
public class SessionTrackerImpl extends ZooKeeperCriticalThread implements SessionTracker { // 保存sessionId和对应的Session对象 HashMap<Long, SessionImpl> sessionsById; HashMap<Long, SessionSet> sessionSets; // key为sessionId,value为这个session的过期时间 ConcurrentHashMap<Long, Integer> sessionsWithTimeout; // 下一次新建session时的id long nextSessionId = 0; public long createSession(int sessionTimeout) { long sessionId = nextSessionId.getAndIncrement(); // 在使用RequestProcessor处理请求前会调用该方法为客户端创建一个session trackSession(sessionId, sessionTimeout); return sessionId; } @Override public synchronized boolean trackSession(long id, int sessionTimeout) { boolean added = false; // 如果没有保存对应的Session对象则创建一个并添加 SessionImpl session = sessionsById.get(id); if (session == null) { session = new SessionImpl(id, sessionTimeout); } // findbugs2.0.3 complains about get after put. // long term strategy would be use computeIfAbsent after JDK 1.8 SessionImpl existedSession = sessionsById.putIfAbsent(id, session); if (existedSession != null) { session = existedSession; } else { added = true; LOG.debug("Adding session 0x{}", Long.toHexString(id)); } if (LOG.isTraceEnabled()) { String actionStr = added ? "Adding" : "Existing"; ZooTrace.logTraceMessage( LOG, ZooTrace.SESSION_TRACE_MASK, "SessionTrackerImpl --- " + actionStr + " session 0x" + Long.toHexString(id) + " " + sessionTimeout); } // 添加完session后更新session的过期时间 updateSessionExpiry(session, sessionTimeout); return added; private void updateSessionExpiry(SessionImpl s, int timeout) { logTraceTouchSession(s.sessionId, timeout, ""); sessionExpiryQueue.update(s, timeout); } // 当client与server有交互时(连接请求/读写操作/心跳),该方法就会被调用 // 当zk server启动时会将磁盘中的session恢复到内存,也会调用该方法 // 该方法在做的是会话换桶操作 public Long update(E elem, int timeout) { // elemMap集合的key为session,value为该session的过期时间, // 即该session当前所在的会话桶id Long prevExpiryTime = elemMap.get(elem); long now = Time.currentElapsedTime(); // 计算本次交互应该将会话放入到哪个会话桶 Long newExpiryTime = roundToNextInterval(now + timeout); // 若之前所在会话桶id与本次交互计算的会话桶id相同, // 则无需换桶,即什么也不用做 if (newExpiryTime.equals(prevExpiryTime)) { // No change, so nothing to update return null; } // ---------- 代码能走到这里,说明需要换桶了。 -------------- // 换桶由两步操作完成:将会话放入到新桶;将会话从老桶中清除 // First add the elem to the new expiry time bucket in expiryMap. // 从会话桶集合中获取当前的会话桶,若为null,则创建一个新的会话桶 Set<E> set = expiryMap.get(newExpiryTime); if (set == null) { // Construct a ConcurrentHashSet using a ConcurrentHashMap // 创建会话桶set set = Collections.newSetFromMap(new ConcurrentHashMap<E, Boolean>()); // Put the new set in the map, but only if another thread // hasn't beaten us to it // 将新建的会话桶放入到会话桶集合 Set<E> existingSet = expiryMap.putIfAbsent(newExpiryTime, set); if (existingSet != null) { set = existingSet; } } // 将会话放入到会话桶 set.add(elem); // Map the elem to the new expiry time. If a different previous // mapping was present, clean up the previous expiry bucket. // 将会话与会话桶id的对应关系放入到elemMap,并获取到该会话之前所在的会话桶id prevExpiryTime = elemMap.put(elem, newExpiryTime); // 若当前会话桶id与之前会话桶id不相同,说明需要换桶。 // 而前面已经将会话放到了新的会话桶,所以这里要将会话从老桶中清除 if (prevExpiryTime != null && !newExpiryTime.equals(prevExpiryTime)) { // 获取到之前的会话桶 Set<E> prevSet = expiryMap.get(prevExpiryTime); if (prevSet != null) { // 将会话从老会话桶中清除 prevSet.remove(elem); } } // 返回当前交互引发的会话所在的会话桶id, // 即当前会话的真正过期时间点 return newExpiryTime; }}
2.5 RequestProcessor请求处理链
前面介绍过,在单机运行时RequestProcessor处理链只有三个:PrepRequestProcessor、SyncRequestProcessor和FinalRequestProcessor,其中前两个是线程对象,最后一个是普通的对象,至于原因前面的文章介绍过。接下来的三个RequestProcessor大致作用不做分析,有兴趣可以看下以前的文章。
2.5.1 PrepRequestProcessor
public class PrepRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor { // 本RequestProcessor中用来暂时保存需要处理的Request,轮询获取请求处理 LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>(); // 本RequestProcessor的下一个RequestProcessor对象 RequestProcessor nextProcessor; ZooKeeperServer zks; @Override public void processRequest(Request request) { // RequestProcessor的实现方法,由于内部使用轮询方式从submittedRequests // 集合获取数据,因此在这里直接把Request添加到集合中即可 submittedRequests.add(request); } @Override public void run() { try { while (true) { // 轮询从submittedRequests集合中获取Request对象 Request request = submittedRequests.take(); // 如果requestOfDeath代表ZK已经关闭,因此退出循环 if (Request.requestOfDeath == request) { break; } // 开始处理正常的Request pRequest(request); } }... } protected void pRequest(Request request) throws RequestProcessorException { request.setHdr(null); request.setTxn(null); if (!request.isThrottled()) { // todo pRequestHelper(request); } request.zxid = zks.getZxid(); // 调用下个RequestProcessor来处理Request nextProcessor.processRequest(request); } private void pRequestHelper(Request request) throws RequestProcessorException { try { switch (request.type) { ... case OpCode.createSession: case OpCode.closeSession: if (!request.isLocalSession()) { // 直接处理事务 pRequest2Txn(request.type, zks.getNextZxid(), request, null, true); } break; ... } } } protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) throws KeeperException, IOException, RequestProcessorException { if (request.getHdr() == null) { // 为请求创建事务头TxnHeader对象 request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), type)); } switch (type) { ... // 创建session case OpCode.createSession: request.request.rewind(); // 此时的to实际上就是sessionTimeout int to = request.request.getInt(); // 使用sessionTimeout创建CreateSessionTxn对象 request.setTxn(new CreateSessionTxn(to)); request.request.rewind(); // only add the global session tracker but not to ZKDb // 根据sessionid和sessionTimeout再次新增session信息 zks.sessionTracker.trackSession(request.sessionId, to); zks.setOwner(request.sessionId, request.getOwner()); break; ... } } protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) throws KeeperException, IOException, RequestProcessorException{ // 为请求创建事务头TxnHeader对象 request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid, zks.getTime(), type); switch (type) { // 无关的case情况忽略 ... case OpCode.createSession: request.request.rewind(); // 此时的to实际上就是sessionTimeout int to = request.request.getInt(); // 使用sessionTimeout创建CreateSessionTxn对象 request.txn = new CreateSessionTxn(to); request.request.rewind(); // 根据sessionid和sessionTimeout再次新增session信息 zks.sessionTracker.addSession(request.sessionId, to); zks.setOwner(request.sessionId, request.getOwner()); break; ... }}
2.5.2 SyncRequestProcessor
public class SyncRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor { // 本RequestProcessor中用来暂时保存需要处理的Request,轮询获取请求处理 private final LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>(); // 保存的是已经被写入磁盘但是待刷新的事务 private final LinkedList<Request> toFlush = new LinkedList<Request>(); // 本RequestProcessor的下一个RequestProcessor对象 private final RequestProcessor nextProcessor; // Server端快照的数量 private static int snapCount = ZooKeeperServer.getSnapCount(); // 在回滚前的log数量,随机生成的 private static int randRoll; public void processRequest(Request request) { // 类似于PrepRequestProcessor,内部使用轮询方式从submittedRequests // 集合获取数据,因此在这里直接把Request添加到集合中即可 queuedRequests.add(request); } @Override public void run() { try { int logCount = 0; // 避免服务都在同一时间获取快照snapshot,这里面设置的是randRoll属性 setRandRoll(r.nextInt(snapCount/2)); while (true) { long pollTime = Math.min(zks.getMaxWriteQueuePollTime(), getRemainingDelay()); // 从queuedRequests获取Request Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS); // 如果已经结束则退出循环 if (si == requestOfDeath) { break; } if (si != null) { // 将Request写入到log中 if (zks.getZKDatabase().append(si)) { logCount++; // 如果日志的数量大于某个临界点,则生成一次快照 if (logCount > (snapCount / 2 + randRoll)) { // 途中会异步生成快照,过程忽略,操作完之后 // logCount 归零 ... logCount = 0; } } else if (toFlush.isEmpty()) { // 如果所有的事务都处理完则使用nextProcessor // 开始进行下一步处理 if (nextProcessor != null) { // 进行处理 nextProcessor.processRequest(si); if (nextProcessor instanceof Flushable) { ((Flushable)nextProcessor).flush(); } } continue; } // 如果前面两个条件都不满足,则把Request添加到待刷新的 // 事务集合中 toFlush.add(si); if (toFlush.size() > 1000) { // 当待刷事务到达了1000个,则把集合中的所有事务全都 // 刷掉并使用nextProcessor依次进行处理 flush(toFlush); } } } } ... }}
2.5.2 FinalRequestProcessor
public class FinalRequestProcessor implements RequestProcessor { ZooKeeperServer zks; public void processRequest(Request request) { // 直接开始处理Request请求 ProcessTxnResult rc = null; if (!request.isThrottled()) { rc = applyRequest(request); } // 如果执行到这里连接对象还为空则直接退出 if (request.cnxn == null) { return; } ServerCnxn cnxn = request.cnxn; long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid(); String lastOp = "NA"; // Notify ZooKeeperServer that the request has finished so that it can // update any request accounting/throttling limits // 执行中的数量减一 zks.decInProcess(); zks.requestFinished(request); Code err = Code.OK; Record rsp = null; String path = null; int responseSize = 0; try { // 如果发生了异常则直接抛出 if (request.getHdr() != null && request.getHdr().getType() == OpCode.error) { AuditHelper.addAuditLog(request, rc, true); // 如果是单个的操作发生了异常抛出 if (request.getException() != null) { throw request.getException(); } else { throw KeeperException.create(KeeperException.Code.get(((ErrorTxn) request.getTxn()).getErr())); } } ... // 开始根据Request的操作类型进行相应的处理 switch (request.type) { ... case OpCode.createSession: { // 最后的操作类型 lastOp = "SESS"; // 更新状态 updateStats(request, lastOp, lastZxid); // 最后调用这个方法来完成session的初始化以及响应 zks.finishSessionInit(request.cnxn, true); // 直接退出方法 return; } } } }}2.6 ZooKeeperServer新建连接生成响应对象又再次回到了ZooKeeperServer类中,这里面执行了Server端针对新建连接的最后响应,其实我也搞不懂为什么要把新建连接单独的抽出来放到ZooKeeperServer类中来,或许唯一能解释的便是方便处理已存在session重新创建这个流程。public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { public void finishSessionInit(ServerCnxn cnxn, boolean valid) { // 使用JMX监控注册连接对象cnxn try { // valid指的是是否成功创建session信息 if (valid) { serverCnxnFactory.registerConnection(cnxn); } }... try { // 如果valid为true,则使用cnxn连接对象的sessionTimemout,否则为0 // 如果valid为true,则使用cnxn连接对象的ssessionId,否则为0 // 如果valid为true,则使用cnxn连接对象的ssessionId生成密码,否则空 ConnectResponse rsp = new ConnectResponse(0, valid ? cnxn.getSessionTimeout() : 0, valid ? cnxn.getSessionId() : 0, valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]); // 生成响应的字节对象 ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos); bos.writeInt(-1, "len"); rsp.serialize(bos, "connect"); if (!cnxn.isOldClient) { bos.writeBool( this instanceof ReadOnlyZooKeeperServer, "readOnly"); } baos.close(); // 根据刚刚生成的字节数组申城ByteBuffer ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray()); bb.putInt(bb.remaining() - 4).rewind(); // 发送ByteBuffer对象内容 cnxn.sendBuffer(bb); // 如果valid失效则关掉连接 if (!valid) { cnxn.sendBuffer(ServerCnxnFactory.closeConn); } else { // 如果成功则确保能读取到Client端发送过来的数据 cnxn.enableRecv(); } } catch (Exception e) { cnxn.close(); } }}2.7 NIOServerCnxn发送新建连接响应执行到这一步已经到了新建连接的尾声了,这一步只有发送ByteBuffer对象的数据,其它的操作相对而言并不是很重要。public class NIOServerCnxn extends ServerCnxn { public void sendBuffer(ByteBuffer bb) { try { // 只有非关闭连接的操作才能使用Socket发送数据 if (bb != ServerCnxnFactory.closeConn) { // 确保SelectionKey的OP_WRITE没有被开启,以确保等下wake唤醒 // Selector可以进行重试 if ((sk.interestOps() & SelectionKey.OP_WRITE) == 0) { try { // 发送缓存数据 sock.write(bb); } catch (IOException e) { } } if (bb.remaining() == 0) { // 如果缓存数据发送完毕则更新ZK的Server状态 packetSent(); return; } } // 如果跑到这里说明ByteBuffer并未全部发送,因此需要唤醒Selector // 把剩余的ByteBuffer数据发送出去 synchronized(this.factory){ sk.selector().wakeup(); // 添加到outgoingBuffers集合中交给doIO()方法里面的write方法 // 类型处理,该逻辑在前面已经分析过了,可以直接回头看 outgoingBuffers.add(bb); if (sk.isValid()) { // 将OP_WRITE打开 sk.interestOps( sk.interestOps() | SelectionKey.OP_WRITE); } } } } // hdr和txn都是和连接相关的对象,里面的方法执行的操作为添加 // session信息,到这里已经是新建连接的第三次调用新增session信息 // 当然这里面还会调用DataTree.processTxn()方法,只是不会执行 // 很重要的逻辑代码 public ProcessTxnResult processTxn(Request request) { TxnHeader hdr = request.getHdr(); processTxnForSessionEvents(request, hdr, request.getTxn()); final boolean writeRequest = (hdr != null); final boolean quorumRequest = request.isQuorum(); // return fast w/o synchronization when we get a read if (!writeRequest && !quorumRequest) { return new ProcessTxnResult(); } synchronized (outstandingChanges) { // ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest()); // request.hdr is set for write requests, which are the only ones // that add to outstandingChanges. if (writeRequest) { long zxid = hdr.getZxid(); // 新建连接流程outstandingChanges是空的,因此这里的循环逻辑暂不分析 while (!outstandingChanges.isEmpty() && outstandingChanges.peek().zxid <= zxid) { ChangeRecord cr = outstandingChanges.remove(); ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1); if (cr.zxid < zxid) { LOG.warn( "Zxid outstanding 0x{} is less than current 0x{}", Long.toHexString(cr.zxid), Long.toHexString(zxid)); } if (outstandingChangesForPath.get(cr.path) == cr) { outstandingChangesForPath.remove(cr.path); } } } // do not add non quorum packets to the queue. if (quorumRequest) { getZKDatabase().addCommittedProposal(request); } return rc; } }}
3. Client端接收响应
当第二步走完后便进入到了第三步Client接收Server端响应并调用监听器的步骤了。
3.1 SendThread接收通知
前面已经说了,SendThread负责发送和接收包数据,当Server端发送了新建连接响应后该类就会接收并进行相应的处理。本次分析只会分析经过的逻辑部分,其它的逻辑不做分析。
class SendThread extends ZooKeeperThread { @Override public void run() { ... while (state.isAlive()) { try { ... // 还是老地方,调用doTransport()方法处理NIO的事件 clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this); } } ... }}
3.2 ClientCnxnSocketNIO处理读事件
这次进入到该类处理的便是OP_READ类型的NIO事件。
public class ClientCnxnSocketNIO extends ClientCnxnSocket { @Override void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn) throws IOException, InterruptedException { // 老逻辑,不再分析 selector.select(waitTimeOut); Set<SelectionKey> selected; synchronized (this) { selected = selector.selectedKeys(); } updateNow(); for (SelectionKey k : selected) { SocketChannel sc = ((SocketChannel) k.channel()); if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) { if (sc.finishConnect()) { updateLastSendAndHeard(); sendThread.primeConnection(); } } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { // 针对客户端的响应均会进入到该方法中 doIO(pendingQueue, outgoingQueue, cnxn); } } // 后面略 ... } void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn) throws InterruptedException, IOException { SocketChannel sock = (SocketChannel) sockKey.channel(); if (sock == null) { throw new IOException("Socket is null!"); } // 开始处理读事件 if (sockKey.isReadable()) { // 从Socket中读取数据 int rc = sock.read(incomingBuffer); if (rc < 0) { throw new EndOfStreamException(); } // incomingBuffer已经读取完毕 if (!incomingBuffer.hasRemaining()) { incomingBuffer.flip(); if (incomingBuffer == lenBuffer) { recvCount++; readLength(); } else if (!initialized) { // 新建连接将会跑到这里来,因为此时Client端的initialized // 还是为false,尚未初始化完成 // 开始读取连接响应结果 readConnectResult(); // 开启Socket的OP_READ操作 enableRead(); // 查看outgoingQueue队列是否有可读包数据 if (findSendablePacket(outgoingQueue, cnxn.sendThread .clientTunneledAuthenticationInProgress())!=null){ // 如果有的话则开启OP_WRITE操作,准备下次轮询时处理 // 写事件 enableWrite(); } // 设置initialized属性初始化完成并更新lastHeard属性 lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); initialized = true; } else { // 这里是当新建连接成功后普通的操作响应处理逻辑 sendThread.readResponse(incomingBuffer); lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); } } } // 后面的处理写事件忽略 } void readConnectResult() throws IOException { // 使用读取到的ByteBuffer对象反序列化得到ConnectResponse响应 ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer); BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis); ConnectResponse conRsp = new ConnectResponse(); conRsp.deserialize(bbia, "connect"); boolean isRO = false; try { // 读取readOnly属性 isRO = bbia.readBool("readOnly"); }... // 开始进行连接成功的操作 this.sessionId = conRsp.getSessionId(); sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, conRsp.getPasswd(), isRO); }}
3.3 ClientCnxn处理连接成功
执行到这里基本上就已经算成功了,接下来的事情便是触发ZK的监听器。
public class NIOServerCnxnFactory extends ServerCnxnFactory { // NIO的Server端SocketChannel,可被多个SocketChannel连接并发送数据 ServerSocketChannel ss; // NIO的多路复用选择器 final Selector selector = Selector.open(); // 保存某一IP和其IP下的所有NIO连接对象 final HashMap<InetAddress, Set<NIOServerCnxn>> ipMap = new HashMap<InetAddress, Set<NIOServerCnxn>>( ); // 同一个IP下默认的最大客户端连接数 int maxClientCnxns = 60; private abstract class AbstractSelectThread extends ZooKeeperThread { protected final Selector selector; public AbstractSelectThread(String name) throws IOException { super(name); // Allows the JVM to shutdown even if this thread is still running. setDaemon(true); this.selector = Selector.open(); } public void wakeupSelector() { selector.wakeup(); } ... } // 功能:该线程主要是接收来自客户端的连接请求,并完成三次握手,建立tcp连接 private class AcceptThread extends AbstractSelectThread { private final ServerSocketChannel acceptSocket; private final SelectionKey acceptKey; private final RateLogger acceptErrorLogger = new RateLogger(LOG); private final Collection<SelectorThread> selectorThreads; private Iterator<SelectorThread> selectorIterator; private volatile boolean reconfiguring = false; public AcceptThread(ServerSocketChannel ss, InetSocketAddress addr, Set<SelectorThread> selectorThreads) throws IOException { super("NIOServerCxnFactory.AcceptThread:" + addr); this.acceptSocket = ss; this.acceptKey = acceptSocket.register(selector, SelectionKey.OP_ACCEPT); this.selectorThreads = Collections.unmodifiableList(new ArrayList<SelectorThread>(selectorThreads)); selectorIterator = this.selectorThreads.iterator(); } // 在run()函数中实现线程的主要逻辑。在run()函数中主要调用select()函数。 public void run() { try { while (!stopped && !acceptSocket.socket().isClosed()) { try { //调用select,将连接加入队列中 select(); } catch (RuntimeException e) { LOG.warn("Ignoring unexpected runtime exception", e); } catch (Exception e) { LOG.warn("Ignoring unexpected exception", e); } } } finally { closeSelector(); // This will wake up the selector threads, and tell the // worker thread pool to begin shutdown. // 这将唤醒选择器线程,并告诉工作线程池将开始关闭. if (!reconfiguring) { NIOServerCnxnFactory.this.stop(); } LOG.info("accept thread exitted run method"); } } public void setReconfiguring() { reconfiguring = true; } // 在select()函数中,会调用java的nio库中的函数: // selector.select()对多个socket进行监控,看是否有读、写事件发生。若没有读、写事件发生,该函数会一直阻塞。 private void select() { try { selector.select(); Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator(); while (!stopped && selectedKeys.hasNext()) { SelectionKey key = selectedKeys.next(); selectedKeys.remove(); // 未获取key即无读写事件发生,阻塞 if (!key.isValid()) { continue; } // 获取到key,即有读写事件发生 if (key.isAcceptable()) { // todo if (!doAccept()) { // If unable to pull a new connection off the accept // queue, pause accepting to give us time to free // up file descriptors and so the accept thread // doesn't spin in a tight loop. // 如果无法从服务器上拔出新连接,请接受 // 排队,暂停接受,给我们自由时间 // 启动文件描述符,因此接受线程 // 不会在一个紧密的循环中旋转。 pauseAccept(10); } } else { LOG.warn("Unexpected ops in accept select {}", key.readyOps()); } } } catch (IOException e) { LOG.warn("Ignoring IOException while selecting", e); } } /** * 若有能够accepted事件发生,则调用doAccept()函数进行处理。在函数doAccept中,会调用socket的accept函数,来完成和客户端的三次握手,建立起tcp * 连接。然后把已经完成连接的socket,设置成非阻塞:sc.configureBlocking(false); * 接下来选择一个selector线程,并把连接好的socket添加到该selector线程的acceptedQueue队列中。 * 可见,accepted队列是一个阻塞队列,添加到该队列后,就需要selector线程来接管已连接socket的后续的消息,所以需要唤醒selector队列。在addAcceptedConnection * 把已连接socket添加到阻塞队列中后,调用wakeupSelector();唤醒对应的selector线程。 */ private boolean doAccept() { // 阻塞 boolean accepted = false; SocketChannel sc = null; try { //完成和客户端的三次握手,建立起tcp连接 sc = acceptSocket.accept(); //非阻塞 accepted = true; if (limitTotalNumberOfCnxns()) { throw new IOException("Too many connections max allowed is " + maxCnxns); } InetAddress ia = sc.socket().getInetAddress(); // 从ipMap中获取IP对应的连接对象,并判断是否超过了 // 当前IP最大连接数量 int cnxncount = getClientCnxnCount(ia); if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns) { // 如果超过则抛异常提示已超过并关闭Socket连接 throw new IOException("Too many connections from " + ia + " - max is " + maxClientCnxns); } LOG.debug("Accepted socket connection from {}", sc.socket().getRemoteSocketAddress()); // 设置成非阻塞 sc.configureBlocking(false); // Round-robin assign this connection to a selector thread // 循环将此连接分配给选择器线程 if (!selectorIterator.hasNext()) { selectorIterator = selectorThreads.iterator(); } SelectorThread selectorThread = selectorIterator.next(); //唤醒对应的selector线程 if (!selectorThread.addAcceptedConnection(sc)) { throw new IOException("Unable to add connection to selector queue" + (stopped ? " (shutdown in progress)" : "")); } acceptErrorLogger.flush(); } catch (IOException e) { // accept, maxClientCnxns, configureBlocking // 接受,maxClientCnxns,配置阻止 ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1); acceptErrorLogger.rateLimitLog("Error accepting new connection: " + e.getMessage()); fastCloseSock(sc); } return accepted; } } /** * 该线程接管连接完成的socket,接收来自该socket的命令处理命令,把处理结果返回给客户端。 * 在主流程中,会调用select()函数来监控socket是否有读和写事件,若有读和写事件会调用handleIO(key)函数对事件进行处理。 */ public class SelectorThread extends AbstractSelectThread { private final int id; // 接收队列,接收来自客户端的连接请求 private final Queue<SocketChannel> acceptedQueue; private final Queue<SelectionKey> updateQueue; public SelectorThread(int id) throws IOException { super("NIOServerCxnFactory.SelectorThread-" + id); this.id = id; acceptedQueue = new LinkedBlockingQueue<SocketChannel>(); updateQueue = new LinkedBlockingQueue<SelectionKey>(); } public boolean addAcceptedConnection(SocketChannel accepted) { if (stopped || !acceptedQueue.offer(accepted)) { return false; } wakeupSelector(); return true; } public void run() { try { while (!stopped) { try { // todo select(); processAcceptedConnections(); processInterestOpsUpdateRequests(); } catch (RuntimeException e) { LOG.warn("Ignoring unexpected runtime exception", e); } catch (Exception e) { LOG.warn("Ignoring unexpected exception", e); } } // Close connections still pending on the selector. Any others // with in-flight work, let drain out of the work queue. for (SelectionKey key : selector.keys()) { NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment(); if (cnxn.isSelectable()) { cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN); } cleanupSelectionKey(key); } SocketChannel accepted; while ((accepted = acceptedQueue.poll()) != null) { fastCloseSock(accepted); } updateQueue.clear(); } finally { closeSelector(); // This will wake up the accept thread and the other selector // threads, and tell the worker thread pool to begin shutdown. NIOServerCnxnFactory.this.stop(); LOG.info("selector thread exitted run method"); } } private void select() { try { selector.select(); Set<SelectionKey> selected = selector.selectedKeys(); ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected); // 随机打乱已经获取到的selectedList集合,至于为什么要打乱 // 估计是为了一定程度上保证各个Client端的请求都能被随机处理 Collections.shuffle(selectedList); Iterator<SelectionKey> selectedKeys = selectedList.iterator(); // 获取选择key while (!stopped && selectedKeys.hasNext()) { SelectionKey key = selectedKeys.next(); selected.remove(key); //如果key无效 if (!key.isValid()) { cleanupSelectionKey(key); continue; } //拥有key且有可读或者可写事件 if (key.isReadable() || key.isWritable()) { // todo handleIO(key); } else { LOG.warn("Unexpected ops in select {}", key.readyOps()); } } } catch (IOException e) { LOG.warn("Ignoring IOException while selecting", e); } } /** * 在handleIO中,会启动woker线程池中的一个worker来处理这个事件, * 处理事件的主类是ScheduledWorkRequest,最终会调用run函数中的workRequest.doWork();来处理请求。 * * 计划与关联的连接上处理的I/O * 给定的SelectionKey。如果未使用工作线程池, * I/O直接由该线程运行 */ private void handleIO(SelectionKey key) { IOWorkRequest workRequest = new IOWorkRequest(this, key); NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment(); // Stop selecting this key while processing on its // connection //在处理其连接时停止选择此键 cnxn.disableSelectable(); key.interestOps(0); touchCnxn(cnxn); workerPool.schedule(workRequest); } private void processAcceptedConnections() { SocketChannel accepted; while (!stopped && (accepted = acceptedQueue.poll()) != null) { SelectionKey key = null; try { // 将Socket 注册到Selector中生成SelectionKey key = accepted.register(selector, SelectionKey.OP_READ); // 生成对应的NIO连接对象 NIOServerCnxn cnxn = createConnection(accepted, key, this); // 将连接对象和SelectionKey进行绑定 key.attach(cnxn); // 这里面会保存IP和连接对象集合,一个IP对应着系列 // 的连接对象,因为一台机器可能有多个连接对象 addCnxn(cnxn); } catch (IOException e) { // register, createConnection cleanupSelectionKey(key); fastCloseSock(accepted); } } } } /** * IOWorkRequest是一个小的包装类,允许执行doIO()调用 * 使用WorkerService在连接上运行。 */ private class IOWorkRequest extends WorkerService.WorkRequest { private final SelectorThread selectorThread; private final SelectionKey key; private final NIOServerCnxn cnxn; IOWorkRequest(SelectorThread selectorThread, SelectionKey key) { this.selectorThread = selectorThread; this.key = key; this.cnxn = (NIOServerCnxn) key.attachment(); } // 在IOWorkRequest.doWork()中会判断key的合法性, // 然后调用NIOServerCnxn.doIO(key)来处理事件 public void doWork() throws InterruptedException { //判断key的合法性 if (!key.isValid()) { selectorThread.cleanupSelectionKey(key); return; } if (key.isReadable() || key.isWritable()) { // todo cnxn.doIO(key); // Check if we shutdown or doIO() closed this connection // 检查是否关闭或doIO()是否关闭了此连接 if (stopped) { cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN); return; } if (!key.isValid()) { selectorThread.cleanupSelectionKey(key); return; } touchCnxn(cnxn); } // Mark this connection as once again ready for selection //将此连接再次标记为可供选择 cnxn.enableSelectable(); // Push an update request on the queue to resume selecting // on the current set of interest ops, which may have changed // as a result of the I/O operations we just performed. // 在队列上推送更新请求以继续选择 // 在当前感兴趣的操作集上,可能已更改 // 作为我们刚才执行的I/O操作的结果。 if (!selectorThread.addInterestOpsUpdateRequest(key)) { cnxn.close(ServerCnxn.DisconnectReason.CONNECTION_MODE_CHANGED); } } }}0
3.4 EventThread监听事件
前面说过SendThread负责和ZK的Server端进行交互,完成发送数据包和接收响应的任务
,而EventThread则是根据SendThread接收到响应类型产生的事件类型进行轮询处理
。也就是说SendThread负责和Server端对接,EventThread则是负责和SendThread对接,处理Client自己产生的ZK事件。
public class NIOServerCnxnFactory extends ServerCnxnFactory { // NIO的Server端SocketChannel,可被多个SocketChannel连接并发送数据 ServerSocketChannel ss; // NIO的多路复用选择器 final Selector selector = Selector.open(); // 保存某一IP和其IP下的所有NIO连接对象 final HashMap<InetAddress, Set<NIOServerCnxn>> ipMap = new HashMap<InetAddress, Set<NIOServerCnxn>>( ); // 同一个IP下默认的最大客户端连接数 int maxClientCnxns = 60; private abstract class AbstractSelectThread extends ZooKeeperThread { protected final Selector selector; public AbstractSelectThread(String name) throws IOException { super(name); // Allows the JVM to shutdown even if this thread is still running. setDaemon(true); this.selector = Selector.open(); } public void wakeupSelector() { selector.wakeup(); } ... } // 功能:该线程主要是接收来自客户端的连接请求,并完成三次握手,建立tcp连接 private class AcceptThread extends AbstractSelectThread { private final ServerSocketChannel acceptSocket; private final SelectionKey acceptKey; private final RateLogger acceptErrorLogger = new RateLogger(LOG); private final Collection<SelectorThread> selectorThreads; private Iterator<SelectorThread> selectorIterator; private volatile boolean reconfiguring = false; public AcceptThread(ServerSocketChannel ss, InetSocketAddress addr, Set<SelectorThread> selectorThreads) throws IOException { super("NIOServerCxnFactory.AcceptThread:" + addr); this.acceptSocket = ss; this.acceptKey = acceptSocket.register(selector, SelectionKey.OP_ACCEPT); this.selectorThreads = Collections.unmodifiableList(new ArrayList<SelectorThread>(selectorThreads)); selectorIterator = this.selectorThreads.iterator(); } // 在run()函数中实现线程的主要逻辑。在run()函数中主要调用select()函数。 public void run() { try { while (!stopped && !acceptSocket.socket().isClosed()) { try { //调用select,将连接加入队列中 select(); } catch (RuntimeException e) { LOG.warn("Ignoring unexpected runtime exception", e); } catch (Exception e) { LOG.warn("Ignoring unexpected exception", e); } } } finally { closeSelector(); // This will wake up the selector threads, and tell the // worker thread pool to begin shutdown. // 这将唤醒选择器线程,并告诉工作线程池将开始关闭. if (!reconfiguring) { NIOServerCnxnFactory.this.stop(); } LOG.info("accept thread exitted run method"); } } public void setReconfiguring() { reconfiguring = true; } // 在select()函数中,会调用java的nio库中的函数: // selector.select()对多个socket进行监控,看是否有读、写事件发生。若没有读、写事件发生,该函数会一直阻塞。 private void select() { try { selector.select(); Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator(); while (!stopped && selectedKeys.hasNext()) { SelectionKey key = selectedKeys.next(); selectedKeys.remove(); // 未获取key即无读写事件发生,阻塞 if (!key.isValid()) { continue; } // 获取到key,即有读写事件发生 if (key.isAcceptable()) { // todo if (!doAccept()) { // If unable to pull a new connection off the accept // queue, pause accepting to give us time to free // up file descriptors and so the accept thread // doesn't spin in a tight loop. // 如果无法从服务器上拔出新连接,请接受 // 排队,暂停接受,给我们自由时间 // 启动文件描述符,因此接受线程 // 不会在一个紧密的循环中旋转。 pauseAccept(10); } } else { LOG.warn("Unexpected ops in accept select {}", key.readyOps()); } } } catch (IOException e) { LOG.warn("Ignoring IOException while selecting", e); } } /** * 若有能够accepted事件发生,则调用doAccept()函数进行处理。在函数doAccept中,会调用socket的accept函数,来完成和客户端的三次握手,建立起tcp * 连接。然后把已经完成连接的socket,设置成非阻塞:sc.configureBlocking(false); * 接下来选择一个selector线程,并把连接好的socket添加到该selector线程的acceptedQueue队列中。 * 可见,accepted队列是一个阻塞队列,添加到该队列后,就需要selector线程来接管已连接socket的后续的消息,所以需要唤醒selector队列。在addAcceptedConnection * 把已连接socket添加到阻塞队列中后,调用wakeupSelector();唤醒对应的selector线程。 */ private boolean doAccept() { // 阻塞 boolean accepted = false; SocketChannel sc = null; try { //完成和客户端的三次握手,建立起tcp连接 sc = acceptSocket.accept(); //非阻塞 accepted = true; if (limitTotalNumberOfCnxns()) { throw new IOException("Too many connections max allowed is " + maxCnxns); } InetAddress ia = sc.socket().getInetAddress(); // 从ipMap中获取IP对应的连接对象,并判断是否超过了 // 当前IP最大连接数量 int cnxncount = getClientCnxnCount(ia); if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns) { // 如果超过则抛异常提示已超过并关闭Socket连接 throw new IOException("Too many connections from " + ia + " - max is " + maxClientCnxns); } LOG.debug("Accepted socket connection from {}", sc.socket().getRemoteSocketAddress()); // 设置成非阻塞 sc.configureBlocking(false); // Round-robin assign this connection to a selector thread // 循环将此连接分配给选择器线程 if (!selectorIterator.hasNext()) { selectorIterator = selectorThreads.iterator(); } SelectorThread selectorThread = selectorIterator.next(); //唤醒对应的selector线程 if (!selectorThread.addAcceptedConnection(sc)) { throw new IOException("Unable to add connection to selector queue" + (stopped ? " (shutdown in progress)" : "")); } acceptErrorLogger.flush(); } catch (IOException e) { // accept, maxClientCnxns, configureBlocking // 接受,maxClientCnxns,配置阻止 ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1); acceptErrorLogger.rateLimitLog("Error accepting new connection: " + e.getMessage()); fastCloseSock(sc); } return accepted; } } /** * 该线程接管连接完成的socket,接收来自该socket的命令处理命令,把处理结果返回给客户端。 * 在主流程中,会调用select()函数来监控socket是否有读和写事件,若有读和写事件会调用handleIO(key)函数对事件进行处理。 */ public class SelectorThread extends AbstractSelectThread { private final int id; // 接收队列,接收来自客户端的连接请求 private final Queue<SocketChannel> acceptedQueue; private final Queue<SelectionKey> updateQueue; public SelectorThread(int id) throws IOException { super("NIOServerCxnFactory.SelectorThread-" + id); this.id = id; acceptedQueue = new LinkedBlockingQueue<SocketChannel>(); updateQueue = new LinkedBlockingQueue<SelectionKey>(); } public boolean addAcceptedConnection(SocketChannel accepted) { if (stopped || !acceptedQueue.offer(accepted)) { return false; } wakeupSelector(); return true; } public void run() { try { while (!stopped) { try { // todo select(); processAcceptedConnections(); processInterestOpsUpdateRequests(); } catch (RuntimeException e) { LOG.warn("Ignoring unexpected runtime exception", e); } catch (Exception e) { LOG.warn("Ignoring unexpected exception", e); } } // Close connections still pending on the selector. Any others // with in-flight work, let drain out of the work queue. for (SelectionKey key : selector.keys()) { NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment(); if (cnxn.isSelectable()) { cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN); } cleanupSelectionKey(key); } SocketChannel accepted; while ((accepted = acceptedQueue.poll()) != null) { fastCloseSock(accepted); } updateQueue.clear(); } finally { closeSelector(); // This will wake up the accept thread and the other selector // threads, and tell the worker thread pool to begin shutdown. NIOServerCnxnFactory.this.stop(); LOG.info("selector thread exitted run method"); } } private void select() { try { selector.select(); Set<SelectionKey> selected = selector.selectedKeys(); ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected); // 随机打乱已经获取到的selectedList集合,至于为什么要打乱 // 估计是为了一定程度上保证各个Client端的请求都能被随机处理 Collections.shuffle(selectedList); Iterator<SelectionKey> selectedKeys = selectedList.iterator(); // 获取选择key while (!stopped && selectedKeys.hasNext()) { SelectionKey key = selectedKeys.next(); selected.remove(key); //如果key无效 if (!key.isValid()) { cleanupSelectionKey(key); continue; } //拥有key且有可读或者可写事件 if (key.isReadable() || key.isWritable()) { // todo handleIO(key); } else { LOG.warn("Unexpected ops in select {}", key.readyOps()); } } } catch (IOException e) { LOG.warn("Ignoring IOException while selecting", e); } } /** * 在handleIO中,会启动woker线程池中的一个worker来处理这个事件, * 处理事件的主类是ScheduledWorkRequest,最终会调用run函数中的workRequest.doWork();来处理请求。 * * 计划与关联的连接上处理的I/O * 给定的SelectionKey。如果未使用工作线程池, * I/O直接由该线程运行 */ private void handleIO(SelectionKey key) { IOWorkRequest workRequest = new IOWorkRequest(this, key); NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment(); // Stop selecting this key while processing on its // connection //在处理其连接时停止选择此键 cnxn.disableSelectable(); key.interestOps(0); touchCnxn(cnxn); workerPool.schedule(workRequest); } private void processAcceptedConnections() { SocketChannel accepted; while (!stopped && (accepted = acceptedQueue.poll()) != null) { SelectionKey key = null; try { // 将Socket 注册到Selector中生成SelectionKey key = accepted.register(selector, SelectionKey.OP_READ); // 生成对应的NIO连接对象 NIOServerCnxn cnxn = createConnection(accepted, key, this); // 将连接对象和SelectionKey进行绑定 key.attach(cnxn); // 这里面会保存IP和连接对象集合,一个IP对应着系列 // 的连接对象,因为一台机器可能有多个连接对象 addCnxn(cnxn); } catch (IOException e) { // register, createConnection cleanupSelectionKey(key); fastCloseSock(accepted); } } } } /** * IOWorkRequest是一个小的包装类,允许执行doIO()调用 * 使用WorkerService在连接上运行。 */ private class IOWorkRequest extends WorkerService.WorkRequest { private final SelectorThread selectorThread; private final SelectionKey key; private final NIOServerCnxn cnxn; IOWorkRequest(SelectorThread selectorThread, SelectionKey key) { this.selectorThread = selectorThread; this.key = key; this.cnxn = (NIOServerCnxn) key.attachment(); } // 在IOWorkRequest.doWork()中会判断key的合法性, // 然后调用NIOServerCnxn.doIO(key)来处理事件 public void doWork() throws InterruptedException { //判断key的合法性 if (!key.isValid()) { selectorThread.cleanupSelectionKey(key); return; } if (key.isReadable() || key.isWritable()) { // todo cnxn.doIO(key); // Check if we shutdown or doIO() closed this connection // 检查是否关闭或doIO()是否关闭了此连接 if (stopped) { cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN); return; } if (!key.isValid()) { selectorThread.cleanupSelectionKey(key); return; } touchCnxn(cnxn); } // Mark this connection as once again ready for selection //将此连接再次标记为可供选择 cnxn.enableSelectable(); // Push an update request on the queue to resume selecting // on the current set of interest ops, which may have changed // as a result of the I/O operations we just performed. // 在队列上推送更新请求以继续选择 // 在当前感兴趣的操作集上,可能已更改 // 作为我们刚才执行的I/O操作的结果。 if (!selectorThread.addInterestOpsUpdateRequest(key)) { cnxn.close(ServerCnxn.DisconnectReason.CONNECTION_MODE_CHANGED); } } }}1
执行到这里新建连接的流程已经执行完毕了,接下来看下ClientWatchManager是如何将ZK的事件和Watcher进行绑定的。
3.5 ClientWatchManager监听器管理类
这个类会管理四种逻辑类型的监听器,至于具体的类型可以看以前的文章。接下来简单的看下其materialize方法的实现。
public class NIOServerCnxnFactory extends ServerCnxnFactory { // NIO的Server端SocketChannel,可被多个SocketChannel连接并发送数据 ServerSocketChannel ss; // NIO的多路复用选择器 final Selector selector = Selector.open(); // 保存某一IP和其IP下的所有NIO连接对象 final HashMap<InetAddress, Set<NIOServerCnxn>> ipMap = new HashMap<InetAddress, Set<NIOServerCnxn>>( ); // 同一个IP下默认的最大客户端连接数 int maxClientCnxns = 60; private abstract class AbstractSelectThread extends ZooKeeperThread { protected final Selector selector; public AbstractSelectThread(String name) throws IOException { super(name); // Allows the JVM to shutdown even if this thread is still running. setDaemon(true); this.selector = Selector.open(); } public void wakeupSelector() { selector.wakeup(); } ... } // 功能:该线程主要是接收来自客户端的连接请求,并完成三次握手,建立tcp连接 private class AcceptThread extends AbstractSelectThread { private final ServerSocketChannel acceptSocket; private final SelectionKey acceptKey; private final RateLogger acceptErrorLogger = new RateLogger(LOG); private final Collection<SelectorThread> selectorThreads; private Iterator<SelectorThread> selectorIterator; private volatile boolean reconfiguring = false; public AcceptThread(ServerSocketChannel ss, InetSocketAddress addr, Set<SelectorThread> selectorThreads) throws IOException { super("NIOServerCxnFactory.AcceptThread:" + addr); this.acceptSocket = ss; this.acceptKey = acceptSocket.register(selector, SelectionKey.OP_ACCEPT); this.selectorThreads = Collections.unmodifiableList(new ArrayList<SelectorThread>(selectorThreads)); selectorIterator = this.selectorThreads.iterator(); } // 在run()函数中实现线程的主要逻辑。在run()函数中主要调用select()函数。 public void run() { try { while (!stopped && !acceptSocket.socket().isClosed()) { try { //调用select,将连接加入队列中 select(); } catch (RuntimeException e) { LOG.warn("Ignoring unexpected runtime exception", e); } catch (Exception e) { LOG.warn("Ignoring unexpected exception", e); } } } finally { closeSelector(); // This will wake up the selector threads, and tell the // worker thread pool to begin shutdown. // 这将唤醒选择器线程,并告诉工作线程池将开始关闭. if (!reconfiguring) { NIOServerCnxnFactory.this.stop(); } LOG.info("accept thread exitted run method"); } } public void setReconfiguring() { reconfiguring = true; } // 在select()函数中,会调用java的nio库中的函数: // selector.select()对多个socket进行监控,看是否有读、写事件发生。若没有读、写事件发生,该函数会一直阻塞。 private void select() { try { selector.select(); Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator(); while (!stopped && selectedKeys.hasNext()) { SelectionKey key = selectedKeys.next(); selectedKeys.remove(); // 未获取key即无读写事件发生,阻塞 if (!key.isValid()) { continue; } // 获取到key,即有读写事件发生 if (key.isAcceptable()) { // todo if (!doAccept()) { // If unable to pull a new connection off the accept // queue, pause accepting to give us time to free // up file descriptors and so the accept thread // doesn't spin in a tight loop. // 如果无法从服务器上拔出新连接,请接受 // 排队,暂停接受,给我们自由时间 // 启动文件描述符,因此接受线程 // 不会在一个紧密的循环中旋转。 pauseAccept(10); } } else { LOG.warn("Unexpected ops in accept select {}", key.readyOps()); } } } catch (IOException e) { LOG.warn("Ignoring IOException while selecting", e); } } /** * 若有能够accepted事件发生,则调用doAccept()函数进行处理。在函数doAccept中,会调用socket的accept函数,来完成和客户端的三次握手,建立起tcp * 连接。然后把已经完成连接的socket,设置成非阻塞:sc.configureBlocking(false); * 接下来选择一个selector线程,并把连接好的socket添加到该selector线程的acceptedQueue队列中。 * 可见,accepted队列是一个阻塞队列,添加到该队列后,就需要selector线程来接管已连接socket的后续的消息,所以需要唤醒selector队列。在addAcceptedConnection * 把已连接socket添加到阻塞队列中后,调用wakeupSelector();唤醒对应的selector线程。 */ private boolean doAccept() { // 阻塞 boolean accepted = false; SocketChannel sc = null; try { //完成和客户端的三次握手,建立起tcp连接 sc = acceptSocket.accept(); //非阻塞 accepted = true; if (limitTotalNumberOfCnxns()) { throw new IOException("Too many connections max allowed is " + maxCnxns); } InetAddress ia = sc.socket().getInetAddress(); // 从ipMap中获取IP对应的连接对象,并判断是否超过了 // 当前IP最大连接数量 int cnxncount = getClientCnxnCount(ia); if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns) { // 如果超过则抛异常提示已超过并关闭Socket连接 throw new IOException("Too many connections from " + ia + " - max is " + maxClientCnxns); } LOG.debug("Accepted socket connection from {}", sc.socket().getRemoteSocketAddress()); // 设置成非阻塞 sc.configureBlocking(false); // Round-robin assign this connection to a selector thread // 循环将此连接分配给选择器线程 if (!selectorIterator.hasNext()) { selectorIterator = selectorThreads.iterator(); } SelectorThread selectorThread = selectorIterator.next(); //唤醒对应的selector线程 if (!selectorThread.addAcceptedConnection(sc)) { throw new IOException("Unable to add connection to selector queue" + (stopped ? " (shutdown in progress)" : "")); } acceptErrorLogger.flush(); } catch (IOException e) { // accept, maxClientCnxns, configureBlocking // 接受,maxClientCnxns,配置阻止 ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1); acceptErrorLogger.rateLimitLog("Error accepting new connection: " + e.getMessage()); fastCloseSock(sc); } return accepted; } } /** * 该线程接管连接完成的socket,接收来自该socket的命令处理命令,把处理结果返回给客户端。 * 在主流程中,会调用select()函数来监控socket是否有读和写事件,若有读和写事件会调用handleIO(key)函数对事件进行处理。 */ public class SelectorThread extends AbstractSelectThread { private final int id; // 接收队列,接收来自客户端的连接请求 private final Queue<SocketChannel> acceptedQueue; private final Queue<SelectionKey> updateQueue; public SelectorThread(int id) throws IOException { super("NIOServerCxnFactory.SelectorThread-" + id); this.id = id; acceptedQueue = new LinkedBlockingQueue<SocketChannel>(); updateQueue = new LinkedBlockingQueue<SelectionKey>(); } public boolean addAcceptedConnection(SocketChannel accepted) { if (stopped || !acceptedQueue.offer(accepted)) { return false; } wakeupSelector(); return true; } public void run() { try { while (!stopped) { try { // todo select(); processAcceptedConnections(); processInterestOpsUpdateRequests(); } catch (RuntimeException e) { LOG.warn("Ignoring unexpected runtime exception", e); } catch (Exception e) { LOG.warn("Ignoring unexpected exception", e); } } // Close connections still pending on the selector. Any others // with in-flight work, let drain out of the work queue. for (SelectionKey key : selector.keys()) { NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment(); if (cnxn.isSelectable()) { cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN); } cleanupSelectionKey(key); } SocketChannel accepted; while ((accepted = acceptedQueue.poll()) != null) { fastCloseSock(accepted); } updateQueue.clear(); } finally { closeSelector(); // This will wake up the accept thread and the other selector // threads, and tell the worker thread pool to begin shutdown. NIOServerCnxnFactory.this.stop(); LOG.info("selector thread exitted run method"); } } private void select() { try { selector.select(); Set<SelectionKey> selected = selector.selectedKeys(); ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected); // 随机打乱已经获取到的selectedList集合,至于为什么要打乱 // 估计是为了一定程度上保证各个Client端的请求都能被随机处理 Collections.shuffle(selectedList); Iterator<SelectionKey> selectedKeys = selectedList.iterator(); // 获取选择key while (!stopped && selectedKeys.hasNext()) { SelectionKey key = selectedKeys.next(); selected.remove(key); //如果key无效 if (!key.isValid()) { cleanupSelectionKey(key); continue; } //拥有key且有可读或者可写事件 if (key.isReadable() || key.isWritable()) { // todo handleIO(key); } else { LOG.warn("Unexpected ops in select {}", key.readyOps()); } } } catch (IOException e) { LOG.warn("Ignoring IOException while selecting", e); } } /** * 在handleIO中,会启动woker线程池中的一个worker来处理这个事件, * 处理事件的主类是ScheduledWorkRequest,最终会调用run函数中的workRequest.doWork();来处理请求。 * * 计划与关联的连接上处理的I/O * 给定的SelectionKey。如果未使用工作线程池, * I/O直接由该线程运行 */ private void handleIO(SelectionKey key) { IOWorkRequest workRequest = new IOWorkRequest(this, key); NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment(); // Stop selecting this key while processing on its // connection //在处理其连接时停止选择此键 cnxn.disableSelectable(); key.interestOps(0); touchCnxn(cnxn); workerPool.schedule(workRequest); } private void processAcceptedConnections() { SocketChannel accepted; while (!stopped && (accepted = acceptedQueue.poll()) != null) { SelectionKey key = null; try { // 将Socket 注册到Selector中生成SelectionKey key = accepted.register(selector, SelectionKey.OP_READ); // 生成对应的NIO连接对象 NIOServerCnxn cnxn = createConnection(accepted, key, this); // 将连接对象和SelectionKey进行绑定 key.attach(cnxn); // 这里面会保存IP和连接对象集合,一个IP对应着系列 // 的连接对象,因为一台机器可能有多个连接对象 addCnxn(cnxn); } catch (IOException e) { // register, createConnection cleanupSelectionKey(key); fastCloseSock(accepted); } } } } /** * IOWorkRequest是一个小的包装类,允许执行doIO()调用 * 使用WorkerService在连接上运行。 */ private class IOWorkRequest extends WorkerService.WorkRequest { private final SelectorThread selectorThread; private final SelectionKey key; private final NIOServerCnxn cnxn; IOWorkRequest(SelectorThread selectorThread, SelectionKey key) { this.selectorThread = selectorThread; this.key = key; this.cnxn = (NIOServerCnxn) key.attachment(); } // 在IOWorkRequest.doWork()中会判断key的合法性, // 然后调用NIOServerCnxn.doIO(key)来处理事件 public void doWork() throws InterruptedException { //判断key的合法性 if (!key.isValid()) { selectorThread.cleanupSelectionKey(key); return; } if (key.isReadable() || key.isWritable()) { // todo cnxn.doIO(key); // Check if we shutdown or doIO() closed this connection // 检查是否关闭或doIO()是否关闭了此连接 if (stopped) { cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN); return; } if (!key.isValid()) { selectorThread.cleanupSelectionKey(key); return; } touchCnxn(cnxn); } // Mark this connection as once again ready for selection //将此连接再次标记为可供选择 cnxn.enableSelectable(); // Push an update request on the queue to resume selecting // on the current set of interest ops, which may have changed // as a result of the I/O operations we just performed. // 在队列上推送更新请求以继续选择 // 在当前感兴趣的操作集上,可能已更改 // 作为我们刚才执行的I/O操作的结果。 if (!selectorThread.addInterestOpsUpdateRequest(key)) { cnxn.close(ServerCnxn.DisconnectReason.CONNECTION_MODE_CHANGED); } } }}2
不得不说这是一个庞大的工程量,阅读完ZK的源码后对平时使用以及某些配置都有更加深刻的理解了,只是对于ZK的ByteBuffer空间大小的4字节分配还有些犯迷糊。后续再补回来。
能耐心看到这里的想必也是决定了把ZK琢磨透的秀儿吧。
参考文章
zookeeper3.7版本github源码注释分析## zk源码分析系列Zookeeper原理和源码学习系列\ Zookeeper学习系列\ Zookeeper源码系列
原文:https://juejin.cn/post/7100111462823100424