Nacos源码—9.Nacos升级gRPC分析四
大纲
10.gRPC客户端初始化分析
11.gRPC客户端的心跳机制(健康检查)
12.gRPC服务端如何处理客户端的建立连接请求
13.gRPC服务端如何映射各种请求与对应的Handler处理类
14.gRPC简单介绍
10.gRPC客户端初始化分析
(1)gRPC客户端代理初始化的源码
(2)gRPC客户端启动的源码
(3)gRPC客户端发起与服务端建立连接请求的源码
(1)gRPC客户端代理初始化的源码
Nacos客户端注册服务实例时会调用NacosNamingService的registerInstance()方法,接着会调用NamingClientProxyDelegate的registerService()方法,然后判断注册的服务实例是不是临时的。如果注册的服务实例是临时的,那么就使用gRPC客户端代理去进行注册。如果注册的服务实例不是临时的,那么就使用HTTP客户端代理去进行注册。
NacosNamingService的init()方法在创建客户端代理,也就是执行NamingClientProxyDelegate的构造方法时,便会创建和初始化gRPC客户端代理NamingGrpcClientProxy。
创建和初始化gRPC客户端代理NamingGrpcClientProxy时,首先会由RpcClientFactory的createClient()方法创建一个RpcClient对象,并将GrpcClient对象赋值给NamingGrpcClientProxy的rpcClient属性,然后调用NamingGrpcClientProxy的start()方法启动RPC客户端连接。
在NamingGrpcClientProxy的start()方法中,会先注册一个用于处理服务端推送请求的NamingPushRequestHandler,然后调用RpcClient的start()方法启动RPC客户端即RpcClient对象,最后将NamingGrpcClientProxy自己作为订阅者向通知中心进行注册。
public class NacosNamingService implements NamingService { ... private NamingClientProxy clientProxy; private void init(Properties properties) throws NacosException { ... this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, properties, changeNotifier); } ... @Override public void registerInstance(String serviceName, Instance instance) throws NacosException { registerInstance(serviceName, Constants.DEFAULT_GROUP, instance); } @Override public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { NamingUtils.checkInstanceIsLegal(instance); //调用NamingClientProxy的注册方法registerService(),其实就是NamingClientProxyDelegate.registerService()方法 clientProxy.registerService(serviceName, groupName, instance); } ... } //客户端代理 public class NamingClientProxyDelegate implements NamingClientProxy { private final NamingHttpClientProxy httpClientProxy; private final NamingGrpcClientProxy grpcClientProxy; public NamingClientProxyDelegate(String namespace, ServiceInfoHolder serviceInfoHolder, Properties properties, InstancesChangeNotifier changeNotifier) throws NacosException { ... //初始化HTTP客户端代理 this.httpClientProxy = new NamingHttpClientProxy(namespace, securityProxy, serverListManager, properties, serviceInfoHolder); //初始化gRPC客户端代理 this.grpcClientProxy = new NamingGrpcClientProxy(namespace, securityProxy, serverListManager, properties, serviceInfoHolder); } ... @Override public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { getExecuteClientProxy(instance).registerService(serviceName, groupName, instance); } private NamingClientProxy getExecuteClientProxy(Instance instance) { return instance.isEphemeral() ? grpcClientProxy : httpClientProxy; } ... } //gRPC客户端代理 public class NamingGrpcClientProxy extends AbstractNamingClientProxy { private final String namespaceId; private final String uuid; private final Long requestTimeout; private final RpcClient rpcClient; private final NamingGrpcRedoService redoService; //初始化gRPC客户端代理 public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListFactory serverListFactory, Properties properties, ServiceInfoHolder serviceInfoHolder) throws NacosException { super(securityProxy); this.namespaceId = namespaceId; this.uuid = UUID.randomUUID().toString(); this.requestTimeout = Long.parseLong(properties.getProperty(CommonParams.NAMING_REQUEST_TIMEOUT, "-1")); Map<String, String> labels = new HashMap<String, String>(); labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK); labels.put(RemoteConstants.LABEL_MODULE, RemoteConstants.LABEL_MODULE_NAMING); //1.通过RpcClientFactory.createClient()方法创建一个GrpcSdkClient对象实例,然后赋值给rpcClient属性 this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels); this.redoService = new NamingGrpcRedoService(this); //2.启动gRPC客户端代理NamingGrpcClientProxy start(serverListFactory, serviceInfoHolder); } private void start(ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException { rpcClient.serverListFactory(serverListFactory); //注册连接监听器 rpcClient.registerConnectionListener(redoService); //1.注册一个用于处理服务端推送请求的NamingPushRequestHandler rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder)); //2.启动RPC客户端RpcClient rpcClient.start(); //3.将NamingGrpcClientProxy自己作为订阅者向通知中心进行注册 NotifyCenter.registerSubscriber(this); } ... @Override public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance {}", namespaceId, serviceName, instance); redoService.cacheInstanceForRedo(serviceName, groupName, instance); //执行服务实例的注册 doRegisterService(serviceName, groupName, instance); } //Execute register operation. public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException { //创建请求参数对象 InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName, NamingRemoteConstants.REGISTER_INSTANCE, instance); //向服务端发起请求 requestToServer(request, Response.class); redoService.instanceRegistered(serviceName, groupName); } private <T extends Response> T requestToServer(AbstractNamingRequest request, Class<T> responseClass) throws NacosException { try { request.putAllHeader(getSecurityHeaders(request.getNamespace(), request.getGroupName(), request.getServiceName())); //实际会调用RpcClient.request()方法发起gRPC请求 Response response = requestTimeout < 0 ? rpcClient.request(request) : rpcClient.request(request, requestTimeout); if (ResponseCode.SUCCESS.getCode() != response.getResultCode()) { throw new NacosException(response.getErrorCode(), response.getMessage()); } if (responseClass.isAssignableFrom(response.getClass())) { return (T) response; } NAMING_LOGGER.error("Server return unexpected response '{}', expected response should be '{}'", response.getClass().getName(), responseClass.getName()); } catch (Exception e) { throw new NacosException(NacosException.SERVER_ERROR, "Request nacos server failed: ", e); } throw new NacosException(NacosException.SERVER_ERROR, "Server return invalid response"); } ... } public class RpcClientFactory { private static final Map<String, RpcClient> CLIENT_MAP = new ConcurrentHashMap<>(); ... //create a rpc client. public static RpcClient createClient(String clientName, ConnectionType connectionType, Integer threadPoolCoreSize, Integer threadPoolMaxSize, Map<String, String> labels) { if (!ConnectionType.GRPC.equals(connectionType)) { throw new UnsupportedOperationException("unsupported connection type :" + connectionType.getType()); } return CLIENT_MAP.computeIfAbsent(clientName, clientNameInner -> { LOGGER.info("[RpcClientFactory] create a new rpc client of " + clientName); try { //创建GrpcClient对象 GrpcClient client = new GrpcSdkClient(clientNameInner); //设置线程核心数和最大数 client.setThreadPoolCoreSize(threadPoolCoreSize); client.setThreadPoolMaxSize(threadPoolMaxSize); client.labels(labels); return client; } catch (Throwable throwable) { LOGGER.error("Error to init GrpcSdkClient for client name :" + clientName, throwable); throw throwable; } }); } ... }
(2)gRPC客户端启动的源码
NamingGrpcClientProxy的start()方法会通过调用RpcClient的start()方法,来启动RPC客户端即RpcClient对象。
在RpcClient的start()方法中,首先会利用CAS来修改RPC客户端(RpcClient)的状态,也就是将RpcClient.rpcClientStatus属性从INITIALIZED更新为STARTING。
然后会创建一个核心线程数为2的线程池,并提交两个任务。任务一是处理连接成功或连接断开时的线程,任务二是处理重连或健康检查的线程。
接着会创建Connection连接对象,也就是在while循环中调用GrpcClient的connectToServer()方法,尝试与服务端建立连接。如果连接失败,则会抛出异常并且进行重试,由于是同步连接,所以最大重试次数是3。
最后当客户端与服务端成功建立连接后,会把对应的Connection连接对象赋值给RpcClient.currentConnection属性,并且修改RpcClient.rpcClientStatus属性即RPC客户端状态为RUNNING。
如果客户端与服务端连接失败,则会通过异步尝试进行连接,也就是调用RpcClient的switchServerAsync()方法,往RpcClient的reconnectionSignal队列中放入一个ReconnectContext对象,reconnectionSignal队列中的元素会交给任务2来处理。
public abstract class RpcClient implements Closeable { protected volatile AtomicReference<RpcClientStatus> rpcClientStatus = new AtomicReference<>(RpcClientStatus.WAIT_INIT); protected ScheduledExecutorService clientEventExecutor; protected BlockingQueue<ConnectionEvent> eventLinkedBlockingQueue = new LinkedBlockingQueue<>(); //在NamingGrpcClientProxy初始化 -> 调用RpcClient.start()方法时,会将GrpcClient.connectToServer()方法的返回值赋值给currentConnection属性 protected volatile Connection currentConnection; private final BlockingQueue<ReconnectContext> reconnectionSignal = new ArrayBlockingQueue<>(1); ... public final void start() throws NacosException { //利用CAS来修改RPC客户端(RpcClient)的状态,从INITIALIZED更新为STARTING boolean success = rpcClientStatus.compareAndSet(RpcClientStatus.INITIALIZED, RpcClientStatus.STARTING); if (!success) { return; } //接下来创建调度线程池执行器,并提交两个任务 clientEventExecutor = new ScheduledThreadPoolExecutor(2, r -> { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.remote.worker"); t.setDaemon(true); return t; }); //任务1:处理连接成功或连接断开时的线程 clientEventExecutor.submit(() -> { ... }); //任务2:处理重连或健康检查的线程 clientEventExecutor.submit(() -> { ... }); //创建连接对象 Connection connectToServer = null; rpcClientStatus.set(RpcClientStatus.STARTING); //重试次数为3次 int startUpRetryTimes = RETRY_TIMES; //在while循环中尝试与服务端建立连接,最多循环3次 while (startUpRetryTimes > 0 && connectToServer == null) { try { startUpRetryTimes--; //获取服务端信息 ServerInfo serverInfo = nextRpcServer(); LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Try to connect to server on start up, server: {}", name, serverInfo); //调用GrpcClient.connectToServer()方法建立和服务端的长连接 connectToServer = connectToServer(serverInfo); } catch (Throwable e) { LoggerUtils.printIfWarnEnabled(LOGGER, "[{}] Fail to connect to server on start up, error message = {}, start up retry times left: {}", name, e.getMessage(), startUpRetryTimes); } } //如果连接成功,connectToServer对象就不为空 if (connectToServer != null) { LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Success to connect to server [{}] on start up, connectionId = {}", name, connectToServer.serverInfo.getAddress(), connectToServer.getConnectionId()); //连接对象赋值,currentConnection其实就是一个在客户端使用的GrpcConnection对象实例 this.currentConnection = connectToServer; //更改RPC客户端RpcClient的状态 rpcClientStatus.set(RpcClientStatus.RUNNING); //往eventLinkedBlockingQueue队列放入ConnectionEvent事件 eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED)); } else { //尝试进行异步连接 switchServerAsync(); } registerServerRequestHandler(new ConnectResetRequestHandler()); //register client detection request. registerServerRequestHandler(request -> { if (request instanceof ClientDetectionRequest) { return new ClientDetectionResponse(); } return null; }); } protected ServerInfo nextRpcServer() { String serverAddress = getServerListFactory().genNextServer(); //获取服务端信息 return resolveServerInfo(serverAddress); } private ServerInfo resolveServerInfo(String serverAddress) { Matcher matcher = EXCLUDE_PROTOCOL_PATTERN.matcher(serverAddress); if (matcher.find()) { serverAddress = matcher.group(1); } String[] ipPortTuple = serverAddress.split(Constants.COLON, 2); int defaultPort = Integer.parseInt(System.getProperty("nacos.server.port", "8848")); String serverPort = CollectionUtils.getOrDefault(ipPortTuple, 1, Integer.toString(defaultPort)); return new ServerInfo(ipPortTuple[0], NumberUtils.toInt(serverPort, defaultPort)); } public void switchServerAsync() { //异步注册逻辑 switchServerAsync(null, false); } protected void switchServerAsync(final ServerInfo recommendServerInfo, boolean onRequestFail) { //往reconnectionSignal队列里放入一个对象 reconnectionSignal.offer(new ReconnectContext(recommendServerInfo, onRequestFail)); } ... }
(3)gRPC客户端发起与服务端建立连接请求的源码
gRPC客户端与服务端建立连接的方法是GrpcClient的connectToServer()方法。该方法首先会获取进行网络通信的端口号,因为gRPC服务需要额外占用一个端口的,所以这个端口号是在Nacos的8848基础上 + 偏移量1000,变成9848。
在建立连接之前,会先检查一下服务端,如果没问题才发起连接请求,接着就会调用GrpcConnection的sendRequest()方法发起连接请求,最后返回GrpcConnection连接对象。
public abstract class GrpcClient extends RpcClient { ... @Override public Connection connectToServer(ServerInfo serverInfo) { try { if (grpcExecutor == null) { this.grpcExecutor = createGrpcExecutor(serverInfo.getServerIp()); } //获取端口号:gRPC服务需要额外占用一个端口的,这个端口是在Nacos 8848的基础上,+ 偏移量1000,所以是9848 int port = serverInfo.getServerPort() + rpcPortOffset(); RequestGrpc.RequestFutureStub newChannelStubTemp = createNewChannelStub(serverInfo.getServerIp(), port); if (newChannelStubTemp != null) { //检查一下服务端,没问题才会发起RPC连接请求 Response response = serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp); if (response == null || !(response instanceof ServerCheckResponse)) { shuntDownChannel((ManagedChannel) newChannelStubTemp.getChannel()); return null; } BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc.newStub(newChannelStubTemp.getChannel()); //创建连接对象 GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor); grpcConn.setConnectionId(((ServerCheckResponse) response).getConnectionId()); //create stream request and bind connection event to this connection. //创建流请求并将连接事件绑定到此连接 StreamObserver<Payload> payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn); //stream observer to send response to server grpcConn.setPayloadStreamObserver(payloadStreamObserver); grpcConn.setGrpcFutureServiceStub(newChannelStubTemp); grpcConn.setChannel((ManagedChannel) newChannelStubTemp.getChannel()); //send a setup request. ConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest(); conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion()); conSetupRequest.setLabels(super.getLabels()); conSetupRequest.setAbilities(super.clientAbilities); conSetupRequest.setTenant(super.getTenant()); //发起连接请求 grpcConn.sendRequest(conSetupRequest); //wait to register connection setup Thread.sleep(100L); return grpcConn; } return null; } catch (Exception e) { LOGGER.error("[{}]Fail to connect to server!,error={}", GrpcClient.this.getName(), e); } return null; } private Response serverCheck(String ip, int port, RequestGrpc.RequestFutureStub requestBlockingStub) { try { if (requestBlockingStub == null) { return null; } ServerCheckRequest serverCheckRequest = new ServerCheckRequest(); Payload grpcRequest = GrpcUtils.convert(serverCheckRequest); //向服务端发送一个检查请求 ListenableFuture<Payload> responseFuture = requestBlockingStub.request(grpcRequest); Payload response = responseFuture.get(3000L, TimeUnit.MILLISECONDS); //receive connection unregister response here,not check response is success. return (Response) GrpcUtils.parse(response); } catch (Exception e) { LoggerUtils.printIfErrorEnabled(LOGGER, "Server check fail, please check server {} ,port {} is available , error ={}", ip, port, e); return null; } } private StreamObserver<Payload> bindRequestStream(final BiRequestStreamGrpc.BiRequestStreamStub streamStub, final GrpcConnection grpcConn) { //调用BiRequestStreamStub.requestBiStream()方法连接服务端 return streamStub.requestBiStream(new StreamObserver<Payload>() { @Override public void onNext(Payload payload) { LoggerUtils.printIfDebugEnabled(LOGGER, "[{}]Stream server request receive, original info: {}", grpcConn.getConnectionId(), payload.toString()); try { Object parseBody = GrpcUtils.parse(payload); final Request request = (Request) parseBody; if (request != null) { try { Response response = handleServerRequest(request); if (response != null) { response.setRequestId(request.getRequestId()); sendResponse(response); } else { LOGGER.warn("[{}]Fail to process server request, ackId->{}", grpcConn.getConnectionId(), request.getRequestId()); } } catch (Exception e) { LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Handle server request exception: {}", grpcConn.getConnectionId(), payload.toString(), e.getMessage()); Response errResponse = ErrorResponse.build(NacosException.CLIENT_ERROR, "Handle server request error"); errResponse.setRequestId(request.getRequestId()); sendResponse(errResponse); } } } catch (Exception e) { LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Error to process server push response: {}", grpcConn.getConnectionId(), payload.getBody().getValue().toStringUtf8()); } } @Override public void onError(Throwable throwable) { boolean isRunning = isRunning(); boolean isAbandon = grpcConn.isAbandon(); if (isRunning && !isAbandon) { LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Request stream error, switch server,error={}", grpcConn.getConnectionId(), throwable); if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) { switchServerAsync(); } } else { LoggerUtils.printIfWarnEnabled(LOGGER, "[{}]Ignore error event,isRunning:{},isAbandon={}", grpcConn.getConnectionId(), isRunning, isAbandon); } } @Override public void onCompleted() { boolean isRunning = isRunning(); boolean isAbandon = grpcConn.isAbandon(); if (isRunning && !isAbandon) { LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Request stream onCompleted, switch server", grpcConn.getConnectionId()); if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) { switchServerAsync(); } } else { LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]Ignore complete event,isRunning:{},isAbandon={}", grpcConn.getConnectionId(), isRunning, isAbandon); } } }); } ... }
(4)总结
11.gRPC客户端的心跳机制(健康检查)
(1)线程任务一:处理连接成功或连接断开时的通知
(2)线程任务二:处理重连或健康检查
RpcClient的start()方法会调用GrpcClient的connectToServer()方法连接服务端,不管连接是否成功,最后都会往不同的阻塞队列中添加事件。
如果连接成功,那么就往RpcClient的eventLinkedBlockingQueue添加连接事件。如果连接失败,那么就往RpcClient的reconnectionSignal队列添加重连对象。而这两个阻塞队列中的数据处理,便是由执行RpcClient的start()方法时启动的两个线程任务进行处理的。
(1)线程任务一:处理连接成功或连接断开时的通知
这个任务主要在连接成功或者连接断开时,修改一些属性状态。通过eventLinkedBlockingQueue的take()方法从队列取到连接事件后,会判断连接事件是否建立连接还是断开连接。
如果是建立连接,那么就调用RpcClient的notifyConnected()方法,把执行NamingGrpcClientProxy的start()方法时所注册的NamingGrpcRedoService对象的connected属性设置为true。
如果是断开连接,那么就调用RpcClient的notifyDisConnected()方法,把执行NamingGrpcClientProxy的start()方法时所注册的NamingGrpcRedoService对象的connected属性设置为false。
public abstract class RpcClient implements Closeable { protected volatile AtomicReference<RpcClientStatus> rpcClientStatus = new AtomicReference<>(RpcClientStatus.WAIT_INIT); protected ScheduledExecutorService clientEventExecutor; protected BlockingQueue<ConnectionEvent> eventLinkedBlockingQueue = new LinkedBlockingQueue<>(); private final BlockingQueue<ReconnectContext> reconnectionSignal = new ArrayBlockingQueue<>(1); //listener called where connection's status changed. 连接状态改变的监听器 protected List<ConnectionEventListener> connectionEventListeners = new ArrayList<>(); ... public final void start() throws NacosException { //利用CAS来修改RPC客户端(RpcClient)的状态,从INITIALIZED更新为STARTING boolean success = rpcClientStatus.compareAndSet(RpcClientStatus.INITIALIZED, RpcClientStatus.STARTING); if (!success) { return; } //接下来创建调度线程池执行器,并提交两个任务 clientEventExecutor = new ScheduledThreadPoolExecutor(2, r -> { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.remote.worker"); t.setDaemon(true); return t; }); //任务1:处理连接成功或连接断开时的线程 clientEventExecutor.submit(() -> { while (!clientEventExecutor.isTerminated() && !clientEventExecutor.isShutdown()) { ConnectionEvent take; try { take = eventLinkedBlockingQueue.take(); if (take.isConnected()) { notifyConnected(); } else if (take.isDisConnected()) { notifyDisConnected(); } } catch (Throwable e) { // Do nothing } } }); //任务2:向服务端上报心跳或重连的线程 clientEventExecutor.submit(() -> { ... }); } ... //Notify when client new connected. protected void notifyConnected() { if (connectionEventListeners.isEmpty()) { return; } LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Notify connected event to listeners.", name); for (ConnectionEventListener connectionEventListener : connectionEventListeners) { try { connectionEventListener.onConnected(); } catch (Throwable throwable) { LoggerUtils.printIfErrorEnabled(LOGGER, "[{}] Notify connect listener error, listener = {}", name, connectionEventListener.getClass().getName()); } } } //Notify when client disconnected. protected void notifyDisConnected() { if (connectionEventListeners.isEmpty()) { return; } LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Notify disconnected event to listeners", name); for (ConnectionEventListener connectionEventListener : connectionEventListeners) { try { connectionEventListener.onDisConnect(); } catch (Throwable throwable) { LoggerUtils.printIfErrorEnabled(LOGGER, "[{}] Notify disconnect listener error, listener = {}", name, connectionEventListener.getClass().getName()); } } } ... //Register connection handler. Will be notified when inner connection's state changed. //在执行NamingGrpcClientProxy.start()方法时会将NamingGrpcRedoService对象注册到connectionEventListeners中 public synchronized void registerConnectionListener(ConnectionEventListener connectionEventListener) { LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Registry connection listener to current client:{}", name, connectionEventListener.getClass().getName()); this.connectionEventListeners.add(connectionEventListener); } ... } public class NamingGrpcRedoService implements ConnectionEventListener { private volatile boolean connected = false; ... @Override public void onConnected() { connected = true; LogUtils.NAMING_LOGGER.info("Grpc connection connect"); } @Override public void onDisConnect() { connected = false; LogUtils.NAMING_LOGGER.warn("Grpc connection disconnect, mark to redo"); synchronized (registeredInstances) { registeredInstances.values().forEach(instanceRedoData -> instanceRedoData.setRegistered(false)); } synchronized (subscribes) { subscribes.values().forEach(subscriberRedoData -> subscriberRedoData.setRegistered(false)); } LogUtils.NAMING_LOGGER.warn("mark to redo completed"); } ... }
(2)线程任务二:处理重连或健康检查
如果RpcClient的start()方法在调用GrpcClient的connectToServer()方法连接服务端时失败了,那么会往RpcClient.reconnectionSignal队列添加重连对象的,而这个任务就会获取reconnectionSignal队列中的重连对象进行重连。
因为reconnectionSignal中的数据是当连接失败时放入的,所以如果从reconnectionSignal中获取不到重连对象,等同于连接成功。
注意:这个任务从reconnectionSignal阻塞队列中获取重连对象时,调用的是阻塞队列的take()方法,而不是阻塞队列的poll()方法。BlockingQueue的take()方法,如果读取不到数据,会一直处于阻塞状态。BlockingQueue的poll()方法,在指定的时间内读取不到数据,会返回null。
情况一:如果从reconnectionSignal队列中获取到的重连对象为null
首先判断存活时间是否大于 5s,如果大于则调用RpcClient.healthCheck()方法发起健康检查的RPC请求。健康检查的触发方法是currentConnection.request()方法,健康检查的请求类型是HealthCheckRequest。
如果健康检查成功,只需刷新存活时间即可。如果健康检查失败,则需要尝试与服务端重新建立连接。
情况二:如果从reconnectionSignal队列中获取到的重连对象不为null
那么就调用RpcClient的reconnect()方法进行重新连接,该方法会通过GrpcClient的connectToServer()方法尝试与服务端建立连接。
public abstract class RpcClient implements Closeable { protected volatile AtomicReference<RpcClientStatus> rpcClientStatus = new AtomicReference<>(RpcClientStatus.WAIT_INIT); protected ScheduledExecutorService clientEventExecutor; protected BlockingQueue<ConnectionEvent> eventLinkedBlockingQueue = new LinkedBlockingQueue<>(); private final BlockingQueue<ReconnectContext> reconnectionSignal = new ArrayBlockingQueue<>(1); ... public final void start() throws NacosException { //利用CAS来修改RPC客户端(RpcClient)的状态,从INITIALIZED更新为STARTING boolean success = rpcClientStatus.compareAndSet(RpcClientStatus.INITIALIZED, RpcClientStatus.STARTING); if (!success) { return; } //接下来创建调度线程池执行器,并提交两个任务 clientEventExecutor = new ScheduledThreadPoolExecutor(2, r -> { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.remote.worker"); t.setDaemon(true); return t; }); //任务1:处理连接成功或连接断开时的线程 clientEventExecutor.submit(() -> { ... }); //任务2:向服务端上报心跳或重连的线程 clientEventExecutor.submit(() -> { while (true) { try { if (isShutdown()) { break; } //这里从reconnectionSignal阻塞队列中获取任务不是调用take()方法,而是调用poll()方法,并且指定了5s的最大读取时间 //BlockingQueue的take()方法,如果读取不到数据,会一直处于阻塞状态 //BlockingQueue的poll()方法,在指定的时间内读取不到数据,会返回null ReconnectContext reconnectContext = reconnectionSignal.poll(keepAliveTime, TimeUnit.MILLISECONDS); //reconnectContext为null,说明从reconnectionSignal中获取不到数据 //由于reconnectionSignal中的数据是当连接失败时放入的 //所以从reconnectionSignal中获取不到数据,等同于连接成功 if (reconnectContext == null) { //check alive time. //检查存活时间,默认存活时间为5s,超过5s就需要做健康检查 if (System.currentTimeMillis() - lastActiveTimeStamp >= keepAliveTime) { //调用RpcClient.healthCheck()方法,发起健康检查请求 boolean isHealthy = healthCheck(); //如果向服务端发起健康检查请求失败,则需要尝试重新建立连接 if (!isHealthy) { if (currentConnection == null) { continue; } LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Server healthy check fail, currentConnection = {}", name, currentConnection.getConnectionId()); //判断连接状态是否关闭,如果是则结束异步任务 RpcClientStatus rpcClientStatus = RpcClient.this.rpcClientStatus.get(); if (RpcClientStatus.SHUTDOWN.equals(rpcClientStatus)) { break; } //修改RpcClient的连接状态为不健康 boolean statusFLowSuccess = RpcClient.this.rpcClientStatus.compareAndSet(rpcClientStatus, RpcClientStatus.UNHEALTHY); //给reconnectContext属性赋值,准备尝试重连 if (statusFLowSuccess) { //重新赋值,注意这里没有continue,所以逻辑会接着往下执行 reconnectContext = new ReconnectContext(null, false); } else { continue; } } else { //如果向服务端发起健康检查请求成功,则刷新RpcClient的存活时间 lastActiveTimeStamp = System.currentTimeMillis(); continue; } } else { continue; } } if (reconnectContext.serverInfo != null) { //clear recommend server if server is not in server list. //如果服务器不在服务器列表中,则清除推荐服务器,即设置reconnectContext.serverInfo为null boolean serverExist = false; //遍历服务端列表 for (String server : getServerListFactory().getServerList()) { ServerInfo serverInfo = resolveServerInfo(server); if (serverInfo.getServerIp().equals(reconnectContext.serverInfo.getServerIp())) { serverExist = true; reconnectContext.serverInfo.serverPort = serverInfo.serverPort; break; } } //reconnectContext.serverInfo不存在服务端列表中,就清除服务器信息,设置reconnectContext.serverInfo为null if (!serverExist) { LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Recommend server is not in server list, ignore recommend server {}", name, reconnectContext.serverInfo.getAddress()); reconnectContext.serverInfo = null; } } //进行重新连接,RpcClient.reconnect()方法中会调用GrpcClient.connectToServer()方法尝试与服务端建立连接 reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail); } catch (Throwable throwable) { //Do nothing } } }); } private boolean healthCheck() { HealthCheckRequest healthCheckRequest = new HealthCheckRequest(); if (this.currentConnection == null) { return false; } try { //利用currentConnection连接对象,发起RPC请求,请求类型是HealthCheckRequest Response response = this.currentConnection.request(healthCheckRequest, 3000L); //not only check server is ok, also check connection is register. return response != null && response.isSuccess(); } catch (NacosException e) { //ignore } return false; } ... }
(3)总结
12.gRPC服务端如何处理客户端的建立连接请求
(1)gRPC服务端是如何启动的
(2)connectionId如何绑定Client对象的
(1)gRPC服务端是如何启动的
BaseRpcServer类有一个被@PostConstruct修饰的start()方法,该方法会调用BaseGrpcServer的startServer()方法来启动gRPC服务端。
在BaseGrpcServer的startServer()方法中,首先会调用BaseGrpcServer的addServices()方法添加服务,然后会使用建造者模式通过ServerBuilder创建gRPC框架的Server对象,最后启动gRPC框架的Server服务端,即启动一个NettyServer服务端。
//abstract rpc server. public abstract class BaseRpcServer { ... //Start sever. 启动gRPC服务端 @PostConstruct public void start() throws Exception { String serverName = getClass().getSimpleName(); Loggers.REMOTE.info("Nacos {} Rpc server starting at port {}", serverName, getServicePort()); //调用BaseGrpcServer.startServer()方法启动gRPC服务端 startServer(); Loggers.REMOTE.info("Nacos {} Rpc server started at port {}", serverName, getServicePort()); Runtime.getRuntime().addShutdownHook(new Thread(() -> { Loggers.REMOTE.info("Nacos {} Rpc server stopping", serverName); try { BaseRpcServer.this.stopServer(); Loggers.REMOTE.info("Nacos {} Rpc server stopped successfully...", serverName); } catch (Exception e) { Loggers.REMOTE.error("Nacos {} Rpc server stopped fail...", serverName, e); } })); } //get service port. public int getServicePort() { return EnvUtil.getPort() + rpcPortOffset(); } ... } //Grpc implementation as a rpc server. public abstract class BaseGrpcServer extends BaseRpcServer { ... @Override public void startServer() throws Exception { final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry(); //server interceptor to set connection id. 定义请求拦截器 ServerInterceptor serverInterceptor = new ServerInterceptor() { @Override public <T, S> ServerCall.Listener<T> interceptCall(ServerCall<T, S> call, Metadata headers, ServerCallHandler<T, S> next) { Context ctx = Context.current() .withValue(CONTEXT_KEY_CONN_ID, call.getAttributes().get(TRANS_KEY_CONN_ID)) .withValue(CONTEXT_KEY_CONN_REMOTE_IP, call.getAttributes().get(TRANS_KEY_REMOTE_IP)) .withValue(CONTEXT_KEY_CONN_REMOTE_PORT, call.getAttributes().get(TRANS_KEY_REMOTE_PORT)) .withValue(CONTEXT_KEY_CONN_LOCAL_PORT, call.getAttributes().get(TRANS_KEY_LOCAL_PORT)); if (REQUEST_BI_STREAM_SERVICE_NAME.equals(call.getMethodDescriptor().getServiceName())) { Channel internalChannel = getInternalChannel(call); ctx = ctx.withValue(CONTEXT_KEY_CHANNEL, internalChannel); } return Contexts.interceptCall(ctx, call, headers, next); } }; //1.调用BaseGrpcServer.addServices()方法添加服务 addServices(handlerRegistry, serverInterceptor); //2.创建一个gRPC框架的Server对象,使用了建造者模式 server = ServerBuilder.forPort(getServicePort()).executor(getRpcExecutor()) .maxInboundMessageSize(getInboundMessageSize()).fallbackHandlerRegistry(handlerRegistry) .compressorRegistry(CompressorRegistry.getDefaultInstance()) .decompressorRegistry(DecompressorRegistry.getDefaultInstance()) .addTransportFilter(new ServerTransportFilter() { @Override public Attributes transportReady(Attributes transportAttrs) { InetSocketAddress remoteAddress = (InetSocketAddress) transportAttrs.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); InetSocketAddress localAddress = (InetSocketAddress) transportAttrs.get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR); int remotePort = remoteAddress.getPort(); int localPort = localAddress.getPort(); String remoteIp = remoteAddress.getAddress().getHostAddress(); Attributes attrWrapper = transportAttrs.toBuilder() .set(TRANS_KEY_CONN_ID, System.currentTimeMillis() + "_" + remoteIp + "_" + remotePort) .set(TRANS_KEY_REMOTE_IP, remoteIp).set(TRANS_KEY_REMOTE_PORT, remotePort) .set(TRANS_KEY_LOCAL_PORT, localPort).build(); String connectionId = attrWrapper.get(TRANS_KEY_CONN_ID); Loggers.REMOTE_DIGEST.info("Connection transportReady,connectionId = {} ", connectionId); return attrWrapper; } @Override public void transportTerminated(Attributes transportAttrs) { String connectionId = null; try { connectionId = transportAttrs.get(TRANS_KEY_CONN_ID); } catch (Exception e) { //Ignore } if (StringUtils.isNotBlank(connectionId)) { Loggers.REMOTE_DIGEST.info("Connection transportTerminated,connectionId = {} ", connectionId); connectionManager.unregister(connectionId); } } }).build(); //3.启动gRPC框架的Server server.start(); } ... }
(2)connectionId如何绑定Client对象的
BaseGrpcServer的startServer()方法在执行addServices()方法添加服务时,就会对connectionId与Client对象进行绑定。
绑定会由GrpcBiStreamRequestAcceptor的requestBiStream()方法触发。具体就是会调用ConnectionManager.register()方法来实现绑定,即先通过执行"connections.put(connectionId, connection)"代码,将connectionId和connection连接对象,放入到ConnectionManager的connections这个Map属性中。再执行ClientConnectionEventListenerRegistry的notifyClientConnected()方法,把Connection连接对象包装成Client对象。
将Connection连接对象包装成Client对象时,又会继续调用ConnectionBasedClientManager的clientConnected()方法,该方法便会根据connectionId创建出一个Client对象,然后将其放入到ConnectionBasedClientManager的clients这个Map中,从而实现connectionId与Client对象的关联。
//Grpc implementation as a rpc server. public abstract class BaseGrpcServer extends BaseRpcServer { ... private void addServices(MutableHandlerRegistry handlerRegistry, ServerInterceptor... serverInterceptor) { //unary common call register. final MethodDescriptor<Payload, Payload> unaryPayloadMethod = MethodDescriptor.<Payload, Payload>newBuilder() .setType(MethodDescriptor.MethodType.UNARY) .setFullMethodName(MethodDescriptor.generateFullMethodName(REQUEST_SERVICE_NAME, REQUEST_METHOD_NAME)) .setRequestMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())) .setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build(); //对gRPC客户端请求的服务进行映射处理 final ServerCallHandler<Payload, Payload> payloadHandler = ServerCalls.asyncUnaryCall((request, responseObserver) -> grpcCommonRequestAcceptor.request(request, responseObserver)); //构建ServerServiceDefinition服务 final ServerServiceDefinition serviceDefOfUnaryPayload = ServerServiceDefinition.builder(REQUEST_SERVICE_NAME).addMethod(unaryPayloadMethod, payloadHandler).build(); //添加服务到gRPC的请求流程中 handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfUnaryPayload, serverInterceptor)); //bi stream register. //处理客户端连接对象的关联 //也就是调用GrpcBiStreamRequestAcceptor.requestBiStream()方法对ConnectionId与Client对象进行绑定 final ServerCallHandler<Payload, Payload> biStreamHandler = ServerCalls.asyncBidiStreamingCall((responseObserver) -> grpcBiStreamRequestAcceptor.requestBiStream(responseObserver)); final MethodDescriptor<Payload, Payload> biStreamMethod = MethodDescriptor.<Payload, Payload>newBuilder() .setType(MethodDescriptor.MethodType.BIDI_STREAMING) .setFullMethodName(MethodDescriptor.generateFullMethodName(REQUEST_BI_STREAM_SERVICE_NAME, REQUEST_BI_STREAM_METHOD_NAME)) .setRequestMarshaller(ProtoUtils.marshaller(Payload.newBuilder().build())) .setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build(); final ServerServiceDefinition serviceDefOfBiStream = ServerServiceDefinition.builder(REQUEST_BI_STREAM_SERVICE_NAME).addMethod(biStreamMethod, biStreamHandler).build(); handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfBiStream, serverInterceptor)); } ... } @Service public class GrpcBiStreamRequestAcceptor extends BiRequestStreamGrpc.BiRequestStreamImplBase { ... @Override public StreamObserver<Payload> requestBiStream(StreamObserver<Payload> responseObserver) { StreamObserver<Payload> streamObserver = new StreamObserver<Payload>() { ... @Override public void onNext(Payload payload) { ... //创建连接信息对象,把一些元信息放入到这个对象中 ConnectionMeta metaInfo = new ConnectionMeta(connectionId, payload.getMetadata().getClientIp(), remoteIp, remotePort, localPort, ConnectionType.GRPC.getType(), setUpRequest.getClientVersion(), appName, setUpRequest.getLabels()); metaInfo.setTenant(setUpRequest.getTenant()); //把连接信息包装到连接对象中 Connection connection = new GrpcConnection(metaInfo, responseObserver, CONTEXT_KEY_CHANNEL.get()); connection.setAbilities(setUpRequest.getAbilities()); boolean rejectSdkOnStarting = metaInfo.isSdkSource() && !ApplicationUtils.isStarted(); //ConnectionManager.register()方法,会将connectionId和连接对象进行绑定 if (rejectSdkOnStarting || !connectionManager.register(connectionId, connection)) { ... } ... } ... }; return streamObserver; } ... } @Service public class ConnectionManager extends Subscriber<ConnectionLimitRuleChangeEvent> { //存储connectionId对应的Connection对象 Map<String, Connection> connections = new ConcurrentHashMap<>(); ... //register a new connect. public synchronized boolean register(String connectionId, Connection connection) { if (connection.isConnected()) { if (connections.containsKey(connectionId)) { return true; } if (!checkLimit(connection)) { return false; } if (traced(connection.getMetaInfo().clientIp)) { connection.setTraced(true); } //将connectionId与Connection连接对象进行绑定 connections.put(connectionId, connection); connectionForClientIp.get(connection.getMetaInfo().clientIp).getAndIncrement(); //把Connection连接对象包装成Client对象 clientConnectionEventListenerRegistry.notifyClientConnected(connection); Loggers.REMOTE_DIGEST.info("new connection registered successfully, connectionId = {},connection={} ", connectionId, connection); return true; } return false; } ... } @Service public class ClientConnectionEventListenerRegistry { final List<ClientConnectionEventListener> clientConnectionEventListeners = new ArrayList<ClientConnectionEventListener>(); //notify where a new client connected public void notifyClientConnected(final Connection connection) { for (ClientConnectionEventListener clientConnectionEventListener : clientConnectionEventListeners) { try { //调用ConnectionBasedClientManager.clientConnected()方法 clientConnectionEventListener.clientConnected(connection); } catch (Throwable throwable) { Loggers.REMOTE.info("[NotifyClientConnected] failed for listener {}", clientConnectionEventListener.getName(), throwable); } } } ... } @Component("connectionBasedClientManager") public class ConnectionBasedClientManager extends ClientConnectionEventListener implements ClientManager { private final ConcurrentMap<String, ConnectionBasedClient> clients = new ConcurrentHashMap<>(); @Override public void clientConnected(Connection connect) { if (!RemoteConstants.LABEL_MODULE_NAMING.equals(connect.getMetaInfo().getLabel(RemoteConstants.LABEL_MODULE))) { return; } //把Connection对象中的信息取出来,放到ClientAttributes对象中 ClientAttributes attributes = new ClientAttributes(); attributes.addClientAttribute(ClientConstants.CONNECTION_TYPE, connect.getMetaInfo().getConnectType()); attributes.addClientAttribute(ClientConstants.CONNECTION_METADATA, connect.getMetaInfo()); //传入connectionId和连接信息 clientConnected(connect.getMetaInfo().getConnectionId(), attributes); } @Override public boolean clientConnected(String clientId, ClientAttributes attributes) { String type = attributes.getClientAttribute(ClientConstants.CONNECTION_TYPE); ClientFactory clientFactory = ClientFactoryHolder.getInstance().findClientFactory(type); //这里的clientId就是connectionId,根据connectionId创建出Client对象 return clientConnected(clientFactory.newClient(clientId, attributes)); } @Override public boolean clientConnected(final Client client) { //最后将connectionId与Client对象进行绑定,放入到ConnectionBasedClientManager的clients这个Map中 clients.computeIfAbsent(client.getClientId(), s -> { Loggers.SRV_LOG.info("Client connection {} connect", client.getClientId()); return (ConnectionBasedClient) client; }); return true; } ... }
(3)总结
13.gRPC服务端如何映射各种请求与对应的Handler处理类
gRPC服务端会如何处理客户端请求,如何找到对应的Handler处理类。
在gRPC服务端启动时,会调用BaseGrpcServer的startServer()方法,其中就会执行到BaseGrpcServer的addServices()方法。在BaseGrpcServer的addServices()方法中,就会进行请求与Handler映射,也就是调用GrpcRequestAcceptor的request()方法进行请求与Handler映射。
在GrpcRequestAcceptor的request()方法中,首先会从请求对象中获取请求type,然后会通过请求type获取一个Handler对象,最后调用RequestHandler的模版方法handleRequest(),从而调用具体Handler对象的handle()方法。
//Grpc implementation as a rpc server. public abstract class BaseGrpcServer extends BaseRpcServer { @Autowired private GrpcRequestAcceptor grpcCommonRequestAcceptor; ... private void addServices(MutableHandlerRegistry handlerRegistry, ServerInterceptor... serverInterceptor) { //unary common call register. final MethodDescriptor<Payload, Payload> unaryPayloadMethod = MethodDescriptor.<Payload, Payload>newBuilder() .setType(MethodDescriptor.MethodType.UNARY) .setFullMethodName(MethodDescriptor.generateFullMethodName(REQUEST_SERVICE_NAME, REQUEST_METHOD_NAME)) .setRequestMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())) .setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build(); //对gRPC客户端发出的请求进行Handler处理类的映射处理 final ServerCallHandler<Payload, Payload> payloadHandler = ServerCalls.asyncUnaryCall((request, responseObserver) -> grpcCommonRequestAcceptor.request(request, responseObserver)); //构建ServerServiceDefinition服务 final ServerServiceDefinition serviceDefOfUnaryPayload = ServerServiceDefinition.builder(REQUEST_SERVICE_NAME).addMethod(unaryPayloadMethod, payloadHandler).build(); //添加服务到gRPC的请求流程中 handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfUnaryPayload, serverInterceptor)); //bi stream register. //处理客户端连接对象的关联 //也就是调用GrpcBiStreamRequestAcceptor.requestBiStream()方法对ConnectionId与Client对象进行绑定 final ServerCallHandler<Payload, Payload> biStreamHandler = ServerCalls.asyncBidiStreamingCall((responseObserver) -> grpcBiStreamRequestAcceptor.requestBiStream(responseObserver)); final MethodDescriptor<Payload, Payload> biStreamMethod = MethodDescriptor.<Payload, Payload>newBuilder() .setType(MethodDescriptor.MethodType.BIDI_STREAMING) .setFullMethodName(MethodDescriptor.generateFullMethodName(REQUEST_BI_STREAM_SERVICE_NAME, REQUEST_BI_STREAM_METHOD_NAME)) .setRequestMarshaller(ProtoUtils.marshaller(Payload.newBuilder().build())) .setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build(); final ServerServiceDefinition serviceDefOfBiStream = ServerServiceDefinition.builder(REQUEST_BI_STREAM_SERVICE_NAME).addMethod(biStreamMethod, biStreamHandler).build(); handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfBiStream, serverInterceptor)); } ... } @Service public class GrpcRequestAcceptor extends RequestGrpc.RequestImplBase { ... @Override public void request(Payload grpcRequest, StreamObserver<Payload> responseObserver) { ... //首先从请求对象中获取请求type String type = grpcRequest.getMetadata().getType(); ... //然后通过请求type获取一个Handler对象 RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type); ... //最后调用RequestHandler的模版方法handleRequest(),从而调用具体Handler对象的handle()方法 Response response = requestHandler.handleRequest(request, requestMeta); ... } ... } public abstract class RequestHandler<T extends Request, S extends Response> { @Autowired private RequestFilters requestFilters; //Handler request. public Response handleRequest(T request, RequestMeta meta) throws NacosException { for (AbstractRequestFilter filter : requestFilters.filters) { try { Response filterResult = filter.filter(request, meta, this.getClass()); if (filterResult != null && !filterResult.isSuccess()) { return filterResult; } } catch (Throwable throwable) { Loggers.REMOTE.error("filter error", throwable); } } //调用具体Handler的handle()方法 return handle(request, meta); } //Handler request. public abstract S handle(T request, RequestMeta meta) throws NacosException; } @Service public class RequestHandlerRegistry implements ApplicationListener<ContextRefreshedEvent> { Map<String, RequestHandler> registryHandlers = new HashMap<String, RequestHandler>(); @Autowired private TpsMonitorManager tpsMonitorManager; //Get Request Handler By request Type. public RequestHandler getByRequestType(String requestType) { return registryHandlers.get(requestType); } @Override public void onApplicationEvent(ContextRefreshedEvent event) { //获取全部继承了RequestHandler类的实现类 Map<String, RequestHandler> beansOfType = event.getApplicationContext().getBeansOfType(RequestHandler.class); Collection<RequestHandler> values = beansOfType.values(); for (RequestHandler requestHandler : values) { Class<?> clazz = requestHandler.getClass(); boolean skip = false; while (!clazz.getSuperclass().equals(RequestHandler.class)) { if (clazz.getSuperclass().equals(Object.class)) { skip = true; break; } clazz = clazz.getSuperclass(); } if (skip) { continue; } try { Method method = clazz.getMethod("handle", Request.class, RequestMeta.class); if (method.isAnnotationPresent(TpsControl.class) && TpsControlConfig.isTpsControlEnabled()) { TpsControl tpsControl = method.getAnnotation(TpsControl.class); String pointName = tpsControl.pointName(); TpsMonitorPoint tpsMonitorPoint = new TpsMonitorPoint(pointName); tpsMonitorManager.registerTpsControlPoint(tpsMonitorPoint); } } catch (Exception e) { //ignore. } Class tClass = (Class) ((ParameterizedType) clazz.getGenericSuperclass()).getActualTypeArguments()[0]; registryHandlers.putIfAbsent(tClass.getSimpleName(), requestHandler); } } }
14.gRPC简单介绍
(1)gRPC是什么
(2)gRPC的特性
(3)gRPC和Dubbo的区别
(1)gRPC是什么
gRPC是一个高性能、开源和通用的RPC框架。gRPC基于ProtoBuf序列化协议开发,且支持众多开发语言。gRPC是面向服务端和移动端,基于HTTP 2设计的,带来诸如双向流、流控、头部压缩、单TCP连接上的多复用请求等特。这些特性使得其在移动设备上表现更好,更省电和节省空间占用。
(2)gRPC的特性
一.gRPC可以跨语言使用
二.基于IDL(接口定义语言Interface Define Language)文件定义服务
通过proto3工具生成指定语言的数据结构、服务端接口以及客户端Stub。
三.通信协议基于标准的HTTP 2设计
支持双向流、消息头压缩、单TCP的多路复用、服务端推送等特性,这些特性使得gRPC在移动端设备上更加省电和节省网络流量。
四.序列化支持ProtoBuf和JSON
ProtoBuf是一种语言无关的高性能序列化框架,它是基于HTTP2和ProtoBuf的,这保障了gRPC调用的高性能。
五.安装简单,扩展方便
使用gRPC框架每秒可达到百万RPC。
(3)gRPC和Dubbo的区别
一.通讯协议
gRPC基于HTTP 2.0,Dubbo基于TCP。
二.序列化
gRPC使用ProtoBuf,Dubbo使用Hession2等基于Java的序列化技术。
三.服务注册与发现
gRPC是应用级别的服务注册,Dubbo2.0及之前的版本都是基于更细力度的服务来进行注册,Dubbo3.0之后转向应用级别的服务注册。
四.编程语言
gRPC可以使用任何语言(HTTP和ProtoBuf天然就是跨语言的),而Dubbo只能使用在构建在JVM之上的语言。
五.服务治理
gRPC自身的服务治理能力很弱,只能基于HTTP连接维度进行容错,而Dubbo可以基于服务维度进行治理。
总结:gRPC的优势在于跨语言、跨平台,但服务治理能力弱。Dubbo服务治理能力强,但受编程语言限制无法跨语言使用。
详细介绍后端技术栈的基础内容,包括但不限于:MySQL原理和优化、Redis原理和应用、JVM和G1原理和优化、RocketMQ原理应用及源码、Kafka原理应用及源码、ElasticSearch原理应用及源码、JUC源码、Netty源码、zk源码、Dubbo源码、Spring源码、Spring Boot源码、SCA源码、分布式锁源码、分布式事务、分库分表和TiDB、大型商品系统、大型订单系统等