RPC项目知识点

23秋招写了三个项目:一个RPC、一个springboot、一个算法,至于面试官会不会问,私企会问一些,国企像银行那种一般不会过问RPC项目,自己整理了一些知识点,大家有需要的可以看看,有点乱,大家见谅

都是些理论知识用于学习,面经大家可以搜索别的帖子,这个项目要是面试官深究就会很难,所以没有弄清楚,不要写上去

PS:感谢几位牛友的资料https://www.nowcoder.com/discuss/353159058410643456?sourceSSR=users

https://www.nowcoder.com/discuss/353157479641063424?sourceSSR=users

简介

做这个项目的原因

两个不同的服务器上的服务提供的方法不在一个内存空间,需要网络编程才能传递方法调用所需要的参数,方法调用的结果也需要通过网络编程来接收。如果手动网络编程来实现这个调用过程的话工作量大,因为需要考虑底层传输方式(TCP 还是UDP)、序列化方式等等方面。RPC可以帮助我们调用远程计算机上某个服务的方法,这个过程就像调用本地方法一样简单。

原理

①服务端启动的时候先扫描,将服务名称及其对应的地址(ip+port)注册到注册中心,这样客户端才能根据服务名称找到对应的服务地址。

②客户端去注册中心找服务地址,有了服务地址之后,客户端就可以通过网络请求服务端了。

②客户端调用远程方法的时候,实际会通过创建代理对象来传输网络请求。

④客户端生成request对象(类名、方法名以及相关参数)作为消息体,拼接消息头,然后通过Netty传输过去。

⑤当客户端发起请求的时候,多台服务器都可以处理这个请求。通过负载均衡选一台服务器。

⑥服务器收到客户端传过来的信息后进行解码,看看客户端需要要干什么,然后反射调用方法得到result,把result封装成rpcMessage,再传回去。

⑦客户端收到rpcMessage,解码,拿到自己想要的结果;

背景

Dubbo、Motan、gRPC

Dubbo

微服务框架,为大规模微服务实践提供高性能 RPC 通信、流量治理、可观测性等解决方案, 涵盖 Java、Golang 等多种语言 SDK 实现。提供了从服务定义、服务发现、服务通信到流量管控等几乎所有的服务治理能力,支持 Triple 协议、应用级服务发现、Dubbo Mesh等特性。

Motan

新浪微博开源的一款 RPC 框架,据说在新浪微博正支撑着千亿次调用。不过笔者倒是很少看到有公司使用,而且网上的资料也比较少。

gRPC

Google 开源的一个高性能、通用的开源 RPC 框架。其由主要面向移动应用开发并基于 HTTP/2 协议标准而设计,基于 ProtoBuf 序列化协议开发,并且支持众多开发语言。

Thrift

Facebook 开源的跨语言的 RPC 通信框架,给 Apache ,由于其跨语言特性和出色的性能,在很多互联网公司得到应用。

对比

gRPC 和 Thrift 虽然支持跨语言的 RPC 调用,只提供了最基本的 RPC 框架功能,缺乏一系列配套的服务化组件和服务治理功能的支撑。

Dubbo 功能完善程度、生态系统还是社区活跃。Dubbo在国内有很多成功的案例比如当当网、滴滴等等,是经得起生产考验的成熟稳定的 RPC 框架。

服务端 暴露->扫描->注册

定义了两个注解:RcpService注册服务,RpcReference消费服务。生命周期都为 runtime,也就是程序运行期能够读取的注解。

RcpService注册服务,用于暴露服务

服务端的HelloServiceImpl实现类上面,将标有@RpcService的注解的bean进行缓存spring包里面实现扫描。

@RpcService(group = "test1", version = "version1")
public class HelloServiceImpl implements HelloService{}

这个注解有两个属性:

version :服务版本,主要是为后续不兼容升级提供可能

group : 服务所在的组。主要用于处理一个接口有多个类实现的情况。

RpcService这个注解还使用了 Spring 提供的 @Component 注解。这样的话,使用 RpcService注解的类就会交由 Spring 管理(前提是被被@ComponentScan注解扫描到 )。

RpcScan自定义包扫描,来扫描自定义的注解

public @interface RpcScan {
    String[] basePackage();
}

两个启动类上面:@RpcScan(basePackage = {"github.javaguide"})

放在启动类上(main 方法所在的类),标识服务的扫描的包的范围,找到标记有 RpcService 的类,并注册。

扫描

【提供扫描的类】

CustomScanner类继承ClassPathBeanDefinitionScanner类;

使用父类的scan方法,来进行扫描,传入的参数就是包的类名(github.javaguide);

可以看到会先得到一个int,也就是已经存在的BeanDefinition,这个在AnnotatedBeanDefinitionReader中将讲过,也就是spring内置的几个bd;

然后通过doScan方法正式进行扫描;

返回的是通过扫描得到的类数;

【扫描】

registerBeanDefinitions类;

扫描被RpcService和Component注释的类;

RpcReference注册服务

注入属性 将代理对象注入 自动装配服务实现类

在客户端的HelloController类里面,

@RpcReference(version = "version1", group = "test1")
private HelloService helloService;

HelloService 是接口,HelloServiceImpl是该接口的实现类。

CustomScannerRegistrar去扫描被@RpcService和Component注解的bean;

向zookeeper注册服务

在创建bean之前先查看类是否被注解,在SpringBeanPostProcessor这个类中,实现 BeanPostProcessor接口并且重写了里面的postProcessBeforeInitializtion方法和postProcessAfterInitialization方法。

【postProcessBeforeInitialization(Object bean, String beanName)】

Spring Bean 在实例化之前会调用 BeanPostProcessor 接口的 postProcessBefore Initialization()方法。在方法中获取RpcService注解,发布服务。

①入参的bean就是github.javaguide.serviceimpl.HelloServiceImpl;

