2. Server端接收处理响应数据


2.1 NIOServerCnxnFactory接收NIO请求



private abstract class AbstractSelectThread extends ZooKeeperThread;// 功能:该线程主要是接收来自客户端的连接请求,并完成三次握手,建立tcp连接private class AcceptThread extends AbstractSelectThread;/** * 该线程接管连接完成的socket,接收来自该socket的命令处理命令,把处理结果返回给客户端。 * 在主流程中,会调用select()函数来监控socket是否有读和写事件,若有读和写事件会调用handleIO(key)函数对事件进行处理。 */public class SelectorThread extends AbstractSelectThread;/** * IOWorkRequest是一个小的包装类,允许执行doIO()调用 * 使用WorkerService在连接上运行。 */private class IOWorkRequest extends WorkerService.WorkRequest/** * 此线程负责关闭过时的连接,以便未建立会话的连接已正确过期。 */private class ConnectionExpirerThread extends ZooKeeperThread

AcceptThread 线程负责接收来自客户端的连接,并将SocketChannel放入到SelectorThread的acceptedQueue队列中。

SelectorThread 线程负责将读写事件交给workerPool.schedule(workRequest);处理,然后IOWorkRequest.doWork()方法处理,交给NIOServerCnxn.doIO()处理。详细代码如下:

