【JAVA】如何基于Netty实现简单的RPC 框架
如何基于Netty实现简单的RPC 框架

1. 项目模块与依赖

common 模块依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>myRPC</artifactId>
<groupId>com.sgg</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>common</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<!--netty依赖 -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<!--json依赖 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.80</version>
</dependency>
<!--lombok依赖 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
</dependency>
</dependencies>
</project>rpc-client模块依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>myRPC</artifactId>
<groupId>com.sgg</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rpc-client</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.sgg</groupId>
<artifactId>common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>rpc-server 模块依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>myRPC</artifactId>
<groupId>com.sgg</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rpc-server</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.sgg</groupId>
<artifactId>common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!--spring相关依赖 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
</dependencies>
</project>myRPC
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sgg</groupId>
<artifactId>myRPC</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>common</module>
<module>rpc-client</module>
<module>rpc-server</module>
</modules>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.4.RELEASE</version>
</parent>
</project>2. common 通用模块

2.1 RpcRequest
package com.sgg.pojo; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /** * @author sz * @DATE 2022/5/6 21:54 */ @Data @AllArgsConstructor @NoArgsConstructor public class RpcRequest { /** * 全限定类名 */ private String className; /** * 方法名 */ private String methodName; /** * 参数类型 */ private Class<?>[] parameterTypes; /** * 实参 */ private Object[] paramters; }
2.2 RpcResponse
package com.sgg.pojo; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /** * @author sz * @DATE 2022/5/6 21:54 */ @Data @AllArgsConstructor @NoArgsConstructor public class RpcResponse { //返回状态码 private Integer code; //返回结果 private String result; //错误信息 private String error; }
2.3 User
package com.sgg.pojo; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /** * @author sz * @DATE 2022/5/6 21:55 */ @Data @AllArgsConstructor @NoArgsConstructor public class User { private Integer id; private String name; }
2.4 UserService
package com.sgg.service;
import com.sgg.pojo.User;
public interface UserService {
User getUserById(Integer id);
}
3. rpc-server 服务端模块

3.1 MyServiceRpc
package com.sgg.anno;
import java.lang.annotation.*;
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MyServiceRpc {
}
3.2 MyServerHandler
package com.sgg.handler; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.sgg.anno.MyServiceRpc; import com.sgg.pojo.RpcRequest; import com.sgg.pojo.RpcResponse; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import org.springframework.beans.BeansException; import org.springframework.cglib.reflect.FastClass; import org.springframework.cglib.reflect.FastMethod; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; import java.lang.reflect.InvocationTargetException; import java.util.*; /** * @author sz * @DATE 2022/5/6 22:16 */ @Component public class MyServerHandler extends SimpleChannelInboundHandler<String> implements ApplicationContextAware { private static ApplicationContext app; private static HashMap<String, Object> cache = new HashMap<>(); @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { app = applicationContext; //拿到容器中所有标注了@MyServiceRpc 注解的 bean Map<String, Object> beansWithAnnotation = app.getBeansWithAnnotation(MyServiceRpc.class); //拿到bean实现的接口的全限定类名 Set<Map.Entry<String, Object>> entries = beansWithAnnotation.entrySet(); entries.stream().forEach(ent->{ Class<?>[] interfaces = ent.getValue().getClass().getInterfaces(); if (null!=interfaces && interfaces.length != 0){ Arrays.stream(interfaces).forEach(inter->{ cache.put(inter.getName(),ent.getValue()); }); } }); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("客户端连接 : "+ctx.channel().remoteAddress().toString().substring(1)); } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String json) throws Exception { //封装结果 RpcResponse rpcResponse = new RpcResponse(); Object result = null; try { //将json字符串转换为RpcRequest 对象 RpcRequest rpcRequest = JSONObject.parseObject(json, RpcRequest.class); //拿到需要调用的类 String className = rpcRequest.getClassName(); Object bean = cache.get(className); //需要调用的方法名 String methodName = rpcRequest.getMethodName(); //方法参数类型 Class<?>[] parameterTypes = rpcRequest.getParameterTypes(); //方法实参 Object[] paramters = rpcRequest.getParamters(); //反射调用方法 FastClass fastClass = FastClass.create(bean.getClass()); FastMethod fastClassMethod = fastClass.getMethod(methodName, parameterTypes); result = fastClassMethod.invoke(bean, paramters); rpcResponse.setCode(200); rpcResponse.setResult((String) result); } catch (Exception e) { e.printStackTrace(); rpcResponse.setCode(400); rpcResponse.setError(e.getMessage()); } //将结果用json字符串写回去 channelHandlerContext.writeAndFlush(JSON.toJSONString(rpcResponse)); } }
3.3 ServerProvider
package com.sgg.provider;
import com.sgg.handler.MyServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ServerChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import org.springframework.stereotype.Service;
import java.io.Closeable;
import java.io.IOException;
import java.net.ServerSocket;
/**
* @author sz
* @DATE 2022/5/6 22:07
*/
@Service
public class ServerProvider implements Closeable {
private NioEventLoopGroup boss ;
private NioEventLoopGroup work ;
public void start(String ip,Integer port) {
//创建两个线程组
boss = new NioEventLoopGroup(1);
//默认线程数 = CPU数 * 2
work = new NioEventLoopGroup();
//创建启动组手
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, work)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//解析字符串
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new StringEncoder());
pipeline.addLast(new StringDecoder());
//内容处理
pipeline.addLast(new MyServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(ip, port).sync();
System.out.println(">>>>>>>服务器启动成功<<<<<<<<");
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
if (null!=boss){
boss.shutdownGracefully();
}
if (null!=boss){
work.shutdownGracefully();
}
}
}
@Override
public void close() throws IOException {
System.out.println("容器关闭我被调用了");
if (null!=boss){
boss.shutdownGracefully();
}
if (null!=boss){
work.shutdownGracefully();
}
}
}
3.4 UserServiceImpl
package com.sgg.service.impl;
import com.sgg.anno.MyServiceRpc;
import com.sgg.pojo.User;
import com.sgg.service.UserService;
import org.springframework.stereotype.Service;
import java.util.HashMap;
/**
* @author sz
* @DATE 2022/5/6 22:18
*/
@MyServiceRpc
@Service
public class UserServiceImpl implements UserService {
private static HashMap<Integer,User> map = new HashMap();
static {
map.put(1,new User(1,"张三"));
map.put(2,new User(2,"李四"));
}
@Override
public User getUserById(Integer id) {
return map.get(id);
}
}
3.5 ServerApp
package com.sgg;
import com.sgg.provider.ServerProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author sz
* @DATE 2022/5/6 22:04
*/
@SpringBootApplication
public class ServerApp implements CommandLineRunner {
@Autowired
private ServerProvider serverProvider;
public static void main(String[] args) {
SpringApplication.run(ServerApp.class,args);
}
@Override
public void run(String... args) throws Exception {
new Thread(()->{
serverProvider.start("127.0.0.1",9999);
}).start();
}
}
4. rpc-client 客户端模块

4.1 RpcClient
package com.sgg.client; import com.sgg.handler.MyClientHandler; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * @author sz * @DATE 2022/5/6 22:54 */ @Data @AllArgsConstructor @NoArgsConstructor public class RpcClient { private String ip; private Integer port; public RpcClient(String ip, Integer port) { this.ip = ip; this.port = port; init(); } private NioEventLoopGroup eventLoopGroup; private Channel channel; private MyClientHandler myClientHandler = new MyClientHandler(); private ExecutorService executorService = Executors.newCachedThreadPool(); public Object sendMess(String message) throws ExecutionException, InterruptedException { myClientHandler.setRequestMsg(message); Future submit = executorService.submit(myClientHandler); return submit.get(); } public void init() { //创建线程组 eventLoopGroup = new NioEventLoopGroup(); //创建启动组手 Bootstrap bootstrap = new Bootstrap(); //分组 try { bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new StringEncoder()); pipeline.addLast(new StringDecoder()); //业务 pipeline.addLast(myClientHandler); } }); channel = bootstrap.connect(ip, port).sync().channel(); } catch (Exception e) { e.printStackTrace(); if (null != channel) { channel.close(); } if (null != eventLoopGroup) { eventLoopGroup.shutdownGracefully(); } } } public void close() { if (null != channel) { channel.close(); } if (null != eventLoopGroup) { eventLoopGroup.shutdownGracefully(); } } }
4.2 MyClientHandler
package com.sgg.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.concurrent.Callable;
/**
* @author sz
* @DATE 2022/5/6 23:04
*/
public class MyClientHandler extends SimpleChannelInboundHandler<String> implements Callable {
private String requestMsg;
private String responseMsg;
private ChannelHandlerContext context;
public void setRequestMsg(String str) {
this.requestMsg = str;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.context = ctx;
}
@Override
protected synchronized void channelRead0(ChannelHandlerContext channelHandlerContext, String str) throws Exception {
this.responseMsg = str;
//唤醒
notify();
}
@Override
public synchronized Object call() throws Exception {
this.context.writeAndFlush(requestMsg);
//线程等待 拿到响应数据
wait();
return responseMsg;
}
}
4.3 RpcProxy
package com.sgg.proxy;
import com.alibaba.fastjson.JSON;
import com.sgg.client.RpcClient;
import com.sgg.pojo.RpcRequest;
import com.sgg.pojo.RpcResponse;
import com.sgg.pojo.User;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
/**
* @author sz
* @DATE 2022/5/6 22:46
*/
public class RpcProxy {
public static Object createProxy(Class target) {
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class[]{target},
(Object proxy, Method method, Object[] args) -> {
RpcRequest rpcRequest = new RpcRequest();
//设置类名
rpcRequest.setClassName(method.getDeclaringClass().getName());
//设置方法名
rpcRequest.setMethodName(method.getName());
//设置方法参数类型
rpcRequest.setParameterTypes(method.getParameterTypes());
//设置方法实际参数
rpcRequest.setParamters(args);
//发送信息,拿到返回值
RpcClient rpcClient = new RpcClient("127.0.0.1", 9999);
String mess = (String) rpcClient.sendMess(JSON.toJSONString(rpcRequest));
//转换为rpcResponse
RpcResponse rpcResponse = JSON.parseObject(mess, RpcResponse.class);
//拿到返回结果
if (200==rpcResponse.getCode()){
return JSON.parseObject(rpcResponse.getResult(), User.class);
}
return null;
}
);
}
}
4.4 ClientApp
package com.sgg;
import com.sgg.pojo.User;
import com.sgg.proxy.RpcProxy;
import com.sgg.service.UserService;
/**
* @author sz
* @DATE 2022/5/6 22:44
*/
public class ClientApp {
public static void main(String[] args) {
UserService proxy = (UserService) RpcProxy.createProxy(UserService.class);
User userById = proxy.getUserById(2);
System.out.println("userById = " + userById);
}
}
#Java开发##Java#