②通过bean反射调用得到RpcService对象;

③创建RpcServiceConfig对象,把RpcService和bean的信息传进去,version,group,服务名;

④接口serviceProvider.publishService(rpcServiceConfig)发布服务;

【postProcessAfterInitialization(Object bean, String beanName)】

遍历类的属性上否有RpcReference注解,如果有的话,就通过反射将这个属性赋值即可。

①查看bean的字段中有没有RpcReference注解;

②获取代理对象RpcClientProxy;

③通过反射注入属性;

④返回bean;

服务发布

【ServiceProvider接口】

addService本地添加;

getService;

publishService连接zookeeper,实现真正的注册;

【ZkServiceProviderImpl】serviceProvider的实现类

变量:

Map<String, Object> serviceMap:代理对象名,方法;

Set<String> registeredService:代理对象名;

(zk包里面的)ServiceRegistry serviceRegistry:接口,用来连接zk;

方法:

addService(在本地记录一些信息);

getService(去map里面找);

publishService(连接zk,实现真正的注册)

publishService(RpcServiceConfig rpcServiceConfig)

调用serviceRegistry.registerService方法,传入方法名和地址;

进入zk包下面的ZkServiceRegistryImpl实现类,连接zk创建结点;

传输实体

RpcRequest implements Serializable

调用对象是要进行序列化的,所以实现序列化接口

serialVersionUID = 1905122041950251207L;序列化ID,为了保证序列化前和反序列化后的数据是一致的

private String requestId; //UUID.randomUUID().toString()

private String interfaceName;

private String methodName;

private Object[] parameters;

private Class<?>[] paramTypes;

private String version; 为后续不兼容情况的升级提供记录信息

private String group;处理一个接口有多个实现类

RpcResponse<T> implements Serializable

serialVersionUID = 715745410605631233L;//自定义序列化ID

private String requestId; //为了对应请求,这里也需要附带上请求的id

private Integer code;状态码 200 500

private String message; “成功”“失败”

private T data;消息体

在invoke方法里面会检查,rpcResponse是否为空,requestId是否相同,状态码。

RpcMessage

private byte messageType;

private byte codec;

private byte compress;

private int requestId; 使用ATOMIC_INTEGER生成

private Object data;

data里面还有一个requestId,使用UUID生成。

步骤:RpcRequest或RpcResponse -> RpcMessage -> 编码器

注册中心

--zookeeper理论知识--

为什么用zookeeper做注册中心?

作为注册中心而言,配置是不经常变动的,只有当新版本发布或者服务器出故障时会变动。CP 不合适于配置经常变动的,而 AP 在遇到问题时可以牺牲其一致性来保证系统服务的高可用性,既返回旧数据。

只是一个demo,环境稳定,流量小,不会遇到注册中心的实例(节点)半数以上都挂了的情况。所以在实际生产环境中,选择 Zookeeper 还是选择 Eureka ,这个就要取决于系统架构师对于业务环境的权衡了。

对比zookeeper和Eureka

CP,AP;

强一致性,高可用;

3个角色,平等;

选取leader时不可用,不保证强一致性。

数据节点

zk数据模型中的最小数据单元,数据模型是一棵树,由斜杠(/)分割的路径名唯一标识,数据节点可以存储数据内容及一系列属性信息,同时还可以挂载子节点,构成一个层次化的命名空间。

会话(Session)

zk客户端与zk服务器之间的一个TCP长连接,通过这个长连接,客户端能够使用心跳检测与服务器保持有效的会话,也能向服务器发送请求并接收响应,还可接收服务器的Watcher事件通知。

Session的sessionTimeout,是会话超时时间,如果这段时间内,客户端未与服务器发生任何沟通(心跳或请求),服务器端会清除该session数据,客户端的TCP长连接将不可用,这种情况下,客户端需要重新实例化一个Zookeeper对象。

事务 ZXID

事务是指能够改变Zookeeper服务器状态的操作,一般包括数据节点的创建与删除、数据节点内容更新和客户端会话创建与失效等操作。对于每个事务请求,zk都会为其分配一个全局唯一的事务ID,即ZXID,是一个64位的数字,高32位表示该事务发生的集群选举周期(集群每发生一次leader选举,值加1),低32位表示该事务在当前选择周期内的递增次序(leader每处理一个事务请求,值加1,发生一次leader选择,低32位要清0)。

事务日志

所有事务操作都是需要记录到日志文件中的,可通过 dataLogDir配置文件目录,文件是以写入的第一条事务zxid为后缀,方便后续的定位查找。zk会采取“磁盘空间预分配”的策略,来避免磁盘Seek频率,提升zk服务器对事务请求的影响能力。默认设置下,每次事务日志写入操作都会实时刷入磁盘,也可以设置成非实时(写到内存文件流,定时批量写入磁盘),但那样断电时会带来丢失数据的风险。

数据快照

数据快照是zk数据存储中另一个非常核心的运行机制。数据快照用来记录zk服务器上某一时刻的全量内存数据内容,并将其写入到指定的磁盘文件中,可通过dataDir配置文件目录。可配置参数snapCount,设置两次快照之间的事务操作个数,zk节点记录完事务日志时,会统计判断是否需要做数据快照(距离上次快照,事务操作次数等于snapCount/2~snapCount 中的某个值时,会触发快照生成操作,随机值是为了避免所有节点同时生成快照,导致集群影响缓慢)。

过半

所谓“过半”是指大于集群机器数量的一半,即大于或等于(n/2+1),此处的“集群机器数量”不包括observer角色节点。leader广播一个事务消息后,当收到半数以上的ack信息时,就认为集群中所有节点都收到了消息,然后leader就不需要再等待剩余节点的ack,直接广播commit消息,提交事务。选举中的投票提议及数据同步时,也是如此,leader不需要等到所有learner节点的反馈,只要收到过半的反馈就可进行下一步操作。

