Nacos源码—7.Nacos升级gRPC分析二

大纲

5.服务变动时如何通知订阅的客户端

6.微服务实例信息如何同步集群节点

5.服务变动时如何通知订阅的客户端

(1)服务注册和服务订阅时发布的客户端注册和订阅事件的处理

(2)延迟任务的执行引擎源码

(3)处理客户端注册和订阅事件时发布的服务变动和服务订阅事件的处理

(1)服务注册和服务订阅时发布的客户端注册和订阅事件的处理

一.服务注册

Nacos客户端注册服务实例时,Nacos服务端会发布ClientRegisterServiceEvent客户端注册服务实例事件。Nacos服务端在处理客户端注册服务实例事件时,会把clientId写入到注册表,然后接着发布ServiceChangedEvent服务改变事件。

//Instance request handler.
@Component
public class InstanceRequestHandler extends RequestHandler<InstanceRequest, InstanceResponse> {
    private final EphemeralClientOperationServiceImpl clientOperationService;
    
    public InstanceRequestHandler(EphemeralClientOperationServiceImpl clientOperationService) {
        this.clientOperationService = clientOperationService;
    }

    @Override
    @Secured(action = ActionTypes.WRITE)
    public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
        //根据请求信息创建一个Service对象,里面包含了:命名空间、分组名、服务名
        Service service = Service.newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
        switch (request.getType()) {
            case NamingRemoteConstants.REGISTER_INSTANCE:
                //注册实例
                return registerInstance(service, request, meta);
            case NamingRemoteConstants.DE_REGISTER_INSTANCE:
                //注销实例
                return deregisterInstance(service, request, meta);
            default:
                throw new NacosException(NacosException.INVALID_PARAM, String.format("Unsupported request type %s", request.getType()));
        }
    }

    private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta) {
        //调用EphemeralClientOperationServiceImpl的注册方法registerInstance(),这里需要注意如下参数;
        //参数service:根据请求信息创建的一个Service对象,里面有命名空间、分组名、服务名
        //参数request.getInstance():这个参数就对应了客户端的实例对象,里面包含IP、端口等信息
        //参数meta.getConnectionId():这个参数很关键,它是连接ID
        clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());
        return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
    }

    private InstanceResponse deregisterInstance(Service service, InstanceRequest request, RequestMeta meta) {
        //调用EphemeralClientOperationServiceImpl的注销方法deregisterInstance()
        clientOperationService.deregisterInstance(service, request.getInstance(), meta.getConnectionId());
        return new InstanceResponse(NamingRemoteConstants.DE_REGISTER_INSTANCE);
    }
}

//Operation service for ephemeral clients and services.
@Component("ephemeralClientOperationService")
public class EphemeralClientOperationServiceImpl implements ClientOperationService {
    private final ClientManager clientManager;
    
    public EphemeralClientOperationServiceImpl(ClientManagerDelegate clientManager) {
        this.clientManager = clientManager;
    }
    
    @Override
    public void registerInstance(Service service, Instance instance, String clientId) {
        //从ServiceManager中根据由请求信息创建的Service对象获取一个已注册的Service对象
        Service singleton = ServiceManager.getInstance().getSingleton(service);
        if (!singleton.isEphemeral()) {
            throw new NacosRuntimeException(NacosException.INVALID_PARAM, String.format("Current service %s is persistent service, can't register ephemeral instance.", singleton.getGroupedServiceName()));
        }
        //从ClientManagerDelegate中根据请求参数中的connectionId获取一个Client对象,即IpPortBasedClient对象
        Client client = clientManager.getClient(clientId);
        if (!clientIsLegal(client, clientId)) {
            return;
        }
        //将请求中的instance实例信息封装为InstancePublishInfo对象
        InstancePublishInfo instanceInfo = getPublishInfo(instance);
        //往Client对象里添加已注册的服务对象Service,调用的是IpPortBasedClient对象的父类AbstractClient的addServiceInstance()方法
        client.addServiceInstance(singleton, instanceInfo);
        //设置IpPortBasedClient对象的lastUpdatedTime属性为最新时间
        client.setLastUpdatedTime();
        //发布客户端注册服务实例的事件
        NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
        //发布服务实例元数据的事件
        NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
    }
    ...
}

//Client and service index manager.
@Component
public class ClientServiceIndexesManager extends SmartSubscriber {
    //注册表(服务提供者),一个Service服务对象,对应多个服务实例的clientId
    private final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();
    ...
    //处理客户端注册事件ClientRegisterServiceEvent
    @Override
    public void onEvent(Event event) {
        if (event instanceof ClientEvent.ClientDisconnectEvent) {
            handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);
        } else if (event instanceof ClientOperationEvent) {
            handleClientOperation((ClientOperationEvent) event);
        }
    }
    
    private void handleClientOperation(ClientOperationEvent event) {
        Service service = event.getService();
        String clientId = event.getClientId();
        if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
            //处理客户端注册事件ClientRegisterServiceEvent
            addPublisherIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
            //处理客户端注销事件ClientDeregisterServiceEvent
            removePublisherIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
            //处理客户端订阅服务事件ClientSubscribeServiceEvent
            addSubscriberIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
            //处理客户端取消订阅事件ClientUnsubscribeServiceEvent
            removeSubscriberIndexes(service, clientId);
        }
    }
    
    private void addPublisherIndexes(Service service, String clientId) {
        //判断注册表是否存在该Service,不存在则创建一个空的ConcurrentHashSet
        publisherIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
        //把clientId放入到对应的Service中
        publisherIndexes.get(service).add(clientId);
        //发布服务改变事件
        NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
    }
    ...
}

二.服务订阅

客户端查询微服务实例列表进行服务发现时,调用的是订阅接口。服务端处理客户端的订阅请求时会发布ClientSubscribeServiceEvent事件,这个事件的处理逻辑是先向订阅表添加clientId到所订阅服务对应的集合中,如果第一次添加clientId则发布一个ServiceSubscribedEvent服务订阅事件。

//Handler to handle subscribe service.
@Component
public class SubscribeServiceRequestHandler extends RequestHandler<SubscribeServiceRequest, SubscribeServiceResponse> {
    private final ServiceStorage serviceStorage;
    private final EphemeralClientOperationServiceImpl clientOperationService;
    ...
  
    //假设order-service需要调用stock-service的接口,那么order-service(Nacos客户端)就要向服务端订阅stock-service服务
    //也就是order-service需要从服务端获取到(查询出)stock-service的所有服务实例
    @Override
    @Secured(action = ActionTypes.READ)
    public SubscribeServiceResponse handle(SubscribeServiceRequest request, RequestMeta meta) throws NacosException {
        String namespaceId = request.getNamespace();
        String serviceName = request.getServiceName();
        String groupName = request.getGroupName();
        String app = request.getHeader("app", "unknown");
        String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
        //构建要查询的Service服务对象,对应的是stock-serivce
        Service service = Service.newService(namespaceId, groupName, serviceName, true);
        //构建要订阅Service服务的订阅者,对应的是order-service
        Subscriber subscriber = new Subscriber(meta.getClientIp(), meta.getClientVersion(), app, meta.getClientIp(), namespaceId, groupedServiceName, 0, request.getClusters());
        //1.调用ServiceStorage.getData()方法读取缓存
        ServiceInfo serviceInfo = ServiceUtil.selectInstancesWithHealthyProtection(serviceStorage.getData(service), metadataManager.getServiceMetadata(service).orElse(null), subscriber);
        if (request.isSubscribe()) {
            //2.添加订阅者,如果订阅的服务有变动,则需要通知订阅者
            clientOperationService.subscribeService(service, subscriber, meta.getConnectionId());
        } else {
            clientOperationService.unsubscribeService(service, subscriber, meta.getConnectionId());
        }
        return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);
    }
}

