Nacos源码—6.Nacos升级gRPC分析一

大纲

1.Nacos 2.x版本的一些变化

2.客户端升级gRPC发起服务注册

3.服务端进行服务注册时的处理

4.客户端服务发现和服务端处理服务订阅的源码分析

1.Nacos 2.x版本的一些变化

变化一:客户端和服务端的交互方式由HTTP升级为gRPC

Nacos 1.x服务端会提供一系列的HTTP接口供客户端请求调用,Nacos 2.x服务端会定义一些列Handler处理类来处理客户端的gRPC请求。

Nacos 1.x进行服务注册时使用的是HTTP短连接,Nacos 2.x进行服务注册时使用的是RPC长连接。

变化二:注册中心的注册表由双重Map结构变为轻量的一个Map

Nacos 1.x版本中的注册中心是使用双重Map结构来存储注册表的,这样使得注册表还是比较重量级的,在并发高时也需要考虑并发冲突。

Nacos 2.x版本则把注册表轻量化了,服务端在处理服务注册时,只是简单地往一个Map记写入客户端连接的ID。

变化三:大量使用了事件驱动

Nacos 2.x版本的Nacos里,使用了非常多的事件驱动。比如服务注册、服务销毁、服务变更等都先通过通知中心来发布一个事件,然后通过处理事件来来处理后续的逻辑流程。

2.客户端升级gRPC发起服务注册

(1)客户端和服务端的版本选择

(2)Nacos客户端项目启动时自动触发服务实例注册

(3)Nacos客户端通过gRPC方式发起服务实例注册

(4)总结

(1)客户端和服务端的版本选择

<!-- 使用的是Nacos 1.4.1 -->
<!-- <properties>
    <java.version>1.8</java.version>
    <spring-cloud.version>Hoxton.SR8</spring-cloud.version>
    <spring-cloud-alibaba.version>2.2.5.RELEASE</spring-cloud-alibaba.version>
</properties> -->

<!-- 使用的是Nacos 2.1.0 -->
<properties>
    <java.version>1.8</java.version>
    <spring-cloud.version>Hoxton.SR12</spring-cloud.version>
    <spring-cloud-alibaba.version>2.2.8.RELEASE</spring-cloud-alibaba.version>
</properties>

(2)Nacos客户端项目启动时自动触发服务实例注册

spring-cloud-starter-alibaba-nacos-discovery依赖包会自动注册服务。查看这个依赖包中的spring.factories文件,会指定一些Configuration类。Spring Boot启动时会扫描spring.factories文件,然后创建里面的配置类。

在spring.pactories文件中,与注册相关的类就是:NacosServiceRegistryAutoConfiguration这个Nacos服务注册自动配置类。

Nacos服务注册自动配置类NacosServiceRegistryAutoConfiguration如下,该配置类创建了三个Bean:

第一个Bean:NacosServiceRegistry

这个Bean在创建时,会传入加载了yml配置文件内容的类NacosDiscoveryProperties。

第二个Bean:NacosRegistration

这个Bean在创建时,会传入加载了yml配置文件内容的类NacosDiscoveryProperties。

第三个Bean:NacosAutoServiceRegistration

这个Bean在创建时,会传入NacosServiceRegistry和NacosRegistration两个Bean,然后该Bean继承了AbstractAutoServiceRegistration抽象类。该抽象类实现了ApplicationListener接口,所以项目启动时便是利用了Spring的监听事件来实现自动注册服务的。因为在Spring容器启动的最后会执行finishRefresh()方法,然后会发布一个事件,该事件会触发调用onApplicationEvent()方法。

调用AbstractAutoServiceRegistration的onApplicationEvent()方法时,首先会调用AbstractAutoServiceRegistration的bind()方法,然后调用AbstractAutoServiceRegistration的start()方法,接着调用AbstractAutoServiceRegistration的register()方法发起注册,也就是调用this.serviceRegistry的register()方法完成服务注册的具体工作。

而AbstractAutoServiceRegistration的serviceRegistry属性,是在服务注册自动配置类NacosServiceRegistryAutoConfiguration,创建第三个Bean—NacosAutoServiceRegistration时,通过传入其创建的第一个Bean—NacosServiceRegistry进行赋值的。

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class, AutoServiceRegistrationAutoConfiguration.class, NacosDiscoveryAutoConfiguration.class })
public class NacosServiceRegistryAutoConfiguration {
    @Bean
    public NacosServiceRegistry nacosServiceRegistry(NacosServiceManager nacosServiceManager, NacosDiscoveryProperties nacosDiscoveryProperties) {
        return new NacosServiceRegistry(nacosServiceManager, nacosDiscoveryProperties);
    }

    @Bean
    @ConditionalOnBean(AutoServiceRegistrationProperties.class)
    public NacosRegistration nacosRegistration(ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers, NacosDiscoveryProperties nacosDiscoveryProperties, ApplicationContext context) {
        return new NacosRegistration(registrationCustomizers.getIfAvailable(), nacosDiscoveryProperties, context);
    }

    @Bean
    @ConditionalOnBean(AutoServiceRegistrationProperties.class)
    public NacosAutoServiceRegistration nacosAutoServiceRegistration(NacosServiceRegistry registry, AutoServiceRegistrationProperties autoServiceRegistrationProperties, NacosRegistration registration) {
        return new NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration);
    }
}

public class NacosAutoServiceRegistration extends AbstractAutoServiceRegistration<Registration> {
    ...
    private NacosRegistration registration;

    public NacosAutoServiceRegistration(ServiceRegistry<Registration> serviceRegistry,
            AutoServiceRegistrationProperties autoServiceRegistrationProperties,
            NacosRegistration registration) {
        super(serviceRegistry, autoServiceRegistrationProperties);
        this.registration = registration;
    }
    ...
}

public abstract class AbstractAutoServiceRegistration<R extends Registration>
        implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener<WebServerInitializedEvent> {
    ...
    private final ServiceRegistry<R> serviceRegistry;
    private AutoServiceRegistrationProperties properties;

    protected AbstractAutoServiceRegistration(ServiceRegistry<R> serviceRegistry, AutoServiceRegistrationProperties properties) {
        this.serviceRegistry = serviceRegistry;
        this.properties = properties;
    }
    ...

    @Override
    @SuppressWarnings("deprecation")
    public void onApplicationEvent(WebServerInitializedEvent event) {
        bind(event);
    }

    public void bind(WebServerInitializedEvent event) {
        ApplicationContext context = event.getApplicationContext();
        if (context instanceof ConfigurableWebServerApplicationContext) {
            if ("management".equals(((ConfigurableWebServerApplicationContext) context).getServerNamespace())) {
                return;
            }
        }
        this.port.compareAndSet(0, event.getWebServer().getPort());
        this.start();
    }

    public void start() {
        if (!isEnabled()) {
            if (logger.isDebugEnabled()) {
                logger.debug("Discovery Lifecycle disabled. Not starting");
            }
            return;
        }
        if (!this.running.get()) {
            this.context.publishEvent(new InstancePreRegisteredEvent(this, getRegistration()));
            //发起注册
            register();
            if (shouldRegisterManagement()) {
                registerManagement();
            }
            this.context.publishEvent(new InstanceRegisteredEvent<>(this, getConfiguration()));
            this.running.compareAndSet(false, true);
        }
    }

    protected void register() {
        //调用创建NacosAutoServiceRegistration时传入的NacosServiceRegistry实例的register()方法
        this.serviceRegistry.register(getRegistration());
    }
    ...
}

public class NacosServiceRegistry implements ServiceRegistry<Registration> {
    private final NacosDiscoveryProperties nacosDiscoveryProperties;

    public NacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {
        this.nacosDiscoveryProperties = nacosDiscoveryProperties;
    }

    @Override
    public void register(Registration registration) {
        if (StringUtils.isEmpty(registration.getServiceId())) {
            log.warn("No service to register for nacos client...");
            return;
        }
        NamingService namingService = namingService();
        //获取服务ID、分组
        String serviceId = registration.getServiceId();
        String group = nacosDiscoveryProperties.getGroup();
        //创建Instance对象
        Instance instance = getNacosInstanceFromRegistration(registration);
        try {
            //发起服务实例注册
            namingService.registerInstance(serviceId, group, instance);
            log.info("nacos registry, {} {} {}:{} register finished", group, serviceId, instance.getIp(), instance.getPort());
        } catch (Exception e) {
            if (nacosDiscoveryProperties.isFailFast()) {
                log.error("nacos registry, {} register failed...{},", serviceId, registration.toString(), e);
                rethrowRuntimeException(e);
            } else {
                log.warn("Failfast is false. {} register failed...{},", serviceId, registration.toString(), e);
            }
        }
    }
    ...
}