zookeeper服务节点挂掉之后,怎么删除它?

使用临时节点,会话失效,节点自动清除。

zookeeper集群节点宕机了怎么发现剔除的?

发现:watcher机制

剔除:临时节点

zookeeper心跳检测更新列表并利用watcher机制发给客户端

分布式协议[1]

ZAB是paxos的改版,Mysql是paxos、redis sentinel是raft、zookeeper是ZAB。

心跳机制

服务提供者定时向注册中心发送本机地址(心跳数据包),而注册中心的监控则维持一个channelId和具体地址的map,并且通过IdleHandler监听空闲事件,到达一定的空闲次数则认为不活跃,当不活跃时(这里的不活跃条件是5分钟内3次以上没有发送心跳包),zookeeper删除相应的url节点,但后续的逻辑没有继续做。

删掉结点后,通知客户端。

注册中心对于服务端掉线时怎么处理

移出ip链表,发送给客户端,等待服务器上线,重新连接;

集群

多装几个,修改配置文件文件;

因为是在一台机器上模拟集群,所以端口不能重复,这里用2181~2183,2287~2289,以及3387~3389相互错开;

在每个zk server配置文件的dataDir所对应的目录下,必须创建一个名为myid的文件,其中的内容必须与zoo.cfg中server.x 中的x相同;

【启动】ZooKeeper zk = new ZooKeeper(" 172.28.20.102:2181, 172.28.20.102:2182, 172.28.20.102:2183", 300000, new DemoWatcher());

数据模型

zk维护的数据主要有:客户端的会话(session)状态及数据节点(dataNode)信息。

--zookeeper实际使用--

注册过程

【Curator】

①创建CuratorFramework对象zkClient,zkClient:地址127.0.0.1:2181,重试策略,start连接zookeeper;

②先去本地路径map里面看看有没有创建过;

③创建结点,根结点:完整的服务名;子节点:IP+port;

④创建完了把路径添加到map中去

服务调用

①参数为RpcRequest对象,从中取出服务名,创建对象zkClient,连接zookeeper;

②获取结点下面的子节点,存到本地map中去,返回string的list;

③在本地Map<String, List<String>>中找;

④没找到,连接zookeeper,获取子节点路径列表,在map中保存;

⑤负载均衡策略选出一个子节点服务地址进行连接;

监听器

Zookeeper会通过心跳检测机制,来判断服务提供端的运行状态,来决定是否应该把这个服务从地址列表剔除。

监听器的作用在于监听某一节点,若该节点的子节点发生变化,比如增加减少,更新操作的时候,我们可以自定义回调函数。

一旦这个节点下的子节点发生变化,Zookeeper Server就会发送一个事件通知客户端。

客户端收到事件以后,就会把本地缓存的这个服务地址删除,这样后续就不会把请求发送到失败的节点上,完成服务下线感知。

为什么不选择Redis作为注册中心?

zookeeper临时节点自动宕机自动清除;

zookeeper服务容灾?zookeeper服务节点挂掉之后,怎么删除它?

容灾:在集群若干台故障后,整个集群仍然可以对外提供可用的服务。

​ 一般配置奇数台去构成集群,以避免资源的浪费。

​ 三机房部署是最常见的、容灾性最好的部署方案。

删除:使用临时节点,会话失效,节点自动清除。

zookeeper的问题

崩溃恢复无法提供服务、写的性能瓶颈是一个问题、选举过程速度缓慢、无法进行有效的权限控制;

负载均衡

【从现有服务地址list中选择一个,参数:List<String> serviceAddresses, RpcRequest rpcRequest】

背景

系统中的某个服务的访问量大,将这个服务部署在了多台服务器上,当客户端发起请求的时候,多台服务器都可以处理这个请求。如何正确选择处理该请求的服务器就很关键。负载均衡为了避免单个服务器响应同一请求,容易造成服务器宕机、崩溃等问题。

随机int

一致性哈希

【原理】hash空间组成一个虚拟的圆环,将各个服务器使用 Hash 函数进行哈希,可以选择服务器的IP或主机名作为关键字进行哈希,从而确定每台机器在哈希环上的位置。将数据key使用相同的函数Hash计算出哈希值,并确定此数据在环上的位置,从此位置沿环顺时针寻找,第一台遇到的服务器就是其应该定位到的服务器。

【优点】对于节点的增减都只需重定位环空间中的一小部分数据,只有部分缓存会失效,不至于将所有压力都在同一时间集中到后端服务器上,具有较好的容错性和可扩展性。

【结构】一个服务名对应一棵树,ConcurrentHashMap<String,ConsistentHashSelec

tor(TreeMap)>,服务名和TreeMap的对应。

【插入】ConsistentHashSelector中有TreeMap<Long, String>(<哈希值,地址>)。传入一个装着地址的list,对每一个地址生成160个虚拟结点,求MD5,再求hash,最后存放在树中。

【查询】入参为rpcRequest,拿到服务名,去Map里面找对应的selector;如果为空,则新建;如果不为空,对服务名求MD5,拿到hashcode。每个hashcode对应一棵树。

【TreeMap】<哈希值Long,地址String>,到 TreeMap 中查找第一个节点值大于或等于当前 hash 的 String。找到了就返回ip + port,没找到就返回第一个。

dubbo的负载均衡算法

①RandomLoadBalance:根据权重随机选择(对加权随机算法的实现);

②LeastActiveLoadBalance:最小活跃数负载均衡。初始状态下所有服务提供者的活跃数均为 0(每个服务提供者的中特定方法都对应一个活跃数),每收到一个请求后,对应的服务提供者的活跃数 +1,当这个请求处理完之后,活跃数 -1。

Dubbo 就认为谁的活跃数越少,谁的处理速度就越快,性能也越好,这样的话,我就优先把请求给活跃数少的服务提供者处理;