//Operation service for ephemeral clients and services.
@Component("ephemeralClientOperationService")
public class EphemeralClientOperationServiceImpl implements ClientOperationService {
    ...
    //添加订阅者
    //@param service    service:要查询的Service对象,比如stock-service
    //@param subscriber subscribe:订阅者,比如对应order-service
    //@param clientId   id of client:对应order-service与Nacos服务端的连接ID
    @Override
    public void subscribeService(Service service, Subscriber subscriber, String clientId) {
        //传入的service是要查询的Service对象,比如stock-service
        Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);
        //传入的clientId是代表着order-service的Client对象,调用EphemeralIpPortClientManager.getClient()方法
        Client client = clientManager.getClient(clientId);
        if (!clientIsLegal(client, clientId)) {
            return;
        }
        //往代表着order-service的Client对象中,添加订阅者
        client.addServiceSubscriber(singleton, subscriber);
        client.setLastUpdatedTime();
        //发布客户端订阅服务事件ClientSubscribeServiceEvent,也就是order-service客户端订阅了service服务
        NotifyCenter.publishEvent(new ClientOperationEvent.ClientSubscribeServiceEvent(singleton, clientId));
    }
    ...
}

@Component
public class ClientServiceIndexesManager extends SmartSubscriber {
    //注册表(服务提供者),一个Service服务对象,对应多个服务实例的clientId
    private final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();
    //订阅者列表(服务消费者),一个Service服务对象,对应多个订阅者的clientId
    private final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();
    ...
    
    //可以处理客户端注册事件ClientRegisterServiceEvent
    @Override
    public void onEvent(Event event) {
        if (event instanceof ClientEvent.ClientDisconnectEvent) {
            handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);
        } else if (event instanceof ClientOperationEvent) {
            handleClientOperation((ClientOperationEvent) event);
        }
    }
    
    private void handleClientOperation(ClientOperationEvent event) {
        Service service = event.getService();
        String clientId = event.getClientId();
        if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
            //处理客户端注册事件ClientRegisterServiceEvent
            addPublisherIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
            //处理客户端注销事件ClientDeregisterServiceEvent
            removePublisherIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
            //处理客户端订阅服务事件ClientSubscribeServiceEvent
            addSubscriberIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
            //处理客户端取消订阅事件ClientUnsubscribeServiceEvent
            removeSubscriberIndexes(service, clientId);
        }
    }
    
    private void addSubscriberIndexes(Service service, String clientId) {
        //传入的service是要查询的Service对象stock-service,clientId是订阅者order-service对应的客户端连接对象ID
        subscriberIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
        // Fix #5404, Only first time add need notify event. 只有第一次添加时需要发布通知事件
        if (subscriberIndexes.get(service).add(clientId)) {
            //发布服务订阅事件ServiceSubscribedEvent
            NotifyCenter.publishEvent(new ServiceEvent.ServiceSubscribedEvent(service, clientId));
        }
    }
    ...
}

(2)延迟任务的执行引擎源码

一.什么是延迟任务执行引擎

延迟任务执行引擎就是可以往执行引擎中添加任务,该任务会被延时执行。Nacos的延迟任务执行引擎就是NacosDelayTaskExecuteEngine类。

Nacos会通过延迟任务执行引擎来处理服务改变事件和服务订阅事件,即ServiceChangedEvent和ServiceSubscribedEvent。

二.延迟任务执行引擎的执行原理

首先,Nacos会定义一个名为NacosTaskProcessor的任务处理器接口。NacosTaskProcessor是一个Interface ,它有很多个实现类。

然后,执行引擎会记录相关的任务处理器实现类。NacosDelayTaskExecuteEngine继承自AbstractNacosTaskExecuteEngine,AbstractNacosTaskExecuteEngine相当于任务执行引擎中心。AbstractNacosTaskExecuteEngine有两个属性来记录这些处理器实现类,并提供了两个方法可以向任务执行引擎中心添加处理器,这两个方法分别是addProcessor()方法和setDefaultTaskProcessor()方法。

接着,创建NacosDelayTaskExecuteEngine时会开启一个定时执行的任务,该定时执行的任务会定时执行ProcessRunnable的run()方法。

延时任务执行引擎有一个Map类型的tasks属性存放所有延迟执行的任务,而在ProcessRunnable的run()方法中,会触发调用其processTasks()方法。processTasks()方法会从tasks属性中获取全部的延迟任务,然后遍历处理。即先通过任务key获取具体的任务,再通过任务key获取对应的处理器,接着调用NacosTaskProcessor的process()方法,来完成延迟任务的执行。

最后,NacosDelayTaskExecuteEngine会提供一个addTask()方法,这个方法可以将延迟执行的任务添加到延时任务执行引擎的tasks属性中。

//Abstract nacos task execute engine. 任务执行引擎中心
public abstract class AbstractNacosTaskExecuteEngine<T extends NacosTask> implements NacosTaskExecuteEngine<T> {
    private final ConcurrentHashMap<Object, NacosTaskProcessor> taskProcessors = new ConcurrentHashMap<>();
    private NacosTaskProcessor defaultTaskProcessor;
    ...
    
    @Override
    public void addProcessor(Object key, NacosTaskProcessor taskProcessor) {
        taskProcessors.putIfAbsent(key, taskProcessor);
    }
    
    @Override
    public void removeProcessor(Object key) {
        taskProcessors.remove(key);
    }
    
    @Override
    public NacosTaskProcessor getProcessor(Object key) {
        return taskProcessors.containsKey(key) ? taskProcessors.get(key) : defaultTaskProcessor;
    }
    
    @Override
    public Collection<Object> getAllProcessorKey() {
        return taskProcessors.keySet();
    }
    
    @Override
    public void setDefaultTaskProcessor(NacosTaskProcessor defaultTaskProcessor) {
        this.defaultTaskProcessor = defaultTaskProcessor;
    }
    ...
}

//Nacos delay task execute engine. 延迟任务执行引擎
public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractDelayTask> {
    private final ScheduledExecutorService processingExecutor;
    //任务池
    protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;
    protected final ReentrantLock lock = new ReentrantLock();
    ...
    
    public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
        super(logger);
        tasks = new ConcurrentHashMap<>(initCapacity);
        processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
        //开启延时任务,即启动ProcessRunnable线程任务
        processingExecutor.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
    }
    ...   
    
    @Override
    public AbstractDelayTask removeTask(Object key) {
        lock.lock();
        try {
            AbstractDelayTask task = tasks.get(key);
            if (null != task && task.shouldProcess()) {
                return tasks.remove(key);
            } else {
                return null;
            }
        } finally {
            lock.unlock();
        }
    }
    
    @Override
    public Collection<Object> getAllTaskKeys() {
        Collection<Object> keys = new HashSet<Object>();
        lock.lock();
        try {
            keys.addAll(tasks.keySet());
        } finally {
            lock.unlock();
        }
        return keys;
    }
    
    @Override
    public void shutdown() throws NacosException {
        tasks.clear();
        processingExecutor.shutdown();
    }
    
    @Override
    public void addTask(Object key, AbstractDelayTask newTask) {
        lock.lock();
        try {
            AbstractDelayTask existTask = tasks.get(key);
            if (null != existTask) {
                newTask.merge(existTask);
            }
            //最后放入到任务池中
            tasks.put(key, newTask);
        } finally {
            lock.unlock();
        }
    }
    
    //process tasks in execute engine.
    protected void processTasks() {
        //获取tasks中所有的任务,然后进行遍历
        Collection<Object> keys = getAllTaskKeys();
        for (Object taskKey : keys) {
            //通过任务key,获取具体的任务,并且从任务池中移除掉
            AbstractDelayTask task = removeTask(taskKey);
            if (null == task) {
                continue;
            }
            //通过任务key获取对应的NacosTaskProcessor延迟任务处理器
            NacosTaskProcessor processor = getProcessor(taskKey);
            if (null == processor) {
                getEngineLog().error("processor not found for task, so discarded. " + task);
                continue;
            }
            try {
                // ReAdd task if process failed
                //调用获取到的NacosTaskProcessor延迟任务处理器的process()方法
                if (!processor.process(task)) {
                    //如果失败了,会重试添加task回tasks这个map中
                    retryFailedTask(taskKey, task);
                }
            } catch (Throwable e) {
                getEngineLog().error("Nacos task execute error ", e);
                retryFailedTask(taskKey, task);
            }
        }
    }
    
    private void retryFailedTask(Object key, AbstractDelayTask task) {
        task.setLastProcessTime(System.currentTimeMillis());
        addTask(key, task);
    }
    
    private class ProcessRunnable implements Runnable {
        @Override
        public void run() {
            try {
                processTasks();
            } catch (Throwable e) {
                getEngineLog().error(e.toString(), e);
            }
        }
    }
}