总结:

Nacos客户端项目启动时自动触发注册服务实例的流程:Spring监听器调用onApplicationEvent() -> bind() -> start() -> register(),最后register()方法会调用serviceRegistry属性的register()方法进行注册。

整个流程具体来说就是:首先通过spring.factories文件,找到一个注册相关的Configuration配置类,这个配置类里面定义了三个Bean对象。创建第三个Bean对象时,需要第一个、第二个Bean对象作为参数传进去。第一个Bean对象里面就有真正进行服务注册的register()方法,并且第一个Bean对象会赋值给第三个Bean对象中的serviceRegistry属性,在第三个Bean对象的父类会实现Spring的监听器方法。所以在Spring容器启动时会发布监听事件,从而触发执行Nacos注册逻辑。

(3)Nacos客户端通过gRPC方式发起服务实例注册

nacos-client会提供接口给Nacos客户端调用来进行服务实例注册。

在NacosNamingService提供的服务注册接口registerInstance()中,会调用NamingClientProxyDelegate的registerService()方法来注册服务。此时会先调用NamingClientProxyDelegate的getExecuteClientProxy()方法,来判断要注册的服务实例是否为临时实例来获取gRPC代理还是HTTP代理。如果注册的是临时实例,则使用gRPC方式注册,否则用HTTP方式注册,然后再调用NamingGrpcClientProxy的registerService()方法注册服务。

在NamingGrpcClientProxy的registerService()方法中,则会调用NamingGrpcClientProxy的doRegisterService()方法执行注册。此时先根据要注册的服务信息创建一个InstanceRequest请求参数对象,然后调用NamingGrpcClientProxy的requestToServer()方法发出请求,也就是通过调用RpcClient的request()方法向Nacos服务端发出gRPC请求。

至于RpcClient的request()方法的底层实现,则是通过一个本地存根代理类grpcFutureServiceStub调用gRPC的接口,来实现向Nacos服务端发起RPC调用的。

//Nacos Naming Service.
@SuppressWarnings("PMD.ServiceOrDaoClassShouldEndWithImplRule")
public class NacosNamingService implements NamingService {
    ...
    private NamingClientProxy clientProxy;

    private void init(Properties properties) throws NacosException {
        ...
        //初始化clientProxy属性为NamingClientProxyDelegate对象实例
        this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, properties, changeNotifier);
    }

    //register a instance to service with specified instance properties.
    @Override
    public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
        NamingUtils.checkInstanceIsLegal(instance);
        //调用NamingClientProxy的注册方法registerService(),其实就是NamingClientProxyDelegate.registerService()方法
        clientProxy.registerService(serviceName, groupName, instance);
    }
    ...
}

//Delegate of naming client proxy.
public class NamingClientProxyDelegate implements NamingClientProxy {
    private final NamingHttpClientProxy httpClientProxy;
    private final NamingGrpcClientProxy grpcClientProxy;

    public NamingClientProxyDelegate(String namespace, ServiceInfoHolder serviceInfoHolder, Properties properties, InstancesChangeNotifier changeNotifier) throws NacosException {
        ...
        this.httpClientProxy = new NamingHttpClientProxy(namespace, securityProxy, serverListManager, properties, serviceInfoHolder);
        this.grpcClientProxy = new NamingGrpcClientProxy(namespace, securityProxy, serverListManager, properties, serviceInfoHolder);
    }
    ...

    @Override
    public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
        getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);
    }

    private NamingClientProxy getExecuteClientProxy(Instance instance) {
        return instance.isEphemeral() ? grpcClientProxy : httpClientProxy;
    }
    ...
}

//Naming grpc client proxy.
public class NamingGrpcClientProxy extends AbstractNamingClientProxy {
    private final String namespaceId;
    private final String uuid;
    private final Long requestTimeout;
    private final RpcClient rpcClient;
    private final NamingGrpcRedoService redoService;

    public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListFactory serverListFactory, Properties properties, ServiceInfoHolder serviceInfoHolder) throws NacosException {
        super(securityProxy);
        this.namespaceId = namespaceId;
        this.uuid = UUID.randomUUID().toString();
        this.requestTimeout = Long.parseLong(properties.getProperty(CommonParams.NAMING_REQUEST_TIMEOUT, "-1"));
        Map<String, String> labels = new HashMap<String, String>();
        labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK);
        labels.put(RemoteConstants.LABEL_MODULE, RemoteConstants.LABEL_MODULE_NAMING);

        //通过RpcClientFactory.createClient()方法创建一个GrpcSdkClient对象实例,然后赋值给rpcClient属性
        this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels);
        this.redoService = new NamingGrpcRedoService(this);
        start(serverListFactory, serviceInfoHolder);
    }

    private void start(ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException {
        rpcClient.serverListFactory(serverListFactory);
        rpcClient.registerConnectionListener(redoService);
        rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder));
        //调用RpcClient.start()方法建立与服务端的连接
        rpcClient.start();
        NotifyCenter.registerSubscriber(this);
    }
    ...

    //Register a instance to service with specified instance properties.
    @Override
    public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
        NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance {}", namespaceId, serviceName, instance);
        redoService.cacheInstanceForRedo(serviceName, groupName, instance);
        //执行服务实例的注册
        doRegisterService(serviceName, groupName, instance);
    }

    //Execute register operation.
    public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {
        //创建请求参数对象InstanceRequest
        InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName, NamingRemoteConstants.REGISTER_INSTANCE, instance);
        //向服务端发起请求
        requestToServer(request, Response.class);
        redoService.instanceRegistered(serviceName, groupName);
    }

    private <T extends Response> T requestToServer(AbstractNamingRequest request, Class<T> responseClass) throws NacosException {
        try {
            request.putAllHeader(getSecurityHeaders(request.getNamespace(), request.getGroupName(), request.getServiceName()));
            //实际会调用RpcClient.request()方法发起gRPC请求
            Response response = requestTimeout < 0 ? rpcClient.request(request) : rpcClient.request(request, requestTimeout);
            if (ResponseCode.SUCCESS.getCode() != response.getResultCode()) {
                throw new NacosException(response.getErrorCode(), response.getMessage());
            }
            if (responseClass.isAssignableFrom(response.getClass())) {
                return (T) response;
            }
            NAMING_LOGGER.error("Server return unexpected response '{}', expected response should be '{}'", response.getClass().getName(), responseClass.getName());
        } catch (Exception e) {
            throw new NacosException(NacosException.SERVER_ERROR, "Request nacos server failed: ", e);
        }
        throw new NacosException(NacosException.SERVER_ERROR, "Server return invalid response");
    }
    ...
}

//abstract remote client to connect to server.
@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
public abstract class RpcClient implements Closeable {
    protected volatile Connection currentConnection;
    ...

    public final void start() throws NacosException {
        ...
        // connect to server, try to connect to server sync RETRY_TIMES times, async starting if failed.
        Connection connectToServer = null;
        rpcClientStatus.set(RpcClientStatus.STARTING);
        int startUpRetryTimes = RETRY_TIMES;
        while (startUpRetryTimes > 0 && connectToServer == null) {
            try {
                startUpRetryTimes--;
                ServerInfo serverInfo = nextRpcServer();
                LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Try to connect to server on start up, server: {}", name, serverInfo);
                //调用GrpcClient.connectToServer()方法建立和服务端的长连接
                connectToServer = connectToServer(serverInfo);
            } catch (Throwable e) {
                LoggerUtils.printIfWarnEnabled(LOGGER, "[{}] Fail to connect to server on start up, error message = {}, start up retry times left: {}", name, e.getMessage(), startUpRetryTimes);
            }
        }

        if (connectToServer != null) {
            LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Success to connect to server [{}] on start up, connectionId = {}", name, connectToServer.serverInfo.getAddress(), connectToServer.getConnectionId());
            //currentConnection其实就是一个用于客户端的GrpcConnection对象实例
            this.currentConnection = connectToServer;
            rpcClientStatus.set(RpcClientStatus.RUNNING);
            eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));
        } else {
            switchServerAsync();
        }
        ...
    }

    //connect to server.
    public abstract Connection connectToServer(ServerInfo serverInfo) throws Exception;
    ...

    //send request.
    public Response request(Request request) throws NacosException {
        return request(request, DEFAULT_TIMEOUT_MILLS);
    }

    //send request.
    public Response request(Request request, long timeoutMills) throws NacosException {
        int retryTimes = 0;
        Response response;
        Exception exceptionThrow = null;
        long start = System.currentTimeMillis();

        while (retryTimes < RETRY_TIMES && System.currentTimeMillis() < timeoutMills + start) {
            boolean waitReconnect = false;
            try {
                if (this.currentConnection == null || !isRunning()) {
                    waitReconnect = true;
                    throw new NacosException(NacosException.CLIENT_DISCONNECT, "Client not connected, current status:" + rpcClientStatus.get());
                }
                //发起gRPC请求,调用GrpcConnection.request()方法
                response = this.currentConnection.request(request, timeoutMills);
                if (response == null) {
                    throw new NacosException(SERVER_ERROR, "Unknown Exception.");
                }
                if (response instanceof ErrorResponse) {
                    if (response.getErrorCode() == NacosException.UN_REGISTER) {
                        synchronized (this) {
                            waitReconnect = true;
                            if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
                                LoggerUtils.printIfErrorEnabled(LOGGER, "Connection is unregistered, switch server, connectionId = {}, request = {}", currentConnection.getConnectionId(), request.getClass().getSimpleName());
                                switchServerAsync();
                            }
                        }
                    }
                    throw new NacosException(response.getErrorCode(), response.getMessage());
                }
                //return response.
                lastActiveTimeStamp = System.currentTimeMillis();
                return response;
            } catch (Exception e) {
                if (waitReconnect) {
                    try {
                        //wait client to reconnect.
                        Thread.sleep(Math.min(100, timeoutMills / 3));
                    } catch (Exception exception) {
                        //Do nothing.
                    }
                }
                LoggerUtils.printIfErrorEnabled(LOGGER, "Send request fail, request = {}, retryTimes = {}, errorMessage = {}", request, retryTimes, e.getMessage());
                exceptionThrow = e;
            }
            retryTimes++;
        }

        if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
            switchServerAsyncOnRequestFail();
        }

        if (exceptionThrow != null) {
            throw (exceptionThrow instanceof NacosException) ? (NacosException) exceptionThrow : new NacosException(SERVER_ERROR, exceptionThrow);
        } else {
            throw new NacosException(SERVER_ERROR, "Request fail, unknown Error");
        }
    }
    ...
}

