使用Netty三分钟手写一个RPC

个人技术博客:www.zhenganwen.top

流程概览

项目结构

依赖

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>5.0.0.Alpha1</version>
</dependency>

<!-- 服务提供方根据调用信息反射获取实现类时需要 -->
<dependency>
    <groupId>org.reflections</groupId>
    <artifactId>reflections</artifactId>
    <version>0.9.10</version>
</dependency>

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.16.18</version>
    <scope>provided</scope>
</dependency>
复制代码

通用模块

ClassInfo

实体类,封装了服务调用信息:

package top.zhenganwen.rpc.common;

import lombok.Data;

import java.io.Serializable;

/** * ClassInfo class * 使用JDK的序列化技术必须实现接口Serializable * * @author : zaw * @date : 2019/3/30 */
@Data
public class ClassInfo implements Serializable {

    /** * 调用服务的接口名 */
    private String className;
    /** * 调用服务的方法名 */
    private String methodName;
    /** * 调用方法的参数列表类型 */
    private Class[] paramTypes;
    /** * 调用服务传参 */
    private Object[] params;
}
复制代码

需要注意的是客户端在发送调用信息时会将该类对象序列化并发送给服务端,而服务的则需要反序列化回来,如果使用的是JDK的序列化技术则需要将此类实现Serializable接口

服务接口

为了便于维护,服务接口通常会被独立出来到通用模块中,以jar包的形式被服务调用方和服务提供方依赖。这里简单的写了两个接口,一个包含无參服务,一个包含有参服务。

public interface HasArgsHelloService {
    String hello(String msg);
}

public interface NoArgsHelloService {
    String hello();
}
复制代码

服务调用方

client

这个包中是依赖Service接口的一些类,RPC服务的调用对于他们来说是透明的,他们仅通过client_stub中的ServiceProxy来获取服务实现类并调用服务。

public class RPCClient {

    public static void main(String[] args){
        NoArgsHelloService noArgsHelloService = (NoArgsHelloService) ServiceProxy.create(NoArgsHelloService.class);
        System.out.println(noArgsHelloService.hello());

        HasArgsHelloService hasArgsHelloService = (HasArgsHelloService) ServiceProxy.create(HasArgsHelloService.class);
        System.out.println(hasArgsHelloService.hello("hello netty rpc"));
    }

}
复制代码

client_stub

真正处理RPC调用逻辑的包,ServiceProxy通过JDK代理Proxy.newProxyInstance来代理所有的服务,所有client中调用服务的动作都将被该代理逻辑中设置的InvocationHandler拦截,拦截后获取调用信息(接口名、方法名、方法参列类型、实参列表)并通过Netty与服务端建立连接发送调用信息,然后阻塞等待连接关闭事件(RPCClientHandler在收到服务端返回的调用结果时会保存该结果并关闭连接),若此事件被触发说明RPCClientHandler已拿到调用结果,于是此次InvocationHandler的拦截可以返回了。

  • ServiceProxy
package top.zhenganwen.rpc.client_stub;

import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import top.zhenganwen.rpc.common.ClassInfo;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

/** * ServiceProxy class * * @author : zaw * @date : 2019/3/30 */
public class ServiceProxy {

    public static Object create(Class clazz) {
        return Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

                //构造调用信息
                ClassInfo classInfo = new ClassInfo();
                classInfo.setClassName(clazz.getName());
                classInfo.setMethodName(method.getName());
                classInfo.setParamTypes(method.getParameterTypes());
                classInfo.setParams(args);

                //使用netty发送调用信息给服务提供方
                NioEventLoopGroup group = new NioEventLoopGroup();
                Bootstrap bootstrap = new Bootstrap();
                RPCClientHandler rpcClientHandler = new RPCClientHandler();
                try {
                    bootstrap.group(group)
                        .channel(NioSocketChannel.class)
                        .option(ChannelOption.SO_KEEPALIVE, true)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new ObjectEncoder());
                                //反序列化对象时指定类解析器,null表示使用默认的类加载器
                                ch.pipeline().addLast(new ObjectDecoder(1024 * 64, ClassResolvers.cacheDisabled(null)));
                                ch.pipeline().addLast(rpcClientHandler);

                            }
                        });
                    //connect是异步的,但调用其future的sync则是同步等待连接成功
                    ChannelFuture future = bootstrap.connect("127.0.0.1", 80).sync();
                    //同步等待调用信息发送成功
                    future.channel().writeAndFlush(classInfo).sync();
                    //同步等待RPCClientHandler的channelRead被触发后(意味着收到了调用结果)
                    future.channel().closeFuture().sync();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    group.shutdownGracefully();
                }

                //返回调用结果
                return rpcClientHandler.getRpcResult();
            }
        });
    }

}
复制代码
  • PRCClientHandler