(3)处理客户端注册和订阅事件时发布的服务变动和服务订阅事件的处理

一.服务端处理服务变动和服务订阅事件的入口

二.执行推送的任务PushExecuteTask说明

三.客户端收到服务端发送的Service服务实例数据推送的处理

一.服务端处理服务变动和服务订阅事件的入口

处理入口是:NamingSubscriberServiceV2Impl的onEvent()方法。其中,对事件的处理使用了双层内存队列(存储延迟任务 + 同步任务)的异步处理方式。

onEvent()方法主要会往延迟任务执行引擎中添加任务,也就是首先会根据不同的事件类型构建不同的PushDelayTask任务,然后调用延迟任务执行引擎NacosDelayTaskExecuteEngine的addTask()方法,把PushDelayTask延迟任务添加到PushDelayTaskExecuteEngine的任务池。

创建继承自NacosDelayTaskExecuteEngine的PushDelayTaskExecuteEngine延迟任务执行引擎时会创建一个定时任务,定时从任务池中取出任务,然后调用对应的任务处理器的process()方法。

PushDelayTask任务对应的任务处理器是PushDelayTaskProcessor,所以最终会触发执行PushDelayTaskProcessor的process()方法。

在执行PushDelayTaskProcessor的process()方法时,会调用NamingExecuteTaskDispatcher的dispatchAndExecuteTask()方法,提交由PushDelayTask任务封装的PushExecuteTask任务给NacosExecuteTaskExecuteEngine进行处理,此时会调用NacosExecuteTaskExecuteEngine的addTask()方法添加任务。

其中,PushExecuteTask任务会被分发到NacosExecuteTaskExecuteEngine执行引擎中的一个TaskExecuteWorker处理,TaskExecuteWorker的process()方法会把PushExecuteTask任务放入队列。由于TaskExecuteWorker初始化时会启动一个线程不断从队列中获取任务并执行,所以最终便会执行到PushExecuteTask的run()方法。

//Naming subscriber service for v2.x.
@org.springframework.stereotype.Service
public class NamingSubscriberServiceV2Impl extends SmartSubscriber implements NamingSubscriberService {
    private final PushDelayTaskExecuteEngine delayTaskEngine;
    ...
    @Override
    public void onEvent(Event event) {
        if (!upgradeJudgement.isUseGrpcFeatures()) {
            return;
        }
        if (event instanceof ServiceEvent.ServiceChangedEvent) {
            //If service changed, push to all subscribers.
            //如果服务变动,会向Service服务的所有订阅者推送Service服务的实例信息,让订阅者(客户端)更新本地缓存
            ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;
            Service service = serviceChangedEvent.getService();
            //调用NacosDelayTaskExecuteEngine.addTask()方法,往延迟任务执行引擎添加任务
            delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));
        } else if (event instanceof ServiceEvent.ServiceSubscribedEvent) {
            //If service is subscribed by one client, only push this client.
            //如果Service服务被一个客户端订阅,则只推送Service服务的实例信息给该客户端
            ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event;
            Service service = subscribedEvent.getService();
            //调用NacosDelayTaskExecuteEngine.addTask()方法,往延迟任务执行引擎添加任务
            delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(), subscribedEvent.getClientId()));
        }
    }
    ...
}

public class PushDelayTaskExecuteEngine extends NacosDelayTaskExecuteEngine {
    ...
    private static class PushDelayTaskProcessor implements NacosTaskProcessor {    
        private final PushDelayTaskExecuteEngine executeEngine;
        
        public PushDelayTaskProcessor(PushDelayTaskExecuteEngine executeEngine) {
            this.executeEngine = executeEngine;
        }
        
        @Override
        public boolean process(NacosTask task) {
            //任务类型转换
            PushDelayTask pushDelayTask = (PushDelayTask) task;
            //获取要推送的服务;比如某服务发生改变时,需要推送该服务的实例给订阅的客户端;比如某服务被订阅时,需要推送该服务的实例给对应的客户端;
            Service service = pushDelayTask.getService();
            //调用NamingExecuteTaskDispatcher.dispatchAndExecuteTask()方法
            //提交PushExecuteTask线程任务给NacosExecuteTaskExecuteEngine来处理
            NamingExecuteTaskDispatcher.getInstance().dispatchAndExecuteTask(service, new PushExecuteTask(service, executeEngine, pushDelayTask));
            return true;
        }
    }
}

public class NamingExecuteTaskDispatcher {
    private static final NamingExecuteTaskDispatcher INSTANCE = new NamingExecuteTaskDispatcher();
    private final NacosExecuteTaskExecuteEngine executeEngine;
    
    private NamingExecuteTaskDispatcher() {
        executeEngine = new NacosExecuteTaskExecuteEngine(EnvUtil.FUNCTION_MODE_NAMING, Loggers.SRV_LOG);
    }
    
    public static NamingExecuteTaskDispatcher getInstance() {
        return INSTANCE;
    }
    
    public void dispatchAndExecuteTask(Object dispatchTag, AbstractExecuteTask task) {
        executeEngine.addTask(dispatchTag, task);
    }
    
    public String workersStatus() {
        return executeEngine.workersStatus();
    }
}

public class NacosExecuteTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractExecuteTask> {
    private final TaskExecuteWorker[] executeWorkers;
    
    public NacosExecuteTaskExecuteEngine(String name, Logger logger, int dispatchWorkerCount) {
        super(logger);
        executeWorkers = new TaskExecuteWorker[dispatchWorkerCount];
        for (int mod = 0; mod < dispatchWorkerCount; ++mod) {
            executeWorkers[mod] = new TaskExecuteWorker(name, mod, dispatchWorkerCount, getEngineLog());
        }
    }
    ...
    
    @Override
    public void addTask(Object tag, AbstractExecuteTask task) {
        //根据tag获取到TaskExecuteWorker
        NacosTaskProcessor processor = getProcessor(tag);
        if (null != processor) {
            processor.process(task);
            return;
        }
        TaskExecuteWorker worker = getWorker(tag);
        //调用TaskExecuteWorker.process()方法把AbstractExecuteTask任务放入到队列当中去
        worker.process(task);
    }
    ...
}

public final class TaskExecuteWorker implements NacosTaskProcessor, Closeable {
    //任务存储容器
    private final BlockingQueue<Runnable> queue;
    
    public TaskExecuteWorker(final String name, final int mod, final int total, final Logger logger) {
        ...
        this.queue = new ArrayBlockingQueue<Runnable>(QUEUE_CAPACITY);
        new InnerWorker(name).start();
    }
    ...
    @Override
    public boolean process(NacosTask task) {
        if (task instanceof AbstractExecuteTask) {
            //把NacosTask任务放入到阻塞队列中
            putTask((Runnable) task);
        }
        return true;
    }
    
    private void putTask(Runnable task) {
        try {
            //把NacosTask任务放入到阻塞队列中
            queue.put(task);
        } catch (InterruptedException ire) {
            log.error(ire.toString(), ire);
        }
    }
    ...
    private class InnerWorker extends Thread {
        InnerWorker(String name) {
            setDaemon(false);
            setName(name);
        }
        
        @Override
        public void run() {
            while (!closed.get()) {
                try {
                    //一直取阻塞队列中的任务
                    Runnable task = queue.take();
                    long begin = System.currentTimeMillis();
                    //调用NacosTask中的run方法
                    task.run();
                    long duration = System.currentTimeMillis() - begin;
                    if (duration > 1000L) {
                        log.warn("task {} takes {}ms", task, duration);
                    }
                } catch (Throwable e) {
                    log.error("[TASK-FAILED] " + e.toString(), e);
                }
            }
        }
    }
}

二.执行推送的任务PushExecuteTask说明

在PushExecuteTask的run()方法中,首先会从ServiceStorage获取要推送的服务Service最新的实例数据包装,然后调用PushExecuteTask的getTargetClientIds()方法获取要推送的clientId,接着根据clientId获取订阅了Service服务的的客户端订阅者对象,最后调用PushExecutorDelegate的doPushWithCallback()方法,也就是调用PushExecutorRpcImpl的doPushWithCallback()方法回调客户端,即调用RpcPushService的pushWithCallback()方法回调客户端,即调用GrpcConnection的asyncRequest()方法向客户端发送RPC请求。