//gRPC connection.
public class GrpcConnection extends Connection {
    //stub to send request.
    //grpcFutureServiceStub属性会在NamingGrpcClientProxy初始化 -> 调用RpcClient.start() -> GrpcClient.connectToServer()时,
    //通过GrpcConnection.setGrpcFutureServiceStub()方法进行设置
    protected RequestGrpc.RequestFutureStub grpcFutureServiceStub;
    ...

    @Override
    public Response request(Request request, long timeouts) throws NacosException {
        Payload grpcRequest = GrpcUtils.convert(request);
        //调用gRPC提供的接口发起请求,属于io.grpc包下的内容了
        ListenableFuture<Payload> requestFuture = grpcFutureServiceStub.request(grpcRequest);
        Payload grpcResponse;
        try {
            grpcResponse = requestFuture.get(timeouts, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            throw new NacosException(NacosException.SERVER_ERROR, e);
        }
        return (Response) GrpcUtils.parse(grpcResponse);
    }
    ...
}

(4)总结

3.服务端进行服务注册时的处理

(1)服务端处理客户端发起的服务注册请求

(2)服务端对客户端注册事件的处理源码

(3)总结

(1)服务端处理客户端发起的服务注册请求

客户端向服务端发起服务注册时,会先根据要注册的服务信息来创建一个InstanceRequest请求参数对象,再调用NamingGrpcClientProxy的requestToServer()方法向服务端发请求,也就是通过调用RpcClient的request()方法来向服务端发出gRPC请求。

InstanceRequestHandler的handle()方法就是用来处理服务注册请求的,该方法会继续调用InstanceRequestHandler的registerInstance()方法,根据客户端发过来的请求类型来选择是注册服务还是注销服务。

如果客户端发过来的请求类型是注册服务实例,则调用EphemeralClientOperationServiceImpl的registerInstance()方法,在该方法中:

一.首先会调用ServiceManager的getSingleton()方法

根据由请求信息创建的Service对象获取一个已注册的Service对象。

需要注意:在Nacos 1.x中,ServiceManager使用一个双层Map存放服务和命名空间。在Nacos 2.x中,ServiceManager则使用两个Map存放服务和命名空间。

在Nacos 1.x中的双层Map是:
Map<String, Map<String, Service>>;
例如Map(namespace, Map(group::serviceName, Service));

在Nacos 2.x中的两个Map是:
ConcurrentHashMap<Service, Service>、
ConcurrentHashMap<String_namespace, Set<Service>>;

二.然后调用ClientManagerDelegate的getClient()方法

根据请求参数中的connectionId来获取一个IpPortBasedClient对象。在执行ClientManagerDelegate的getClient()方法时,会先根据connectionId选出具体的ClientManager实现类,接着再调用比如EphemeralIpPortClientManager的getClient()方法,从EphemeralIpPortClientManager.clients属性中获取一个Client对象。clients属性是一个ConcurrentMap,key是请求参数中的connectionId,value是继承了实现Client接口的AbstractClient的IpPortBasedClient对象。

需要注意:Nacos中的gRPC底层是基于Netty实现的。当客户端和服务端建立长连接后,服务端会生成SocketChannel连接对象,这个SocketChannel连接对象就代表了客户端。Nacos会在这个SocketChannel连接对象的基础上,封装一个Client对象,并且生成一个connectionId将SocketChannel对象与Client对象关联起来。

所以当要进行服务注册的客户端和服务端建立好长连接后,服务端就会为客户端创建一个IpPortBasedClient对象,并将该对象存放在EphemeralIpPortClientManager.clients属性里。

三.接着调用ClientOperationService的getPublishInfo()方法

将请求中的instance实例信息封装为InstancePublishInfo对象。

四.然后调用IpPortBasedClient对象的addServiceInstance()方法

往IpPortBasedClient对象里添加Service对象 -> InstancePublishInfo对象。

由于IpPortBasedClient继承自实现了Client接口的AbstractClient抽象类,所以实际是调用AbstractClient的addServiceInstance()方法添加服务实例。

在AbstractClient抽象类中,有一个名为publishers的属性。它是一个ConcurrentHashMap,用于存储客户端服务注册请求中的Instance信息。就是记录该客户端提供的服务和服务实例,一个客户端可提供多个服务,当然这些信息已经被封装为InstancePublishInfo对象了。所以AbstractClient.publishers属性的key为已注册的Service,value是根据请求中的Instance实例信息封装的InstancePublishInfo对象。

在AbstractClient的addServiceInstance()方法中,首先往publishers放入这次要注册的Service对象和客户端Instance实例,然后发布客户端改变事件ClientChangedEvent,用来同步集群间的数据。

五.最后发布客户端注册服务实例事件和服务实例元数据事件

客户端注册服务实例事件是ClientRegisterServiceEvent,服务实例元数据事件是InstanceMetadataEvent。

//Instance request handler.
@Component
public class InstanceRequestHandler extends RequestHandler<InstanceRequest, InstanceResponse> {
    private final EphemeralClientOperationServiceImpl clientOperationService;

    public InstanceRequestHandler(EphemeralClientOperationServiceImpl clientOperationService) {
        this.clientOperationService = clientOperationService;
    }

    @Override
    @Secured(action = ActionTypes.WRITE)
    public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
        //根据请求信息创建一个Service对象,里面包含了:命名空间、分组名、服务名
        Service service = Service.newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
        switch (request.getType()) {
            case NamingRemoteConstants.REGISTER_INSTANCE:
                //注册实例
                return registerInstance(service, request, meta);
            case NamingRemoteConstants.DE_REGISTER_INSTANCE:
                //注销实例
                return deregisterInstance(service, request, meta);
            default:
                throw new NacosException(NacosException.INVALID_PARAM, String.format("Unsupported request type %s", request.getType()));
        }
    }

    private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta) {
        //调用EphemeralClientOperationServiceImpl的注册方法registerInstance(),这里需要注意如下参数;
        //参数service:根据请求信息创建的一个Service对象,里面有命名空间、分组名、服务名
        //参数request.getInstance():这个参数就对应了客户端的实例对象,里面包含IP、端口等信息
        //参数meta.getConnectionId():这个参数很关键,它是连接ID
        clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());
        return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
    }

    private InstanceResponse deregisterInstance(Service service, InstanceRequest request, RequestMeta meta) {
        //调用EphemeralClientOperationServiceImpl的注销方法deregisterInstance()
        clientOperationService.deregisterInstance(service, request.getInstance(), meta.getConnectionId());
        return new InstanceResponse(NamingRemoteConstants.DE_REGISTER_INSTANCE);
    }
}

//Operation service for ephemeral clients and services.
@Component("ephemeralClientOperationService")
public class EphemeralClientOperationServiceImpl implements ClientOperationService {
    private final ClientManager clientManager;