③ConsistentHashLoadBalance:一致性哈希;

④RoundRobinLoadBalance:加权轮询负载均衡,加权轮询就是在轮询的基础上,让更多的请求落到权重更大的服务提供者上。

序列化协议

不使用JDK自带

①不支持跨语言调用;

②性能差:相比于其他序列化框架性能更低,主要原因是序列化之后的字节数组体积较大,导致传输成本加大。

为什么选Kryo?

Kryo 是专门针对 Java 语言序列化方式并且性能非常好,并且 Dubbo 官网的一篇文章中提到说推荐使用 Kryo 作为生产环境的序列化方式。

序列化协议对比[2]

Kryo、Hessian、Protobuf

Kryo特点

Kryo由于其变长存储特性并使用了字节码生成机制,拥有较高的运行速度和较小的字节码体积。

因为 Kryo 不是线程安全的。使用 ThreadLocal 来存储 Kryo 对象,一个线程一个 Kryo 实例。

I / O

【I/O】linux系统内核read()、write()函数,read把内核缓冲区中的数据复制到用户缓冲区中,write把用户缓冲区的数据写入到内核缓冲区中。

【阻塞和非阻塞】进程在访问数据的时候,根据IO操作的就绪状态来采取的不同方式,是一种读取或者写入操作函数的实现方式。用户进程发起read操作,阻塞则会一直等待内存缓冲区数据完整后再解除阻塞;非阻塞是指IO操作被调用后立即返回给用户一个状态值,无需等到IO操作彻底完成。

【同步/异步】是用户线程与内核的交互方式。①同步是指用户线程发起IO请求后需要等待或者轮询内核IO操作完成后才能继续执行;②异步是指用户线程发起IO请求后仍继续执行,当内核IO操作完成后会通知用户线程,或者调用用户线程注册的回调函数。

非阻塞IO怎么判断数据是否准备好

轮询

BIO NIO AIO

【BIO】服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销。

应用程序发起 read 调用后,会一直阻塞,直到内核把数据拷贝到用户空间。

【NIO】应用程序会一直发起 read 调用,等待数据从内核空间拷贝到用户空间的这段时间里,线程依然是阻塞的,直到在内核把数据拷贝到用户空间。

【IO多路复用(异步阻塞)】多路网络连接可以复用一个I/O线程。应用程序不断进行 I/O 系统调用轮询数据是否已经准备好是十分消耗 CPU 资源的。IO 多路复用模型中,线程首先发起 select 调用,询问内核数据是否准备就绪,阻塞等待select系统调用返回。等内核把数据准备好了返回一个ready,用户线程再发起 read 调用。read 调用的过程(数据从内核空间 -> 用户空间)还是阻塞的。

【AIO异步非阻塞】基于事件和回调机制实现的,发起IO操作之后会直接返回,不会堵塞在那里,当后台处理完成,操作系统会通知相应的线程进行后续的操作,Proactor。

适用场景

BIO方式适用于连接数目比较小,长请求且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,下载一个大文件;

NIO的适用场景:高并发数目多,比较轻,高访问量,短请求,聊天服务器,弹幕系统,服务器间通讯;

AIO方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用OS参与并发操作,编程比较复杂。

多路复用

IO多路复用的本质就是通过系统内核缓冲IO数据,让单个线程可以监视多个文件描述符(FD),一旦某个描述符读就绪或者写就绪,可以通知程序进行相应的读写操作,也就是使用单个线程同时处理多个网络连接IO,它的原理就是select、poll、epoll不断轮询所负责的socket,当某个socket有数据达到了,就通知用户进程。

select

过程是阻塞的。仅知道有几个I/O事件发生了,但并不知道具体是哪几个socket连接有I/O事件,还需要轮询去查找,时间复杂度为O(n),处理的请求数越多,所消耗的时间越长。

【select函数执行流程】①从用户空间拷贝fd_set(注册的事件集合)到内核空间;②遍历所有fd文件,并将当前进程挂到每个fd的等待队列中,当某个fd文件设备收到消息后,会唤醒设备等待队列上睡眠的进程,那么当前进程就会被唤醒;③如果遍历完所有的fd没有I/O事件,则当前进程进入睡眠,当有某个fd文件有I/O事件或当前进程睡眠超时后,当前进程重新唤醒再次遍历所有fd文件。

【缺点】①单个进程所打开的FD是有限制的,通过 FD_SETSIZE 设置,默认1024;②每次调用 select,都需要把 fd 集合从用户态拷贝到内核态,这个开销在 fd 很多时会很大。

poll

poll本质上和select没有区别,它将用户传入的数组拷贝到内核空间,然后查询每个fd对应的设备状态, 但是它没有最大连接数的限制,原因是它是基于链表来存储的。

epoll epoll_create() , epoll_ctl() , epoll_wait()

事件驱动机制,修改主动轮询为被动通知,当有事件发生时,被动接收通知。所以epoll模型注册套接字后,主程序可做其他事情,当事件发生时,接收到通知后再去处理。epoll会把哪个流发生哪种I/O事件通知我们。epoll是事件驱动,即每个事件关联上fd,每当fd就绪,系统注册的回调函数就会被调用,将就绪的fd放到readyList里面,是基于红黑树实现的。

【流程】①通过epoll_create() 函数创建一个文件,返回一个文件描述符(Linus系统一切对象皆为文件)fd ② 创建socket接口号4,绑定socket号与端口号,监听事件,标记为非阻塞。通过epoll_ctl() 函数将该socket号 以及 需要监听的事件(如listen事件)写入fd中。③循环调用epoll_wait() 函数进行监听,返回已经就绪事件序列的长度(返回0则说明无状态,大于0则说明有n个事件已就绪)。例如如果有客户端进行连接,则,再调用accept()函数与4号socket进行连接,连接后返回一个新的socket号,且需要监听读事件,则再通过epoll_ctl()将新的socket号以及对应的事件(如read读事件)写入fd中,epoll_wait()进行监听。循环往复。