执行PushExecuteTask的getTargetClientIds()方法获取要推送的clientId时,会根据PushDelayTask的pushToAll属性来获取对应的clientId。因为在NamingSubscriberServiceV2Impl的onEvent()方法中,如果处理的是服务改变事件,则构造的PushDelayTask是面向所有客户端。如果处理的是服务订阅事件,则构造的PushDelayTask是面向一个客户端。

所以如果PushDelayTask要面向所有客户端推送Service服务实例数据,那么就调用ClientServiceIndexesManager的getAllClientsSubscribeService()方法,从订阅者列表中获取订阅了Service服务的所有clientId。如果PushDelayTask要面向单个客户端推送Service服务实例数据,则通过PushDelayTask的getTargetClients()方法获取对应的clientId即可。

总结:服务变动需要通知全部订阅了该Service服务的客户端对象,服务订阅只需要通知当前订阅者客户端对象即可。

//Nacos naming push execute task.
public class PushExecuteTask extends AbstractExecuteTask {
    //要推送的Service服务
    //比如某服务发生改变时,需要推送该服务的实例给订阅的客户端;比如某服务被订阅时,需要推送该服务的实例给对应的客户端;
    private final Service service;
    private final PushDelayTaskExecuteEngine delayTaskEngine;
    private final PushDelayTask delayTask;
    
    public PushExecuteTask(Service service, PushDelayTaskExecuteEngine delayTaskEngine, PushDelayTask delayTask) {
        this.service = service;
        this.delayTaskEngine = delayTaskEngine;
        this.delayTask = delayTask;
    }
    
    @Override
    public void run() {
        try {
            //从ServiceStorage获取要推送的服务Service最新的实例数据包装
            PushDataWrapper wrapper = generatePushData();
            ClientManager clientManager = delayTaskEngine.getClientManager();
            //遍历订阅了Service服务的、要推送Service服务实例数据的所有clientId
            for (String each : getTargetClientIds()) {
                //根据clientId获取客户端Client对象
                Client client = clientManager.getClient(each);
                if (null == client) {
                    continue;
                }
                //调用AbstractClient.getSubscriber()方法
                //因为AbstractClient对象中存放着它订阅的服务与订阅者对象映射
                //所以可以根据要推送的Service服务,获取对应的客户端订阅者对象
                Subscriber subscriber = clientManager.getClient(each).getSubscriber(service);
                //传入订阅者subscriber,调用PushExecutorDelegate.doPushWithCallback()方法回调客户端
                delayTaskEngine.getPushExecutor().doPushWithCallback(each, subscriber, wrapper,
                    new NamingPushCallback(each, subscriber, wrapper.getOriginalData(), delayTask.isPushToAll()));
            }
        } catch (Exception e) {
            Loggers.PUSH.error("Push task for service" + service.getGroupedServiceName() + " execute failed ", e);
            delayTaskEngine.addTask(service, new PushDelayTask(service, 1000L));
        }
    }
    
    private PushDataWrapper generatePushData() {
        //调用ServiceStorage.getPushData()方法根据要推送的Service对象,获取包含所有实例信息的ServiceInfo对象
        ServiceInfo serviceInfo = delayTaskEngine.getServiceStorage().getPushData(service);
        ServiceMetadata serviceMetadata = delayTaskEngine.getMetadataManager().getServiceMetadata(service).orElse(null);
        return new PushDataWrapper(serviceMetadata, serviceInfo);
    }
    
    private Collection<String> getTargetClientIds() {
        //通过PushDelayTask的pushToAll属性控制是否对全部订阅了Service服务的客户端Client,进行推送
        //处理服务改变事件时,delayTask.isPushToAll()就是true
        //处理服务订阅事件时,delayTask.getTargetClients()就是指定的客户端Client
        //其中getAllClientsSubscribeService()会从订阅者列表中获取订阅了Service服务的所有clientId
        return delayTask.isPushToAll() ? delayTaskEngine.getIndexesManager().getAllClientsSubscribeService(service)
            : delayTask.getTargetClients();
    }
    ...
}

public class PushDelayTaskExecuteEngine extends NacosDelayTaskExecuteEngine {
    private final ClientManager clientManager;
    private final ClientServiceIndexesManager indexesManager;
    private final ServiceStorage serviceStorage;
    private final NamingMetadataManager metadataManager;
    private final PushExecutor pushExecutor;
    ...
}

public class PushDelayTask extends AbstractDelayTask {    
    private final Service service;
    private boolean pushToAll;
    private Set<String> targetClients;

    //处理服务变动事件,创建PushDelayTask任务时所使用的构造方法
    public PushDelayTask(Service service, long delay) {
        this.service = service;
        pushToAll = true;
        targetClients = null;
        setTaskInterval(delay);
        setLastProcessTime(System.currentTimeMillis());
    }
    
    //处理服务订阅事件,创建PushDelayTask任务时所使用的构造方法
    public PushDelayTask(Service service, long delay, String targetClient) {
        this.service = service;
        this.pushToAll = false;
        this.targetClients = new HashSet<>(1);
        //把clientId添加到targetClients中,这个clientId就是发起服务订阅的客户端与服务端建立长连接后的客户端连接ID
        this.targetClients.add(targetClient);
        setTaskInterval(delay);
        setLastProcessTime(System.currentTimeMillis());
    }
    ...
}

@Component
public class ServiceStorage {
    //缓存要查询的Service服务对象对应的已注册的服务详情
    private final ConcurrentMap<Service, ServiceInfo> serviceDataIndexes;
    ...
    public ServiceInfo getPushData(Service service) {
        //调用ServiceStorage.emptyServiceInfo()方法创建空的ServiceInfo对象
        ServiceInfo result = emptyServiceInfo(service);
        if (!ServiceManager.getInstance().containSingleton(service)) {
            return result;
        }
        //调用ServiceStorage.getAllInstancesFromIndex()方法获服务取实例列表
        //ServiceInfo的hosts属性就包含了该服务的所有Instance实例数据
        result.setHosts(getAllInstancesFromIndex(service));
        //将获取到的ServiceInfo对象放入到缓存中
        serviceDataIndexes.put(service, result);
        return result;
    }
    ...
}

//Client and service index manager.
@Component
public class ClientServiceIndexesManager extends SmartSubscriber {
    //注册表(服务提供者),一个Service服务对象,对应多个服务实例的clientId
    private final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();
    //订阅者列表(服务消费者),一个Service服务对象,对应多个订阅者的clientId
    private final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();
    ...
    public Collection<String> getAllClientsSubscribeService(Service service) {
        //从订阅者列表中获取订阅了Service服务的所有clientId
        return subscriberIndexes.containsKey(service) ? subscriberIndexes.get(service) : new ConcurrentHashSet<>();
    }
    ...
}

public abstract class AbstractClient implements Client {
    //subscribers存放着:订阅者Subscriber(其实可理解为当前客户端)订阅了的Service服务
    //subscribers的key=stock-service(要订阅的某个服务)、value=order-service(订阅者,某个具体的包含IP的服务实例)
    protected final ConcurrentHashMap<Service, Subscriber> subscribers = new ConcurrentHashMap<>(16, 0.75f, 1);
    ...
    @Override
    public Subscriber getSubscriber(Service service) {
        return subscribers.get(service);
    }
    
    @Override
    public boolean addServiceSubscriber(Service service, Subscriber subscriber) {
        //服务订阅时,添加订阅者
        //subscribers的key=stock-service(要订阅的某个服务)、value=order-service(订阅者,某个具体的包含IP的服务实例)
        if (null == subscribers.put(service, subscriber)) {
            MetricsMonitor.incrementSubscribeCount();
        }
        return true;
    }
    ...
}

@Component
public class PushExecutorDelegate implements PushExecutor {
    private final PushExecutorRpcImpl rpcPushExecuteService;
    private final PushExecutorUdpImpl udpPushExecuteService;
    ...
    @Override
    public void doPushWithCallback(String clientId, Subscriber subscriber, PushDataWrapper data, PushCallBack callBack) {
        getPushExecuteService(clientId, subscriber).doPushWithCallback(clientId, subscriber, data, callBack);
    }
    
