Nacos源码—8.Nacos升级gRPC分析三
大纲
7.服务端对服务实例进行健康检查
8.服务下线如何注销注册表和客户端等信息
9.事件驱动架构源码分析
7.服务端对服务实例进行健康检查
(1)服务端对服务实例进行健康检查的设计逻辑
(2)服务端对服务实例进行健康检查的源码
(3)服务端检查服务实例不健康后的注销处理
(1)服务端对服务实例进行健康检查的设计逻辑
一.首先会获取所有客户端的Connection连接对象
Connection连接对象里有个属性叫lastActiveTime,表示的是最后存活时间。
二.然后判断当前时间-最后存活时间是否大于20s
如果大于,则把该Connection连接对象的connectionId放入到一个集合里。这个集合是一个名为outDatedConnections的待移除集合Set,此时该Connection连接对象并不会马上删除。
三.当判断完全部的Connection连接对象后会遍历outDatedConnections集合
向遍历到的Connection连接对象发起一次请求,确认是否真的下线。如果响应成功,则往successConnections集合中添加connectionId,并且刷新Connection连接对象的lastActiveTime属性。这个机制有一个专业的名称叫做:探活机制。
四.遍历待移除集合进行注销并且在注销之前先判断一下是否探活成功
也就是connectionId存在于待移除集合outDatedConnections中,但是不存在于探活成功集合successConnections中,那么这个connectionId对应的客户端就会被注销掉。
(2)服务端对服务实例进行健康检查的源码
对服务实例进行健康检查的源码入口是ConnectionManager的start()方法。
@Service public class ConnectionManager extends Subscriber<ConnectionLimitRuleChangeEvent> { Map<String, Connection> connections = new ConcurrentHashMap<>(); ... //Start Task:Expel the connection which active Time expire. @PostConstruct public void start() { //Start UnHealthy Connection Expel Task. RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(new Runnable() { @Override public void run() { ... //一.首先获取所有的连接 Set<Map.Entry<String, Connection>> entries = connections.entrySet(); ... //二.然后判断客户端是否超过20s没有发来心跳信息了,如果是则会将clientId加入outDatedConnections集合中 Set<String> outDatedConnections = new HashSet<>(); long now = System.currentTimeMillis(); for (Map.Entry<String, Connection> entry : entries) { Connection client = entry.getValue(); String clientIp = client.getMetaInfo().getClientIp(); AtomicInteger integer = expelForIp.get(clientIp); if (integer != null && integer.intValue() > 0) { integer.decrementAndGet(); expelClient.add(client.getMetaInfo().getConnectionId()); expelCount--; } else if (now - client.getMetaInfo().getLastActiveTime() >= KEEP_ALIVE_TIME) {//判断心跳时间 //添加到待移除列表 outDatedConnections.add(client.getMetaInfo().getConnectionId()); } } ... //client active detection. //三.初次检测完超过20s的Connection连接对象后,并不会立马进行删除,而是进行探活,服务端主动请求客户端,来确认是否真的下线 Loggers.REMOTE_DIGEST.info("Out dated connection ,size={}", outDatedConnections.size()); if (CollectionUtils.isNotEmpty(outDatedConnections)) { Set<String> successConnections = new HashSet<>(); final CountDownLatch latch = new CountDownLatch(outDatedConnections.size()); //遍历超过20s没有心跳的客户端clientId for (String outDateConnectionId : outDatedConnections) { try { Connection connection = getConnection(outDateConnectionId); if (connection != null) { ClientDetectionRequest clientDetectionRequest = new ClientDetectionRequest(); //调用GrpcConnection.asyncRequest()方法异步发送请求 connection.asyncRequest(clientDetectionRequest, new RequestCallBack() { @Override public Executor getExecutor() { return null; } @Override public long getTimeout() { return 1000L; } @Override public void onResponse(Response response) { latch.countDown(); if (response != null && response.isSuccess()) { //响应成功刷新心跳时间 connection.freshActiveTime(); //并且加入到探活成功的集合列表中 successConnections.add(outDateConnectionId); } } @Override public void onException(Throwable e) { latch.countDown(); } }); Loggers.REMOTE_DIGEST.info("[{}]send connection active request ", outDateConnectionId); } else { latch.countDown(); } } catch (ConnectionAlreadyClosedException e) { latch.countDown(); } catch (Exception e) { Loggers.REMOTE_DIGEST.error("[{}]Error occurs when check client active detection ,error={}", outDateConnectionId, e); latch.countDown(); } } latch.await(3000L, TimeUnit.MILLISECONDS); Loggers.REMOTE_DIGEST.info("Out dated connection check successCount={}", successConnections.size()); //经过探活还是不成功的Connection连接对象,就准备进行移除了 //遍历20s没有心跳的客户端,准备移除客户端信息 for (String outDateConnectionId : outDatedConnections) { //判断探活是否成功,如果成功了则不需要移除 if (!successConnections.contains(outDateConnectionId)) { Loggers.REMOTE_DIGEST.info("[{}]Unregister Out dated connection....", outDateConnectionId); //执行客户端注销逻辑 unregister(outDateConnectionId); } } } ... } }, 1000L, 3000L, TimeUnit.MILLISECONDS); } ... }
(3)服务端检查服务实例不健康后的注销处理
进行注销处理的方法是ConnectionManager的unregister()方法。该方法主要会移除Connection连接对象 + 清除一些数据,以及发布一个ClientDisconnectEvent客户端注销事件。
@Service public class ConnectionManager extends Subscriber<ConnectionLimitRuleChangeEvent> { private Map<String, AtomicInteger> connectionForClientIp = new ConcurrentHashMap<String, AtomicInteger>(16); Map<String, Connection> connections = new ConcurrentHashMap<>(); @Autowired private ClientConnectionEventListenerRegistry clientConnectionEventListenerRegistry; ... //unregister a connection . public synchronized void unregister(String connectionId) { //移除客户端信息 Connection remove = this.connections.remove(connectionId); if (remove != null) { String clientIp = remove.getMetaInfo().clientIp; AtomicInteger atomicInteger = connectionForClientIp.get(clientIp); if (atomicInteger != null) { int count = atomicInteger.decrementAndGet(); if (count <= 0) { connectionForClientIp.remove(clientIp); } } remove.close(); Loggers.REMOTE_DIGEST.info("[{}]Connection unregistered successfully. ", connectionId); //通知客户端注销连接 clientConnectionEventListenerRegistry.notifyClientDisConnected(remove); } } ... } @Service public class ClientConnectionEventListenerRegistry { ... //notify where a new client disconnected. public void notifyClientDisConnected(final Connection connection) { for (ClientConnectionEventListener clientConnectionEventListener : clientConnectionEventListeners) { try { //调用ConnectionBasedClientManager.clientDisConnected()方法 clientConnectionEventListener.clientDisConnected(connection); } catch (Throwable throwable) { Loggers.REMOTE.info("[NotifyClientDisConnected] failed for listener {}", clientConnectionEventListener.getName(), throwable); } } } ... } @Component("connectionBasedClientManager") public class ConnectionBasedClientManager extends ClientConnectionEventListener implements ClientManager { private final ConcurrentMap<String, ConnectionBasedClient> clients = new ConcurrentHashMap<>(); ... @Override public boolean clientDisconnected(String clientId) { Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId); ConnectionBasedClient client = clients.remove(clientId); if (null == client) { return true; } client.release(); //最后发布客户端注销事件 NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client)); return true; } ... }
ClientDisconnectEvent客户端注销事件会被两个监听响应:一是ClientServiceIndexesManager的onEvent()方法用来移除注册表 + 订阅表信息,二是DistroClientDataProcessor的onEvent()方法用来同步服务实例被注销后的数据。
@Component public class ClientServiceIndexesManager extends SmartSubscriber { //注册表(服务提供者),一个Service服务对象,对应多个服务实例的clientId private final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>(); //订阅者列表(服务消费者),一个Service服务对象,对应多个订阅者的clientId private final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>(); ... @Override public void onEvent(Event event) { if (event instanceof ClientEvent.ClientDisconnectEvent) { handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event); } else if (event instanceof ClientOperationEvent) { handleClientOperation((ClientOperationEvent) event); } } private void handleClientDisconnect(ClientEvent.ClientDisconnectEvent event) { Client client = event.getClient(); for (Service each : client.getAllSubscribeService()) { //移除订阅者列表的元素 removeSubscriberIndexes(each, client.getClientId()); } for (Service each : client.getAllPublishedService()) { //移除注册表的元素 removePublisherIndexes(each, client.getClientId()); } } private void removePublisherIndexes(Service service, String clientId) { if (!publisherIndexes.containsKey(service)) { return; } publisherIndexes.get(service).remove(clientId); NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true)); } private void removeSubscriberIndexes(Service service, String clientId) { if (!subscriberIndexes.containsKey(service)) { return; } subscriberIndexes.get(service).remove(clientId); if (subscriberIndexes.get(service).isEmpty()) { subscriberIndexes.remove(service); } } ... } public class DistroClientDataProcessor extends SmartSubscriber implements DistroDataStorage, DistroDataProcessor { ... @Override public void onEvent(Event event) { ... if (event instanceof ClientEvent.ClientVerifyFailedEvent) { syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event); } else { syncToAllServer((ClientEvent) event); } } private void syncToAllServer(ClientEvent event) { Client client = event.getClient(); //Only ephemeral data sync by Distro, persist client should sync by raft. //临时实例使用Distro协议,持久化实例使用Raft协议 //ClientManager.isResponsibleClient()方法,判断只有该client的责任节点才能进行集群数据同步 if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) { return; } if (event instanceof ClientEvent.ClientDisconnectEvent) { //如果event是客户端注销实例时需要进行集群节点同步的事件 DistroKey distroKey = new DistroKey(client.getClientId(), TYPE); distroProtocol.sync(distroKey, DataOperation.DELETE); } else if (event instanceof ClientEvent.ClientChangedEvent) { //如果event是客户端注册实例时需要进行集群节点同步的事件 DistroKey distroKey = new DistroKey(client.getClientId(), TYPE); distroProtocol.sync(distroKey, DataOperation.CHANGE); } } ... }
(4)总结
在Nacos 1.4.1版本中的服务健康检查:是15s没心跳则把健康状态修改为不健康,30s没心跳则把实例对象移除。
在Nacos 2.1.0版本中的服务健康检查:是20s没心跳则把客户端放入一个过期集合,此时并不移除客户端连接。由于引入了gRPC长连接,所以可以新增探活机制检查过期集合中的连接。服务端发送探活请求给客户端时的代价并不大,可确保客户端下线。
8.服务下线如何注销注册表和客户端等信息
(1)客户端发出服务下线请求的源码
(2)服务端处理服务下线请求的源码
(1)客户端发出服务下线请求的源码
Nacos客户端发起服务下线的入口在AbstractAutoServiceRegistration这个类之中,而AbstractAutoServiceRegistration是nacos-discovery中的类。
由于AbstractAutoServiceRegistration的destroy()方法被@PreDestroy修饰,所以当容器关闭时,会调用AbstractAutoServiceRegistration的destroy()方法。该方法最后会触发调用NacosNamingService的deregisterInstance()方法,然后调用NamingClientProxyDelegate的deregisterService()方法,接着调用NamingGrpcClientProxy的deregisterService()方法。和客户端发起服务注册一样,首先会创建请求参数对象,然后通过NamingGrpcClientProxy的requestToServer()方法发起请求,也就是调用RpcClient的request()方法发起gRPC请求进行服务下线。
public abstract class AbstractAutoServiceRegistration<R extends Registration> implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener<WebServerInitializedEvent> { ... @PreDestroy public void destroy() { stop(); } public void stop() { if (this.getRunning().compareAndSet(true, false) && isEnabled()) { deregister(); if (shouldRegisterManagement()) { deregisterManagement(); } this.serviceRegistry.close(); } } protected void deregister() { this.serviceRegistry.deregister(getRegistration()); } ... } public class NacosServiceRegistry implements ServiceRegistry<Registration> { ... @Override public void deregister(Registration registration) { log.info("De-registering from Nacos Server now..."); if (StringUtils.isEmpty(registration.getServiceId())) { log.warn("No dom to de-register for nacos client..."); return; } NamingService namingService = namingService(); String serviceId = registration.getServiceId(); String group = nacosDiscoveryProperties.getGroup(); try { //调用NacosNamingService.deregisterInstance()方法 namingService.deregisterInstance(serviceId, group, registration.getHost(), registration.getPort(), nacosDiscoveryProperties.getClusterName()); } catch (Exception e) { log.error("ERR_NACOS_DEREGISTER, de-register failed...{},", registration.toString(), e); } log.info("De-registration finished."); } ... } @SuppressWarnings("PMD.ServiceOrDaoClassShouldEndWithImplRule") public class NacosNamingService implements NamingService { private NamingClientProxy clientProxy; ... public NacosNamingService(Properties properties) throws NacosException { init(properties); } private void init(Properties properties) throws NacosException { ... this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, properties, changeNotifier); } @Override public void deregisterInstance(String serviceName, String groupName, String ip, int port, String clusterName) throws NacosException { Instance instance = new Instance(); instance.setIp(ip); instance.setPort(port); instance.setClusterName(clusterName); deregisterInstance(serviceName, groupName, instance); } @Override public void deregisterInstance(String serviceName, String groupName, Instance instance) throws NacosException { //调用NamingClientProxyDelegate.deregisterService()方法 clientProxy.deregisterService(serviceName, groupName, instance); } ... } public class NamingClientProxyDelegate implements NamingClientProxy { private final NamingHttpClientProxy httpClientProxy; private final NamingGrpcClientProxy grpcClientProxy; ... public NamingClientProxyDelegate(String namespace, ServiceInfoHolder serviceInfoHolder, Properties properties, InstancesChangeNotifier changeNotifier) throws NacosException { ... this.httpClientProxy = new NamingHttpClientProxy(namespace, securityProxy, serverListManager, properties, serviceInfoHolder); this.grpcClientProxy = new NamingGrpcClientProxy(namespace, securityProxy, serverListManager, properties, serviceInfoHolder); } @Override public void deregisterService(String serviceName, String groupName, Instance instance) throws NacosException { getExecuteClientProxy(instance).deregisterService(serviceName, groupName, instance); } private NamingClientProxy getExecuteClientProxy(Instance instance) { return instance.isEphemeral() ? grpcClientProxy : httpClientProxy; } ... } public class NamingGrpcClientProxy extends AbstractNamingClientProxy { private final NamingGrpcRedoService redoService; ... @Override public void deregisterService(String serviceName, String groupName, Instance instance) throws NacosException { NAMING_LOGGER.info("[DEREGISTER-SERVICE] {} deregistering service {} with instance: {}", namespaceId, serviceName, instance); redoService.instanceDeregister(serviceName, groupName); doDeregisterService(serviceName, groupName, instance); } //Execute deregister operation. public void doDeregisterService(String serviceName, String groupName, Instance instance) throws NacosException { //创建请求参数对象 InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName, NamingRemoteConstants.DE_REGISTER_INSTANCE, instance); //向服务端发起请求 requestToServer(request, Response.class); redoService.removeInstanceForRedo(serviceName, groupName); } private <T extends Response> T requestToServer(AbstractNamingRequest request, Class<T> responseClass) throws NacosException { try { request.putAllHeader(getSecurityHeaders(request.getNamespace(), request.getGroupName(), request.getServiceName())); //实际会调用RpcClient.request()方法发起gRPC请求 Response response = requestTimeout < 0 ? rpcClient.request(request) : rpcClient.request(request, requestTimeout); if (ResponseCode.SUCCESS.getCode() != response.getResultCode()) { throw new NacosException(response.getErrorCode(), response.getMessage()); } if (responseClass.isAssignableFrom(response.getClass())) { return (T) response; } NAMING_LOGGER.error("Server return unexpected response '{}', expected response should be '{}'", response.getClass().getName(), responseClass.getName()); } catch (Exception e) { throw new NacosException(NacosException.SERVER_ERROR, "Request nacos server failed: ", e); } throw new NacosException(NacosException.SERVER_ERROR, "Server return invalid response"); } ... }
(2)服务端处理服务下线请求的源码
服务注册时请求参数InstanceRequest的类型是REGISTER_INSTANCE,服务下线时请求参数InstanceRequest的类型是DE_REGISTER_INSTANCE。
处理服务注册和服务下线的入口是InstanceRequestHandler的handle()方法,这个方法会触发调用InstanceRequestHandler的deregisterInstance()方法,也就是调用EphemeralClientOperationServiceImpl的deregisterInstance()方法。
在EphemeralClientOperationServiceImpl的deregisterInstance()方法中,会在移除Client对象中的instance信息时,发布ClientChangedEvent事件,然后接着发布客户端注销服务实例的事件ClientDeregisterServiceEvent。
其中ClientChangedEvent事件是用来同步数据给集群节点的,ClientDeregisterServiceEvent事件是用来移除注册表 + 订阅表的服务实例。移除注册表 + 订阅表的服务实例时,还会发布ServiceChangeEvent事件,ServiceChangeEvent事件是用来通知订阅了该服务的Nacos客户端的。同理,服务注册时其实也会发布类似的三个事件。
@Component public class InstanceRequestHandler extends RequestHandler<InstanceRequest, InstanceResponse> { private final EphemeralClientOperationServiceImpl clientOperationService; public InstanceRequestHandler(EphemeralClientOperationServiceImpl clientOperationService) { this.clientOperationService = clientOperationService; } @Override @Secured(action = ActionTypes.WRITE) public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException { //根据请求信息创建一个Service对象,里面包含了:命名空间、分组名、服务名 Service service = Service.newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true); switch (request.getType()) { case NamingRemoteConstants.REGISTER_INSTANCE: //注册实例 return registerInstance(service, request, meta); case NamingRemoteConstants.DE_REGISTER_INSTANCE: //注销实例 return deregisterInstance(service, request, meta); default: throw new NacosException(NacosException.INVALID_PARAM, String.format("Unsupported request type %s", request.getType())); } } private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta) { //调用EphemeralClientOperationServiceImpl的注册方法registerInstance(),这里需要注意如下参数; //参数service:根据请求信息创建的一个Service对象,里面有命名空间、分组名、服务名 //参数request.getInstance():这个参数就对应了客户端的实例对象,里面包含IP、端口等信息 //参数meta.getConnectionId():这个参数很关键,它是连接ID clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId()); return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE); } private InstanceResponse deregisterInstance(Service service, InstanceRequest request, RequestMeta meta) { //调用EphemeralClientOperationServiceImpl的注销方法deregisterInstance() clientOperationService.deregisterInstance(service, request.getInstance(), meta.getConnectionId()); return new InstanceResponse(NamingRemoteConstants.DE_REGISTER_INSTANCE); } } @Component("ephemeralClientOperationService") public class EphemeralClientOperationServiceImpl implements ClientOperationService { private final ClientManager clientManager; ... @Override public void deregisterInstance(Service service, Instance instance, String clientId) { if (!ServiceManager.getInstance().containSingleton(service)) { Loggers.SRV_LOG.warn("remove instance from non-exist service: {}", service); return; } //从ServiceManager中根据由请求信息创建的Service对象获取一个已注册的Service对象 Service singleton = ServiceManager.getInstance().getSingleton(service); //从ClientManagerDelegate中根据请求参数中的connectionId获取一个Client对象,即IpPortBasedClient对象 Client client = clientManager.getClient(clientId); if (!clientIsLegal(client, clientId)) { return; } //调用AbstractClient.removeServiceInstance()方法 //移除Client对象中的instance信息并发布ClientChangedEvent事件来同步集群节点 InstancePublishInfo removedInstance = client.removeServiceInstance(singleton); client.setLastUpdatedTime(); if (null != removedInstance) { //发布客户端注销服务实例的事件 NotifyCenter.publishEvent(new ClientOperationEvent.ClientDeregisterServiceEvent(singleton, clientId)); NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, removedInstance.getMetadataId(), true)); } } ... } public abstract class AbstractClient implements Client { //publishers其实就是记录该客户端提供的服务和服务实例,一个客户端可提供多个服务 //存储客户端发送过来的请求中的Instance信息,当然这些信息已封装为InstancePublishInfo对象 //key为已注册的Service,value是根据请求中的instance实例信息封装的InstancePublishInfo对象 protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1); ... @Override public InstancePublishInfo removeServiceInstance(Service service) { InstancePublishInfo result = publishers.remove(service); if (null != result) { MetricsMonitor.decrementInstanceCount(); //发布客户端改变事件,用于处理集群间的数据同步 NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this)); } Loggers.SRV_LOG.info("Client remove for service {}, {}", service, getClientId()); return result; } ... }
一.处理ClientChangedEvent事件
也就是同步数据到集群节点:
public class DistroClientDataProcessor extends SmartSubscriber implements DistroDataStorage, DistroDataProcessor { ... @Override public void onEvent(Event event) { ... if (event instanceof ClientEvent.ClientVerifyFailedEvent) { syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event); } else { syncToAllServer((ClientEvent) event); } } private void syncToAllServer(ClientEvent event) { Client client = event.getClient(); //Only ephemeral data sync by Distro, persist client should sync by raft. //临时实例使用Distro协议,持久化实例使用Raft协议 //ClientManager.isResponsibleClient()方法,判断只有该client的责任节点才能进行集群数据同步 if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) { return; } if (event instanceof ClientEvent.ClientDisconnectEvent) { //如果event是客户端注销实例时需要进行集群节点同步的事件 DistroKey distroKey = new DistroKey(client.getClientId(), TYPE); distroProtocol.sync(distroKey, DataOperation.DELETE); } else if (event instanceof ClientEvent.ClientChangedEvent) { //如果event是客户端注册实例时需要进行集群节点同步的事件 DistroKey distroKey = new DistroKey(client.getClientId(), TYPE); distroProtocol.sync(distroKey, DataOperation.CHANGE); } } ... } @Component public class DistroProtocol { private final ServerMemberManager memberManager; private final DistroTaskEngineHolder distroTaskEngineHolder; ... //Start to sync by configured delay. public void sync(DistroKey distroKey, DataOperation action) { sync(distroKey, action, DistroConfig.getInstance().getSyncDelayMillis()); } //Start to sync data to all remote server. public void sync(DistroKey distroKey, DataOperation action, long delay) { //遍历集群中除自身节点外的其他节点 for (Member each : memberManager.allMembersWithoutSelf()) { syncToTarget(distroKey, action, each.getAddress(), delay); } } //Start to sync to target server. public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) { //先把要同步的集群节点targetServer包装成DistroKey对象 DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(), targetServer); //然后根据DistroKey对象创建DistroDelayTask任务 DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay); //接着调用NacosDelayTaskExecuteEngine.addTask()方法 //往延迟任务执行引擎DistroDelayTaskExecuteEngine中添加延迟任务DistroDelayTask distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask); if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer); } } ... }
二.处理ClientDeregisterServiceEvent事件
也就是移除注册表 + 订阅表的服务实例:
@Component public class ClientServiceIndexesManager extends SmartSubscriber { //注册表(服务提供者),一个Service服务对象,对应多个服务实例的clientId private final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>(); //订阅者列表(服务消费者),一个Service服务对象,对应多个订阅者的clientId private final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>(); ... @Override public void onEvent(Event event) { if (event instanceof ClientEvent.ClientDisconnectEvent) { handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event); } else if (event instanceof ClientOperationEvent) { handleClientOperation((ClientOperationEvent) event); } } private void handleClientOperation(ClientOperationEvent event) { Service service = event.getService(); String clientId = event.getClientId(); if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) { //处理客户端注册事件ClientRegisterServiceEvent addPublisherIndexes(service, clientId); } else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) { //处理客户端注销事件ClientDeregisterServiceEvent removePublisherIndexes(service, clientId); } else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) { //处理客户端订阅服务事件ClientSubscribeServiceEvent addSubscriberIndexes(service, clientId); } else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) { //处理客户端取消订阅事件ClientUnsubscribeServiceEvent removeSubscriberIndexes(service, clientId); } } private void removePublisherIndexes(Service service, String clientId) { if (!publisherIndexes.containsKey(service)) { return; } //移除注册表中的服务实例 publisherIndexes.get(service).remove(clientId); //发布服务改变事件ServiceChangedEvent NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true)); } ... }
三.处理ServiceChangeEvent事件
也就是通知订阅了该服务的客户端:
@org.springframework.stereotype.Service public class NamingSubscriberServiceV2Impl extends SmartSubscriber implements NamingSubscriberService { ... @Override public void onEvent(Event event) { if (!upgradeJudgement.isUseGrpcFeatures()) { return; } if (event instanceof ServiceEvent.ServiceChangedEvent) { //If service changed, push to all subscribers. //如果服务变动,会向Service服务的所有订阅者推送Service服务的实例信息,让订阅者(客户端)更新本地缓存 ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event; Service service = serviceChangedEvent.getService(); //调用NacosDelayTaskExecuteEngine.addTask()方法,往延迟任务执行引擎添加任务 delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay())); } else if (event instanceof ServiceEvent.ServiceSubscribedEvent) { //If service is subscribed by one client, only push this client. //如果Service服务被一个客户端订阅,则只推送Service服务的实例信息给该客户端 ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event; Service service = subscribedEvent.getService(); //调用NacosDelayTaskExecuteEngine.addTask()方法,往延迟任务执行引擎添加任务 delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(), subscribedEvent.getClientId())); } } ... }
(3)总结
9.事件驱动架构源码分析
(1)如何使用Nacos的事件发布
(2)Nacos通知中心的事件发布源码
(3)Nacos通知中心注册订阅者的源码
Nacos 2.x大量使用了事件发布的动作,比如客户端注册服务实例、客户端下线服务实例、服务改变、服务订阅等。
(1)如何使用Nacos的事件发布
一.首先自定义一个事件
下面定义了一个名为TestEvent的事件,继承自Nacos的Event类。
import com.alibaba.nacos.common.notify.Event; public class TestEvent extends Event { }
二.然后定义一个订阅者
有了事件之后,还需要一个订阅者,这样发布的事件才能被这个订阅者进行处理。
自定义的订阅者需要继承Nacos的SmartSubscriber抽象类,自定义的订阅者需要实现三个方法。
方法一:构造方法
需要将自定义的订阅者注册到Nacos的通知中心NotifyCenter里,这样NotifyCenter在发布自定义事件时,才能让自定义的订阅者进行响应。
方法二:subscribeTypes()方法
实现该方法时,需要把自定义的事件添加到方法的返回结果中,所以可以通过该方法获取自定义订阅者监听了哪些事件。
方法三:onEvent()方法
Nacos的通知中心NotifyCenter在发布自定义事件时,便会调用该方法,所以该方法中需要实现自定义订阅者对自定义事件的处理。
import com.alibaba.nacos.common.notify.Event; import com.alibaba.nacos.common.notify.NotifyCenter; import com.alibaba.nacos.common.notify.listener.SmartSubscriber; import org.springframework.stereotype.Component; import java.util.LinkedList; import java.util.List; //自定义的订阅者需要继承Nacos的SmartSubscriber抽象类 @Component public class TestSubscriber extends SmartSubscriber { //构造方法中需要将自定义的订阅者TestSubscriber注册到Nacos的通知中心NotifyCenter public TestSubscriber() { NotifyCenter.registerSubscriber(this); } //实现subscribeTypes()方法时,把自定义的事件TestEvent添加进去返回 @Override public List<Class<? extends Event>> subscribeTypes() { List<Class<? extends Event>> result = new LinkedList<>(); result.add(TestEvent.class); return result; } //实现onEvent()方法 //当Nacos的通知中心NotifyCenter发布一个TestEvent事件时,就会响应该方法处理订阅者的逻辑 @Override public void onEvent(Event event) { System.out.println("TestSubscriber onEvent"); } }
三.最后通过Nacos的通知中心NotifyCenter发布自定义事件
这样便完成了自定义事件、自定义订阅者通过Nacos实现发布订阅功能。
@RestController @RequestMapping("/sub/") public class SubscriberController { @GetMapping("/test") public void test() { NotifyCenter.publishEvent(new TestEvent()); } }
(2)Nacos通知中心的事件发布源码
通知中心NotifyCenter执行publishEvent()方法发布事件时,比如会调用DefaultPublisher的publish()方法来发布事件。
DefaultPublisher的publish()方法会先把事件放入到一个阻塞队列queue中,而在DefaultPublisher创建时会启动一个线程从阻塞队列取出事件来处理。处理时就会调用到DefaultPublisher的receiveEvent()方法通知事件订阅者,也就是执行DefaultPublisher的notifySubscriber()方法通知事件订阅者。
在DefaultPublisher的notifySubscriber()方法中,首先会创建一个调用订阅者的onEvent()方法的任务,然后如果订阅者有线程池,则将任务提交给订阅者的线程池去执行。如果订阅者没有线程池,则直接执行该任务。
可见事件的发布也使用了阻塞队列 + 异步任务,来实现对订阅者的通知。
public class NotifyCenter { private static final NotifyCenter INSTANCE = new NotifyCenter(); //key是事件Class的canonicalName,value是EventPublisher对象,一个事件对应一个EventPublisher对象 //在EventPublisher对象中就会包含订阅了该事件的所有订阅者 //EventPublisher的实现类有DefaultPublisher、NamingEventPublisher private final Map<String, EventPublisher> publisherMap = new ConcurrentHashMap<>(16); ... //Request publisher publish event Publishers load lazily, calling publisher. Start () only when the event is actually published. public static boolean publishEvent(final Event event) { try { return publishEvent(event.getClass(), event); } catch (Throwable ex) { LOGGER.error("There was an exception to the message publishing : ", ex); return false; } } //Request publisher publish event Publishers load lazily, calling publisher. private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) { if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) { return INSTANCE.sharePublisher.publish(event); } //获取发布的事件的Class的canonicalName final String topic = ClassUtils.getCanonicalName(eventType); //根据发布事件类型获取EventPublisher对象,该对象中会包含所发布事件的所有订阅者信息 EventPublisher publisher = INSTANCE.publisherMap.get(topic); if (publisher != null) { //比如调用DefaultPublisher.publish()方法发布事件 return publisher.publish(event); } LOGGER.warn("There are no [{}] publishers for this event, please register", topic); return false; } ... } //The default event publisher implementation. //一个事件只会对应一个DefaultPublisher public class DefaultPublisher extends Thread implements EventPublisher { private Class<? extends Event> eventType; //阻塞队列存放待发布的事件 private BlockingQueue<Event> queue; //Class为eventType的事件的所有订阅者 protected final ConcurrentHashSet<Subscriber> subscribers = new ConcurrentHashSet<>(); @Override public void init(Class<? extends Event> type, int bufferSize) { ... start(); } @Override public synchronized void start() { if (!initialized) { super.start(); ... } } @Override public void run() { openEventHandler(); } void openEventHandler() { try { ... for (; ;) { ... //从阻塞队列取数据 final Event event = queue.take(); //处理事件 receiveEvent(event); ... } } catch (Throwable ex) { LOGGER.error("Event listener exception : ", ex); } } ... @Override public boolean publish(Event event) { checkIsStart(); //把事件放入到了一个阻塞队列queue中,由DefaultPublisher创建时启动的线程来处理 boolean success = this.queue.offer(event); if (!success) {//如果事件放入阻塞队列失败,则直接处理 LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event); //通知事件的订阅者去进行事件处理 receiveEvent(event); return true; } return true; } //通知事件的订阅者去进行事件处理 void receiveEvent(Event event) { ... //遍历当前事件的订阅者,对订阅者执行notifySubscriber()方法,实际上就是执行订阅者的onEvent()方法 for (Subscriber subscriber : subscribers) { ... //触发执行订阅者的onEvent()方法,实现对订阅者的通知 notifySubscriber(subscriber, event); } } @Override public void notifySubscriber(final Subscriber subscriber, final Event event) { //创建一个任务,该任务会调用订阅者的onEvent方法 final Runnable job = () -> subscriber.onEvent(event); final Executor executor = subscriber.executor(); if (executor != null) { //将任务提交给订阅者的线程池去执行 executor.execute(job); } else { try { //如果订阅者没有线程池,则直接执行该任务 job.run(); } catch (Throwable e) { LOGGER.error("Event callback exception: ", e); } } } ... }
(3)Nacos通知中心注册订阅者的源码
在执行NotifyCenter的registerSubscriber()方法注册订阅者时,会调用订阅者实现的subscribeTypes()方法获取订阅者要监听的所有事件,然后遍历这些事件并调用NotifyCenter的addSubscriber()方法。
执行NotifyCenter的addSubscriber()方法时会为这些事件添加订阅者。由于每个事件都会对应一个EventPublisher对象,所以会先从NotifyCenter.publisherMap中获取EventPublisher对象,然后调用EventPublisher的addSubscriber()方法向EventPublisher添加订阅者,从而完成向通知中心注册订阅者。
public class NotifyCenter { private static final NotifyCenter INSTANCE = new NotifyCenter(); //key是事件Class的canonicalName,value是EventPublisher对象,一个事件对应一个EventPublisher对象 //在EventPublisher对象中就会包含订阅了该事件的所有订阅者 //EventPublisher的实现类有DefaultPublisher、NamingEventPublisher private final Map<String, EventPublisher> publisherMap = new ConcurrentHashMap<>(16); ... public static void registerSubscriber(final Subscriber consumer) { //注册订阅者 registerSubscriber(consumer, DEFAULT_PUBLISHER_FACTORY); } public static void registerSubscriber(final Subscriber consumer, final EventPublisherFactory factory) { if (consumer instanceof SmartSubscriber) { //调用subscribeTypes()方法获取订阅者consumer需要监听的事件,然后对这些事件进行遍历 for (Class<? extends Event> subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) { //For case, producer: defaultSharePublisher -> consumer: smartSubscriber. if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) { //添加订阅者 INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType); } else { //For case, producer: defaultPublisher -> consumer: subscriber. //添加订阅者 addSubscriber(consumer, subscribeType, factory); } } return; } final Class<? extends Event> subscribeType = consumer.subscribeType(); if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) { INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType); return; } addSubscriber(consumer, subscribeType, factory); } //Add a subscriber to publisher. private static void addSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType, EventPublisherFactory factory) { //获取订阅的事件的Class的canonicalName final String topic = ClassUtils.getCanonicalName(subscribeType); synchronized (NotifyCenter.class) { //MapUtils.computeIfAbsent is a unsafe method. //创建EventPublisher对象,一个事件会对应一个EventPublisher对象 MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, subscribeType, ringBufferSize); } //获取事件对应的EventPublisher对象,比如DefaultPublisher对象 EventPublisher publisher = INSTANCE.publisherMap.get(topic); if (publisher instanceof ShardedEventPublisher) { ((ShardedEventPublisher) publisher).addSubscriber(consumer, subscribeType); } else { //往EventPublisher对象添加订阅者信息,比如调用DefaultPublisher.addSubscriber()方法 publisher.addSubscriber(consumer); } } ... } //一个事件只会对应一个DefaultPublisher public class DefaultPublisher extends Thread implements EventPublisher { private Class<? extends Event> eventType; //Class为eventType的事件的所有订阅者 protected final ConcurrentHashSet<Subscriber> subscribers = new ConcurrentHashSet<>(); ... @Override public void addSubscriber(Subscriber subscriber) { //添加订阅者 subscribers.add(subscriber); } ... }
详细介绍后端技术栈的基础内容,包括但不限于:MySQL原理和优化、Redis原理和应用、JVM和G1原理和优化、RocketMQ原理应用及源码、Kafka原理应用及源码、ElasticSearch原理应用及源码、JUC源码、Netty源码、zk源码、Dubbo源码、Spring源码、Spring Boot源码、SCA源码、分布式锁源码、分布式事务、分库分表和TiDB、大型商品系统、大型订单系统等