    public EphemeralClientOperationServiceImpl(ClientManagerDelegate clientManager) {
        this.clientManager = clientManager;
    }

    @Override
    public void registerInstance(Service service, Instance instance, String clientId) {
        //1.从ServiceManager中根据由请求信息创建的Service对象获取一个已注册的Service对象
        Service singleton = ServiceManager.getInstance().getSingleton(service);
        if (!singleton.isEphemeral()) {
            throw new NacosRuntimeException(NacosException.INVALID_PARAM, String.format("Current service %s is persistent service, can't register ephemeral instance.", singleton.getGroupedServiceName()));
        }
        //2.从ClientManagerDelegate中根据请求参数中的connectionId获取一个Client对象,即IpPortBasedClient对象
        Client client = clientManager.getClient(clientId);
        if (!clientIsLegal(client, clientId)) {
            return;
        }
        //3.将请求中的instance实例信息封装为InstancePublishInfo对象
        InstancePublishInfo instanceInfo = getPublishInfo(instance);
        //4.往Client对象里添加已注册的服务对象Service,调用的是IpPortBasedClient对象的父类AbstractClient的addServiceInstance()方法
        client.addServiceInstance(singleton, instanceInfo);
        //设置IpPortBasedClient对象的lastUpdatedTime属性为最新时间
        client.setLastUpdatedTime();
        //5.发布客户端注册服务实例的事件
        NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
        //5.发布服务实例元数据的事件
        NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
    }
    ...
}

public interface ClientOperationService {
    ...    
    //get publish info.
    default InstancePublishInfo getPublishInfo(Instance instance) {
        InstancePublishInfo result = new InstancePublishInfo(instance.getIp(), instance.getPort());
        Map<String, Object> extendDatum = result.getExtendDatum();
        if (null != instance.getMetadata() && !instance.getMetadata().isEmpty()) {
            extendDatum.putAll(instance.getMetadata());
        }
        if (StringUtils.isNotEmpty(instance.getInstanceId())) {
            extendDatum.put(Constants.CUSTOM_INSTANCE_ID, instance.getInstanceId());
        }
        if (Constants.DEFAULT_INSTANCE_WEIGHT != instance.getWeight()) {
            extendDatum.put(Constants.PUBLISH_INSTANCE_WEIGHT, instance.getWeight());
        }
        if (!instance.isEnabled()) {
            extendDatum.put(Constants.PUBLISH_INSTANCE_ENABLE, instance.isEnabled());
        }
        String clusterName = StringUtils.isBlank(instance.getClusterName()) ? UtilsAndCommons.DEFAULT_CLUSTER_NAME : instance.getClusterName();
        result.setHealthy(instance.isHealthy());
        result.setCluster(clusterName);
        return result;
    }
}

//Nacos service manager for v2.
public class ServiceManager {
    private static final ServiceManager INSTANCE = new ServiceManager();

    //key是根据请求参数创建的Service对象,value是已经注册的Service对象
    private final ConcurrentHashMap<Service, Service> singletonRepository;

    //key是命名空间,value是相同命名空间的已注册的Service对象集合
    private final ConcurrentHashMap<String, Set<Service>> namespaceSingletonMaps;

    private ServiceManager() {
        singletonRepository = new ConcurrentHashMap<>(1 << 10);
        namespaceSingletonMaps = new ConcurrentHashMap<>(1 << 2);
    }

    public static ServiceManager getInstance() {
        return INSTANCE;
    }

    public Set<Service> getSingletons(String namespace) {
        return namespaceSingletonMaps.getOrDefault(namespace, new HashSet<>(1));
    }

    //Get singleton service. Put to manager if no singleton.
    //@param service new service
    //@return if service is exist, return exist service, otherwise return new service
    public Service getSingleton(Service service) {
        //往singletonRepository这个ConcurrentHashMap中添加一个Service对象,如果存在就不添加
        singletonRepository.putIfAbsent(service, service);
        //从这个ConcurrentHashMap中把已注册的Service对象取出来
        Service result = singletonRepository.get(service);
        //将已注册的Service对象添加到namespaceSingletonMaps中
        //由于namespaceSingletonMaps会按命名空间对已注册的Service对象进行分类
        //所以namespaceSingletonMaps的key是命名空间,value是相同命名空间的已注册的Service对象集合
        namespaceSingletonMaps.computeIfAbsent(result.getNamespace(), (namespace) -> new ConcurrentHashSet<>());
        namespaceSingletonMaps.get(result.getNamespace()).add(result);
        return result;
    }
    ...
}

//Client manager delegate.
@Component("clientManager")
public class ClientManagerDelegate implements ClientManager {
    private final ConnectionBasedClientManager connectionBasedClientManager;
    private final EphemeralIpPortClientManager ephemeralIpPortClientManager;    
    private final PersistentIpPortClientManager persistentIpPortClientManager;

    public ClientManagerDelegate(ConnectionBasedClientManager connectionBasedClientManager, EphemeralIpPortClientManager ephemeralIpPortClientManager, PersistentIpPortClientManager persistentIpPortClientManager) {
        this.connectionBasedClientManager = connectionBasedClientManager;
        this.ephemeralIpPortClientManager = ephemeralIpPortClientManager;
        this.persistentIpPortClientManager = persistentIpPortClientManager;
    }
    ...

    @Override
    public Client getClient(String clientId) {
        //通过请求参数中的connectionId获取一个Client对象,比如调用EphemeralIpPortClientManager.getClient()方法
        return getClientManagerById(clientId).getClient(clientId);
    }

    private ClientManager getClientManagerById(String clientId) {
        if (isConnectionBasedClient(clientId)) {
            return connectionBasedClientManager;
        }
        return clientId.endsWith(ClientConstants.PERSISTENT_SUFFIX) ? persistentIpPortClientManager : ephemeralIpPortClientManager;
    }
    ...
}

//The manager of {@code IpPortBasedClient} and ephemeral.
@Component("ephemeralIpPortClientManager")
public class EphemeralIpPortClientManager implements ClientManager {
    //key是请求参数中的connectionId即clientId,value是一个继承了实现Client接口的AbstractClient的IpPortBasedClient对象
    private final ConcurrentMap<String, IpPortBasedClient> clients = new ConcurrentHashMap<>();
    private final DistroMapper distroMapper;
    private final ClientFactory<IpPortBasedClient> clientFactory;

    public EphemeralIpPortClientManager(DistroMapper distroMapper, SwitchDomain switchDomain) {
        this.distroMapper = distroMapper;
        GlobalExecutor.scheduleExpiredClientCleaner(new ExpiredClientCleaner(this, switchDomain), 0, Constants.DEFAULT_HEART_BEAT_INTERVAL, TimeUnit.MILLISECONDS);
        clientFactory = ClientFactoryHolder.getInstance().findClientFactory(ClientConstants.EPHEMERAL_IP_PORT);
    }

    @Override
    public boolean clientConnected(String clientId, ClientAttributes attributes) {
        return clientConnected(clientFactory.newClient(clientId, attributes));
    }

    @Override
    public boolean clientConnected(final Client client) {
        clients.computeIfAbsent(client.getClientId(), s -> {
            Loggers.SRV_LOG.info("Client connection {} connect", client.getClientId());
            IpPortBasedClient ipPortBasedClient = (IpPortBasedClient) client;
            ipPortBasedClient.init();
            return ipPortBasedClient;
        });
        return true;
    }

    @Override
    public boolean syncClientConnected(String clientId, ClientAttributes attributes) {
        return clientConnected(clientFactory.newSyncedClient(clientId, attributes));
    }

    @Override
    public Client getClient(String clientId) {
        //客户端和服务端建立长连接时,就会通过EphemeralIpPortClientManager.clientConnected()方法将clientId放入到clients
        return clients.get(clientId);
    }
    ...
}

public class IpPortBasedClient extends AbstractClient {
    ...
    ...
}

public abstract class AbstractClient implements Client {
    //publishers其实就是记录该客户端提供的服务和服务实例,一个客户端可提供多个服务;
    //存储客户端发送过来的请求中的Instance信息,当然这些信息已封装为InstancePublishInfo对象
    //key为已注册的Service,value是根据请求中的Instance实例信息封装的InstancePublishInfo对象
    protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);
    ...

    @Override
    public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
        //服务注册时,如果是第一次put进去Service对象,会返回null
        if (null == publishers.put(service, instancePublishInfo)) {
            //监视器记录
            MetricsMonitor.incrementInstanceCount();
        }
        //发布客户端改变事件ClientChangedEvent,用于处理集群间的数据同步
        NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
        Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
        return true;
    }
    ...
}

(2)服务端对客户端注册事件的处理源码

一.服务端处理客户端的服务注册请求时的主要工作