    private PushExecutor getPushExecuteService(String clientId, Subscriber subscriber) {
        Optional<SpiPushExecutor> result = SpiImplPushExecutorHolder.getInstance().findPushExecutorSpiImpl(clientId, subscriber);
        if (result.isPresent()) {
            return result.get();
        }
        return clientId.contains(IpPortBasedClient.ID_DELIMITER) ? udpPushExecuteService : rpcPushExecuteService;
    }
    ...
}

@Component
public class PushExecutorRpcImpl implements PushExecutor {
    private final RpcPushService pushService;
    ...
    @Override
    public void doPushWithCallback(String clientId, Subscriber subscriber, PushDataWrapper data, PushCallBack callBack) {
        pushService.pushWithCallback(clientId, NotifySubscriberRequest.buildNotifySubscriberRequest(getServiceInfo(data, subscriber)), callBack, GlobalExecutor.getCallbackExecutor());
    }
    ...
}

//push response  to clients.
@Service
public class RpcPushService {
    @Autowired
    private ConnectionManager connectionManager;
    ...
    //push response with no ack.
    public void pushWithCallback(String connectionId, ServerRequest request, PushCallBack requestCallBack, Executor executor) {
        Connection connection = connectionManager.getConnection(connectionId);
        if (connection != null) {
            try {
                //调用GrpcConnection.asyncRequest()方法向客户端发送推送请求
                connection.asyncRequest(request, new AbstractRequestCallBack(requestCallBack.getTimeout()) {
                    @Override
                    public Executor getExecutor() {
                        return executor;
                    }
                    
                    @Override
                    public void onResponse(Response response) {
                        if (response.isSuccess()) {
                            requestCallBack.onSuccess();
                        } else {
                            requestCallBack.onFail(new NacosException(response.getErrorCode(), response.getMessage()));
                        }
                    } 
                                   
                    @Override
                    public void onException(Throwable e) {
                        requestCallBack.onFail(e);
                    }
                });
            } catch (ConnectionAlreadyClosedException e) {
                connectionManager.unregister(connectionId);
                requestCallBack.onSuccess();
            } catch (Exception e) {
                Loggers.REMOTE_DIGEST.error("error to send push response to connectionId ={},push response={}", connectionId, request, e);
                requestCallBack.onFail(e);
            }
        } else {
            requestCallBack.onSuccess();
        }
    }
    ...
}

三.客户端收到服务端发送的Service服务实例数据推送的处理

NamingPushRequestHandler的requestReply()方法会处理服务端的推送,即调用ServiceInfoHolder的processServiceInfo()方法更新本地缓存。

public class NamingPushRequestHandler implements ServerRequestHandler {
    private final ServiceInfoHolder serviceInfoHolder;
    
    public NamingPushRequestHandler(ServiceInfoHolder serviceInfoHolder) {
        this.serviceInfoHolder = serviceInfoHolder;
    }
    
    @Override
    public Response requestReply(Request request) {
        if (request instanceof NotifySubscriberRequest) {
            //进行类型转换
            NotifySubscriberRequest notifyResponse = (NotifySubscriberRequest) request;
            //更新客户端本地缓存数据
            serviceInfoHolder.processServiceInfo(notifyResponse.getServiceInfo());
            return new NotifySubscriberResponse();
        }
        return null;
    }
}

public class ServiceInfoHolder implements Closeable {
    private final ConcurrentMap<String, ServiceInfo> serviceInfoMap;
    ...
    public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
        String serviceKey = serviceInfo.getKey();
        if (serviceKey == null) {
            return null;
        }
        //获取本地缓存中的服务实例
        ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
        if (isEmptyOrErrorPush(serviceInfo)) {
            return oldService;
        }
        //更新本地缓存中的服务实例
        serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
        //判断服务实例是否有改变
        boolean changed = isChangedServiceInfo(oldService, serviceInfo);
        if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
            serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
        }
        MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
        if (changed) {
            NAMING_LOGGER.info("current ips:({}) service: {} -> {}", serviceInfo.ipCount(), serviceInfo.getKey(), JacksonUtils.toJson(serviceInfo.getHosts()));
            //发布服务实例改变事件
            NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts()));
            //将服务实例信息写入本地磁盘
            DiskCache.write(serviceInfo, cacheDir);
        }
        return serviceInfo;
    }
    ...
}

(4)总结

6.微服务实例信息如何同步集群节点

(1)服务端处理服务注册时会发布一个ClientChangedEvent事件

(2)ClientChangedEvent事件的处理源码

(3)集群节点处理数据同步请求的源码

(1)服务端处理服务注册时会发布一个ClientChangedEvent事件

ClientChangedEvent事件的作用就是向集群节点同步服务实例数据的。

//Instance request handler.
@Component
public class InstanceRequestHandler extends RequestHandler<InstanceRequest, InstanceResponse> {
    private final EphemeralClientOperationServiceImpl clientOperationService;
    
    public InstanceRequestHandler(EphemeralClientOperationServiceImpl clientOperationService) {
        this.clientOperationService = clientOperationService;
    }
    
    @Override
    @Secured(action = ActionTypes.WRITE)
    public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
        //根据请求信息创建一个Service对象,里面包含了:命名空间、分组名、服务名
        Service service = Service.newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
        switch (request.getType()) {
            case NamingRemoteConstants.REGISTER_INSTANCE:
                //注册实例
                return registerInstance(service, request, meta);
            case NamingRemoteConstants.DE_REGISTER_INSTANCE:
                //注销实例
                return deregisterInstance(service, request, meta);
            default:
                throw new NacosException(NacosException.INVALID_PARAM, String.format("Unsupported request type %s", request.getType()));
        }
    }
    
    private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta) {
        //调用EphemeralClientOperationServiceImpl的注册方法registerInstance(),这里需要注意如下参数;
        //参数service:根据请求信息创建的一个Service对象,里面有命名空间、分组名、服务名
        //参数request.getInstance():这个参数就对应了客户端的实例对象,里面包含IP、端口等信息
        //参数meta.getConnectionId():这个参数很关键,它是连接ID
        clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());
        return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
    }
    ...
}

@Component("ephemeralClientOperationService")
public class EphemeralClientOperationServiceImpl implements ClientOperationService {
    private final ClientManager clientManager;
    
    public EphemeralClientOperationServiceImpl(ClientManagerDelegate clientManager) {
        this.clientManager = clientManager;
    }
    
    @Override
    public void registerInstance(Service service, Instance instance, String clientId) {
        //从ServiceManager中根据由请求信息创建的Service对象获取一个已注册的Service对象
        Service singleton = ServiceManager.getInstance().getSingleton(service);
        if (!singleton.isEphemeral()) {
            throw new NacosRuntimeException(NacosException.INVALID_PARAM, String.format("Current service %s is persistent service, can't register ephemeral instance.", singleton.getGroupedServiceName()));
        }
        //从ClientManagerDelegate中根据请求参数中的connectionId获取一个Client对象,即IpPortBasedClient对象
        Client client = clientManager.getClient(clientId);
        if (!clientIsLegal(client, clientId)) {
            return;
        }
        //将请求中的instance实例信息封装为InstancePublishInfo对象
        InstancePublishInfo instanceInfo = getPublishInfo(instance);
        //往Client对象里添加已注册的服务对象Service,调用的是IpPortBasedClient对象的父类AbstractClient的addServiceInstance()方法
        client.addServiceInstance(singleton, instanceInfo);
        //设置IpPortBasedClient对象的lastUpdatedTime属性为最新时间
        client.setLastUpdatedTime();
        //发布客户端注册服务实例的事件
        NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
        //发布服务实例元数据的事件
        NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
    }
    ...
}

//Nacos naming client based ip and port.
//The client is bind to the ip and port users registered. It's a abstract content to simulate the tcp session client.
public class IpPortBasedClient extends AbstractClient {
    ...
    @Override
    public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
        return super.addServiceInstance(service, parseToHealthCheckInstance(instancePublishInfo));
    }
    ...
}