public class NIOServerCnxnFactory extends ServerCnxnFactory {    // NIO的Server端SocketChannel,可被多个SocketChannel连接并发送数据    ServerSocketChannel ss;    // NIO的多路复用选择器    final Selector selector = Selector.open();    // 保存某一IP和其IP下的所有NIO连接对象    final HashMap<InetAddress, Set<NIOServerCnxn>> ipMap =            new HashMap<InetAddress, Set<NIOServerCnxn>>( );    // 同一个IP下默认的最大客户端连接数    int maxClientCnxns = 60;    private abstract class AbstractSelectThread extends ZooKeeperThread {        protected final Selector selector;        public AbstractSelectThread(String name) throws IOException {            super(name);            // Allows the JVM to shutdown even if this thread is still running.            setDaemon(true);            this.selector = Selector.open();        }        public void wakeupSelector() {            selector.wakeup();        }       ...    }    // 功能:该线程主要是接收来自客户端的连接请求,并完成三次握手,建立tcp连接    private class AcceptThread extends AbstractSelectThread {        private final ServerSocketChannel acceptSocket;        private final SelectionKey acceptKey;        private final RateLogger acceptErrorLogger = new RateLogger(LOG);        private final Collection<SelectorThread> selectorThreads;        private Iterator<SelectorThread> selectorIterator;        private volatile boolean reconfiguring = false;        public AcceptThread(ServerSocketChannel ss, InetSocketAddress addr, Set<SelectorThread> selectorThreads) throws IOException {            super("NIOServerCxnFactory.AcceptThread:" + addr);            this.acceptSocket = ss;            this.acceptKey = acceptSocket.register(selector, SelectionKey.OP_ACCEPT);            this.selectorThreads = Collections.unmodifiableList(new ArrayList<SelectorThread>(selectorThreads));            selectorIterator = this.selectorThreads.iterator();        }        // 在run()函数中实现线程的主要逻辑。在run()函数中主要调用select()函数。        public void run() {            try {                while (!stopped && !acceptSocket.socket().isClosed()) {                    try {                        //调用select,将连接加入队列中                        select();                    } catch (RuntimeException e) {                        LOG.warn("Ignoring unexpected runtime exception", e);                    } catch (Exception e) {                        LOG.warn("Ignoring unexpected exception", e);                    }                }            } finally {                closeSelector();                // This will wake up the selector threads, and tell the                // worker thread pool to begin shutdown.                // 这将唤醒选择器线程,并告诉工作线程池将开始关闭.                if (!reconfiguring) {                    NIOServerCnxnFactory.this.stop();                }                LOG.info("accept thread exitted run method");            }        }        public void setReconfiguring() {            reconfiguring = true;        }        // 在select()函数中,会调用java的nio库中的函数:        // selector.select()对多个socket进行监控,看是否有读、写事件发生。若没有读、写事件发生,该函数会一直阻塞。        private void select() {            try {                selector.select();                Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();                while (!stopped && selectedKeys.hasNext()) {                    SelectionKey key = selectedKeys.next();                    selectedKeys.remove();                    // 未获取key即无读写事件发生,阻塞                    if (!key.isValid()) {                        continue;                    }                    // 获取到key,即有读写事件发生                    if (key.isAcceptable()) {                        // todo                        if (!doAccept()) {                            // If unable to pull a new connection off the accept                            // queue, pause accepting to give us time to free                            // up file descriptors and so the accept thread                            // doesn't spin in a tight loop.                            // 如果无法从服务器上拔出新连接,请接受                            // 排队,暂停接受,给我们自由时间                            // 启动文件描述符,因此接受线程                            // 不会在一个紧密的循环中旋转。                            pauseAccept(10);                        }                    } else {                        LOG.warn("Unexpected ops in accept select {}", key.readyOps());                    }                }            } catch (IOException e) {                LOG.warn("Ignoring IOException while selecting", e);            }        }        /**         * 若有能够accepted事件发生,则调用doAccept()函数进行处理。在函数doAccept中,会调用socket的accept函数,来完成和客户端的三次握手,建立起tcp         * 连接。然后把已经完成连接的socket,设置成非阻塞:sc.configureBlocking(false);         * 接下来选择一个selector线程,并把连接好的socket添加到该selector线程的acceptedQueue队列中。         * 可见,accepted队列是一个阻塞队列,添加到该队列后,就需要selector线程来接管已连接socket的后续的消息,所以需要唤醒selector队列。在addAcceptedConnection         * 把已连接socket添加到阻塞队列中后,调用wakeupSelector();唤醒对应的selector线程。         */        private boolean doAccept() {            // 阻塞            boolean accepted = false;            SocketChannel sc = null;            try {                //完成和客户端的三次握手,建立起tcp连接                sc = acceptSocket.accept();                //非阻塞                accepted = true;                if (limitTotalNumberOfCnxns()) {                    throw new IOException("Too many connections max allowed is " + maxCnxns);                }                InetAddress ia = sc.socket().getInetAddress();                // 从ipMap中获取IP对应的连接对象,并判断是否超过了                // 当前IP最大连接数量                int cnxncount = getClientCnxnCount(ia);                if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns) {                    // 如果超过则抛异常提示已超过并关闭Socket连接                    throw new IOException("Too many connections from " + ia + " - max is " + maxClientCnxns);                }                LOG.debug("Accepted socket connection from {}", sc.socket().getRemoteSocketAddress());                // 设置成非阻塞                sc.configureBlocking(false);                // Round-robin assign this connection to a selector thread                // 循环将此连接分配给选择器线程                if (!selectorIterator.hasNext()) {                    selectorIterator = selectorThreads.iterator();                }                SelectorThread selectorThread = selectorIterator.next();                //唤醒对应的selector线程                if (!selectorThread.addAcceptedConnection(sc)) {                    throw new IOException("Unable to add connection to selector queue"                                          + (stopped ? " (shutdown in progress)" : ""));                }                acceptErrorLogger.flush();            } catch (IOException e) {                // accept, maxClientCnxns, configureBlocking                // 接受,maxClientCnxns,配置阻止                ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);                acceptErrorLogger.rateLimitLog("Error accepting new connection: " + e.getMessage());                fastCloseSock(sc);            }            return accepted;        }    }    /**     * 该线程接管连接完成的socket,接收来自该socket的命令处理命令,把处理结果返回给客户端。     * 在主流程中,会调用select()函数来监控socket是否有读和写事件,若有读和写事件会调用handleIO(key)函数对事件进行处理。     */    public class SelectorThread extends AbstractSelectThread {        private final int id;        // 接收队列,接收来自客户端的连接请求        private final Queue<SocketChannel> acceptedQueue;        private final Queue<SelectionKey> updateQueue;        public SelectorThread(int id) throws IOException {            super("NIOServerCxnFactory.SelectorThread-" + id);            this.id = id;            acceptedQueue = new LinkedBlockingQueue<SocketChannel>();            updateQueue = new LinkedBlockingQueue<SelectionKey>();        }        public boolean addAcceptedConnection(SocketChannel accepted) {            if (stopped || !acceptedQueue.offer(accepted)) {                return false;            }            wakeupSelector();            return true;        }        public void run() {            try {                while (!stopped) {                    try {                        // todo                        select();                        processAcceptedConnections();                        processInterestOpsUpdateRequests();                    } catch (RuntimeException e) {                        LOG.warn("Ignoring unexpected runtime exception", e);                    } catch (Exception e) {                        LOG.warn("Ignoring unexpected exception", e);                    }                }                // Close connections still pending on the selector. Any others                // with in-flight work, let drain out of the work queue.                for (SelectionKey key : selector.keys()) {                    NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();                    if (cnxn.isSelectable()) {                        cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN);                    }                    cleanupSelectionKey(key);                }                SocketChannel accepted;                while ((accepted = acceptedQueue.poll()) != null) {                    fastCloseSock(accepted);                }                updateQueue.clear();            } finally {                closeSelector();                // This will wake up the accept thread and the other selector                // threads, and tell the worker thread pool to begin shutdown.                NIOServerCnxnFactory.this.stop();                LOG.info("selector thread exitted run method");            }        }        private void select() {            try {                selector.select();                Set<SelectionKey> selected = selector.selectedKeys();                ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected);                // 随机打乱已经获取到的selectedList集合,至于为什么要打乱                // 估计是为了一定程度上保证各个Client端的请求都能被随机处理                Collections.shuffle(selectedList);                Iterator<SelectionKey> selectedKeys = selectedList.iterator();                // 获取选择key                while (!stopped && selectedKeys.hasNext()) {                    SelectionKey key = selectedKeys.next();                    selected.remove(key);                    //如果key无效                    if (!key.isValid()) {                        cleanupSelectionKey(key);                        continue;                    }                    //拥有key且有可读或者可写事件                    if (key.isReadable() || key.isWritable()) {                        // todo                        handleIO(key);                    } else {                        LOG.warn("Unexpected ops in select {}", key.readyOps());                    }                }            } catch (IOException e) {                LOG.warn("Ignoring IOException while selecting", e);            }        }        /**         * 在handleIO中,会启动woker线程池中的一个worker来处理这个事件,         * 处理事件的主类是ScheduledWorkRequest,最终会调用run函数中的workRequest.doWork();来处理请求。         *         * 计划与关联的连接上处理的I/O         * 给定的SelectionKey。如果未使用工作线程池,         * I/O直接由该线程运行         */        private void handleIO(SelectionKey key) {            IOWorkRequest workRequest = new IOWorkRequest(this, key);            NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();            // Stop selecting this key while processing on its            // connection            //在处理其连接时停止选择此键            cnxn.disableSelectable();            key.interestOps(0);            touchCnxn(cnxn);            workerPool.schedule(workRequest);        }        private void processAcceptedConnections() {            SocketChannel accepted;            while (!stopped && (accepted = acceptedQueue.poll()) != null) {                SelectionKey key = null;                try {                    // 将Socket 注册到Selector中生成SelectionKey                    key = accepted.register(selector, SelectionKey.OP_READ);                    // 生成对应的NIO连接对象                    NIOServerCnxn cnxn = createConnection(accepted, key, this);                    // 将连接对象和SelectionKey进行绑定                    key.attach(cnxn);                    // 这里面会保存IP和连接对象集合,一个IP对应着系列                    // 的连接对象,因为一台机器可能有多个连接对象                    addCnxn(cnxn);                } catch (IOException e) {                    // register, createConnection                    cleanupSelectionKey(key);                    fastCloseSock(accepted);                }            }        }    }    /**     * IOWorkRequest是一个小的包装类,允许执行doIO()调用     * 使用WorkerService在连接上运行。     */    private class IOWorkRequest extends WorkerService.WorkRequest {        private final SelectorThread selectorThread;        private final SelectionKey key;        private final NIOServerCnxn cnxn;        IOWorkRequest(SelectorThread selectorThread, SelectionKey key) {            this.selectorThread = selectorThread;            this.key = key;            this.cnxn = (NIOServerCnxn) key.attachment();        }        // 在IOWorkRequest.doWork()中会判断key的合法性,        // 然后调用NIOServerCnxn.doIO(key)来处理事件        public void doWork() throws InterruptedException {            //判断key的合法性            if (!key.isValid()) {                selectorThread.cleanupSelectionKey(key);                return;            }            if (key.isReadable() || key.isWritable()) {                // todo                cnxn.doIO(key);                // Check if we shutdown or doIO() closed this connection                // 检查是否关闭或doIO()是否关闭了此连接                if (stopped) {                    cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN);                    return;                }                if (!key.isValid()) {                    selectorThread.cleanupSelectionKey(key);                    return;                }                touchCnxn(cnxn);            }            // Mark this connection as once again ready for selection            //将此连接再次标记为可供选择            cnxn.enableSelectable();            // Push an update request on the queue to resume selecting            // on the current set of interest ops, which may have changed            // as a result of the I/O operations we just performed.            // 在队列上推送更新请求以继续选择            // 在当前感兴趣的操作集上,可能已更改            // 作为我们刚才执行的I/O操作的结果。            if (!selectorThread.addInterestOpsUpdateRequest(key)) {                cnxn.close(ServerCnxn.DisconnectReason.CONNECTION_MODE_CHANGED);            }        }    }}

2.2 连接对象NIOServerCnxn


public class NIOServerCnxn extends ServerCnxn {    // 这三个对象便不用做过多介绍了    NIOServerCnxnFactory factory;    final SocketChannel sock;    private final SelectionKey sk;    // 用来读取请求长度的buffer对象    ByteBuffer lenBuffer = ByteBuffer.allocate(4);    // 实际接受请求长度的buffer对象    ByteBuffer incomingBuffer = lenBuffer;    // 是否已经初始化,默认值为false    boolean initialized;    private final ZooKeeperServer zkServer;    // 本连接对应的sessionId,刚开始sessionId不会有,只有当ZK的Server端处理了    // ConnectRequest之后才会被赋值    long sessionId;    // 写操作使用的ByteBuffer集合    LinkedBlockingQueue<ByteBuffer> outgoingBuffers;    public NIOServerCnxn(ZooKeeperServer zk, SocketChannel sock,        SelectionKey sk, NIOServerCnxnFactory factory) throws IOException {        ...        // 前面的赋值可以忽略,当创建本对象时将会默认开启读事件        sk.interestOps(SelectionKey.OP_READ);    }    void doIO(SelectionKey k) throws InterruptedException {        try {            // 进行操作前需要判断Socket是否被关闭            if (isSocketOpen() == false) {                return;            }            // 判断读事件            if (k.isReadable()) {                // 从Socket中先读取数据,注意的是incomingBuffer容量只有4字节                int rc = sock.read(incomingBuffer);                // 读取长度异常                if (rc < 0) {                    throw new EndOfStreamException();                }                // 读取完毕开始进行处理                if (incomingBuffer.remaining() == 0) {                    boolean isPayload;                    // 当这两个完全相等说明已经是下一次连接了,新建时无需分析                    if (incomingBuffer == lenBuffer) {                        incomingBuffer.flip();                        isPayload = readLength(k);                        incomingBuffer.clear();                    } else {                        isPayload = true;                    }                    if (isPayload) {                        // 读取具体连接的地方                        readPayload();                    }                    else {                        return;                    }                }            }            // 写事件类型            if (k.isWritable()) {                // 如果ByteBuffer集合不为空才进入,新建连接时如果响应没有一次性                // 发送完剩余的会被放在outgoingBuffers集合中依次发送出去                if (outgoingBuffers.size() > 0) {                    // 给发送的ByteBuffer对象分配空间,大小为64 * 1024字节                    ByteBuffer directBuffer = factory.directBuffer;                    directBuffer.clear();                    for (ByteBuffer b : outgoingBuffers) {                        // 这里执行的操作是把已经发送过的数据剔除掉                        // 留下未发送的数据截取下来重新发送                        if (directBuffer.remaining() < b.remaining()) {                            b = (ByteBuffer) b.slice().limit(                                    directBuffer.remaining());                        }                        int p = b.position();                        // 将未发送的数据放入directBuffer中                        directBuffer.put(b);                        // 更新outgoingBuffers中的ByteBuffer对象属性,以便                        // 后续使用                        b.position(p);                        // 如果directBuffer的空间都被占用光了,则直接停止从                        // outgoingBuffers集合中获取                        if (directBuffer.remaining() == 0) {                            break;                        }                    }                    directBuffer.flip();                    // 发送directBuffer中的数据                    int sent = sock.write(directBuffer);                    ByteBuffer bb;                    // 这部分的循环便是再次判断前面使用过的对象                    // 看这些对象是否已经发送完,根据position信息判断如果发送完                    // 则从outgoingBuffers集合中移除                    while (outgoingBuffers.size() > 0) {                        bb = outgoingBuffers.peek();                        if (bb == ServerCnxnFactory.closeConn) {                            throw new CloseRequestException();                        }                        // 获取ByteBuffer的剩余数据                        int left = bb.remaining() - sent;                        // 如果到此大于0,说明前面的数据已经填充满                        // 直接退出循环                        if (left > 0) {                            bb.position(bb.position() + sent);                            break;                        }                        // 执行到这里说明ByteBuffer对象已经发送完毕,可以更新                        // 发送状态并从将其从outgoingBuffers中移除                        packetSent();                        sent -= bb.remaining();                        outgoingBuffers.remove();                    }                }                synchronized(this.factory){                    if (outgoingBuffers.size() == 0) {                        // 如果outgoingBuffers已经全部被消化完了便把                        // OP_WRITE操作关闭                        if (!initialized && (sk.interestOps()                                 & SelectionKey.OP_READ) == 0) {                            throw new CloseRequestException();                        }                        sk.interestOps(sk.interestOps()                                & (~SelectionKey.OP_WRITE));                    } else {                        // 如果还剩余一些没有发送完,则继续打开OP_WRITE操作                        // 接着下次轮询发送                        sk.interestOps(sk.interestOps()                                | SelectionKey.OP_WRITE);                    }                }            }        }         // 异常处理忽略        ...    }    private void readPayload() throws IOException, InterruptedException {        // 前面已经判断过,这里一定不会成立        if (incomingBuffer.remaining() != 0) {            int rc = sock.read(incomingBuffer);            if (rc < 0) {                throw new EndOfStreamException();            }        }        if (incomingBuffer.remaining() == 0) {            // 进行接收报文数量+1和更新Server端接收报文数量+1的操作            packetReceived();            incomingBuffer.flip();            // 第一次进来肯定是false            if (!initialized) {                // 因此这里肯定会进入调用处理ConnectRequest的方法中                readConnectRequest();            } else {                // 这里是处理其它Request的方法,此次暂不分析,后续分析ping和                // 其它操作时再来分析此方法中的流程                readRequest();            }            lenBuffer.clear();            // 处理完这次请求后再将incomingBuffer复原            incomingBuffer = lenBuffer;        }    }    private void readConnectRequest()             throws IOException, InterruptedException {        if (zkServer == null) {            throw new IOException("ZooKeeperServer not running");        }        // 调用ZooKeeperServer的方法处理连接请求        zkServer.processConnectRequest(this, incomingBuffer);        // 当前面执行完毕后说明已经初始化完成了        initialized = true;    }}

2.3 单机运行的ZooKeeperServer



public class ZooKeeperServer implements SessionExpirer,         ServerStats.Provider {    // 默认3S检测一次客户端存活情况    public static final int DEFAULT_TICK_TIME = 3000;    // 实际设置的检测存活时间间隔    protected int tickTime = DEFAULT_TICK_TIME;    // Server端可接受的最小Client端sessionTimeout,如果未设置则值为tickTime*2    protected int minSessionTimeout = -1;    // Server端可接受的最大Client端sessionTimeout,如果未设置则值为tickTime*20    protected int maxSessionTimeout = -1;    // 处理客户端请求RequestProcessor的第一个实现类对象    protected RequestProcessor firstProcessor;    public void processConnectRequest(ServerCnxn cnxn,             ByteBuffer incomingBuffer) throws IOException {        BinaryInputArchive bia = BinaryInputArchive                .getArchive(new ByteBufferInputStream(incomingBuffer));        // 反序列化ByteBuffer对象为ConnectRequest对象        ConnectRequest connReq = new ConnectRequest();        connReq.deserialize(bia, "connect");        boolean readOnly = false;        try {            // 是否只可读            readOnly = bia.readBool("readOnly");            cnxn.isOldClient = false;        } catch (IOException e) {            ...        }        // 只有ReadOnlyZooKeeperServer类型的Server只接收readOnly为true的        if (readOnly == false && this instanceof ReadOnlyZooKeeperServer) {            ...            throw new CloseRequestException(msg);        }        // 获取的zxid需要小于Server端最大的zxid        if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {            ...            throw new CloseRequestException(msg);        }        // 这段代码便是Server和Client端协商具体的sessionTimeout值        // 1、获取客户端传来的sessionTimeout        int sessionTimeout = connReq.getTimeOut();        byte passwd[] = connReq.getPasswd();        // 2、先判断sessionTimeout是否小于Server端可接受的最小值        // 如果小于Server端可接受最小值则设置成Server端的最小sessionTimeout        int minSessionTimeout = getMinSessionTimeout();        if (sessionTimeout < minSessionTimeout) {            sessionTimeout = minSessionTimeout;        }        // 3、再判断sessionTimeout是否大于Server端可接受的最大值        // 如果大于Server端可接受最大值则设置成Server端的最大sessionTimeout        int maxSessionTimeout = getMaxSessionTimeout();        if (sessionTimeout > maxSessionTimeout) {            sessionTimeout = maxSessionTimeout;        }        // 最后把满足协商范围的sessionTimeout设置到Client连接对象中        cnxn.setSessionTimeout(sessionTimeout);        // 设置该连接对象不再从Client端接收数据        cnxn.disableRecv();        long sessionId = connReq.getSessionId();        // 第一次连接不手动设置sessionId都是0        if (sessionId != 0) {            // 如果不是0则需要关闭原来的session并且重新打开sessionId            // 这种情况不常见,只需要知道处理的代码逻辑在这里便行,暂不详细分析            long clientSessionId = connReq.getSessionId();            serverCnxnFactory.closeSession(sessionId);            cnxn.setSessionId(sessionId);            reopenSession(cnxn, sessionId, passwd, sessionTimeout);        } else {            // 开始创建新的session信息            createSession(cnxn, passwd, sessionTimeout);        }    }    long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {        // 根据失效时间创建一个新的session信息并返回唯一ID        long sessionId = sessionTracker.createSession(timeout);        // 设置失效时间和sessionId        Random r = new Random(sessionId ^ superSecret);        r.nextBytes(passwd);        ByteBuffer to = ByteBuffer.allocate(4);        to.putInt(timeout);        cnxn.setSessionId(sessionId);        // 调用该方法使用刚刚获取到的属性去生成Request请求        submitRequest(cnxn, sessionId, OpCode.createSession, 0, to, null);        return sessionId;    }    private void submitRequest(ServerCnxn cnxn, long sessionId, int type,        int xid, ByteBuffer bb, List<Id> authInfo) {        // 根据参数生成Request对象,并调用submitRequest()方法开始使用        // RequestProcessor链对Request进行处理        Request si = new Request(cnxn, sessionId, xid, type, bb, authInfo);        submitRequest(si);    }    public void submitRequest(Request si) {        // 这个方法功能很简单:        // 1、判断Server端是否初始化完成,如果未完成则一直持续等待        // 2、在调用RequestProcessor链前先更新session在Server端的过期时间        // 3、调用firstProcessor对象的processRequest方法开始处理请求        if (firstProcessor == null) {            synchronized (this) {                try {                    // 一直轮询直到Server端的各种组件初始化完成                    while (state == State.INITIAL) {                        wait(1000);                    }                } ...                // 如果未初始化成功则抛出异常                if (firstProcessor == null || state != State.RUNNING) {                    throw new RuntimeException("Not started");                }            }        }        // todo 将请求放到requestThrottler的submittedRequests队列中        requestThrottler.submitRequest(si);    }    public void submitRequest(Request request) {        if (stopping) {            LOG.debug("Shutdown in progress. Request cannot be processed");            dropRequest(request);        } else {            request.requestThrottleQueueTime = Time.currentElapsedTime();            // todo            submittedRequests.add(request);        }    }    将请求放到requestThrottler的submittedRequests队列中,然后在requestThrottler 的run()方法中调用zks.submitRequestNow(request);    public void submitRequestNow(Request si) {        ..         try {            // 更新session的过期时间            touch(si.cnxn);            // 校验请求类型是否有效            boolean validpacket = Request.isValid(si.type);            if (validpacket) {                setLocalSessionFlag(si);                // todo 开始调用firstProcessor对象的processRequest()方法处理请求                firstProcessor.processRequest(si);                if (si.cnxn != null) {                    incInProcess();                }            } else {                LOG.warn("Received packet at server of unknown type {}", si.type);                // Update request accounting/throttling limits                requestFinished(si);                // 如果处理类型校验不通过则发送无法处理请求并关闭连接                new UnimplementedRequestProcessor().processRequest(si);            }        }         ...    }    void touch(ServerCnxn cnxn) throws MissingSessionException {        if (cnxn == null) {            return;        }        long id = cnxn.getSessionId();        int to = cnxn.getSessionTimeout();        // 获取sessionId和sessionTimeout属性调用sessionTracker去更新session        // 在Server端的过期时间        if (!sessionTracker.touchSession(id, to)) {            throw new MissingSessionException();        }    }}

2.4 session追踪类SessionTracker


public class SessionTrackerImpl extends ZooKeeperCriticalThread         implements SessionTracker {    // 保存sessionId和对应的Session对象    HashMap<Long, SessionImpl> sessionsById;    HashMap<Long, SessionSet> sessionSets;    // key为sessionId,value为这个session的过期时间    ConcurrentHashMap<Long, Integer> sessionsWithTimeout;    // 下一次新建session时的id    long nextSessionId = 0;    public long createSession(int sessionTimeout) {        long sessionId = nextSessionId.getAndIncrement();        // 在使用RequestProcessor处理请求前会调用该方法为客户端创建一个session        trackSession(sessionId, sessionTimeout);        return sessionId;    }    @Override    public synchronized boolean trackSession(long id, int sessionTimeout) {        boolean added = false;        // 如果没有保存对应的Session对象则创建一个并添加        SessionImpl session = sessionsById.get(id);        if (session == null) {            session = new SessionImpl(id, sessionTimeout);        }        // findbugs2.0.3 complains about get after put.        // long term strategy would be use computeIfAbsent after JDK 1.8        SessionImpl existedSession = sessionsById.putIfAbsent(id, session);        if (existedSession != null) {            session = existedSession;        } else {            added = true;            LOG.debug("Adding session 0x{}", Long.toHexString(id));        }        if (LOG.isTraceEnabled()) {            String actionStr = added ? "Adding" : "Existing";            ZooTrace.logTraceMessage(                LOG,                ZooTrace.SESSION_TRACE_MASK,                "SessionTrackerImpl --- " + actionStr                + " session 0x" + Long.toHexString(id) + " " + sessionTimeout);        }        // 添加完session后更新session的过期时间        updateSessionExpiry(session, sessionTimeout);        return added;    private void updateSessionExpiry(SessionImpl s, int timeout) {        logTraceTouchSession(s.sessionId, timeout, "");        sessionExpiryQueue.update(s, timeout);    }    // 当client与server有交互时(连接请求/读写操作/心跳),该方法就会被调用    // 当zk server启动时会将磁盘中的session恢复到内存,也会调用该方法    // 该方法在做的是会话换桶操作    public Long update(E elem, int timeout) {        // elemMap集合的key为session,value为该session的过期时间,        // 即该session当前所在的会话桶id        Long prevExpiryTime = elemMap.get(elem);        long now = Time.currentElapsedTime();        // 计算本次交互应该将会话放入到哪个会话桶        Long newExpiryTime = roundToNextInterval(now + timeout);        // 若之前所在会话桶id与本次交互计算的会话桶id相同,        // 则无需换桶,即什么也不用做        if (newExpiryTime.equals(prevExpiryTime)) {            // No change, so nothing to update            return null;        }        // ---------- 代码能走到这里,说明需要换桶了。 --------------        // 换桶由两步操作完成:将会话放入到新桶;将会话从老桶中清除        // First add the elem to the new expiry time bucket in expiryMap.        // 从会话桶集合中获取当前的会话桶,若为null,则创建一个新的会话桶        Set<E> set = expiryMap.get(newExpiryTime);        if (set == null) {            // Construct a ConcurrentHashSet using a ConcurrentHashMap            // 创建会话桶set            set = Collections.newSetFromMap(new ConcurrentHashMap<E, Boolean>());            // Put the new set in the map, but only if another thread            // hasn't beaten us to it            // 将新建的会话桶放入到会话桶集合            Set<E> existingSet = expiryMap.putIfAbsent(newExpiryTime, set);            if (existingSet != null) {                set = existingSet;            }        }        // 将会话放入到会话桶        set.add(elem);        // Map the elem to the new expiry time. If a different previous        // mapping was present, clean up the previous expiry bucket.        // 将会话与会话桶id的对应关系放入到elemMap,并获取到该会话之前所在的会话桶id        prevExpiryTime = elemMap.put(elem, newExpiryTime);        // 若当前会话桶id与之前会话桶id不相同,说明需要换桶。        // 而前面已经将会话放到了新的会话桶,所以这里要将会话从老桶中清除        if (prevExpiryTime != null && !newExpiryTime.equals(prevExpiryTime)) {            // 获取到之前的会话桶            Set<E> prevSet = expiryMap.get(prevExpiryTime);            if (prevSet != null) {                // 将会话从老会话桶中清除                prevSet.remove(elem);            }        }        // 返回当前交互引发的会话所在的会话桶id,        // 即当前会话的真正过期时间点        return newExpiryTime;    }}

2.5 RequestProcessor请求处理链


2.5.1 PrepRequestProcessor

public class PrepRequestProcessor extends ZooKeeperCriticalThread         implements RequestProcessor {    // 本RequestProcessor中用来暂时保存需要处理的Request,轮询获取请求处理    LinkedBlockingQueue<Request> submittedRequests =             new LinkedBlockingQueue<Request>();    // 本RequestProcessor的下一个RequestProcessor对象    RequestProcessor nextProcessor;    ZooKeeperServer zks;    @Override    public void processRequest(Request request) {        // RequestProcessor的实现方法,由于内部使用轮询方式从submittedRequests        // 集合获取数据,因此在这里直接把Request添加到集合中即可        submittedRequests.add(request);    }    @Override    public void run() {        try {            while (true) {                // 轮询从submittedRequests集合中获取Request对象                Request request = submittedRequests.take();                // 如果requestOfDeath代表ZK已经关闭,因此退出循环                if (Request.requestOfDeath == request) {                    break;                }                // 开始处理正常的Request                pRequest(request);            }        }...    }    protected void pRequest(Request request)             throws RequestProcessorException {        request.setHdr(null);        request.setTxn(null);        if (!request.isThrottled()) {            // todo          pRequestHelper(request);        }          request.zxid = zks.getZxid();        // 调用下个RequestProcessor来处理Request        nextProcessor.processRequest(request);    }    private void pRequestHelper(Request request) throws RequestProcessorException {        try {            switch (request.type) {            ...            case OpCode.createSession:            case OpCode.closeSession:                if (!request.isLocalSession()) {                    // 直接处理事务                    pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);                }                break;                ...           }       }    }    protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) throws KeeperException, IOException, RequestProcessorException {        if (request.getHdr() == null) {            // 为请求创建事务头TxnHeader对象            request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,                    Time.currentWallTime(), type));        }        switch (type) {        ...        // 创建session        case OpCode.createSession:            request.request.rewind();            // 此时的to实际上就是sessionTimeout            int to = request.request.getInt();            // 使用sessionTimeout创建CreateSessionTxn对象            request.setTxn(new CreateSessionTxn(to));            request.request.rewind();            // only add the global session tracker but not to ZKDb            // 根据sessionid和sessionTimeout再次新增session信息            zks.sessionTracker.trackSession(request.sessionId, to);            zks.setOwner(request.sessionId, request.getOwner());            break;        ...        }    }    protected void pRequest2Txn(int type, long zxid, Request request,             Record record, boolean deserialize)            throws KeeperException, IOException, RequestProcessorException{        // 为请求创建事务头TxnHeader对象        request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,                                    zks.getTime(), type);        switch (type) {            // 无关的case情况忽略            ...            case OpCode.createSession:                request.request.rewind();                // 此时的to实际上就是sessionTimeout                int to = request.request.getInt();                // 使用sessionTimeout创建CreateSessionTxn对象                request.txn = new CreateSessionTxn(to);                request.request.rewind();                // 根据sessionid和sessionTimeout再次新增session信息                zks.sessionTracker.addSession(request.sessionId, to);                zks.setOwner(request.sessionId, request.getOwner());                break;            ...    }}

2.5.2 SyncRequestProcessor

public class SyncRequestProcessor extends ZooKeeperCriticalThread         implements RequestProcessor {    // 本RequestProcessor中用来暂时保存需要处理的Request,轮询获取请求处理    private final LinkedBlockingQueue<Request> queuedRequests =            new LinkedBlockingQueue<Request>();    // 保存的是已经被写入磁盘但是待刷新的事务    private final LinkedList<Request> toFlush = new LinkedList<Request>();    // 本RequestProcessor的下一个RequestProcessor对象    private final RequestProcessor nextProcessor;    // Server端快照的数量    private static int snapCount = ZooKeeperServer.getSnapCount();    // 在回滚前的log数量,随机生成的    private static int randRoll;    public void processRequest(Request request) {        // 类似于PrepRequestProcessor,内部使用轮询方式从submittedRequests        // 集合获取数据,因此在这里直接把Request添加到集合中即可        queuedRequests.add(request);    }    @Override    public void run() {        try {            int logCount = 0;            // 避免服务都在同一时间获取快照snapshot,这里面设置的是randRoll属性            setRandRoll(r.nextInt(snapCount/2));            while (true) {            long pollTime = Math.min(zks.getMaxWriteQueuePollTime(), getRemainingDelay());            // 从queuedRequests获取Request            Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS);                // 如果已经结束则退出循环                if (si == requestOfDeath) {                    break;                }                if (si != null) {                    // 将Request写入到log中                    if (zks.getZKDatabase().append(si)) {                        logCount++;                        // 如果日志的数量大于某个临界点,则生成一次快照                        if (logCount > (snapCount / 2 + randRoll)) {                            // 途中会异步生成快照,过程忽略,操作完之后                            // logCount 归零                            ...                            logCount = 0;                        }                    } else if (toFlush.isEmpty()) {                        // 如果所有的事务都处理完则使用nextProcessor                        // 开始进行下一步处理                        if (nextProcessor != null) {                            // 进行处理                            nextProcessor.processRequest(si);                            if (nextProcessor instanceof Flushable) {                                ((Flushable)nextProcessor).flush();                            }                        }                        continue;                    }                    // 如果前面两个条件都不满足,则把Request添加到待刷新的                    // 事务集合中                    toFlush.add(si);                    if (toFlush.size() > 1000) {                        // 当待刷事务到达了1000个,则把集合中的所有事务全都                        // 刷掉并使用nextProcessor依次进行处理                        flush(toFlush);                    }                }            }        } ...    }}

2.5.2 FinalRequestProcessor

public class FinalRequestProcessor implements RequestProcessor {    ZooKeeperServer zks;    public void processRequest(Request request) {        // 直接开始处理Request请求        ProcessTxnResult rc = null;        if (!request.isThrottled()) {          rc = applyRequest(request);        }        // 如果执行到这里连接对象还为空则直接退出        if (request.cnxn == null) {            return;        }        ServerCnxn cnxn = request.cnxn;        long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();        String lastOp = "NA";        // Notify ZooKeeperServer that the request has finished so that it can        // update any request accounting/throttling limits        // 执行中的数量减一        zks.decInProcess();        zks.requestFinished(request);        Code err = Code.OK;        Record rsp = null;        String path = null;        int responseSize = 0;        try {            // 如果发生了异常则直接抛出            if (request.getHdr() != null && request.getHdr().getType() == OpCode.error) {                AuditHelper.addAuditLog(request, rc, true);                // 如果是单个的操作发生了异常抛出                if (request.getException() != null) {                    throw request.getException();                } else {                    throw KeeperException.create(KeeperException.Code.get(((ErrorTxn) request.getTxn()).getErr()));                }            }            ...            // 开始根据Request的操作类型进行相应的处理            switch (request.type) {            ...            case OpCode.createSession: {                // 最后的操作类型                lastOp = "SESS";                // 更新状态                updateStats(request, lastOp, lastZxid);                // 最后调用这个方法来完成session的初始化以及响应                zks.finishSessionInit(request.cnxn, true);                // 直接退出方法                return;            }           }       }    }}2.6 ZooKeeperServer新建连接生成响应对象又再次回到了ZooKeeperServer类中,这里面执行了Server端针对新建连接的最后响应,其实我也搞不懂为什么要把新建连接单独的抽出来放到ZooKeeperServer类中来,或许唯一能解释的便是方便处理已存在session重新创建这个流程。public class ZooKeeperServer implements         SessionExpirer, ServerStats.Provider {    public void finishSessionInit(ServerCnxn cnxn, boolean valid) {        // 使用JMX监控注册连接对象cnxn        try {            // valid指的是是否成功创建session信息            if (valid) {                serverCnxnFactory.registerConnection(cnxn);            }        }...        try {            // 如果valid为true,则使用cnxn连接对象的sessionTimemout,否则为0            // 如果valid为true,则使用cnxn连接对象的ssessionId,否则为0            // 如果valid为true,则使用cnxn连接对象的ssessionId生成密码,否则空            ConnectResponse rsp = new ConnectResponse(0,                     valid ? cnxn.getSessionTimeout()                    : 0, valid ? cnxn.getSessionId() : 0,                    valid ? generatePasswd(cnxn.getSessionId())                    : new byte[16]);            // 生成响应的字节对象            ByteArrayOutputStream baos = new ByteArrayOutputStream();            BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);            bos.writeInt(-1, "len");            rsp.serialize(bos, "connect");            if (!cnxn.isOldClient) {                bos.writeBool(                    this instanceof ReadOnlyZooKeeperServer, "readOnly");            }            baos.close();            // 根据刚刚生成的字节数组申城ByteBuffer            ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());            bb.putInt(bb.remaining() - 4).rewind();            // 发送ByteBuffer对象内容            cnxn.sendBuffer(bb);             // 如果valid失效则关掉连接               if (!valid) {                cnxn.sendBuffer(ServerCnxnFactory.closeConn);            } else {                // 如果成功则确保能读取到Client端发送过来的数据                cnxn.enableRecv();            }        } catch (Exception e) {            cnxn.close();        }    }}2.7 NIOServerCnxn发送新建连接响应执行到这一步已经到了新建连接的尾声了,这一步只有发送ByteBuffer对象的数据,其它的操作相对而言并不是很重要。public class NIOServerCnxn extends ServerCnxn {    public void sendBuffer(ByteBuffer bb) {        try {            // 只有非关闭连接的操作才能使用Socket发送数据            if (bb != ServerCnxnFactory.closeConn) {                // 确保SelectionKey的OP_WRITE没有被开启,以确保等下wake唤醒                // Selector可以进行重试                if ((sk.interestOps() & SelectionKey.OP_WRITE) == 0) {                    try {                        // 发送缓存数据                        sock.write(bb);                    } catch (IOException e) {                    }                }                if (bb.remaining() == 0) {                    // 如果缓存数据发送完毕则更新ZK的Server状态                    packetSent();                    return;                }            }            // 如果跑到这里说明ByteBuffer并未全部发送,因此需要唤醒Selector            // 把剩余的ByteBuffer数据发送出去            synchronized(this.factory){                sk.selector().wakeup();                // 添加到outgoingBuffers集合中交给doIO()方法里面的write方法                // 类型处理,该逻辑在前面已经分析过了,可以直接回头看                outgoingBuffers.add(bb);                if (sk.isValid()) {                    // 将OP_WRITE打开                    sk.interestOps(                            sk.interestOps() | SelectionKey.OP_WRITE);                }            }        }     }    // hdr和txn都是和连接相关的对象,里面的方法执行的操作为添加    // session信息,到这里已经是新建连接的第三次调用新增session信息    // 当然这里面还会调用DataTree.processTxn()方法,只是不会执行    // 很重要的逻辑代码    public ProcessTxnResult processTxn(Request request) {        TxnHeader hdr = request.getHdr();        processTxnForSessionEvents(request, hdr, request.getTxn());        final boolean writeRequest = (hdr != null);        final boolean quorumRequest = request.isQuorum();        // return fast w/o synchronization when we get a read        if (!writeRequest && !quorumRequest) {            return new ProcessTxnResult();        }        synchronized (outstandingChanges) {            //            ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest());            // request.hdr is set for write requests, which are the only ones            // that add to outstandingChanges.            if (writeRequest) {                long zxid = hdr.getZxid();                // 新建连接流程outstandingChanges是空的,因此这里的循环逻辑暂不分析                while (!outstandingChanges.isEmpty()                        && outstandingChanges.peek().zxid <= zxid) {                    ChangeRecord cr = outstandingChanges.remove();                    ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1);                    if (cr.zxid < zxid) {                        LOG.warn(                            "Zxid outstanding 0x{} is less than current 0x{}",                            Long.toHexString(cr.zxid),                            Long.toHexString(zxid));                    }                    if (outstandingChangesForPath.get(cr.path) == cr) {                        outstandingChangesForPath.remove(cr.path);                    }                }            }            // do not add non quorum packets to the queue.            if (quorumRequest) {                getZKDatabase().addCommittedProposal(request);            }            return rc;        }    }}

3. Client端接收响应


3.1 SendThread接收通知


class SendThread extends ZooKeeperThread {    @Override    public void run() {        ...        while (state.isAlive()) {            try {                ...                // 还是老地方,调用doTransport()方法处理NIO的事件                clientCnxnSocket.doTransport(to, pendingQueue,                         outgoingQueue, ClientCnxn.this);            }        }        ...    }}

3.2 ClientCnxnSocketNIO处理读事件


public class ClientCnxnSocketNIO extends ClientCnxnSocket {    @Override    void doTransport(int waitTimeOut, List<Packet> pendingQueue,             LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)            throws IOException, InterruptedException {        // 老逻辑,不再分析        selector.select(waitTimeOut);        Set<SelectionKey> selected;        synchronized (this) {            selected = selector.selectedKeys();        }        updateNow();        for (SelectionKey k : selected) {            SocketChannel sc = ((SocketChannel) k.channel());            if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {                if (sc.finishConnect()) {                    updateLastSendAndHeard();                    sendThread.primeConnection();                }            } else if ((k.readyOps() &                     (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {                // 针对客户端的响应均会进入到该方法中                doIO(pendingQueue, outgoingQueue, cnxn);            }        }        // 后面略        ...    }    void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,             ClientCnxn cnxn) throws InterruptedException, IOException {        SocketChannel sock = (SocketChannel) sockKey.channel();        if (sock == null) {            throw new IOException("Socket is null!");        }        // 开始处理读事件        if (sockKey.isReadable()) {            // 从Socket中读取数据            int rc = sock.read(incomingBuffer);            if (rc < 0) {                throw new EndOfStreamException();            }            // incomingBuffer已经读取完毕            if (!incomingBuffer.hasRemaining()) {                incomingBuffer.flip();                if (incomingBuffer == lenBuffer) {                    recvCount++;                    readLength();                } else if (!initialized) {                    // 新建连接将会跑到这里来,因为此时Client端的initialized                    // 还是为false,尚未初始化完成                    // 开始读取连接响应结果                    readConnectResult();                    // 开启Socket的OP_READ操作                    enableRead();                    // 查看outgoingQueue队列是否有可读包数据                    if (findSendablePacket(outgoingQueue, cnxn.sendThread                        .clientTunneledAuthenticationInProgress())!=null){                        // 如果有的话则开启OP_WRITE操作,准备下次轮询时处理                        // 写事件                        enableWrite();                    }                    // 设置initialized属性初始化完成并更新lastHeard属性                    lenBuffer.clear();                    incomingBuffer = lenBuffer;                    updateLastHeard();                    initialized = true;                } else {                    // 这里是当新建连接成功后普通的操作响应处理逻辑                    sendThread.readResponse(incomingBuffer);                    lenBuffer.clear();                    incomingBuffer = lenBuffer;                    updateLastHeard();                }            }        }        // 后面的处理写事件忽略    }    void readConnectResult() throws IOException {        // 使用读取到的ByteBuffer对象反序列化得到ConnectResponse响应        ByteBufferInputStream bbis =                 new ByteBufferInputStream(incomingBuffer);        BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);        ConnectResponse conRsp = new ConnectResponse();        conRsp.deserialize(bbia, "connect");        boolean isRO = false;        try {            // 读取readOnly属性            isRO = bbia.readBool("readOnly");        }...        // 开始进行连接成功的操作        this.sessionId = conRsp.getSessionId();        sendThread.onConnected(conRsp.getTimeOut(), this.sessionId,                conRsp.getPasswd(), isRO);    }}

3.3 ClientCnxn处理连接成功


3.4 EventThread监听事件


3.5 ClientWatchManager监听器管理类