往三个ConcurrentHashMap里放入内容 + 发布三个事件。

三个ConcurrentHashMap分别是:第一个是ServiceManager中的,key是根据请求参数创建的Service对象,value是已经注册的Service对象。第二个是ServiceManager中的,key是命名空间,value是相同命名空间的已注册的Service对象集合。第三个是AbstractClient中的,key是注册的Service服务对象,value是根据请求中的Instance实例封装的InstancePublishInfo对象。

三个事件分别是:第一个是在AbstractClient的addServiceInstance()方法中,发布的ClientChangedEvent客户端改变事件。第二个是在EphemeralClientOperationService的registerInstance()方法中,发布的ClientRegisterServiceEvent客户端注册事件。第三个是在EphemeralClientOperationService的registerInstance()方法中,发布的InstanceMetadataEvent服务实例元数据事件。

二.服务端对客户端注册事件ClientRegisterServiceEvent的处理

当执行EphemeralClientOperationService的registerInstance()方法,发布一个ClientRegisterServiceEvent客户端注册事件时,便会触发执行ClientServiceIndexesManager的onEvent()方法,然后执行ClientServiceIndexesManager的handleClientOperation()方法,最终调用ClientServiceIndexesManager的addPublisherIndexes()方法。

其中addPublisherIndexes()方法会把clientId放入到publisherIndexes中。publisherIndexes是一个ConcurrentMap,它的key是要注册的服务实例所属的服务Service对象,它的value是某服务Service对象下的所有clientId即connectionId。由于connectionId代表了一个Client对象,也就是一个客户端,所以publisherIndexes的value可理解为服务Service下的所有客户端实例。

//Client and service index manager.
@Component
public class ClientServiceIndexesManager extends SmartSubscriber {
    //注册表(服务提供者),一个Service服务对象,对应多个服务实例的clientId
    private final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();
    ...

    //可以处理客户端注册事件ClientRegisterServiceEvent
    @Override
    public void onEvent(Event event) {
        if (event instanceof ClientEvent.ClientDisconnectEvent) {
            handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);
        } else if (event instanceof ClientOperationEvent) {
            handleClientOperation((ClientOperationEvent) event);
        }
    }

    private void handleClientOperation(ClientOperationEvent event) {
        Service service = event.getService();
        String clientId = event.getClientId();
        if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
            //处理客户端注册事件ClientRegisterServiceEvent
            addPublisherIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
            //处理客户端注销事件ClientDeregisterServiceEvent
            removePublisherIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
            //处理客户端订阅服务事件ClientSubscribeServiceEvent
            addSubscriberIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
            //处理客户端取消订阅事件ClientUnsubscribeServiceEvent
            removeSubscriberIndexes(service, clientId);
        }
    }

    private void addPublisherIndexes(Service service, String clientId) {
        //判断注册表是否存在该Service,不存在则创建一个空的ConcurrentHashSet
        publisherIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
        //把clientId放入到对应的Service中
        publisherIndexes.get(service).add(clientId);
        //发布服务改变事件
        NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
    }
    ...
}

(3)总结

服务端处理服务实例注册时,会使用多个Map来存储微服务实例的信息。在注册表也就是ClientServiceIndexesManager.publisherIndexes属性中,只是简单记录每个Service服务对象下包含的clientId字符串集合。通过clientId可以在clients属性中获取到IpPortBasedClient对象,IpPortBasedClient的父类AbstractClient会存储对应的Instance实例信息,所以这样的注册表是可记录每个Service服务包含的所有Instance实例的。

4.客户端服务发现和服务端处理服务订阅的源码分析

(1)Nacos客户端进行服务发现的源码

(2)Nacos服务端处理服务订阅请求的源码

(1)Nacos客户端进行服务发现的源码

一.nacos-discovery引入Ribbon实现服务调用时的负载均衡

二.nacos-discovery如何整合Ribbon实现服务调用时的负载均衡

三.nacos-client如何进行服务发现

一.nacos-discovery引入Ribbon实现服务调用时的负载均衡

Nacos客户端就是引入了nacos-discovery + nacos-client依赖的项目。由于nacos-discovery整合了Ribbon,所以Ribbon可以调用Nacos服务端的服务实例查询列表接口。于是Nacos客户端便借助Ribbon实现了服务调用时的负载均衡,即Ribbon会从服务实例列表中选择一个服务实例给客户端进行服务调用。

在nacos-discovery的pom.xml中,可以看到它引入了Ribbon依赖:

二.nacos-discovery如何整合Ribbon实现服务调用时的负载均衡

在Ribbon中会有一个ServerList接口,如下所示:ServerList就是一个扩展接口,这个接口的作用就是获取Server列表。然后nacos-discovery会针对这个接口进行实现,从而整合Ribbon。

从引入的包来看,loadbalancer是属于Ribbon源码包下的。而LoadBalancer则是Ribbon中的负载均衡器。负载均衡器会结合IRule负载均衡策略,从服务实例列表中选择一个实例。

package com.netflix.loadbalancer;

import java.util.List;

//Interface that defines the methods sed to obtain the List of Servers
public interface ServerList<T extends Server> {
    public List<T> getInitialListOfServers();

    //Return updated list of servers. This is called say every 30 secs
    public List<T> getUpdatedListOfServers();
}

当Nacos客户端进行微服务调用时,会通过Ribbon来选出一个服务实例,此时Ribbon会调用NacosServerList的getUpdatedListOfServers()方法获取服务实例列表。

nacos-discovery的NacosServerList类继承了AbstractServerList类,而且实现了Ribbon的ServerList接口的两个方法,如下所示:

public abstract class AbstractServerList<T extends Server> implements ServerList<T>, IClientConfigAware {
    ...
    ...
}

public class NacosServerList extends AbstractServerList<NacosServer> {
    private NacosDiscoveryProperties discoveryProperties;
    private String serviceId;

    public NacosServerList(NacosDiscoveryProperties discoveryProperties) {
        this.discoveryProperties = discoveryProperties;
    }

    @Override
    public List<NacosServer> getInitialListOfServers() {
        return getServers();
    }

    @Override
    public List<NacosServer> getUpdatedListOfServers() {
        return getServers();
    }

    private List<NacosServer> getServers() {
        try {
            //读取分组
            String group = discoveryProperties.getGroup();
            //通过服务名称、分组、true(表示只需要健康实例),
            //调用NacosNamingService.selectInstances()方法来查询服务实例列表
            List<Instance> instances = discoveryProperties.namingServiceInstance().selectInstances(serviceId, group, true);
            //把Instance转换成NacosServer类型
            return instancesToServerList(instances);
        } catch (Exception e) {
            throw new IllegalStateException("Can not get service instances from nacos, serviceId=" + serviceId, e);
        }
    }

    private List<NacosServer> instancesToServerList(List<Instance> instances) {
        List<NacosServer> result = new ArrayList<>();
        if (CollectionUtils.isEmpty(instances)) {
            return result;
        }
        for (Instance instance : instances) {
            result.add(new NacosServer(instance));
        }
        return result;
    }

    public String getServiceId() {
        return serviceId;
    }

    @Override
    public void initWithNiwsConfig(IClientConfig iClientConfig) {
        this.serviceId = iClientConfig.getClientName();
    }
}

NacosServerList的核心方法是getServers(),因为nacos-discovery实现Ribbon的两个接口都调用到了该方法。

在nacos-discovery的NacosServerList的getServers()方法中,会调用nacos-client的NacosNamingService的selectInstances()方法,来获取服务实例列表。

三.nacos-client如何进行服务发现

在nacos-client的NacosNamingService的selectInstances()方法中,首先会调用ServiceInfoHolder的getServiceInfo()方法从本地缓存获取数据。ServiceInfoHolder的serviceInfoMap中的value是一个ServiceInfo对象,在ServiceInfo对象中会有一个Listhosts属性来存放实例数据。

如果ServiceInfoHolder中的本地缓存没有对应的ServiceInfo对象,那么就会调用NamingClientProxyDelegate的subscribe()方法。该方法首先会开启一个查询服务实例列表的延时执行的任务,然后通过Client对象发送订阅请求,去服务端实时获取服务实例数据。

具体来说就是先调用ServiceInfoUpdateService的scheduleUpdateIfAbsent()方法,开启一个延迟执行查询服务实例列表的UpdateTask任务,然后再次调用ServiceInfoHolder的getServiceInfoMap()方法查询本地缓存。如果本地缓存为空,则向服务端发起gRPC请求获取服务实例数据,也就是通过调用NamingGrpcClientProxy的subscribe()方法,触发调用NamingGrpcClientProxy的doSubscribe()方法,再触发调用NamingGrpcClientProxy的requestToServer()方法,接着调用RpcClient的request()方法发送gRPC请求给服务端,最后调用ServiceInfoHolder的processServiceInfo()方法更新本地缓存。