【优点】不需要再遍历所有的socket号来获取每一个socket的状态,只需要管理活跃的连接。即监听在通过epoll_create()创建的文件中注册的socket号以及对应的事件。只有产生就绪事件,才会处理,所以操作都是有效的,为O(1)。

epoll的水平触发和边缘触发

LT(水平触发):当被监控的fd上有IO事件发生时,epoll_wait()会通知处理程序去读写。如果这次没有把数据一次性全部读写完(如读写缓冲区太小),那么下次调用 epoll_wait()时,它还会通知你在上没读写完的文件描述符上继续读写,当然如果你一直不去读写,它会一直通知你!

ET(边缘触发):当被监控的fd上有可读写事件发生时,epoll_wait()会通知处理程序去读写。如果这次没有把数据全部读写完(如读写缓冲区太小),那么下次调用epoll_wait()时,它不会通知你,也就是它只会通知你一次,直到该文件描述符上出现第二次可读写事件才会通知你。这种模式比水平触发效率高,系统不会充斥大量你不关心的就绪文件描述符。

NIO

NIO组件

一个线程对应一个selector,一个selector对应多个channel(连接),每个channel 都会对应一个Buffer;

buffer:可以读写数据的内存块;

channel:用于数据的读写;

selector:

NIO的缺点

①NIO的类库和API繁杂,使用麻烦,需要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。

②工作量和难度都非常大。

③JDK NIO的BUG,例如臭名昭著的epoll bug,它会导致Selector空轮询,最终导致CPU 100%。官方声称在JDK 1.6版本修复了该问题,但是直到JDK 1.7版本该问题仍旧存在,只不过该BUG发生概率降低了一些而已,它并没有得到根本性解决。

Reactor线程模型

Reactor模式基于事件驱动,分为单Reactor单线程模型、单Reactor多线程模型、主从Reactor多线程模型。

单reactor单线程

【工作原理】

服务器端用一个线程通过多路复用搞定所有的I0操作(建立连接,读、写等)。但是如果客户端连接数量较多,将无法支撑,NIO案例就属于这种模型。

【特点】

模型简单,没有多线程、进程通信、竞争的问题,全部都在一个线程中完成。只有一个线程,无法完全发挥多核CPU的性能。Handler 在处理某个连接上的业务时,整个进程无法处理其他连接事件,很容易导致性能瓶颈。

单reactor多线程

【原理】

一个线程负责管理连接,一组线程负责处理IO操作。

【特点】

充分的利用多核cpu 的处理能力,多线程数据共享和访问比较复杂, reactor 处理所有的事件的监听和响应,在单线程运行,在高并发场景容易出现性能瓶颈。

主从 Reactor 多线程

①主线程 MainReactor对象通过select 监听连接事件, 收到事件后,通过Acceptor 处理连接事件;

②当 Acceptor 处理连接事件后,MainReactor 将连接分配给 SubReactor;

③SubReactor将连接加入到连接队列进行监听,并创建handler进行各种事件处理;

④当有新事件发生时,subreactor 就会调用对应的handler处理;

⑤handler 通过read 读取数据,分发给后面的worker 线程处理;

⑥worker 线程池分配独立的worker 线程进行业务处理,并返回结果;

⑦handler 收到响应的结果后,再通过send 将结果返回给client;

Reactor 和 Proactor区别

①Reactor模式用于同步I/O,而Proactor运用于异步I/O操作。

②主要区别就是真正的读取和写入操作是有谁来完成的,Reactor中需要应用程序自己读取或者写入数据,而Proactor模式中,应用程序不需要进行实际的读写过程,它只需要从缓存区读取或者写入即可,操作系统会读取缓存区或者写入缓存区到真正的IO设备。

Netty基础理论知识

对Netty的认识

①基于NIO的client-server框架,使用它可以快速简单地开发网络应用程序。

②简化了TCP和UDP套接字服务器等网络编程,并且性能以及安全性等很多方面甚至都要更好。

③支持多种协议如FTP,SMTP,HTTP 以及各种二进制和基于文本的传统协议。

Netty优点

①高并发:Netty 是一款基于 NIO开发的网络通信框架,对比于 BIO,并发性能得到了很大提高,修复了已经发现 NIO BUG。

②传输快:传输依赖于零拷贝特性,尽量减少不必要的内存拷贝,实现了更高效率的传输。

③封装好:Netty 封装了 NIO 操作的很多细节,提供了易于使用调用接口。

④API使用简单,开发门槛低;功能强大,预置了多种编解码功能,支持多种主流协议;

零拷贝[3]

①Netty 的接收和发送 ByteBuffer 采用 DIRECT BUFFERS,使用堆外直接内存进行 Socket 读写,不需要进行字节缓冲区的二次拷贝。如果使用传统的堆内存(HEAP BUFFERS)进行 Socket 读写,JVM 会将堆内存 Buffer 拷贝一份到直接内存中,然后才写入 Socket 中。相比于堆外直接内存,消息在发送过程中多了一次缓冲区的内存拷贝。

②Netty 提供了组合 Buffer 对象,可以聚合多个 ByteBuffer 对象,用户可以像操作一个 Buffer 那样方便的对组合 Buffer 进行操作,避免了传统通过内存拷贝的方式将几个小 Buffer 合并成一个大的 Buffer。

③Netty 的文件传输采用了 transferTo 方法,它可以直接将文件缓冲区的数据发送到目标 Channel,避免了传统通过循环 write 方式导致的内存拷贝问题。

Netty 高性能

①IO线程模型:同步非阻塞,用最少的资源做更多的事。