//Abstract implementation of {@code Client}.
public abstract class AbstractClient implements Client {
    //publishers其实就是记录该客户端提供的服务和服务实例,一个客户端可提供多个服务
    //存储客户端发送过来的请求中的Instance信息,当然这些信息已封装为InstancePublishInfo对象
    //key为已注册的Service,value是根据请求中的instance实例信息封装的InstancePublishInfo对象
    protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);
    //subscribers存放着:订阅者Subscriber(其实可理解为当前客户端)订阅了的Service服务
    //subscribers的key=stock-service(要订阅的某个服务)、value=order-service(订阅者,某个具体的包含IP的服务实例)
    protected final ConcurrentHashMap<Service, Subscriber> subscribers = new ConcurrentHashMap<>(16, 0.75f, 1);
    ...
    @Override
    public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
        //服务注册时,如果是第一次put进去Service对象,会返回null
        if (null == publishers.put(service, instancePublishInfo)) {
            //监视器记录
            MetricsMonitor.incrementInstanceCount();
        }
        //发布客户端改变事件,用于处理集群间的数据同步
        NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
        Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
        return true;
    }
    ...
}

(2)ClientChangedEvent事件的处理源码

DistroClientDataProcessor的onEvent()方法会响应ClientChangedEvent。该方法如果判断出事件类型为ClientChangedEvent事件,那么就会执行DistroClientDataProcessor的syncToAllServer()方法,然后调用DistroProtocol的sync()方法进行集群节点同步处理。

DistroProtocol的sync()方法会遍历集群中除自身节点外的其他节点,然后对遍历到的每个节点执行DistroProtocol的syncToTarget()方法。

在DistroProtocol的syncToTarget()方法中,首先把要同步的集群节点targetServer包装成DistroKey对象,然后根据DistroKey对象创建DistroDelayTask延迟任务,接着调用NacosDelayTaskExecuteEngine的addTask()方法,往延迟任务执行引擎的tasks中添加任务。

NacosDelayTaskExecuteEngine在初始化时会启动一个定时任务,这个定时任务会定时执行ProcessRunnable的run()方法。而ProcessRunnable的run()方法会不断从任务池tasks中取出延迟任务处理,处理DistroDelayTask任务时会调用DistroDelayTaskProcessor的process()方法。

在执行DistroDelayTaskProcessor的process()方法时,会先根据DistroDelayTask任务封装一个DistroSyncChangeTask任务,然后调用NacosExecuteTaskExecuteEngine的addTask()方法。也就是调用TaskExecuteWorker的process()方法,将DistroSyncChangeTask任务添加到TaskExecuteWorker的阻塞队列中,同时创建TaskExecuteWorker时会启动线程不断从队列中取出任务处理。因此最终会执行DistroSyncChangeTask的run()方法。

public class DistroClientDataProcessor extends SmartSubscriber implements DistroDataStorage, DistroDataProcessor {
    private final ClientManager clientManager;
    private final DistroProtocol distroProtocol;
    ...
    @Override
    public void onEvent(Event event) {
        ...
        if (event instanceof ClientEvent.ClientVerifyFailedEvent) {
            syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);
        } else {
            syncToAllServer((ClientEvent) event);
        }
    }
    
    private void syncToAllServer(ClientEvent event) {
        Client client = event.getClient();
        //Only ephemeral data sync by Distro, persist client should sync by raft.
        //临时实例使用Distro协议,持久化实例使用Raft协议
        //ClientManager.isResponsibleClient()方法,判断只有该client的责任节点才能进行集群数据同步
        if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
            return;
        }
        if (event instanceof ClientEvent.ClientDisconnectEvent) {
            //如果event是客户端注销实例时需要进行集群节点同步的事件
            DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
            distroProtocol.sync(distroKey, DataOperation.DELETE);
        } else if (event instanceof ClientEvent.ClientChangedEvent) {
            //如果event是客户端注册实例时需要进行集群节点同步的事件
            DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
            distroProtocol.sync(distroKey, DataOperation.CHANGE);
        }
    }
    ...
}

@Component
public class DistroProtocol {
    private final ServerMemberManager memberManager;
    private final DistroTaskEngineHolder distroTaskEngineHolder;
    ...
    //Start to sync by configured delay.
    public void sync(DistroKey distroKey, DataOperation action) {
        sync(distroKey, action, DistroConfig.getInstance().getSyncDelayMillis());
    }
    
    //Start to sync data to all remote server.
    public void sync(DistroKey distroKey, DataOperation action, long delay) {
        //遍历集群中除自身节点外的其他节点
        for (Member each : memberManager.allMembersWithoutSelf()) {
            syncToTarget(distroKey, action, each.getAddress(), delay);
        }
    }
    
    //Start to sync to target server.
    public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) {
        //先把要同步的集群节点targetServer包装成DistroKey对象
        DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(), targetServer);
        //然后根据DistroKey对象创建DistroDelayTask任务
        DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
        //接着调用NacosDelayTaskExecuteEngine.addTask()方法
        //往延迟任务执行引擎DistroDelayTaskExecuteEngine中添加延迟任务DistroDelayTask
        distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
        if (Loggers.DISTRO.isDebugEnabled()) {
            Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer);
        }
    }
    ...
}

//延迟任务执行引擎
public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractDelayTask> {
    private final ScheduledExecutorService processingExecutor;
    protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;//任务池
   
    public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
        super(logger);
        tasks = new ConcurrentHashMap<>(initCapacity);
        processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
        //开启定时任务,即启动ProcessRunnable线程任务
        processingExecutor.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
    }
    ...
    @Override
    public void addTask(Object key, AbstractDelayTask newTask) {
        lock.lock();
        try {
            AbstractDelayTask existTask = tasks.get(key);
            if (null != existTask) {
                newTask.merge(existTask);
            }
            //最后放入到任务池中
            tasks.put(key, newTask);
        } finally {
            lock.unlock();
        }
    }
    
    protected void processTasks() {
        //获取tasks中所有的任务,然后进行遍历
        Collection<Object> keys = getAllTaskKeys();
        for (Object taskKey : keys) {
            //通过任务key,获取具体的任务,并且从任务池中移除掉
            AbstractDelayTask task = removeTask(taskKey);
            if (null == task) {
                continue;
            }
            //通过任务key获取对应的NacosTaskProcessor延迟任务处理器
            NacosTaskProcessor processor = getProcessor(taskKey);
            if (null == processor) {
                getEngineLog().error("processor not found for task, so discarded. " + task);
                continue;
            }
            try {
                //ReAdd task if process failed
                //调用获取到的NacosTaskProcessor延迟任务处理器的process()方法
                if (!processor.process(task)) {
                    //如果失败了,会重试添加task回tasks这个map中
                    retryFailedTask(taskKey, task);
                }
            } catch (Throwable e) {
                getEngineLog().error("Nacos task execute error ", e);
                retryFailedTask(taskKey, task);
            }
        }
    }
    ...
    private class ProcessRunnable implements Runnable {
        @Override
        public void run() {
            try {
                processTasks();
            } catch (Throwable e) {
                getEngineLog().error(e.toString(), e);
            }
        }
    }
}

//Distro delay task processor.
public class DistroDelayTaskProcessor implements NacosTaskProcessor {
    ...
    @Override
    public boolean process(NacosTask task) {
        if (!(task instanceof DistroDelayTask)) {
            return true;
        }
        DistroDelayTask distroDelayTask = (DistroDelayTask) task;
        DistroKey distroKey = distroDelayTask.getDistroKey();
        switch (distroDelayTask.getAction()) {
            case DELETE:
                //处理客户端注销实例时的延迟任务(同步数据到集群节点)
                //根据DistroDelayTask任务封装一个DistroSyncTask任务
                DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder);
                //调用NacosExecuteTaskExecuteEngine.addTask()方法
                distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncDeleteTask);
                return true;
            case CHANGE:
            case ADD:
                //处理客户端注册实例时的延迟任务(同步数据到集群节点)
                //根据DistroDelayTask任务封装一个DistroSyncChangeTask任务
                DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
                //调用NacosExecuteTaskExecuteEngine.addTask()方法
                distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
                return true;
            default:
                return false;
        }
    }
}

//任务执行引擎
public class NacosExecuteTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractExecuteTask> {
    private final TaskExecuteWorker[] executeWorkers;
    
    public NacosExecuteTaskExecuteEngine(String name, Logger logger, int dispatchWorkerCount) {
        super(logger);
        executeWorkers = new TaskExecuteWorker[dispatchWorkerCount];
        for (int mod = 0; mod < dispatchWorkerCount; ++mod) {
            executeWorkers[mod] = new TaskExecuteWorker(name, mod, dispatchWorkerCount, getEngineLog());
        }
    }
    ...
    @Override
    public void addTask(Object tag, AbstractExecuteTask task) {
        //根据tag获取到TaskExecuteWorker
        NacosTaskProcessor processor = getProcessor(tag);
        if (null != processor) {
            processor.process(task);
            return;
        }
        TaskExecuteWorker worker = getWorker(tag);
        //调用TaskExecuteWorker.process()方法把AbstractExecuteTask任务放入到队列当中去
        worker.process(task);
    }
    