public class NacosNamingService implements NamingService {
    private ServiceInfoHolder serviceInfoHolder;
    private NamingClientProxy clientProxy;
    ...

    private void init(Properties properties) throws NacosException {
        ...
        this.serviceInfoHolder = new ServiceInfoHolder(namespace, properties);
        this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, properties, changeNotifier);
    }

    @Override
    public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException {
        ServiceInfo serviceInfo;
        String clusterString = StringUtils.join(clusters, ",");
        //判断是否需要订阅,默认为true
        if (subscribe) {
            //查询Nacos本地缓存数据
            serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
            if (null == serviceInfo) {
                //如果本地缓存数据为空,则调用Client代理NamingClientProxyDelegate的订阅方法subscribe(),通过Client对象请求服务端获取数据
                serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
            }
        } else {
            serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);
        }
        //返回数据
        return selectInstances(serviceInfo, healthy);
    }
    ...
}

//Naming client service information holder.
public class ServiceInfoHolder implements Closeable {
    private final ConcurrentMap<String, ServiceInfo> serviceInfoMap;
    ...

    public ServiceInfo getServiceInfo(final String serviceName, final String groupName, final String clusters) {
        NAMING_LOGGER.debug("failover-mode: {}", failoverReactor.isFailoverSwitch());
        //获取key
        String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
        String key = ServiceInfo.getKey(groupedServiceName, clusters);
        if (failoverReactor.isFailoverSwitch()) {
            return failoverReactor.getService(key);
        }
        //通过key从本地缓存中获取数据
        return serviceInfoMap.get(key);
    }
    ...
}

public class NamingClientProxyDelegate implements NamingClientProxy {
    private final ServiceInfoUpdateService serviceInfoUpdateService;
    private final ServiceInfoHolder serviceInfoHolder;
    private final NamingGrpcClientProxy grpcClientProxy;
    ...

    public NamingClientProxyDelegate(String namespace, ServiceInfoHolder serviceInfoHolder, Properties properties, InstancesChangeNotifier changeNotifier) throws NacosException {
        ...
        this.serviceInfoUpdateService = new ServiceInfoUpdateService(properties, serviceInfoHolder, this, changeNotifier);
        this.serviceInfoHolder = serviceInfoHolder;
        this.grpcClientProxy = new NamingGrpcClientProxy(namespace, securityProxy, serverListManager, properties, serviceInfoHolder);
    }
    ...

    @Override
    public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
        NAMING_LOGGER.info("[SUBSCRIBE-SERVICE] service:{}, group:{}, clusters:{} ", serviceName, groupName, clusters);
        String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
        String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);
        //1.调用ServiceInfoUpdateService.scheduleUpdateIfAbsent()方法开启一个查询服务实例列表的定时任务
        serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
        //再次查询本地的缓存数据
        ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
        if (null == result || !isSubscribed(serviceName, groupName, clusters)) {
            //2.如果本地缓存还是为空,则使用gRPC来请求服务端,也就是调用NamingGrpcClientProxy.subscribe()方法
            result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
        }
        //更新本地缓存
        serviceInfoHolder.processServiceInfo(result);
        return result;
    }
    ...
}

public class NamingGrpcClientProxy extends AbstractNamingClientProxy {
    ...
    private final RpcClient rpcClient;
    private final NamingGrpcRedoService redoService;

    public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListFactory serverListFactory, Properties properties, ServiceInfoHolder serviceInfoHolder) throws NacosException {
        ...
        //通过RpcClientFactory.createClient()方法创建一个GrpcSdkClient对象实例,然后赋值给rpcClient属性
        this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels);
        this.redoService = new NamingGrpcRedoService(this);
        start(serverListFactory, serviceInfoHolder);
    }

    private void start(ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException {
        rpcClient.serverListFactory(serverListFactory);
        rpcClient.registerConnectionListener(redoService);
        rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder));
        rpcClient.start();
        NotifyCenter.registerSubscriber(this);
    }
    ...

    @Override
    public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
        if (NAMING_LOGGER.isDebugEnabled()) {
            NAMING_LOGGER.debug("[GRPC-SUBSCRIBE] service:{}, group:{}, cluster:{} ", serviceName, groupName, clusters);
        }
        redoService.cacheSubscriberForRedo(serviceName, groupName, clusters);
        //执行订阅
        return doSubscribe(serviceName, groupName, clusters);
    }

    //Execute subscribe operation.
    public ServiceInfo doSubscribe(String serviceName, String groupName, String clusters) throws NacosException {
        //构建订阅服务请求——SubscribeServiceRequest对象
        SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, groupName, serviceName, clusters, true);
        //向服务端发送SubscribeServiceRequest类型的请求
        SubscribeServiceResponse response = requestToServer(request, SubscribeServiceResponse.class);
        redoService.subscriberRegistered(serviceName, groupName, clusters);
        return response.getServiceInfo();
    }

    private <T extends Response> T requestToServer(AbstractNamingRequest request, Class<T> responseClass) throws NacosException {
        try {
            request.putAllHeader(getSecurityHeaders(request.getNamespace(), request.getGroupName(), request.getServiceName()));
            //实际会调用RpcClient.request()方法发起gRPC请求
            Response response = requestTimeout < 0 ? rpcClient.request(request) : rpcClient.request(request, requestTimeout);
            if (ResponseCode.SUCCESS.getCode() != response.getResultCode()) {
                throw new NacosException(response.getErrorCode(), response.getMessage());
            }
            if (responseClass.isAssignableFrom(response.getClass())) {
                return (T) response;
            }
            NAMING_LOGGER.error("Server return unexpected response '{}', expected response should be '{}'", response.getClass().getName(), responseClass.getName());
        } catch (Exception e) {
            throw new NacosException(NacosException.SERVER_ERROR, "Request nacos server failed: ", e);
        }
        throw new NacosException(NacosException.SERVER_ERROR, "Server return invalid response");
    }
    ...
}

其中UpdateTask任务的run()方法会先调用NamingClientProxy.queryInstancesOfService()方法,然后调用ServiceInfoHolder的processServiceInfo()方法向服务端查询服务实例列表以及更新本地服务实例缓存。当该任务执行完毕时,会继续向调度线程池提交一个延迟6s执行的任务,从而实现不断更新本地缓存的服务实例列表。

在ServiceInfoHolder的processServiceInfo()方法更新本地服务实例缓存中,会判断服务实例是否发生改变。如果有改变,那么客户端会先发布一个服务实例改变事件InstancesChangeEvent,然后把新的服务实例数据写入本地磁盘。

public class ServiceInfoUpdateService implements Closeable {
    private final NamingClientProxy namingClientProxy;
    private final Map<String, ScheduledFuture<?>> futureMap = new HashMap<String, ScheduledFuture<?>>();
    ...

    //Schedule update if absent.
    public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {
        //生成一个serverKey
        String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
        //判断当前serviceKey是否已经开启了对应的定时任务,如果已经开启就不开启了
        if (futureMap.get(serviceKey) != null) {
            return;
        }
        //加一把同步锁,避免并发冲突
        synchronized (futureMap) {
            //加锁之后再进行一次判断,双重检测Double Check
            if (futureMap.get(serviceKey) != null) {
                return;
            }
            //添加UpdateTask任务到executor延迟执行
            ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));
            futureMap.put(serviceKey, future);
        }
    }

    private synchronized ScheduledFuture<?> addTask(UpdateTask task) {
        return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
    }
    ...

    public class UpdateTask implements Runnable {
        long lastRefTime = Long.MAX_VALUE;
        private boolean isCancel;
        private final String serviceName;
        private final String groupName;
        private final String clusters;
        private final String groupedServiceName;
        private final String serviceKey;
        //the fail situation. 1:can't connect to server 2:serviceInfo's hosts is empty
        private int failCount = 0;

        public UpdateTask(String serviceName, String groupName, String clusters) {
            this.serviceName = serviceName;
            this.groupName = groupName;
            this.clusters = clusters;
            this.groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
            this.serviceKey = ServiceInfo.getKey(groupedServiceName, clusters);
        }

        @Override
        public void run() {
            long delayTime = DEFAULT_DELAY;
            try {
                if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(serviceKey)) {
                    NAMING_LOGGER.info("update task is stopped, service:{}, clusters:{}", groupedServiceName, clusters);
                    isCancel = true;
                    return;
                }
                //获取本地缓存
                ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
                if (serviceObj == null) {
                    //如果本地缓存为空,则通过gRPC去查询服务端的服务实例数据
                    serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
                    //更新本地缓存
                    serviceInfoHolder.processServiceInfo(serviceObj);
                    //更新UpdateTask的lastRefTime属性,即更新UpdateTask任务的数据获取时间
                    lastRefTime = serviceObj.getLastRefTime();
                    return;
                }
                //如果本地缓存不为空,则判断本地缓存最后一次刷新时间 是否小于等于 最后一次UpdateTask任务的数据获取时间
                if (serviceObj.getLastRefTime() <= lastRefTime) {
                    //如果小于等于,则重新查询服务端的服务实例数据
                    serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
                    //更新本地缓存
                    serviceInfoHolder.processServiceInfo(serviceObj);
                }
                lastRefTime = serviceObj.getLastRefTime();
                if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
                    incFailCount();
                    return;
                }
                //TODO multiple time can be configured.
                //计算下一次定时任务执行的时间,这个时间默认是6s
                delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;
                //请求成功后,重置错误次数为0
                resetFailCount();
            } catch (Throwable e) {
                //记录请求失败的次数
                incFailCount();
                NAMING_LOGGER.warn("[NA] failed to update serviceName: {}", groupedServiceName, e);
            } finally {
                if (!isCancel) {
                    //根据请求失败的次数,动态调整重新提交的UpdateTask定时任务的执行时间
                    executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
                }
            }
        }

        private void incFailCount() {
            int limit = 6;
            if (failCount == limit) {
                return;
            }
            failCount++;
        }

        private void resetFailCount() {
            failCount = 0;
        }
    }
    ...
}