②内存零拷贝:尽量减少不必要的内存拷贝,实现了更高效率的传输。

③内存池设计:申请的内存可以重用,主要指直接内存。内部实现是用一颗二叉查找树管理内存分配情况。

④可靠性,链路有效性检测:链路空闲检测机制,读/写空闲超时机制;

⑤内存保护机制:通过内存池重用ByteBuf;ByteBuf的解码保护;优雅停机:不再接收新消息、退出前的预处理操作、资源的释放操作。

⑥串行无锁化设计:消息的处理尽可能在同一个线程内完成,期间不进行线程切换,这样就避免了多线程竞争和同步锁。

⑦高性能序列化协议:支持 protobuf 等高性能序列化协议。

⑧安全性:SSL V2和V3,TLS,SSL单向认证、双向认证和第三方CA认证。

⑨TCP参数配置:SO_RCVBUF和SO_SNDBUF:通常建议值为128K或者256K;SO_TCPNODELAY:NAGLE算法通过将缓冲区内的小封包自动相连,组成较大的封包,阻止大量小封包的发送阻塞网络,从而提高网络应用效率。但是对于时延敏感的应用场景需要关闭该优化算法

Netty 和 Tomcat 的区别

作用不同:Tomcat 是 Servlet 容器,可以视为 Web 服务器,而 Netty 是异步事件驱动的网络应用程序框架和工具用于简化网络编程,例如TCP和UDP套接字服务器。

协议不同:Tomcat 是基于 http 协议的 Web 服务器,而 Netty 能通过编程自定义各种协议,因为 Netty 本身自己能编码/解码字节流,所有 Netty 可以实现,HTTP 服务器、FTP 服务器、UDP 服务器、RPC 服务器、WebSocket 服务器、Redis 的 Proxy 服务器、MySQL 的 Proxy 服务器等等。

Netty 和 Socket的区别

【socket】

Socket编程主要涉及到客户端和服务端两个方面,首先是在服务器端创建一个服务器套接字(ServerSocket),并把它附加到一个端口上,服务器从这个端口监听连接。

客户端请求与服务器进行连接的时候,根据服务器的域名或者IP地址,加上端口号,打开一个套接字。当服务器接受连接后,服务器和客户端之间的通信就像输入输出流一样进行操作。

【socket缺点】

①需对传输的数据进行解析,转化成应用级的数据;

②对开发人员的开发水平要求高;

③相对于Http协议传输,增加了开发量;

Netty 发送消息方式

Netty 有两种发送消息的方式:

①直接写入 Channel 中,消息从 ChannelPipeline 当中尾部开始移动;

②(用的这个)写入和 ChannelHandler 绑定的 ChannelHandlerContext 中,消息从 ChannelPipeline 中的下一个 ChannelHandler 中移动。

Netty线程模型

Netty线程模型

Netty主要基于主从Reactors多线程模型做了一定的改进,其中主从Reactor多线程模型有多个Reactor。

①Netty抽象出两组线程池,Boss负责客户端的连接,Worker专门负责网络的读写,都是NioEventLoopGroup;

②NioEventLoopGroup一个事件循环组,一个组中含有多个事件循环,每一个事件循环是NioEventLoop;

③NioEventLoop 表示不断循环的执行处理任务的线程,每个NioEventLoop都有一个 selector,用于监听绑定在其上的socket的网络通讯。还有一个TaskQueue;

④Boss里面的NioEventLoop循环

轮询accept事件;

处理accept事件,与client建立连接,生成 NioScocketChannel,并将其注册到某个workerNioEventLoop上的selector;

处理任务队列的任务,即runAllTasks;

⑤Worker里面的NioEventLoop循环

轮询read/write事件;

处理i/o事件,在NioScocket;

Channel处理;处理任务队列的任务,即runAllTasks;

⑥每个WorkerNioEventLoop处理业务时,会使用pipeline,pipeline 中包含了channel,即通过pipeline可以获取到对应通道,管道中维护了很多的处理器;

Netty异步模型

①异步和同步的区别是:当一个异步过程开始时,调用者是不能立刻得到结果的。是调用完成之后通过回调来通知调用者的。

②Netty的IO操作是异步的,Bind、 Write、 Connect 等操作会返回一个Channel Future。

③调用者虽然不能直接获得结果,但是可以通过Future-Listener机制监听,让调用者获得结果。

④Netty模型是建立在future - callback 机制的,callback 就是回调。Future的核心思想是,假设一个方法fun计算过程非常耗时,等待fun 返回显然不合适。那么可以在调用fun 的时候,立马返回一个Future, 后续可以通过Future 去监控方法fun 的处理过程(即: Future-Listener 机制)。

处理方法

①客户端:通过ChannelFuture 接口的addListener() 方法注册一个ChannelFuture Listener ,当操作执行成功或者失败时,监听就会自动触发返回结果。

②服务器:可以通过ChannelFuture接口的sync() 方法让异步的操作编程同步的。

Netty核心组件

一个线程- > 一个EventLoop - > 多个channel - > 一个pepiline - > 很多handler

Bytebuf (字节容器)

通过字节流进行传输的。

Channel (网络读写操作抽象类)

通过Channel可以进行I/O操作,客户端成功连接服务端,就会新建一个Channel 同该用户端进行绑定,为每个Channel分配一个EventLoop。

Bootstrap启动类

设置线程组,设置通道NioSocketChannel,连接超时时间,handler(pipeline、心跳IdleStateHandler、编码器、解码器、处理器),connect某个端口(连接TCP),添加监听器,连接成功获得channel,放到CompletableFuture中。

Bootstrap 在调用 bind()(连接UDP)和 connect()(连接TCP)方法时,会新创建一个 Channel,来实现所有的网络交换。

ServerBootstrap服务端启动类

