Netty在RPC项目中的实战应用
为什么使用Netty?
1. 客户端和服务器需要远程通信,Netty是一个基于NIO的客户端服务器框架,可以简单快速地开发网络应用程序
2. 简化了TCP和UDP套接字服务器,性能以及安全性更好
3. 支持多种协议FTP,SMTP,HTTP 以及各种二进制和基于文本的传统协议
代码解读
首先定义两个对象,这两个对象是客户端与服务端进行交互的实体类。 客户端将 RpcRequest 类型的对象发送到服务端,服务端进行相应的处理之后将得到结果 RpcResponse 对象返回给客户端。
1. RpcRequest : 客户端请求实体类
@AllArgsConstructor @Getter @NoArgsConstructor @Builder @ToString public class RpcRequest { private String interfaceName; private String methodName; }
2. RpcResponse: 服务端响应类
@AllArgsConstructor @Getter @NoArgsConstructor @Builder @ToString public class RpcResponse { private String message; }
3. 客户端
3.1 初始化客户端
客户端中主要有一个用于向服务端发送消息的 sendMessage()方法,通过这个方法你可以将消息也就是RpcRequest 对象发送到服务端,并且你可以同步获取到服务端返回的结果也就是RpcResponse 对象。
public class NettyClient { private static final Logger logger = LoggerFactory.getLogger(NettyClient.class); private final String host; private final int port; private static final Bootstrap b; public NettyClient(String host, int port) { this.host = host; this.port = port; } // 初始化相关资源比如 EventLoopGroup, Bootstrap static { EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); b = new Bootstrap(); KryoSerializer kryoSerializer = new KryoSerializer(); b.group(eventLoopGroup) .channel(NioSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) // 连接的超时时间,超过这个时间还是建立不上的话则代表连接失败 // 如果 15 秒之内没有发送数据给服务端的话,就发送一次心跳请求 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { /* 自定义序列化编解码器 */ // RpcResponse -> ByteBuf ch.pipeline().addLast(new NettyKryoDecoder(kryoSerializer, RpcResponse.class)); // ByteBuf -> RpcRequest ch.pipeline().addLast(new NettyKryoEncoder(kryoSerializer, RpcRequest.class)); ch.pipeline().addLast(new NettyClientHandler()); } }); } /** * 发送消息到服务端 * * @param rpcRequest 消息体 * @return 服务端返回的数据 */ public RpcResponse sendMessage(RpcRequest rpcRequest) { try { ChannelFuture f = b.connect(host, port).sync(); logger.info("client connect {}", host + ":" + port); Channel futureChannel = f.channel(); logger.info("send message"); if (futureChannel != null) { futureChannel.writeAndFlush(rpcRequest).addListener(future -> { if (future.isSuccess()) { logger.info("client send message: [{}]", rpcRequest.toString()); } else { logger.error("Send failed:", future.cause()); } }); futureChannel.closeFuture().sync(); AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse"); return futureChannel.attr(key).get(); } } catch (InterruptedException e) { logger.error("occur exception when connect server:", e); } return null; }sendMessage()方法分析:
1. 首先初始化了一个 Bootstrap
2. 通过 Bootstrap 对象连接服务端
3. 通过 Channel 向服务端发送消息RpcRequest
4. 发送成功后,阻塞等待 ,直到Channel关闭
5. 拿到服务端返回的结果 RpcResponse
3.2 自定义 ChannelHandler 处理服务端消息
public class NettyClientHandler extends ChannelInboundHandlerAdapter { private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { try { RpcResponse rpcResponse = (RpcResponse) msg; logger.info("client receive msg: [{}]", rpcResponse.toString()); // 声明一个 AttributeKey 对象 AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse"); // 将服务端的返回结果保存到 AttributeMap 上,AttributeMap 可以看作是一个Channel的共享数据源 // AttributeMap的key是AttributeKey,value是Attribute ctx.channel().attr(key).set(rpcResponse); ctx.channel().close(); } finally { ReferenceCountUtil.release(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { logger.error("client caught exception", cause); ctx.close(); } }NettyClientHandler用于读取服务端发送过来的 RpcResponse 消息对象,并将 RpcResponse 消息对象保存到 AttributeMap 上,AttributeMap 可以看作是一个Channel的共享数据源。这样的话,我们就能通过 channel 和 key 将数据读取出来。
4. 服务端
4.1 初始化服务端
NettyServer 主要作用就是开启了一个服务端用于接受客户端的请求并处理。
public class NettyServer { private static final Logger logger = LoggerFactory.getLogger(NettyServer.class); private final int port; private NettyServer(int port) { this.port = port; } private void run() { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); KryoSerializer kryoSerializer = new KryoSerializer(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // TCP默认开启了 Nagle 算法,该算法的作用是尽可能的发送大数据快,减少网络传输。TCP_NODELAY 参数的作用就是控制是否启用 Nagle 算法。 .childOption(ChannelOption.TCP_NODELAY, true) // 是否开启 TCP 底层心跳机制 .childOption(ChannelOption.SO_KEEPALIVE, true) //表示系统用于临时存放已完成三次握手的请求的队列的最大长度,如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数 .option(ChannelOption.SO_BACKLOG, 128) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(new NettyKryoDecoder(kryoSerializer, RpcRequest.class)); ch.pipeline().addLast(new NettyKryoEncoder(kryoSerializer, RpcResponse.class)); ch.pipeline().addLast(new NettyServerHandler()); } }); // 绑定端口,同步等待绑定成功 ChannelFuture f = b.bind(port).sync(); // 等待服务端监听端口关闭 f.channel().closeFuture().sync(); } catch (InterruptedException e) { logger.error("occur exception when start server:", e); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
4.2 自定义ChannelHandler处理客户端消息
NettyServerHandler 用于处理客户端发送过来的消息并返回结果给客户端。
public class NettyServerHandler extends ChannelInboundHandlerAdapter { private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class); private static final AtomicInteger atomicInteger = new AtomicInteger(1); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { try { RpcRequest rpcRequest = (RpcRequest) msg; logger.info("server receive msg: [{}] ,times:[{}]", rpcRequest, atomicInteger.getAndIncrement()); RpcResponse messageFromServer = RpcResponse.builder().message("message from server").build(); ChannelFuture f = ctx.writeAndFlush(messageFromServer); f.addListener(ChannelFutureListener.CLOSE); } finally { ReferenceCountUtil.release(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.error("server catch exception", cause); ctx.close(); } }
5. 编码器
5.1 自定义编码器
NettyKryoEncoder 是我们自定义的编码器。它负责处理"出站"消息,将消息格式转换为字节数组然后写入到字节数据的容器 ByteBuf 对象中。
/** * 自定义编码器。负责处理"出站"消息,将消息格式转换为字节数组然后写入到字节数据的容器 ByteBuf 对象中。 * <p> * 网络传输需要通过字节流来实现,ByteBuf 可以看作是 Netty 提供的字节数据的容器,使用它会让我们更加方便地处理字节数据。 * * @author Yuan * @createTime 2020年05月25日 19:43:00 */ @AllArgsConstructor public class NettyKryoEncoder extends MessageToByteEncoder<Object> { private final Serializer serializer; private final Class<?> genericClass; /** * 将对象转换为字节码然后写入到 ByteBuf 对象中 */ @Override protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) { if (genericClass.isInstance(o)) { // 1. 将对象转换为byte byte[] body = serializer.serialize(o); // 2. 读取消息的长度 int dataLength = body.length; // 3.写入消息对应的字节数组长度,writerIndex 加 4 byteBuf.writeInt(dataLength); //4.将字节数组写入 ByteBuf 对象中 byteBuf.writeBytes(body); } } }5.2 自定义解码器
/** * 自定义解码器。负责处理"入站"消息,将消息格式转换为我们需要的业务对象 * * @author Yuan * @createTime 2020年05月25日 19:42:00 */ @AllArgsConstructor @Slf4j public class NettyKryoDecoder extends ByteToMessageDecoder { private final Serializer serializer; private final Class<?> genericClass; /** * Netty传输的消息长度也就是对象序列化后对应的字节数组的大小,存储在 ByteBuf 头部 */ private static final int BODY_LENGTH = 4; /** * 解码 ByteBuf 对象 * * @param ctx 解码器关联的 ChannelHandlerContext 对象 * @param in "入站"数据,也就是 ByteBuf 对象 * @param out 解码之后的数据对象需要添加到 out 对象里面 */ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { //1.byteBuf中写入的消息长度所占的字节数已经是4了,所以 byteBuf 的可读字节必须大于 4, if (in.readableBytes() >= BODY_LENGTH) { //2.标记当前readIndex的位置,以便后面重置readIndex 的时候使用 in.markReaderIndex(); //3.读取消息的长度 //注意: 消息长度是encode的时候我们自己写入的,参见 NettyKryoEncoder 的encode方法 int dataLength = in.readInt(); //4.遇到不合理的情况直接 return if (dataLength < 0 || in.readableBytes() < 0) { log.error("data length&nbs***bsp;byteBuf readableBytes is not valid"); return; } //5.如果可读字节数小于消息长度的话,说明是不完整的消息,重置readIndex if (in.readableBytes() < dataLength) { in.resetReaderIndex(); return; } // 6.走到这里说明没什么问题了,可以序列化了 byte[] body = new byte[dataLength]; in.readBytes(body); // 将bytes数组转换为我们需要的对象 Object obj = serializer.deserialize(body, genericClass); out.add(obj); log.info("successful decode ByteBuf to Object"); } } }
5.3 自定义序列化接口
public interface Serializer { /** * 序列化 * * @param obj 要序列化的对象 * @return 字节数组 */ byte[] serialize(Object obj); /** * 反序列化 * * @param bytes 序列化后的字节数组 * @param clazz 类 * @param <T> * @return 反序列化的对象 */ <T> T deserialize(byte[] bytes, Class<T> clazz); }
5.4 实现序列化接口
public class KryoSerializer implements Serializer { /** * 由于 Kryo 不是线程安全的。每个线程都应该有自己的 Kryo,Input 和 Output 实例。 * 所以,使用 ThreadLocal 存放 Kryo 对象 */ private static final ThreadLocal<Kryo> kryoThreadLocal = ThreadLocal.withInitial(() -> { Kryo kryo = new Kryo(); kryo.register(RpcResponse.class); kryo.register(RpcRequest.class); kryo.setReferences(true);//默认值为true,是否关闭注册行为,关闭之后可能存在序列化问题,一般推荐设置为 true kryo.setRegistrationRequired(false);//默认值为false,是否关闭循环引用,可以提高性能,但是一般不推荐设置为 true return kryo; }); @Override public byte[] serialize(Object obj) { try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); Output output = new Output(byteArrayOutputStream)) { Kryo kryo = kryoThreadLocal.get(); // Object->byte:将对象序列化为byte数组 kryo.writeObject(output, obj); kryoThreadLocal.remove(); return output.toBytes(); } catch (Exception e) { throw new SerializeException("序列化失败"); } } @Override public <T> T deserialize(byte[] bytes, Class<T> clazz) { try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes); Input input = new Input(byteArrayInputStream)) { Kryo kryo = kryoThreadLocal.get(); // byte->Object:从byte数组中反序列化出对对象 Object o = kryo.readObject(input, clazz); kryoThreadLocal.remove(); return clazz.cast(o); } catch (Exception e) { throw new SerializeException("反序列化失败"); } } }