    private TaskExecuteWorker getWorker(Object tag) {
        int idx = (tag.hashCode() & Integer.MAX_VALUE) % workersCount();
        return executeWorkers[idx];
    }    
    ...
}

public final class TaskExecuteWorker implements NacosTaskProcessor, Closeable {
    private final BlockingQueue<Runnable> queue;//任务存储容器
    
    public TaskExecuteWorker(final String name, final int mod, final int total, final Logger logger) {
        ...
        this.queue = new ArrayBlockingQueue<Runnable>(QUEUE_CAPACITY);
        new InnerWorker(name).start();
    }
    
    @Override
    public boolean process(NacosTask task) {
        if (task instanceof AbstractExecuteTask) {
            //把NacosTask任务放入到阻塞队列中
            putTask((Runnable) task);
        }
        return true;
    }
    
    private void putTask(Runnable task) {
        try {
            //把NacosTask任务放入到阻塞队列中
            queue.put(task);
        } catch (InterruptedException ire) {
            log.error(ire.toString(), ire);
        }
    }
    ...
    private class InnerWorker extends Thread {
        InnerWorker(String name) {
            setDaemon(false);
            setName(name);
        }
        
        @Override
        public void run() {
            while (!closed.get()) {
                try {
                    //一直取阻塞队列中的任务
                    Runnable task = queue.take();
                    long begin = System.currentTimeMillis();
                    //调用NacosTask中的run方法
                    task.run();
                    long duration = System.currentTimeMillis() - begin;
                    if (duration > 1000L) {
                        log.warn("task {} takes {}ms", task, duration);
                    }
                } catch (Throwable e) {
                    log.error("[TASK-FAILED] " + e.toString(), e);
                }
            }
        }
    }
}

执行DistroSyncChangeTask的run()方法,其实就是执行AbstractDistroExecuteTask的run()方法。AbstractDistroExecuteTask的run()方法会先获取请求数据,然后调用DistroClientTransportAgent的syncData()方法同步集群节点,也就是调用ClusterRpcClientProxy的sendRequest()方法发送数据同步请求,最终会调用RpcClient的request()方法 -> GrpcConnection的request()方法。

public class DistroSyncChangeTask extends AbstractDistroExecuteTask {
    ...
    @Override
    protected void doExecuteWithCallback(DistroCallback callback) {
        String type = getDistroKey().getResourceType();
        //获取请求数据
        DistroData distroData = getDistroData(type);
        if (null == distroData) {
            Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());
            return;
        }
        //默认调用DistroClientTransportAgent.syncData()方法同步集群节点
        getDistroComponentHolder().findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer(), callback);
    }
    
    private DistroData getDistroData(String type) {
        DistroData result = getDistroComponentHolder().findDataStorage(type).getDistroData(getDistroKey());
        if (null != result) {
            result.setType(OPERATION);
        }
        return result;
    }
    ...
}

public abstract class AbstractDistroExecuteTask extends AbstractExecuteTask {
    ...
    @Override
    public void run() {
        //Nacos:Naming:v2:ClientData
        String type = getDistroKey().getResourceType();
        //获取DistroClientTransportAgent对象
        DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(type);
        if (null == transportAgent) {
            Loggers.DISTRO.warn("No found transport agent for type [{}]", type);
            return;
        }
        Loggers.DISTRO.info("[DISTRO-START] {}", toString());
        //默认返回true
        if (transportAgent.supportCallbackTransport()) {
            //默认执行子类的doExecuteWithCallback()方法
            doExecuteWithCallback(new DistroExecuteCallback());
        } else {
            executeDistroTask();
        }
    }
    
    protected abstract void doExecuteWithCallback(DistroCallback callback);
    ...
}

public class DistroClientTransportAgent implements DistroTransportAgent {
    private final ClusterRpcClientProxy clusterRpcClientProxy;
    private final ServerMemberManager memberManager;
    ...
    @Override
    public boolean syncData(DistroData data, String targetServer) {
        if (isNoExistTarget(targetServer)) {
            return true;
        }
        //创建请求对象
        DistroDataRequest request = new DistroDataRequest(data, data.getType());
        //找到集群节点
        Member member = memberManager.find(targetServer);
        if (checkTargetServerStatusUnhealthy(member)) {
            Loggers.DISTRO.warn("[DISTRO] Cancel distro sync caused by target server {} unhealthy", targetServer);
            return false;
        }
        try {
            //向集群节点发送RPC异步请求
            Response response = clusterRpcClientProxy.sendRequest(member, request);
            return checkResponse(response);
        } catch (NacosException e) {
            Loggers.DISTRO.error("[DISTRO-FAILED] Sync distro data failed! ", e);
        }
        return false;
    }
    ...
}

@Service
public class ClusterRpcClientProxy extends MemberChangeListener {
    ...
    //send request to member.
    public Response sendRequest(Member member, Request request, long timeoutMills) throws NacosException {
        RpcClient client = RpcClientFactory.getClient(memberClientKey(member));
        if (client != null) {
            //调用RpcClient.request()方法
            return client.request(request, timeoutMills);
        } else {
            throw new NacosException(CLIENT_INVALID_PARAM, "No rpc client related to member: " + member);
        }
    }
    ...
}

public abstract class RpcClient implements Closeable {
    //在NamingGrpcClientProxy初始化 -> 调用RpcClient.start()方法时,
    //会将GrpcClient.connectToServer()方法的返回值赋值给currentConnection属性
    protected volatile Connection currentConnection;
    ...
    //send request.
    public Response request(Request request, long timeoutMills) throws NacosException {
        int retryTimes = 0;
        Response response;
        Exception exceptionThrow = null;
        long start = System.currentTimeMillis();
        while (retryTimes < RETRY_TIMES && System.currentTimeMillis() < timeoutMills + start) {
            ...
            //发起gRPC请求,调用GrpcConnection.request()方法
            response = this.currentConnection.request(request, timeoutMills);
            ...
        }
        ...
    }
    ...
}

(3)集群节点处理数据同步请求的源码

通过DistroClientTransportAgent的syncData()方法发送的数据同步请求,会被DistroDataRequestHandler的handle()方法处理。然后会调用DistroDataRequestHandler的handleSyncData()方法,接着调用DistroProtocol的onReceive()方法,于是最终会调用到DistroClientDataProcessor.processData()方法。

在执行DistroClientDataProcessor的processData()方法时,如果是同步服务实例新增、修改后的数据,则执行DistroClientDataProcessor的handlerClientSyncData()方法。该方法会和处理服务注册时一样,发布一个客户端注册服务实例的事件。如果是同步服务实例删除后的数据,则调用EphemeralIpPortClientManager的clientDisconnected()方法。首先移除客户端对象信息,然后发布一个客户端注销服务实例的事件。

其中客户端注销服务实例的事件ClientDisconnectEvent,首先会被ClientServiceIndexesManager的onEvent()方法进行处理,处理时会调用ClientServiceIndexesManager的handleClientDisconnect()方法,移除ClientServiceIndexesManager订阅者列表的元素和注册表的元素。然后会被DistroClientDataProcessor的onEvent()方法进行处理,进行集群节点之间的数据同步。

@Component
public class DistroDataRequestHandler extends RequestHandler<DistroDataRequest, DistroDataResponse> {
    private final DistroProtocol distroProtocol;
    ...
    @Override
    public DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException {
        try {
            switch (request.getDataOperation()) {
                case VERIFY:
                    return handleVerify(request.getDistroData(), meta);
                case SNAPSHOT:
                    return handleSnapshot();
                case ADD:
                case CHANGE:
                case DELETE:
                    //服务实例新增、修改、删除的同步,都会由DistroDataRequestHandler.handleSyncData()方法处理
                    return handleSyncData(request.getDistroData());
                case QUERY:
                    return handleQueryData(request.getDistroData());
                default:
                    return new DistroDataResponse();
            }
        } catch (Exception e) {
            Loggers.DISTRO.error("[DISTRO-FAILED] distro handle with exception", e);
            DistroDataResponse result = new DistroDataResponse();
            result.setErrorCode(ResponseCode.FAIL.getCode());
            result.setMessage("handle distro request with exception");
            return result;
        }
    }
    