package top.zhenganwen.rpc.client_stub;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

/** * RPCClientHandler class * * @author : zaw * @date : 2019/3/30 */
public class RPCClientHandler extends ChannelHandlerAdapter {

    /** * RPC调用返回的结果 */
    private Object rpcResult;

    public Object getRpcResult() {
        return rpcResult;
    }

    public void setRpcResult(Object rpcResult) {
        this.rpcResult = rpcResult;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        setRpcResult(msg);
        ctx.close();
    }
}
复制代码

服务提供方

server

首先服务提供方有具体的服务实现类,然后它通过RPCServer建立Netty服务端24小时监听客户端的服务调用请求。请求将被RPCServerHandler处理,它根据请求中的调用信息通过反射找到实现类和服务方法并反射调用获取结果,并立即将结果发送给客户端。

  • 服务实现类
public class NoArgsHelloServiceImpl implements NoArgsHelloService {

    @Override
    public String hello() {
        return "hello";
    }
}

public class HasArgsHelloServiceImpl implements HasArgsHelloService {

    @Override
    public String hello(String msg) {
        return msg;
    }
}
复制代码
  • PRCServer
package top.zhenganwen.rpc.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import top.zhenganwen.rpc.server_stub.RPCServerHandler;

/** * RPCServer class * * @author : zaw * @date : 2019/3/30 */
public class RPCServer {

    public static void main(String[] args){
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();
        ServerBootstrap bootstrap = new ServerBootstrap();
        try {
            bootstrap.group(boss, worker)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 128)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new ObjectEncoder());
                        ch.pipeline().addLast(new ObjectDecoder(1024 * 64, ClassResolvers.cacheDisabled(null)));
                        ch.pipeline().addLast(new RPCServerHandler());
                    }
                });
            //bind初始化端口是异步的,但调用sync则会同步阻塞等待端口绑定成功
            ChannelFuture future = bootstrap.bind(80).sync();
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }

}
复制代码

server_stub

真正根据调用请求反射调用的业务处理类

  • RPCServerHandler
package top.zhenganwen.rpc.server_stub;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import org.reflections.Reflections;
import top.zhenganwen.rpc.common.ClassInfo;

import java.lang.reflect.Method;
import java.util.Set;

/** * RPCServerHandler class * * @author : zaw * @date : 2019/3/30 */
public class RPCServerHandler extends ChannelHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //获取调用信息,寻找服务实现类
        ClassInfo classInfo = (ClassInfo) msg;
        String implName = getImplClassName(classInfo.getClassName());
        Class<?> clazz = Class.forName(implName);
        Method method = clazz.getMethod(classInfo.getMethodName(), classInfo.getParamTypes());
        Object result = method.invoke(clazz.newInstance(), classInfo.getParams());
        ctx.writeAndFlush(result);
    }

    private String getImplClassName(String interfaceName) throws ClassNotFoundException {
        Class interClass = Class.forName(interfaceName);
        String servicePath = "top.zhenganwen.rpc.server";
        Reflections reflections = new Reflections(servicePath);
        Set<Class> implClasses = reflections.getSubTypesOf(interClass);
        if (implClasses.isEmpty()) {
            System.err.println("impl class is not found!");
        } else if (implClasses.size() > 1) {
            System.err.println("there are many impl classes, not sure invoke which");
        } else {
            Class[] classes = implClasses.toArray(new Class[1]);
            return classes[0].getName();
        }
        return null;
    }
}
复制代码
全部评论

相关推荐

来个厂收我吧:首先,市场侧求职我不是很懂。 但是,如果hr把这份简历给我,我会觉得求职人不适合做产品经理。 问题点: 1,简历的字体格式不统一,排版不尽如人意 2,重点不突出,建议参考star法则写个人经历 3,印尼官方货币名称为印度尼西亚卢比(IDR),且GMV690000印尼盾换算为305人民币,总成交额不高。 4,右上角的意向职位在发给其他公司时记得删除。 5,你所有的经历都是新媒体运营,但是你要投市场营销岗位,jd和简历不匹配,建议用AI+提示词,参照多个jd改一下经历内容。 修改建议: 1,统一字体(中文:思源黑体或微软雅黑,英文数字:time new romans),在word中通过表格进行排版(b站学) 2,校招个人经历权重:实习经历=创业经历(大创另算)>项目经历>实训经历>校园经历 3,请将项目经历时间顺序改为倒序,最新的放最上方。 4,求职方向不同,简历文字描述侧重点也需要不同。
点赞 评论 收藏
分享
06-13 10:15
门头沟学院 Java
想去夏威夷的大西瓜在...:我也是27届,但是我现在研一下了啥项目都没有呀咋办,哎,简历不知道咋写
点赞 评论 收藏
分享
评论
点赞
1
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务