设置2个线程组,设置通道NioServerSocketChannel,设置保持活动连接状态,childHandler(pipeline、心跳IdleStateHandler、编码器、解码器、处理器),启动服务器(并绑定bind端口),异步改为同步,生成了一个 ChannelFuture 对象。

ServerBootstarp在调用 bind()方法时会创建一个 ServerChannel 来接受来自客户端的连接,并且该 ServerChannel 管理了多个子 Channel 用于同客户端之间的通信。

EventLoopGroup,NioEventLoopGroup

EventLoop 的主要作用实际就是责监听网络事件并调用事件处理器进行相关I/0操作(读写)的处理,处理连接的生命周期中所发生的事件。客户端一个线程组,服务器两个,boss接收客户端的连接,worker专门负责网络的读写。

Channel-Pepiline-handler

1个Channel 包含1个ChannelPipeline。1个ChannelPipeline上可以有多个ChannelHandler。ChannelHandler是消息的具体处理器,主要负责处理客户端/服务端接收和发送的数据。

ChannelFuture

Netty中所有的I/O操作都为异步的,不能立刻得到操作结果。法一(客户端):通过ChannelFuture 接口的addListener() 方法注册一个ChannelFutureListener ,当操作执行成功或者失败时,监听就会自动触发返回结果。法二(服务器):可以通过ChannelFuture接口的sync() 方法让异步的操作编程同步的。

客户端channel中收到Response后,会自动先进入pipeline中add的Decoder()解码(从bytebuf到Object),解码后的object数据再进入handler强转成Response类型后续处理

Netty实际应用

客户端

客户端NettyRpcClient继承RpcRequestTransport

【NettyRpcClient()】

①配置一个启动类,一个线程组;

②配置channel为NioSocketChannel;

③配置handler,对应一个pipeline;

④配置pipeline,心跳机制(0,5,0),编码器,解码器,处理器。(如果 5 秒内没有数据发送到服务器,则发送心跳请求)

【doConnect(InetSocketAddress):Channel】用于连接服务端并返回对应的Channel,使用completableFuture接收。知道了服务端的地址之后,可以通过NettyClient 成功连接服务端了,(有了Channel 之后就能发送数据到服务端了)。

(异步变同步)通过ChannelFuture 接口的addListener() 方法注册一个Channel FutureListener ,当操作执行成功或者失败时,监听就会自动触发返回结果。

【sendRpcRequest(rpcRequest):CompletableFuture】发RpcRequest到服务端

①创建返回值CompletableFuture;

②根据rpcRequest获得服务地址,已经负载均衡了;

③调用getChannel,获取服务器地址相关通道;

④新建新建rpcMessage,传入rpcRequest,指定序列化、压缩方式;

⑤channel.writeAndFlush(rpcMessage),channel数据冲刷,发动数据到服务器;

【getChannel(inetSocketAddress):Channel】连接复用

根据地址去channelProvider的map里面找,Map<String, Channel> channelMap,找到了就复用,没找到就连接服务器返回新的channel,并且保存channel。

客户端NettyClientHandler继承ChannelInboundHandlerAdapter

【NettyClientHandler()】单例生成未被处理的请求UnprocessedRequests,单例生成NettyRpcClient。

【channelRead(ChannelHandlerContext ctx, Object msg)】读取服务器发送的消息。

①ctx:上下文对象, 含有管道pipeline , 通道channel, 地址;

②Object msg: 就是服务器发送的数据;

③判断信息的类型,如果是心跳就简单收下。如果是response,就去UnprocessedRequests的map里面记录这个request已经被处理了。

【userEventTriggered(ChannelHandlerContext ctx, Object evt)】心跳机制。判断信息是不是心跳机制,是就返回ping,刷新到channel中去。

【exceptionCaught(ChannelHandlerContext ctx, Throwable cause)】处理客户端消息发生异常时调用,关闭ctx。

客户端UnprocessedRequests

Map<String, CompletableFuture<RpcResponse<Object>>> 键:requestId 值:feature

用于存放未被服务端处理的请求(建议限制map容器大小,避免未处理请求过多OOM),处理完了就complete。

【put(String requestId, CompletableFuture<RpcResponse<Object>> future)】客户端发送的时候,把还没有处理的请求存进来。

【complete(RpcResponse<Object> rpcResponse)】客户端收到之后,把其从map中删掉。

客户端ChannelProvider 连接复用

Map<String, Channel> 键:服务端实例地址和序列化算法的字符串表示,值: Channel 对象

当客户端请求到一个服务端地址时,会到 map 里面查询是否已经建立过连接,如果建立过连接则查看状态是否存活,存活则直接使用该 Channel 对象进行传输。没活着就从map里面删掉。连接未建立则建立连接后再将连接放入。

服务器

服务器NettyRpcServer

【端口号】

【注册中心registerService】

【registerService(RpcServiceConfig rpcServiceConfig)】调用ServiceProvider.publ ishService(rpcServiceConfig)完成服务注册;

【start()】

①设置端口号,得到主机地址;

②配置一个启动类,两个线程组;

③配置channel为NioSocketChannel;

④TCP开启Nagle算法,尽可能的发送大数据快,减少网络传输。设置保持活动连接状态。

⑤配置handler,对应一个pipeline;

配置pipeline,心跳机制(30,0,0),编码器,解码器,处理器。(30 秒之内没有收到客户端请求的话就关闭连接);

⑥绑定端口并且同步生成了一个 ChannelFuture 对象,sync异步变同步。

服务器NettyRpcServerHandler继承ChannelInboundHandlerAdapter

【作用】

将标有@RpcService的注解的bean进行缓存spring包里面实现扫描,标在了HelloServiceImpl上面。@RpcService(group = "test1", version = "version1")。

接收客户端的请求。

根据传递进来的beanName从缓存中查找。

通过反射调用bean中的方法 handler包中的RpcRequestHandler类。