    private DistroDataResponse handleSyncData(DistroData distroData) {
        DistroDataResponse result = new DistroDataResponse();
        //调用DistroProtocol.onReceive()方法
        if (!distroProtocol.onReceive(distroData)) {
            result.setErrorCode(ResponseCode.FAIL.getCode());
            result.setMessage("[DISTRO-FAILED] distro data handle failed");
        }
        return result;
    }
    ...
}

@Component
public class DistroProtocol {
    ...
    //Receive synced distro data, find processor to process.
    public boolean onReceive(DistroData distroData) {
        Loggers.DISTRO.info("[DISTRO] Receive distro data type: {}, key: {}", distroData.getType(), distroData.getDistroKey());
        //Nacos:Naming:v2:ClientData
        String resourceType = distroData.getDistroKey().getResourceType();
        //获取DistroClientDataProcessor处理对象
        DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
        if (null == dataProcessor) {
            Loggers.DISTRO.warn("[DISTRO] Can't find data process for received data {}", resourceType);
            return false;
        }
        //调用DistroClientDataProcessor.processData()方法
        return dataProcessor.processData(distroData);
    }
    ...
}

public class DistroClientDataProcessor extends SmartSubscriber implements DistroDataStorage, DistroDataProcessor {
    ...
    @Override
    public boolean processData(DistroData distroData) {
        switch (distroData.getType()) {
            case ADD:
            case CHANGE:
                //服务实例添加和改变时的执行逻辑
                ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class).deserialize(distroData.getContent(), ClientSyncData.class);
                handlerClientSyncData(clientSyncData);
                return true;
            case DELETE:
                //服务实例删除时的执行逻辑
                String deleteClientId = distroData.getDistroKey().getResourceKey();
                Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId);
                //调用EphemeralIpPortClientManager.clientDisconnected()方法
                clientManager.clientDisconnected(deleteClientId);
                return true;
            default:
                return false;
        }
    }
    
    private void handlerClientSyncData(ClientSyncData clientSyncData) {
        Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}", clientSyncData.getClientId());
        clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());
        Client client = clientManager.getClient(clientSyncData.getClientId());
        upgradeClient(client, clientSyncData);
    }
    
    private void upgradeClient(Client client, ClientSyncData clientSyncData) {
        List<String> namespaces = clientSyncData.getNamespaces();
        List<String> groupNames = clientSyncData.getGroupNames();
        List<String> serviceNames = clientSyncData.getServiceNames();
        List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();
        Set<Service> syncedService = new HashSet<>();
        
        for (int i = 0; i < namespaces.size(); i++) {
            Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));
            Service singleton = ServiceManager.getInstance().getSingleton(service);
            syncedService.add(singleton);
            InstancePublishInfo instancePublishInfo = instances.get(i);
            //如果和当前不一样才发布事件
            if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {
                client.addServiceInstance(singleton, instancePublishInfo);
                //发布客户端注册服务实例的事件,与客户端进行服务注册时一样
                NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));
            }
        }
        
        for (Service each : client.getAllPublishedService()) {
            if (!syncedService.contains(each)) {
                client.removeServiceInstance(each);
                NotifyCenter.publishEvent(new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));
            }
        }
    }
    ...
}

@Component("ephemeralIpPortClientManager")
public class EphemeralIpPortClientManager implements ClientManager {
    //key是请求参数中的connectionId即clientId,value是一个继承了实现Client接口的AbstractClient的IpPortBasedClient对象
    private final ConcurrentMap<String, IpPortBasedClient> clients = new ConcurrentHashMap<>();
    ...
    @Override
    public boolean clientDisconnected(String clientId) {
        Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);
        //移除客户端信息
        IpPortBasedClient client = clients.remove(clientId);
        if (null == client) {
            return true;
        }
        //发布客户端注销服务实例的事件
        NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));
        client.release();
        return true;
    }
    ...
}

@Component
public class ClientServiceIndexesManager extends SmartSubscriber {
    //注册表(服务提供者),一个Service服务对象,对应多个服务实例的clientId
    private final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();
    //订阅者列表(服务消费者),一个Service服务对象,对应多个订阅者的clientId
    private final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();
    ...
    @Override
    public void onEvent(Event event) {
        if (event instanceof ClientEvent.ClientDisconnectEvent) {
            handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);
        } else if (event instanceof ClientOperationEvent) {
            handleClientOperation((ClientOperationEvent) event);
        }
    }
    
    private void handleClientDisconnect(ClientEvent.ClientDisconnectEvent event) {
        Client client = event.getClient();
        for (Service each : client.getAllSubscribeService()) {
            //移除订阅者列表的元素
            removeSubscriberIndexes(each, client.getClientId());
        }
        for (Service each : client.getAllPublishedService()) {
            //移除注册表的元素
            removePublisherIndexes(each, client.getClientId());
        }
    }
    ...
}

public class DistroClientDataProcessor extends SmartSubscriber implements DistroDataStorage, DistroDataProcessor {
    ...
    @Override
    public void onEvent(Event event) {
        ...
        if (event instanceof ClientEvent.ClientVerifyFailedEvent) {
            syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);
        } else {
            syncToAllServer((ClientEvent) event);
        }
    }
    
    private void syncToAllServer(ClientEvent event) {
        Client client = event.getClient();
        //Only ephemeral data sync by Distro, persist client should sync by raft.
        //临时实例使用Distro协议,持久化实例使用Raft协议
        //ClientManager.isResponsibleClient()方法,判断只有该client的责任节点才能进行集群数据同步
        if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
            return;
        }
        if (event instanceof ClientEvent.ClientDisconnectEvent) {
            //如果event是客户端注销实例时需要进行集群节点同步的事件
            DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
            distroProtocol.sync(distroKey, DataOperation.DELETE);
        } else if (event instanceof ClientEvent.ClientChangedEvent) {
            //如果event是客户端注册实例时需要进行集群节点同步的事件
            DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
            distroProtocol.sync(distroKey, DataOperation.CHANGE);
        }
    }
    ...
}

(4)总结

一.执行引擎的总结

延时任务执行引擎的实现原理是引擎有一个Map类型的tasks任务池,这个任务池可以根据key映射对应的任务处理器。引擎会定时从任务池中获取任务,执行任务处理器的处理方法处理任务。

任务执行引擎的实现原理是会创建多个任务执行Worker,每个任务执行Worker都会有一个阻塞队列。向任务执行引擎添加任务时会将任务添加到其中一个Woker的阻塞队列中,Worker在初始化时就会启动一个线程不断取出阻塞队列中的任务来处理。所以任务执行引擎会通过阻塞队列 + 异步任务的方式来实现。

二.用于向集群节点同步数据的客户端改变事件的处理流程总结

步骤一:先创建DistroDelayTask延迟任务放入到延迟任务执行引擎的任务池,DistroDelayTask延迟任务会由DistroDelayTaskProcessor处理器处理。

步骤二:DistroDelayTaskProcessor处理时会创建DistroSyncChangeTask任务,然后再将任务分发添加到执行引擎中的任务执行Worker的阻塞队列中。

步骤三:任务执行Worker会从队列中获取并执行DistroSyncChangeTask任务,也就是执行引擎会触发调用AbstractDistroExecuteTask的run()方法,从而调用DistroSyncChangeTask的doExecuteWithCallback()方法。

步骤四:doExecuteWithCallback()方法会获取最新的微服务实例列表,然后通过DistroClientTransportAgent的syncData()方法发送数据同步请求。

后端技术栈的基础修养 文章被收录于专栏

详细介绍后端技术栈的基础内容,包括但不限于:MySQL原理和优化、Redis原理和应用、JVM和G1原理和优化、RocketMQ原理应用及源码、Kafka原理应用及源码、ElasticSearch原理应用及源码、JUC源码、Netty源码、zk源码、Dubbo源码、Spring源码、Spring Boot源码、SCA源码、分布式锁源码、分布式事务、分库分表和TiDB、大型商品系统、大型订单系统等

全部评论

相关推荐

评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客企业服务