欢迎大家关注 github.com/hsfxuebao ,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈
1. 客户端连接源码分析
ZKClient 客户端,Curator 客户端,详情见:Zookeeper安装和客户端使用
先下结论:
Client 要创建一个连接,其首先会在本地创建一个 ZooKeeper 对象
,用于表示其所连接上的 Server。连接成功后,该连接的各种临时性数据会被初始化到 zk 对象中
。连接关闭后,这个代表 Server 的 zk 对象会被删除。
我们知道常用的ZK客户端技术有ZKClient 客户端,Curator 客户端,而客户端在连接ZK Server的时候,会配置集群信息,而连接集群中具体哪一台服务器,采用轮询的方式,先将集群配置信息打散,打散以后再轮询(默认情况,当然可以指定重连策略)
1.1 ZKClient源码分析:
下面是ZKClient使用的DEMO:
public class ZKClientTest { // 指定 zk 集群 private static final String CLUSTER = "zkOS:2181"; // 指定节点名称 private static final String PATH = "/mylog"; public static void main(String[] args) { // ---------------- 创建会话 ----------- // 创建 zkClient ZkClient zkClient = new ZkClient(CLUSTER); // 为 zkClient 指定序列化器 zkClient.setZkSerializer(new SerializableSerializer()); // ---------------- 创建节点 ----------- // 指定创建持久节点 CreateMode mode = CreateMode.PERSISTENT; // 指定节点数据内容 String data = "first log"; // 创建节点 String nodeName = zkClient.create(PATH, data, mode); ...
追踪ZKClient源码,看下是如何连接的,从ZkClient 构造开始:
public class ZkClient implements Watcher { ... public ZkClient(String serverstring) { this(serverstring, Integer.MAX_VALUE); } public ZkClient(String zkServers, int connectionTimeout) { //关键点 看到创建了ZkConnection对象 this(new ZkConnection(zkServers), connectionTimeout); } ... //构造一直走,会走到下面该方法 public ZkClient(final IZkConnection zkConnection, final int connectionTimeout, final ZkSerializer zkSerializer, final long operationRetryTimeout) { if (zkConnection == null) { throw new NullPointerException("Zookeeper connection is null!"); } //将创建的ZkConnection,赋值到_connection 成员变量上 _connection = zkConnection; _zkSerializer = zkSerializer; _operationRetryTimeoutInMillis = operationRetryTimeout; _isZkSaslEnabled = isZkSaslEnabled(); connect(connectionTimeout, this); } public void connect(final long maxMsToWaitUntilConnected, Watcher watcher) throws ZkInterruptedException, ZkTimeoutException, IllegalStateException { boolean started = false; acquireEventLock(); try { setShutdownTrigger(false); _eventThread = new ZkEventThread(_connection.getServers()); _eventThread.start(); //调用ZkConnection.connect进行连接 _connection.connect(watcher); LOG.debug("Awaiting connection to Zookeeper server"); boolean waitSuccessful = waitUntilConnected(maxMsToWaitUntilConnected, TimeUnit.MILLISECONDS); if (!waitSuccessful) { throw new ZkTimeoutException("Unable to connect to zookeeper server '" + _connection.getServers() + "' with timeout of " + maxMsToWaitUntilConnected + " ms"); } started = true; } finally { getEventLock().unlock(); // we should close the zookeeper instance, otherwise it would keep // on trying to connect if (!started) { close(); } } }}
通过上面源码追踪,看到ZKClient连接实际上是通过ZkConnection.connect方法进行连接的,我们继续追踪ZkConnection
public class ZkConnection implements IZkConnection { ... //关键对象ZooKeeper private ZooKeeper _zk = null; ... public ZkConnection(String zkServers, int sessionTimeOut) { _servers = zkServers; _sessionTimeOut = sessionTimeOut; } @Override public void connect(Watcher watcher) { _zookeeperLock.lock(); try { if (_zk != null) { throw new IllegalStateException("zk client has already been started"); } try { LOG.debug("Creating new ZookKeeper instance to connect to " + _servers + "."); //!!!可以看到实际上ZKCLient与服务端连接,靠的就是ZooKeeper对象 _zk = new ZooKeeper(_servers, _sessionTimeOut, watcher); } catch (IOException e) { throw new ZkException("Unable to connect to " + _servers, e); } } finally { _zookeeperLock.unlock(); } }}
1.2 Curator 源码分析:
下面是Curator使用的DEMO:
public class FluentTest { public static void main(String[] args) throws Exception { // ---------------- 创建会话 ----------- // 创建重试策略对象:重试间隔时间是1秒,最多重试 3 次 ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3); // 创建客户端 CuratorFramework client = CuratorFrameworkFactory .builder() .connectString("zkOS:2181") .sessionTimeoutMs(15000) .connectionTimeoutMs(13000) .retryPolicy(retryPolicy) //namespace:根路径,所有操作都是基于该路径之上 .namespace("logs") .build(); // 开启客户端 client.start(); ...
追踪Curator源码,看下是如何连接的,从client.start()开始:
public class CuratorFrameworkImpl implements CuratorFramework{ ... @Override public void start(){ log.info("Starting"); if ( !state.compareAndSet(CuratorFrameworkState.LATENT, CuratorFrameworkState.STARTED) ){ throw new IllegalStateException("Cannot be started more than once"); } try{ ... this.getConnectionStateListenable().addListener(listener); client.start(); ... }catch ( Exception e ){ ThreadUtils.checkInterrupted(e); handleBackgroundOperationException(null, e); } }}
关注client.start();这个方法:
public class CuratorZookeeperClient implements Closeable{ ... public void start() throws Exception { log.debug("Starting"); if ( !started.compareAndSet(false, true) ) { throw new IllegalStateException("Already started"); } state.start(); } ...}
继续追踪state.start();
class ConnectionState implements Watcher, Closeable{ ... void start() throws Exception{ log.debug("Starting"); ensembleProvider.start(); reset(); } synchronized void reset() throws Exception{ log.debug("reset"); instanceIndex.incrementAndGet(); isConnected.set(false); connectionStartMs = System.currentTimeMillis(); handleHolder.closeAndReset(); handleHolder.getZooKeeper(); // initiate connection } ...}
关键点看handleHolder.getZooKeeper()方法:
class HandleHolder{ ... ZooKeeper getZooKeeper() throws Exception{ return (helper != null) ? helper.getZooKeeper() : null; } ...}class Helper{ private final Data data; ... ZooKeeper getZooKeeper() throws Exception{ return data.zooKeeperHandle; } ...}
直接从data里面取了,Hepler是什么时候创建的呢?回到org.apache.curator.ConnectionState#reset
,看handleHolder.closeAndReset()
方法:
class HandleHolder{ ... void closeAndReset() throws Exception{ internalClose(0); Helper.Data data = new Helper.Data(); helper = new Helper(data){ @Override ZooKeeper getZooKeeper() throws Exception{ synchronized(this){ if ( data.zooKeeperHandle == null ){ resetConnectionString(ensembleProvider.getConnectionString()); data.zooKeeperHandle = zookeeperFactory.newZooKeeper(data.connectionString, sessionTimeout, watcher, canBeReadOnly); } helper = new Helper(data); return super.getZooKeeper(); } } }; } ...}
我们看下data.zooKeeperHandle到底是怎么创建的:
public class NonAdminZookeeperFactory implements ZookeeperFactory{ @Override public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception{ return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly); }}
可以看到,无论哪种客户端技术,最终都会在本地创建一个ZooKeeper对象
,接下来我们分析ZK源码中的ZooKeeper对象
1.3 ZK源码中客户端对象ZooKeeper
我们找到ZK源码中的ZooKeeper对象代码(下面是构造代码):
public class ZkClient implements Watcher { ... public ZkClient(String serverstring) { this(serverstring, Integer.MAX_VALUE); } public ZkClient(String zkServers, int connectionTimeout) { //关键点 看到创建了ZkConnection对象 this(new ZkConnection(zkServers), connectionTimeout); } ... //构造一直走,会走到下面该方法 public ZkClient(final IZkConnection zkConnection, final int connectionTimeout, final ZkSerializer zkSerializer, final long operationRetryTimeout) { if (zkConnection == null) { throw new NullPointerException("Zookeeper connection is null!"); } //将创建的ZkConnection,赋值到_connection 成员变量上 _connection = zkConnection; _zkSerializer = zkSerializer; _operationRetryTimeoutInMillis = operationRetryTimeout; _isZkSaslEnabled = isZkSaslEnabled(); connect(connectionTimeout, this); } public void connect(final long maxMsToWaitUntilConnected, Watcher watcher) throws ZkInterruptedException, ZkTimeoutException, IllegalStateException { boolean started = false; acquireEventLock(); try { setShutdownTrigger(false); _eventThread = new ZkEventThread(_connection.getServers()); _eventThread.start(); //调用ZkConnection.connect进行连接 _connection.connect(watcher); LOG.debug("Awaiting connection to Zookeeper server"); boolean waitSuccessful = waitUntilConnected(maxMsToWaitUntilConnected, TimeUnit.MILLISECONDS); if (!waitSuccessful) { throw new ZkTimeoutException("Unable to connect to zookeeper server '" + _connection.getServers() + "' with timeout of " + maxMsToWaitUntilConnected + " ms"); } started = true; } finally { getEventLock().unlock(); // we should close the zookeeper instance, otherwise it would keep // on trying to connect if (!started) { close(); } } }}0
ConnectStringParser connectStringParser = new ConnectStringParser(connectString)
创建一个zk集群字符串解析器,将解析出的ip与port构建为一个地址实例,放入到缓存集合
public class ZkClient implements Watcher { ... public ZkClient(String serverstring) { this(serverstring, Integer.MAX_VALUE); } public ZkClient(String zkServers, int connectionTimeout) { //关键点 看到创建了ZkConnection对象 this(new ZkConnection(zkServers), connectionTimeout); } ... //构造一直走,会走到下面该方法 public ZkClient(final IZkConnection zkConnection, final int connectionTimeout, final ZkSerializer zkSerializer, final long operationRetryTimeout) { if (zkConnection == null) { throw new NullPointerException("Zookeeper connection is null!"); } //将创建的ZkConnection,赋值到_connection 成员变量上 _connection = zkConnection; _zkSerializer = zkSerializer; _operationRetryTimeoutInMillis = operationRetryTimeout; _isZkSaslEnabled = isZkSaslEnabled(); connect(connectionTimeout, this); } public void connect(final long maxMsToWaitUntilConnected, Watcher watcher) throws ZkInterruptedException, ZkTimeoutException, IllegalStateException { boolean started = false; acquireEventLock(); try { setShutdownTrigger(false); _eventThread = new ZkEventThread(_connection.getServers()); _eventThread.start(); //调用ZkConnection.connect进行连接 _connection.connect(watcher); LOG.debug("Awaiting connection to Zookeeper server"); boolean waitSuccessful = waitUntilConnected(maxMsToWaitUntilConnected, TimeUnit.MILLISECONDS); if (!waitSuccessful) { throw new ZkTimeoutException("Unable to connect to zookeeper server '" + _connection.getServers() + "' with timeout of " + maxMsToWaitUntilConnected + " ms"); } started = true; } finally { getEventLock().unlock(); // we should close the zookeeper instance, otherwise it would keep // on trying to connect if (!started) { close(); } } }}1
createDefaultHostProvider(connectString)
创建主机提供者,把将缓存集合中的地址打散
public class ZkClient implements Watcher { ... public ZkClient(String serverstring) { this(serverstring, Integer.MAX_VALUE); } public ZkClient(String zkServers, int connectionTimeout) { //关键点 看到创建了ZkConnection对象 this(new ZkConnection(zkServers), connectionTimeout); } ... //构造一直走,会走到下面该方法 public ZkClient(final IZkConnection zkConnection, final int connectionTimeout, final ZkSerializer zkSerializer, final long operationRetryTimeout) { if (zkConnection == null) { throw new NullPointerException("Zookeeper connection is null!"); } //将创建的ZkConnection,赋值到_connection 成员变量上 _connection = zkConnection; _zkSerializer = zkSerializer; _operationRetryTimeoutInMillis = operationRetryTimeout; _isZkSaslEnabled = isZkSaslEnabled(); connect(connectionTimeout, this); } public void connect(final long maxMsToWaitUntilConnected, Watcher watcher) throws ZkInterruptedException, ZkTimeoutException, IllegalStateException { boolean started = false; acquireEventLock(); try { setShutdownTrigger(false); _eventThread = new ZkEventThread(_connection.getServers()); _eventThread.start(); //调用ZkConnection.connect进行连接 _connection.connect(watcher); LOG.debug("Awaiting connection to Zookeeper server"); boolean waitSuccessful = waitUntilConnected(maxMsToWaitUntilConnected, TimeUnit.MILLISECONDS); if (!waitSuccessful) { throw new ZkTimeoutException("Unable to connect to zookeeper server '" + _connection.getServers() + "' with timeout of " + maxMsToWaitUntilConnected + " ms"); } started = true; } finally { getEventLock().unlock(); // we should close the zookeeper instance, otherwise it would keep // on trying to connect if (!started) { close(); } } }}2
打散的目的在于负载均衡,不然每个客户端轮询都会连上第一个
cnxn = new ClientCnxn(…);
创建一个连接实例 cnxn.start();
启动连接
public class ZkClient implements Watcher { ... public ZkClient(String serverstring) { this(serverstring, Integer.MAX_VALUE); } public ZkClient(String zkServers, int connectionTimeout) { //关键点 看到创建了ZkConnection对象 this(new ZkConnection(zkServers), connectionTimeout); } ... //构造一直走,会走到下面该方法 public ZkClient(final IZkConnection zkConnection, final int connectionTimeout, final ZkSerializer zkSerializer, final long operationRetryTimeout) { if (zkConnection == null) { throw new NullPointerException("Zookeeper connection is null!"); } //将创建的ZkConnection,赋值到_connection 成员变量上 _connection = zkConnection; _zkSerializer = zkSerializer; _operationRetryTimeoutInMillis = operationRetryTimeout; _isZkSaslEnabled = isZkSaslEnabled(); connect(connectionTimeout, this); } public void connect(final long maxMsToWaitUntilConnected, Watcher watcher) throws ZkInterruptedException, ZkTimeoutException, IllegalStateException { boolean started = false; acquireEventLock(); try { setShutdownTrigger(false); _eventThread = new ZkEventThread(_connection.getServers()); _eventThread.start(); //调用ZkConnection.connect进行连接 _connection.connect(watcher); LOG.debug("Awaiting connection to Zookeeper server"); boolean waitSuccessful = waitUntilConnected(maxMsToWaitUntilConnected, TimeUnit.MILLISECONDS); if (!waitSuccessful) { throw new ZkTimeoutException("Unable to connect to zookeeper server '" + _connection.getServers() + "' with timeout of " + maxMsToWaitUntilConnected + " ms"); } started = true; } finally { getEventLock().unlock(); // we should close the zookeeper instance, otherwise it would keep // on trying to connect if (!started) { close(); } } }}3
查看启动连接线程sendThread的run方法
public class ZkClient implements Watcher { ... public ZkClient(String serverstring) { this(serverstring, Integer.MAX_VALUE); } public ZkClient(String zkServers, int connectionTimeout) { //关键点 看到创建了ZkConnection对象 this(new ZkConnection(zkServers), connectionTimeout); } ... //构造一直走,会走到下面该方法 public ZkClient(final IZkConnection zkConnection, final int connectionTimeout, final ZkSerializer zkSerializer, final long operationRetryTimeout) { if (zkConnection == null) { throw new NullPointerException("Zookeeper connection is null!"); } //将创建的ZkConnection,赋值到_connection 成员变量上 _connection = zkConnection; _zkSerializer = zkSerializer; _operationRetryTimeoutInMillis = operationRetryTimeout; _isZkSaslEnabled = isZkSaslEnabled(); connect(connectionTimeout, this); } public void connect(final long maxMsToWaitUntilConnected, Watcher watcher) throws ZkInterruptedException, ZkTimeoutException, IllegalStateException { boolean started = false; acquireEventLock(); try { setShutdownTrigger(false); _eventThread = new ZkEventThread(_connection.getServers()); _eventThread.start(); //调用ZkConnection.connect进行连接 _connection.connect(watcher); LOG.debug("Awaiting connection to Zookeeper server"); boolean waitSuccessful = waitUntilConnected(maxMsToWaitUntilConnected, TimeUnit.MILLISECONDS); if (!waitSuccessful) { throw new ZkTimeoutException("Unable to connect to zookeeper server '" + _connection.getServers() + "' with timeout of " + maxMsToWaitUntilConnected + " ms"); } started = true; } finally { getEventLock().unlock(); // we should close the zookeeper instance, otherwise it would keep // on trying to connect if (!started) { close(); } } }}4
while (state.isAlive())
判断当前连接对象是否处于激活状态
public class ZkClient implements Watcher { ... public ZkClient(String serverstring) { this(serverstring, Integer.MAX_VALUE); } public ZkClient(String zkServers, int connectionTimeout) { //关键点 看到创建了ZkConnection对象 this(new ZkConnection(zkServers), connectionTimeout); } ... //构造一直走,会走到下面该方法 public ZkClient(final IZkConnection zkConnection, final int connectionTimeout, final ZkSerializer zkSerializer, final long operationRetryTimeout) { if (zkConnection == null) { throw new NullPointerException("Zookeeper connection is null!"); } //将创建的ZkConnection,赋值到_connection 成员变量上 _connection = zkConnection; _zkSerializer = zkSerializer; _operationRetryTimeoutInMillis = operationRetryTimeout; _isZkSaslEnabled = isZkSaslEnabled(); connect(connectionTimeout, this); } public void connect(final long maxMsToWaitUntilConnected, Watcher watcher) throws ZkInterruptedException, ZkTimeoutException, IllegalStateException { boolean started = false; acquireEventLock(); try { setShutdownTrigger(false); _eventThread = new ZkEventThread(_connection.getServers()); _eventThread.start(); //调用ZkConnection.connect进行连接 _connection.connect(watcher); LOG.debug("Awaiting connection to Zookeeper server"); boolean waitSuccessful = waitUntilConnected(maxMsToWaitUntilConnected, TimeUnit.MILLISECONDS); if (!waitSuccessful) { throw new ZkTimeoutException("Unable to connect to zookeeper server '" + _connection.getServers() + "' with timeout of " + maxMsToWaitUntilConnected + " ms"); } started = true; } finally { getEventLock().unlock(); // we should close the zookeeper instance, otherwise it would keep // on trying to connect if (!started) { close(); } } }}5
serverAddress = hostProvider.next(1000);
获取要连接的zkServer的地址
public class ZkClient implements Watcher { ... public ZkClient(String serverstring) { this(serverstring, Integer.MAX_VALUE); } public ZkClient(String zkServers, int connectionTimeout) { //关键点 看到创建了ZkConnection对象 this(new ZkConnection(zkServers), connectionTimeout); } ... //构造一直走,会走到下面该方法 public ZkClient(final IZkConnection zkConnection, final int connectionTimeout, final ZkSerializer zkSerializer, final long operationRetryTimeout) { if (zkConnection == null) { throw new NullPointerException("Zookeeper connection is null!"); } //将创建的ZkConnection,赋值到_connection 成员变量上 _connection = zkConnection; _zkSerializer = zkSerializer; _operationRetryTimeoutInMillis = operationRetryTimeout; _isZkSaslEnabled = isZkSaslEnabled(); connect(connectionTimeout, this); } public void connect(final long maxMsToWaitUntilConnected, Watcher watcher) throws ZkInterruptedException, ZkTimeoutException, IllegalStateException { boolean started = false; acquireEventLock(); try { setShutdownTrigger(false); _eventThread = new ZkEventThread(_connection.getServers()); _eventThread.start(); //调用ZkConnection.connect进行连接 _connection.connect(watcher); LOG.debug("Awaiting connection to Zookeeper server"); boolean waitSuccessful = waitUntilConnected(maxMsToWaitUntilConnected, TimeUnit.MILLISECONDS); if (!waitSuccessful) { throw new ZkTimeoutException("Unable to connect to zookeeper server '" + _connection.getServers() + "' with timeout of " + maxMsToWaitUntilConnected + " ms"); } started = true; } finally { getEventLock().unlock(); // we should close the zookeeper instance, otherwise it would keep // on trying to connect if (!started) { close(); } } }}6
startConnect(serverAddress);
开启连接尝试(有可能连接不上,连接不上会循环获取下一个地址继续尝试连接):
public class ZkClient implements Watcher { ... public ZkClient(String serverstring) { this(serverstring, Integer.MAX_VALUE); } public ZkClient(String zkServers, int connectionTimeout) { //关键点 看到创建了ZkConnection对象 this(new ZkConnection(zkServers), connectionTimeout); } ... //构造一直走,会走到下面该方法 public ZkClient(final IZkConnection zkConnection, final int connectionTimeout, final ZkSerializer zkSerializer, final long operationRetryTimeout) { if (zkConnection == null) { throw new NullPointerException("Zookeeper connection is null!"); } //将创建的ZkConnection,赋值到_connection 成员变量上 _connection = zkConnection; _zkSerializer = zkSerializer; _operationRetryTimeoutInMillis = operationRetryTimeout; _isZkSaslEnabled = isZkSaslEnabled(); connect(connectionTimeout, this); } public void connect(final long maxMsToWaitUntilConnected, Watcher watcher) throws ZkInterruptedException, ZkTimeoutException, IllegalStateException { boolean started = false; acquireEventLock(); try { setShutdownTrigger(false); _eventThread = new ZkEventThread(_connection.getServers()); _eventThread.start(); //调用ZkConnection.connect进行连接 _connection.connect(watcher); LOG.debug("Awaiting connection to Zookeeper server"); boolean waitSuccessful = waitUntilConnected(maxMsToWaitUntilConnected, TimeUnit.MILLISECONDS); if (!waitSuccessful) { throw new ZkTimeoutException("Unable to connect to zookeeper server '" + _connection.getServers() + "' with timeout of " + maxMsToWaitUntilConnected + " ms"); } started = true; } finally { getEventLock().unlock(); // we should close the zookeeper instance, otherwise it would keep // on trying to connect if (!started) { close(); } } }}7
1.4 Zk对象创建流程图
2. 服务端连接源码分析
2.1 ZooKeeper会话理论知识
会话是 zk 中最重要的概念之一,客户端与服务端之间的任何交互操作都与会话相关。
ZooKeeper 客户端启动时,首先会与 zk 服务器建立一个 TCP 长连接。连接一旦建立,客户端会话的生命周期也就开始了。
2.1.1 会话状态
常见的会话状态有三种:
CONNECTING
:连接中。Client 要创建一个连接,其首先会在本地创建一个 zk 对象,用于表示其所连接上的 Server。
CONNECTED
:已连接。连接成功后,该连接的各种临时性数据会被初始化到 zk 对象中。
CLOSED
:已关闭。连接关闭后,这个代表 Server 的 zk 对象会被删除。
2.1.2 会话连接超时管理—客户端维护
我们这里的会话连接超时管理指的是,客户端所发起的服务端连接时间记录,是从客户端当前会话第一次发起服务端连接的时间开始计时。
ZK是CP架构的,服务端在进行数据同步的时候是不对外提供服务的,但是这个过程是非常快的,对于客户端来说,在连接超时时间内,会一直尝试连接,直到成功,所以服务端不对外提供服务的过程,客户端是感知不到的。
2.1.3 会话连接事件
客户端与服务端的长连接失效后,客户端将进行重连。在重连过程中客户端会产生三种会话连接事件:
CONNECTION_LOSS
:连接丢失
SESSION_MOVED
:会话转移。若在客户端连接超时时限范围内又连接上了 Server,且连接的 Server 与之前的不是同一个(集群中的其他机器),则会发生会话转移。
SESSION_EXPIRED
:会话失效。若在客户端连接超时时限范围外连接上了 Server,而该Server 中存放的该会话的 sessionId 又被 Server 给干掉了,则该会话失效。
2.1.4 会话空闲超时管理—服务端维护
会话连接超时针对客户端来说的,会话空闲超时,是针对服务端的
服务器为每一个客户端的会话都记录着上一次交互后空闲的时长
,及从上一次交互结束开始会话空闲超时的时间点
。一旦空闲时长超时,服务端就会将该会话的 SessionId 从服务端清除。这也就是为什么客户端在空闲时需要定时向服务端发送心跳,就是为了维护这个会话长连接的。服务器是通过空闲超时管理来判断会话是否发生中断的。
服务端对于会话空闲超时管理,采用了一种特殊的方式——分桶策略
。
分桶策略
分桶策略是指,将空闲超时时间相近的会话
放到同一个桶中来进行管理
,以减少管理的复杂度
。在检查超时时,只需要检查桶中剩下的会话即可,因为没有超时的会话已经被移出了桶,而桶中存在的会话就是超时的会话。
zk 对于会话空闲的超时管理并非是精确的管理,即并非是一超时马上就执行相关的超时操作。
分桶依据
分桶的计算依据为:
CurrentTime
:当前时间(这是时间轴上的时间)
SessionTimeout
:会话超时时间(这是一个时间范围)
ExpirationTime
:当前会话下一次超时的时间点(这是时间轴上的时间)
ExpirationInterval
:桶的大小(这是一个时间范围)
BucketTime
:代表的是当前会话下次超时的时间点所在的桶
从以上公式可知,一个桶的大小为 ExpirationInterval 时间
。只要 ExpirationTime 落入到同一个桶中,系统就会对其中的会话超时进行统一管理。
2.2 服务端连接源码分析
找到ZooKeeperServer.startup
方法,一但Server启动就会触发该方法
public class ZkClient implements Watcher { ... public ZkClient(String serverstring) { this(serverstring, Integer.MAX_VALUE); } public ZkClient(String zkServers, int connectionTimeout) { //关键点 看到创建了ZkConnection对象 this(new ZkConnection(zkServers), connectionTimeout); } ... //构造一直走,会走到下面该方法 public ZkClient(final IZkConnection zkConnection, final int connectionTimeout, final ZkSerializer zkSerializer, final long operationRetryTimeout) { if (zkConnection == null) { throw new NullPointerException("Zookeeper connection is null!"); } //将创建的ZkConnection,赋值到_connection 成员变量上 _connection = zkConnection; _zkSerializer = zkSerializer; _operationRetryTimeoutInMillis = operationRetryTimeout; _isZkSaslEnabled = isZkSaslEnabled(); connect(connectionTimeout, this); } public void connect(final long maxMsToWaitUntilConnected, Watcher watcher) throws ZkInterruptedException, ZkTimeoutException, IllegalStateException { boolean started = false; acquireEventLock(); try { setShutdownTrigger(false); _eventThread = new ZkEventThread(_connection.getServers()); _eventThread.start(); //调用ZkConnection.connect进行连接 _connection.connect(watcher); LOG.debug("Awaiting connection to Zookeeper server"); boolean waitSuccessful = waitUntilConnected(maxMsToWaitUntilConnected, TimeUnit.MILLISECONDS); if (!waitSuccessful) { throw new ZkTimeoutException("Unable to connect to zookeeper server '" + _connection.getServers() + "' with timeout of " + maxMsToWaitUntilConnected + " ms"); } started = true; } finally { getEventLock().unlock(); // we should close the zookeeper instance, otherwise it would keep // on trying to connect if (!started) { close(); } } }}8
createSessionTracker();
创建一个sessionTracker(Session跟踪器)线程:
public class ZkClient implements Watcher { ... public ZkClient(String serverstring) { this(serverstring, Integer.MAX_VALUE); } public ZkClient(String zkServers, int connectionTimeout) { //关键点 看到创建了ZkConnection对象 this(new ZkConnection(zkServers), connectionTimeout); } ... //构造一直走,会走到下面该方法 public ZkClient(final IZkConnection zkConnection, final int connectionTimeout, final ZkSerializer zkSerializer, final long operationRetryTimeout) { if (zkConnection == null) { throw new NullPointerException("Zookeeper connection is null!"); } //将创建的ZkConnection,赋值到_connection 成员变量上 _connection = zkConnection; _zkSerializer = zkSerializer; _operationRetryTimeoutInMillis = operationRetryTimeout; _isZkSaslEnabled = isZkSaslEnabled(); connect(connectionTimeout, this); } public void connect(final long maxMsToWaitUntilConnected, Watcher watcher) throws ZkInterruptedException, ZkTimeoutException, IllegalStateException { boolean started = false; acquireEventLock(); try { setShutdownTrigger(false); _eventThread = new ZkEventThread(_connection.getServers()); _eventThread.start(); //调用ZkConnection.connect进行连接 _connection.connect(watcher); LOG.debug("Awaiting connection to Zookeeper server"); boolean waitSuccessful = waitUntilConnected(maxMsToWaitUntilConnected, TimeUnit.MILLISECONDS); if (!waitSuccessful) { throw new ZkTimeoutException("Unable to connect to zookeeper server '" + _connection.getServers() + "' with timeout of " + maxMsToWaitUntilConnected + " ms"); } started = true; } finally { getEventLock().unlock(); // we should close the zookeeper instance, otherwise it would keep // on trying to connect if (!started) { close(); } } }}9
上面updateSessionExpiry方法介绍的是在创建sessionTracker线程时调用的,其实还有很多场景都会调用该方法,比如:
我们先来看一下touchSession方法:底层也是调用的updateSessionExpiry()
方法,所以调用touchSession方法也会更新会话桶
。
public class ZkConnection implements IZkConnection { ... //关键对象ZooKeeper private ZooKeeper _zk = null; ... public ZkConnection(String zkServers, int sessionTimeOut) { _servers = zkServers; _sessionTimeOut = sessionTimeOut; } @Override public void connect(Watcher watcher) { _zookeeperLock.lock(); try { if (_zk != null) { throw new IllegalStateException("zk client has already been started"); } try { LOG.debug("Creating new ZookKeeper instance to connect to " + _servers + "."); //!!!可以看到实际上ZKCLient与服务端连接,靠的就是ZooKeeper对象 _zk = new ZooKeeper(_servers, _sessionTimeOut, watcher); } catch (IOException e) { throw new ZkException("Unable to connect to " + _servers, e); } } finally { _zookeeperLock.unlock(); } }}0
我们来看一下有哪些场景也在调用touchSession方法:
会话与当前Server交互时
public class ZkConnection implements IZkConnection { ... //关键对象ZooKeeper private ZooKeeper _zk = null; ... public ZkConnection(String zkServers, int sessionTimeOut) { _servers = zkServers; _sessionTimeOut = sessionTimeOut; } @Override public void connect(Watcher watcher) { _zookeeperLock.lock(); try { if (_zk != null) { throw new IllegalStateException("zk client has already been started"); } try { LOG.debug("Creating new ZookKeeper instance to connect to " + _servers + "."); //!!!可以看到实际上ZKCLient与服务端连接,靠的就是ZooKeeper对象 _zk = new ZooKeeper(_servers, _sessionTimeOut, watcher); } catch (IOException e) { throw new ZkException("Unable to connect to " + _servers, e); } } finally { _zookeeperLock.unlock(); } }}1
发生会话丢失后,客户端重新发起连接请求时
public class ZkConnection implements IZkConnection { ... //关键对象ZooKeeper private ZooKeeper _zk = null; ... public ZkConnection(String zkServers, int sessionTimeOut) { _servers = zkServers; _sessionTimeOut = sessionTimeOut; } @Override public void connect(Watcher watcher) { _zookeeperLock.lock(); try { if (_zk != null) { throw new IllegalStateException("zk client has already been started"); } try { LOG.debug("Creating new ZookKeeper instance to connect to " + _servers + "."); //!!!可以看到实际上ZKCLient与服务端连接,靠的就是ZooKeeper对象 _zk = new ZooKeeper(_servers, _sessionTimeOut, watcher); } catch (IOException e) { throw new ZkException("Unable to connect to " + _servers, e); } } finally { _zookeeperLock.unlock(); } }}2
public void reopenSession(ServerCnxn cnxn, long sessionId, byte[] passwd, int sessionTimeout) throws IOException { // 若当前连接密码正确,则验证会话,否则关闭会话 if (checkPasswd(sessionId, passwd)) { // 验证会话 revalidateSession(cnxn, sessionId, sessionTimeout); } else { // 关闭会话 finishSessionInit(cnxn, false); } } protected void revalidateSession(ServerCnxn cnxn, long sessionId, int sessionTimeout) throws IOException { // 判断当前sessionId会话是否还有效。若有效,则将会话放入到相应的会话桶 // rc为true表示会话有效,为false表示会话不存在或已关闭(无效) boolean rc = sessionTracker.touchSession(sessionId, sessionTimeout);
public class ZkConnection implements IZkConnection { ... //关键对象ZooKeeper private ZooKeeper _zk = null; ... public ZkConnection(String zkServers, int sessionTimeOut) { _servers = zkServers; _sessionTimeOut = sessionTimeOut; } @Override public void connect(Watcher watcher) { _zookeeperLock.lock(); try { if (_zk != null) { throw new IllegalStateException("zk client has already been started"); } try { LOG.debug("Creating new ZookKeeper instance to connect to " + _servers + "."); //!!!可以看到实际上ZKCLient与服务端连接,靠的就是ZooKeeper对象 _zk = new ZooKeeper(_servers, _sessionTimeOut, watcher); } catch (IOException e) { throw new ZkException("Unable to connect to " + _servers, e); } } finally { _zookeeperLock.unlock(); } }}3
}
public void finishSessionInit(ServerCnxn cnxn, boolean valid) { // register with JMX try { if (valid) { // 若会话有效,则重新注册原来的连接,即原来的连接仍是有效的 if (serverCnxnFactory != null && serverCnxnFactory.cnxns.contains(cnxn)) { serverCnxnFactory.registerConnection(cnxn); } else if (secureServerCnxnFactory != null && secureServerCnxnFactory.cnxns.contains(cnxn)) { secureServerCnxnFactory.registerConnection(cnxn); } } } catch (Exception e) { LOG.warn("Failed to register with JMX", e); }
public class ZkConnection implements IZkConnection { ... //关键对象ZooKeeper private ZooKeeper _zk = null; ... public ZkConnection(String zkServers, int sessionTimeOut) { _servers = zkServers; _sessionTimeOut = sessionTimeOut; } @Override public void connect(Watcher watcher) { _zookeeperLock.lock(); try { if (_zk != null) { throw new IllegalStateException("zk client has already been started"); } try { LOG.debug("Creating new ZookKeeper instance to connect to " + _servers + "."); //!!!可以看到实际上ZKCLient与服务端连接,靠的就是ZooKeeper对象 _zk = new ZooKeeper(_servers, _sessionTimeOut, watcher); } catch (IOException e) { throw new ZkException("Unable to connect to " + _servers, e); } } finally { _zookeeperLock.unlock(); } }}4
}
public class ZkConnection implements IZkConnection { ... //关键对象ZooKeeper private ZooKeeper _zk = null; ... public ZkConnection(String zkServers, int sessionTimeOut) { _servers = zkServers; _sessionTimeOut = sessionTimeOut; } @Override public void connect(Watcher watcher) { _zookeeperLock.lock(); try { if (_zk != null) { throw new IllegalStateException("zk client has already been started"); } try { LOG.debug("Creating new ZookKeeper instance to connect to " + _servers + "."); //!!!可以看到实际上ZKCLient与服务端连接,靠的就是ZooKeeper对象 _zk = new ZooKeeper(_servers, _sessionTimeOut, watcher); } catch (IOException e) { throw new ZkException("Unable to connect to " + _servers, e); } } finally { _zookeeperLock.unlock(); } }}5
2.3 会话管理流程图
参考文章
zookeeper3.7版本github源码注释分析## zk源码分析系列Zookeeper原理和源码学习系列\ Zookeeper学习系列\ Zookeeper源码系列
原文:https://juejin.cn/post/7100150142904303624