给客户端响应。

【channelRead(ChannelHandlerContext ctx, Object msg)】即发送又接收

ctx:上下文对象, 含有管道pipeline , 通道channel, 地址;

Object msg: 就是服务器发送的数据;

①将信息msg转为RpcMessage类型;

②判断信息的类型,如果是心跳就简单返回pong。

③不是就把msg转成RpcRequest;

④Object result = rpcRequestHandler.handle(rpcRequest)反射调用,拿到结果。

⑤ctx.writeAndFlush(rpcMessage).addListener将结果写入到缓存,并刷新。

RpcRequestHandler

【handle】

①去注册中心找,得到实现类Object;

②反射调用invokeTargetMethod;

【invokeTargetMethod】

传入request和实现类,反射调用拿到result;

异步改为同步

Netty中所有的IO操作都是异步的,不能立刻得知消息是否被正确处理。但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过Future 和ChanellFutures, 他们可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件。

自定义协议

步骤:RpcRequest或RpcResponse -> RpcMessage -> 编码器

通过设计协议,我们定义需要传输哪些类型的数据,并且还会规定每一种类型的数据应该占多少字节。这样我们在接收到二级制数据之后,就可以正确的解析出我们需要的数据。

编码器 RpcMessage变为ByteBuf

encode(ChannelHandlerContext ctx, RpcMessage a, ByteBuf out)

ctx:上下文对象, 含有管道pipeline , 通道channel, 地址

【自定义编码器 负责网络传输出去的消息,将消息格式转换为字节数组,然后写入到字节数据的容器ByteBuf对象中。】

【信息头】(魔数4:筛选来到服务端的数据包,识别出这个数据包并非是遵循自定义协议)(版本1:version)(信息长度4:总长度,头有16B+body)(信息类型1:心跳还是普通)(序列化类型1)(压缩类型1:GZIP)(requestid4:ATOMIC_INTEGER递增)

【信息体】byte[]

如果是心跳,写body为ping或者是pong;

不是心跳,序列化,压缩,求长度,记录长度。

解码器 ByteBuf变为RpcMessage

decode(ChannelHandlerContext ctx, ByteBuf in)

【背景】TCP 是面向连接的,面向流的,提供高可靠性服务。发送端为了将多个发给接收端的包,更有效的发给对方,使用了优化方法(Nagle 算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包。这样做虽然提高了效率,但是接收端就难于分辨出完整的数据包了,因为面向流的通信是无消息保护边界的。

【长度】继承LengthFieldBasedFrameDecoder,它们处理粘拆包的主要思想是在生成的数据包中添加一个长度字段,用于记录当前数据包的长度。

--lengthFieldOffset字段长度偏移量:magic code 是 4B,version 是 1B,所以值为 5;

--lengthFieldLength字段长度中的字节数:full length为4B,所以值为 4;

--lengthAdjustment添加到长度字段值的补偿值,负数:full length包括所有数据并读取前9个字节,所以左边的长度是(fullLength-9)。 所以值是-9;

--initialBytesToStrip跳过的字节数:我们将手动检查魔数和版本,所以不要剥离任何字节,所以值为 0;

【过程】

调用父类的解码方法,传入(ctx,in),得到ByteBuf对象in;

检查魔数和版本号;

拿到长度,信息类型,序列化号,压缩号,requestId;

新建RpcMessage对象,设置序列化号,压缩号,requestId;

判断信息类型,如果是心跳的ping,设置RpcMessage的data为pong,返回;

判断信息类型,如果是心跳的pong,设置RpcMessage的data为ping,返回;

都不是,用减法求出body的长度bodyLength;

根据bodyLength生成byte[],提取in对应的长度;

解压,反序列化,变成RpcRequest或者RpcResponse;

设置RpcMessage的data为RpcRequest或者RpcResponse;

序列化

现在项目里的编码器是通过SerializationType的getName方法获得序列化方式,然后通过扩展类加载器得到序列化类的全限定类名加载类文件,创建序列化对象,这个时候的对象就是KryoSerializer类的对象,然后serializer调用serialize就用的是kryo序列化。

未完待续...

#你的秋招进展怎么样了#
全部评论
要是有人看就接着更了
8 回复
分享
发布于 2023-07-12 22:32 四川
接着更吧,大佬。
1 回复
分享
发布于 2023-07-12 23:01 山东
滴滴
校招火热招聘中
官网直投
面试官会不会问一些zk的八股
1 回复
分享
发布于 2023-07-13 00:17 北京
催更!
1 回复
分享
发布于 2023-07-13 13:17 四川
继续更下去吧
点赞 回复
分享
发布于 2023-07-13 00:11 上海
点赞 回复
分享
发布于 2023-07-13 00:29 四川
求更新!
点赞 回复
分享
发布于 2023-07-13 00:44 四川
m
点赞 回复
分享
发布于 2023-07-14 00:20 甘肃
请问简历上rpc项目应该怎么写呢,求参考
点赞 回复
分享
发布于 2023-07-14 10:00 上海
很强
点赞 回复
分享
发布于 2023-07-15 01:07 浙江
两个我也是rpc
点赞 回复
分享
发布于 2023-07-15 08:35 上海
哈人
点赞 回复
分享
发布于 2023-07-15 10:32 陕西
rpc啊,还得看我这篇实战:https://www.nowcoder.com/discuss/353158960331038720
点赞 回复
分享
发布于 2023-07-15 11:15 北京
牛逼 感谢
点赞 回复
分享
发布于 2023-07-15 15:32 天津
支持!
点赞 回复
分享
发布于 2023-09-07 21:27 四川
m
点赞 回复
分享
发布于 03-20 17:45 山东

相关推荐

国电投广西核电 维修 23w 本科985
点赞 评论 收藏
转发
115 810 评论
分享
牛客网
牛客企业服务