//Naming client service information holder.
public class ServiceInfoHolder implements Closeable {
    private final ConcurrentMap<String, ServiceInfo> serviceInfoMap;
    ...

    //Process service info.
    public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
        String serviceKey = serviceInfo.getKey();
        if (serviceKey == null) {
            return null;
        }
        //获取本地缓存中的服务实例
        ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
        if (isEmptyOrErrorPush(serviceInfo)) {
            return oldService;
        }
        //更新本地缓存中的服务实例
        serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
        //判断服务实例是否有改变
        boolean changed = isChangedServiceInfo(oldService, serviceInfo);
        if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
            serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
        }
        MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
        if (changed) {
            NAMING_LOGGER.info("current ips:({}) service: {} -> {}", serviceInfo.ipCount(), serviceInfo.getKey(), JacksonUtils.toJson(serviceInfo.getHosts()));
            //发布服务实例改变事件
            NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts()));
            //将服务实例信息写入本地磁盘
            DiskCache.write(serviceInfo, cacheDir);
        }
        return serviceInfo;
    }
    ...
}

(2)Nacos服务端处理服务订阅请求的源码

一.根据要查询的Service对象读取缓存

二.添加订阅者即Subscriber对象

假设order-service需要调用stock-service的接口,那么order-service(Nacos客户端)就要向Nacos服务端订阅stock-service服务,也就是order-service需要从服务端获取到stock-service的所有服务实例。

客户端向服务端发起订阅请求的参数类型是SubscribeServiceRequest。服务订阅请求的处理方法是SubscribeServiceRequestHandler的handle(),该方法会先从SubscribeServiceRequest对象里获取信息构建Service对象,然后再根据RequestMeta请求元数据构建Subscriber对象,接着就会调用ServiceStorage的getData()方法读取缓存中的服务实例,以及通过clientOperationService的subscribeService()方法添加订阅者。

//Handler to handle subscribe service.
@Component
public class SubscribeServiceRequestHandler extends RequestHandler<SubscribeServiceRequest, SubscribeServiceResponse> {
    private final ServiceStorage serviceStorage;
    private final EphemeralClientOperationServiceImpl clientOperationService;
    ...

    //假设order-service需要调用stock-service的接口,那么order-service(Nacos客户端)就要向服务端订阅stock-service服务
    //也就是order-service需要从服务端获取到(查询出)stock-service的所有服务实例
    @Override
    @Secured(action = ActionTypes.READ)
    public SubscribeServiceResponse handle(SubscribeServiceRequest request, RequestMeta meta) throws NacosException {
        String namespaceId = request.getNamespace();
        String serviceName = request.getServiceName();
        String groupName = request.getGroupName();
        String app = request.getHeader("app", "unknown");
        String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
        //构建要查询的Service服务对象,对应的是stock-serivce
        Service service = Service.newService(namespaceId, groupName, serviceName, true);
        //构建要订阅Service服务的订阅者,对应的是order-service
        Subscriber subscriber = new Subscriber(meta.getClientIp(), meta.getClientVersion(), app, meta.getClientIp(), namespaceId, groupedServiceName, 0, request.getClusters());
        //1.调用ServiceStorage.getData()方法读取缓存
        ServiceInfo serviceInfo = ServiceUtil.selectInstancesWithHealthyProtection(serviceStorage.getData(service), metadataManager.getServiceMetadata(service).orElse(null), subscriber);
        if (request.isSubscribe()) {
            //2.添加订阅者,如果订阅的服务有变动,则需要通知订阅者
            clientOperationService.subscribeService(service, subscriber, meta.getConnectionId());
        } else {
            clientOperationService.unsubscribeService(service, subscriber, meta.getConnectionId());
        }
        return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);
    }
 }

一.根据要查询的Service对象读取缓存

调用的方法是ServiceStorage.getData():

-> serviceDataIndexes.get()
-> ServiceStorage.getPushData()方法
-> ServiceStorage.emptyServiceInfo()方法
-> ServiceStorage.getAllInstancesFromIndex()方法
-> ClientServiceIndexesManager.getAllClientsRegisteredService()方法 
-> ServiceStorage.getInstanceInfo()方法根据clientId获取Instance对象
-> EphemeralIpPortClientManager.getClient()方法
-> AbstractClient.getInstancePublishInfo()方法
-> ServiceStorage.parseInstance()方法
-> serviceDataIndexes.put() + serviceClusterIndex.put()

ServiceStorage的getData()方法在读取缓存时,获取要查询的Service服务对象下的全部Instance实例会分三步:一是从注册表中获取要查询的Service对象下的全部clientId,二是根据clientId获取对应的Client对象,三是根据Client对象获取对应的Instance信息。

@Component
public class ServiceStorage {
    //缓存要查询的Service服务对象对应的已注册的服务详情
    private final ConcurrentMap<Service, ServiceInfo> serviceDataIndexes;

    //缓存要查询的Service服务对象对应的服务集群
    private final ConcurrentMap<Service, Set<String>> serviceClusterIndex;
    ...

    public ServiceInfo getData(Service service) {
        //判断缓存中是否有数据,传入的参数service就是要查询的Service服务对象,对应的是stock-serivce
        //如果有则直接读取缓存数据,如果没有则调用ServiceStorage.getPushData(service)
        return serviceDataIndexes.containsKey(service) ? serviceDataIndexes.get(service) : getPushData(service);
    }

    public ServiceInfo getPushData(Service service) {
        //调用ServiceStorage.emptyServiceInfo()方法创建空的ServiceInfo对象
        ServiceInfo result = emptyServiceInfo(service);
        if (!ServiceManager.getInstance().containSingleton(service)) {
            return result;
        }
        //调用ServiceStorage.getAllInstancesFromIndex()方法获服务取实例列表
        //ServiceInfo的hosts属性就包含了该服务的所有Instance实例数据
        result.setHosts(getAllInstancesFromIndex(service));
        //将获取到的ServiceInfo对象放入到缓存中
        serviceDataIndexes.put(service, result);
        return result;
    }

    private ServiceInfo emptyServiceInfo(Service service) {
        ServiceInfo result = new ServiceInfo();
        result.setName(service.getName());
        result.setGroupName(service.getGroup());
        result.setLastRefTime(System.currentTimeMillis());
        result.setCacheMillis(switchDomain.getDefaultPushCacheMillis());
        return result;
    }

    private List<Instance> getAllInstancesFromIndex(Service service) {
        //传入的参数service就是要查询的Service服务对象,对应的是stock-serivce
        Set<Instance> result = new HashSet<>();
        Set<String> clusters = new HashSet<>();
        //each = clientId,调用ClientServiceIndexesManager.getAllClientsRegisteredService()方法获取Service服务对象的所有clientId
        for (String each : serviceIndexesManager.getAllClientsRegisteredService(service)) {
            //调用ServiceStorage.getInstanceInfo()方法,根据clientId查询出对应的Instance信息,即InstancePublishInfo对象
            Optional<InstancePublishInfo> instancePublishInfo = getInstanceInfo(each, service);
            if (instancePublishInfo.isPresent()) {
                //对象转换,将InstancePublishInfo对象转换成Instance对象
                Instance instance = parseInstance(service, instancePublishInfo.get());
                result.add(instance);
                clusters.add(instance.getClusterName());
            }
        }
        //cache clusters of this service
        //缓存要查询的Service服务对象,对应有哪些集群
        serviceClusterIndex.put(service, clusters);
        return new LinkedList<>(result);
    }

    private Optional<InstancePublishInfo> getInstanceInfo(String clientId, Service service) {
        //通过调用EphemeralIpPortClientManager.getClient()方法,根据clientId获取对应的Client连接对象
        Client client = clientManager.getClient(clientId);
        if (null == client) {
            return Optional.empty();
        }
        //调用AbstractClient.getInstancePublishInfo()方法,获取该Client客户端注册的服务实例信息InstancePublishInfo
        return Optional.ofNullable(client.getInstancePublishInfo(service));
    }

    private Instance parseInstance(Service service, InstancePublishInfo instanceInfo) {
        Instance result = InstanceUtil.parseToApiInstance(service, instanceInfo);
        Optional<InstanceMetadata> metadata = metadataManager.getInstanceMetadata(service, instanceInfo.getMetadataId());
        metadata.ifPresent(instanceMetadata -> InstanceUtil.updateInstanceMetadata(result, instanceMetadata));
        return result;
    }
    ...
}

@Component
public class ClientServiceIndexesManager extends SmartSubscriber {
    //注册表(服务提供者),一个Service服务对象,对应多个服务实例的clientId
    private final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();
    ...

    public Collection<String> getAllClientsRegisteredService(Service service) {
        //服务端在处理服务注册时,最后会调用ClientServiceIndexesManager.addPublisherIndexes()方法:
        //将客户端对应的clientId放入到publisherIndexes注册表中
        //所有下面的代码会从注册表中获取要查询的Service服务对象对应的clientId集合
        return publisherIndexes.containsKey(service) ? publisherIndexes.get(service) : new ConcurrentHashSet<>();
    }

    private void addPublisherIndexes(Service service, String clientId) {
        //判断注册表是否存在该Service,不存在则创建一个空的ConcurrentHashSet
        publisherIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
        //把clientId放入到对应的Service中
        publisherIndexes.get(service).add(clientId);
        //发布服务改变事件
        NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
    }
    ...
}

@Component("ephemeralIpPortClientManager")
public class EphemeralIpPortClientManager implements ClientManager {
    //key是请求参数中的connectionId即clientId,value是一个继承了实现Client接口的AbstractClient的IpPortBasedClient对象
    private final ConcurrentMap<String, IpPortBasedClient> clients = new ConcurrentHashMap<>();
    ...

    @Override
    public Client getClient(String clientId) {
        //客户端和服务端建立长连接时,就会通过EphemeralIpPortClientManager.clientConnected()方法将clientId放入到clients
        return clients.get(clientId);
    }
    ...
}

public abstract class AbstractClient implements Client {
    //publishers其实就是记录该客户端提供的服务和服务实例,一个客户端可提供多个服务
    //存储客户端发送过来的请求中的Instance信息,当然这些信息已封装为InstancePublishInfo对象
    //key为已注册的Service,value是根据请求中的instance实例信息封装的InstancePublishInfo对象
    protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);
    ...

    @Override
    public InstancePublishInfo getInstancePublishInfo(Service service) {
        return publishers.get(service);
    }
    ...
}

二.添加订阅者即Subscriber对象

调用的方法是EphemeralClientOperationServiceImpl.subscribeService(),添加订阅者其实就是先根据clientId找出对应的客户端Client对象,然后往AbstractClient.subscribers属性放入服务对象和对应的订阅者对象,最后再发布一个客户端订阅服务事件ClientSubscribeServiceEvent。

这个事件会被ClientServiceIndexesManager的onEvent()方法处理,即调用ClientServiceIndexesManager的addSubscriberIndexes()方法,该方法会继续发布一个服务订阅事件ServiceSubscribedEvent。

//Operation service for ephemeral clients and services.
@Component("ephemeralClientOperationService")
public class EphemeralClientOperationServiceImpl implements ClientOperationService {
    ...
    //添加订阅者
    //@param service    service:要查询的Service对象,比如stock-service
    //@param subscriber subscribe:订阅者,比如对应order-service
    //@param clientId   id of client:对应order-service与Nacos服务端的连接ID
    @Override
    public void subscribeService(Service service, Subscriber subscriber, String clientId) {
        //传入的service是要查询的Service对象,比如stock-service
        Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);
        //传入的clientId是代表着order-service的Client对象,调用EphemeralIpPortClientManager.getClient()方法
        Client client = clientManager.getClient(clientId);
        if (!clientIsLegal(client, clientId)) {
            return;
        }
        //往代表着order-service的Client对象中,添加订阅者
        client.addServiceSubscriber(singleton, subscriber);
        client.setLastUpdatedTime();
        //发布客户端订阅服务事件ClientSubscribeServiceEvent,也就是order-service客户端订阅了service服务
        NotifyCenter.publishEvent(new ClientOperationEvent.ClientSubscribeServiceEvent(singleton, clientId));
    }
    ...
}

@Component("ephemeralIpPortClientManager")
public class EphemeralIpPortClientManager implements ClientManager {
    //key是请求参数中的connectionId即clientId,value是一个继承了实现Client接口的AbstractClient的IpPortBasedClient对象
    private final ConcurrentMap<String, IpPortBasedClient> clients = new ConcurrentHashMap<>();
    ...

    @Override
    public Client getClient(String clientId) {
        //客户端和服务端建立长连接时,就会通过EphemeralIpPortClientManager.clientConnected()方法将clientId放入到clients
        return clients.get(clientId);
    }
    ...
}

public abstract class AbstractClient implements Client {
    //subscribers存放着:订阅者Subscriber(其实可理解为当前客户端)订阅了的Service服务
    //subscribers的key=stock-service(要订阅的某个服务)、value=order-service(订阅者,某个具体的包含IP的服务实例)
    protected final ConcurrentHashMap<Service, Subscriber> subscribers = new ConcurrentHashMap<>(16, 0.75f, 1);
    ...

    @Override
    public boolean addServiceSubscriber(Service service, Subscriber subscriber) {
        //服务订阅时,添加订阅者
        //subscribers的key=stock-service(要订阅的某个服务)、value=order-service(订阅者,某个具体的包含IP的服务实例)
        if (null == subscribers.put(service, subscriber)) {
            MetricsMonitor.incrementSubscribeCount();
        }
        return true;
    }
    ...
}

@Component
public class ClientServiceIndexesManager extends SmartSubscriber {
    //注册表(服务提供者),一个Service服务对象,对应多个服务实例的clientId
    private final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();
    //订阅者列表(服务消费者),一个Service服务对象,对应多个订阅者的clientId
    private final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();
    ...

    //可以处理客户端注册事件ClientRegisterServiceEvent
    @Override
    public void onEvent(Event event) {
        if (event instanceof ClientEvent.ClientDisconnectEvent) {
            handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);
        } else if (event instanceof ClientOperationEvent) {
            handleClientOperation((ClientOperationEvent) event);
        }
    }

    private void handleClientOperation(ClientOperationEvent event) {
        Service service = event.getService();
        String clientId = event.getClientId();
        if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
            //处理客户端注册事件ClientRegisterServiceEvent
            addPublisherIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
            //处理客户端注销事件ClientDeregisterServiceEvent
            removePublisherIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
            //处理客户端订阅服务事件ClientSubscribeServiceEvent
            addSubscriberIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
            //处理客户端取消订阅事件ClientUnsubscribeServiceEvent
            removeSubscriberIndexes(service, clientId);
        }
    }

    private void addSubscriberIndexes(Service service, String clientId) {
        //传入的service是要查询的Service对象stock-service,clientId是订阅者order-service对应的客户端连接对象ID
        subscriberIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
        //Fix #5404, Only first time add need notify event. 只有第一次添加时需要发布通知事件
        if (subscriberIndexes.get(service).add(clientId)) {
            //发布服务订阅事件ServiceSubscribedEvent
            NotifyCenter.publishEvent(new ServiceEvent.ServiceSubscribedEvent(service, clientId));
        }
    }
    ...
}

(3)总结

后端技术栈的基础修养 文章被收录于专栏

详细介绍后端技术栈的基础内容,包括但不限于:MySQL原理和优化、Redis原理和应用、JVM和G1原理和优化、RocketMQ原理应用及源码、Kafka原理应用及源码、ElasticSearch原理应用及源码、JUC源码、Netty源码、zk源码、Dubbo源码、Spring源码、Spring Boot源码、SCA源码、分布式锁源码、分布式事务、分库分表和TiDB、大型商品系统、大型订单系统等

全部评论

相关推荐

评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客企业服务