Disruptor—4.与Netty的简单应用

大纲

1.服务端代码最佳实践

2.客户端代码最佳实践

3.Netty的高性能核心问题分析

4.基于Disruptor异步化处理Netty的长链接业务

5.Disruptor核心池化封装实现

6.实现接入百万长链接

1.服务端代码最佳实践

(1)TCP握手原理

(2)Netty服务端代码

(1)TCP握手原理

服务端处理客户端的TCP连接请求时,系统底层会采用两个队列,这两个队列分别是SYNC队列和ACCEPT队列。这两个队列的处理分别对应创建Netty服务端时的两个EventLoopGroup,其中bossNioEventLoopGroup用来处理SYNC队列,workNioEventLoopGroup用来处理ACCEPT队列。

决定服务端可以接收百万级长链接的基础是:服务端的ulimit链接句柄数和backlog链接队列大小,其中backlog链接队列大小 = SYNC队列大小 + ACCEPT队列大小。

(2)Netty服务端代码

public class NettyServer {
    public NettyServer() {
        //1.创建两个工作线程组: 一个用于接受网络请求的线程组. 另一个用于实际处理业务的线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();
  
        //2.辅助类
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        try {
            serverBootstrap.group(bossGroup, workGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                //表示缓存区动态调配(自适应)
                .option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)
                //缓存区池化操作
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                //设置使用日志
                .handler(new LoggingHandler(LogLevel.INFO))
                //设置workGroup的异步回调
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel sc) throws Exception {
                        sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                        sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                        sc.pipeline().addLast(new ServerHandler());
                    }
                });
            //同步绑定端口
            ChannelFuture cf = serverBootstrap.bind(8765).sync();
            System.err.println("Server Startup...");
            //同步等待请求连接
            cf.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //优雅停机
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
            System.err.println("Sever ShutDown...");
        }
    }
}

public class ServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        TranslatorData request = (TranslatorData)msg;
        System.err.println("Sever端: id= " + request.getId() + ", name= " + request.getName() + ", message= " + request.getMessage());
  
        //数据库持久化操作 IO读写 ---> 交给一个线程池,去异步的调用执行
        TranslatorData response = new TranslatorData();
        response.setId("resp: " + request.getId());
        response.setName("resp: " + request.getName());
        response.setMessage("resp: " + request.getMessage());
  
        //发送response响应信息
        ctx.writeAndFlush(response);
    }
}

2.客户端代码最佳实践

public class NettyClient {
    public static final String HOST = "127.0.0.1";
    public static final int PORT = 8765;
    //扩展完善池化: ConcurrentHashMap<KEY -> String, Value -> Channel>
    private Channel channel;
    //1.创建工作线程组: 用于实际处理业务的线程组
    private EventLoopGroup workGroup = new NioEventLoopGroup();
    private ChannelFuture cf;

    public NettyClient() {
        this.connect(HOST, PORT);
    }

    private void connect(String host, int port) {
        //2.辅助类(注意Client和Server不一样)
        Bootstrap bootstrap = new Bootstrap();
        try {
            bootstrap.group(workGroup)
                .channel(NioSocketChannel.class)
                //表示缓存区动态调配(自适应)
                .option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)
                //缓存区池化操作
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                //设置日志处理器
                .handler(new LoggingHandler(LogLevel.INFO))
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel sc) throws Exception {
                        sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                        sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                        sc.pipeline().addLast(new ClientHandler());
                    }
                });
            //同步绑定端口
            this.cf = bootstrap.connect(host, port).sync();
            System.err.println("Client connected...");
  
            //接下来就进行数据的发送, 但是首需要获取channel:
            this.channel = cf.channel();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void sendData() {
        for (int i = 0; i < 10; i++) {
            TranslatorData request = new TranslatorData();
            request.setId("" + i);
            request.setName("请求消息名称 " + i);
            request.setMessage("请求消息内容 " + i);
            this.channel.writeAndFlush(request);
        }
    }

    public void close() throws Exception {
        cf.channel().closeFuture().sync();
        //优雅停机
        workGroup.shutdownGracefully();
        System.err.println("Sever ShutDown...");
    }
}

public class ClientHandler extends ChannelInboundHandlerAdapter {
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
       try {
           TranslatorData response = (TranslatorData)msg;
           System.err.println("Client端: id= " + response.getId() + ", name= " + response.getName() + ", message= " + response.getMessage());
       } finally {
           //一定要注意用完了缓存后要进行释放
           ReferenceCountUtil.release(msg);
       }
    }
}

3.Netty的高性能核心问题分析

如果ServerHandler的业务很复杂,耗时很长,那么就会影响Netty性能。所以在使用Netty进行接收处理数据时,不要在workGroup中处理业务。此时可利用异步机制,使用线程池异步处理来提升ServerHandler性能。如果使用线程池,那么又意味着需要使用阻塞队列。因此为消除线程池的阻塞队列影响性能,可使用Disruptor替换线程池。

4.基于Disruptor异步化处理Netty的长链接业务

基于Netty + Disruptor构建高性能Netty的核心架构图如下所示:

5.Disruptor核心池化封装实现

基于多生产者和多消费者模型,为了避免在ClientHandler和ServerHandler中不断创建生产者和消费者,可以将生产者对象和消费者对象通过池化的方式进行管理。

public class RingBufferWorkerPoolFactory {
    private static Map<String, MessageProducer> producers = new ConcurrentHashMap<String, MessageProducer>();
    private static Map<String, MessageConsumer> consumers = new ConcurrentHashMap<String, MessageConsumer>();
    private RingBuffer<TranslatorDataWapper> ringBuffer;
    private SequenceBarrier sequenceBarrier;
    private WorkerPool<TranslatorDataWapper> workerPool;

    private RingBufferWorkerPoolFactory() {
    
    }

    public static RingBufferWorkerPoolFactory getInstance() {
        return SingletonHolder.instance;
    }

    public void initAndStart(ProducerType type, int bufferSize, WaitStrategy waitStrategy, MessageConsumer[] messageConsumers) {
        //1.构建ringBuffer对象
        this.ringBuffer = RingBuffer.create(
            type,
            new EventFactory<TranslatorDataWapper>() {
                public TranslatorDataWapper newInstance() {
                    return new TranslatorDataWapper();
                }
            },
            bufferSize,
            waitStrategy
        );
        //2.设置序号栅栏
        this.sequenceBarrier = this.ringBuffer.newBarrier();
        //3.设置工作池
        this.workerPool = new WorkerPool<TranslatorDataWapper>(
            this.ringBuffer,
            this.sequenceBarrier,
            new EventExceptionHandler(),
            messageConsumers
        );
        //4.把所构建的消费者置入池中
        for (MessageConsumer mc : messageConsumers) {
            this.consumers.put(mc.getConsumerId(), mc);
        }
        //5.添加我们的sequences
        this.ringBuffer.addGatingSequences(this.workerPool.getWorkerSequences());
        //6.启动我们的工作池
        this.workerPool.start(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() / 2));
    }

    public MessageProducer getMessageProducer(String producerId) {
        MessageProducer messageProducer = this.producers.get(producerId);
        if (null == messageProducer) {
            messageProducer = new MessageProducer(producerId, this.ringBuffer);
            this.producers.put(producerId, messageProducer);
        }
        return messageProducer;
    }

    private static class SingletonHolder {
        static final RingBufferWorkerPoolFactory instance = new RingBufferWorkerPoolFactory();
    }

    //异常静态类
    static class EventExceptionHandler implements ExceptionHandler<TranslatorDataWapper> {
        public void handleEventException(Throwable ex, long sequence, TranslatorDataWapper event) {
        
        }
        
        public void handleOnStartException(Throwable ex) {
        
        }
        
        public void handleOnShutdownException(Throwable ex) {
        
        }
    }
}

public class MessageProducer {
    private String producerId;
    private RingBuffer<TranslatorDataWapper> ringBuffer;
    
    public MessageProducer(String producerId, RingBuffer<TranslatorDataWapper> ringBuffer) {
        this.producerId = producerId;
        this.ringBuffer = ringBuffer;
    }
    
    public void onData(TranslatorData data, ChannelHandlerContext ctx) {
        long sequence = ringBuffer.next();
        try {
            TranslatorDataWapper wapper = ringBuffer.get(sequence);
            wapper.setData(data);
            wapper.setCtx(ctx);
        } finally {
            ringBuffer.publish(sequence);
        }
    }
}

//Netty的Client端和Server端分别具体实现
public abstract class MessageConsumer implements WorkHandler<TranslatorDataWapper> {
    protected String consumerId;
    
    public MessageConsumer(String consumerId) {
        this.consumerId = consumerId;
    }
    
    public String getConsumerId() {
        return consumerId;
    }
    
    public void setConsumerId(String consumerId) {
        this.consumerId = consumerId;
    }
}

6.实现接入百万长链接

(1)服务端处理客户端请求实现支持百万长链接

(2)客户端处理服务端响应实现支持百万长链接

(3)启动Netty服务端

(4)启动Netty客户端

(1)服务端处理客户端请求实现支持百万长链接

public class ServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        TranslatorData request = (TranslatorData) msg;
        //自已的应用服务应该有一个ID生成规则
        String producerId = "code:sessionId:001";
        MessageProducer messageProducer = RingBufferWorkerPoolFactory.getInstance().getMessageProducer(producerId);
        //通过Disruptor的生产者来处理request,同时传入ctx
        messageProducer.onData(request, ctx);
    }
}

//通过Disruptor的消费者来处理request
public class MessageConsumerImpl4Server extends MessageConsumer {
    public MessageConsumerImpl4Server(String consumerId) {
        super(consumerId);
    }
    
    public void onEvent(TranslatorDataWapper event) throws Exception {
        TranslatorData request = event.getData();
        ChannelHandlerContext ctx = event.getCtx();
        //1.对客户端请求的处理逻辑:
        System.err.println("Sever端: id= " + request.getId() + ", name= " + request.getName() + ", message= " + request.getMessage());
  
        //2.发送响应信息
        TranslatorData response = new TranslatorData();
        response.setId("resp: " + request.getId());
        response.setName("resp: " + request.getName());
        response.setMessage("resp: " + request.getMessage());
        //写出response响应信息
        ctx.writeAndFlush(response);
    }
}

(2)客户端处理服务端响应实现支持百万长链接

public class ClientHandler extends ChannelInboundHandlerAdapter {
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //接收服务端的响应
        TranslatorData response = (TranslatorData) msg;
        String producerId = "code:seesionId:002";
        MessageProducer messageProducer = RingBufferWorkerPoolFactory.getInstance().getMessageProducer(producerId);
        messageProducer.onData(response, ctx);
    }
}

public class MessageConsumerImpl4Client extends MessageConsumer {
    public MessageConsumerImpl4Client(String consumerId) {
        super(consumerId);
    }
    
    public void onEvent(TranslatorDataWapper event) throws Exception {
        TranslatorData response = event.getData();
        ChannelHandlerContext ctx = event.getCtx();
        //对服务端返回响应的处理逻辑:
        try {
            System.err.println("Client端: id= " + response.getId() + ", name= " + response.getName() + ", message= " + response.getMessage());
        } finally {
            ReferenceCountUtil.release(response);
        }
    }
}

(3)启动Netty服务端

@SpringBootApplication
public class NettyServerApplication {
    public static void main(String[] args) {
      //1.启动SpringBoot
      SpringApplication.run(NettyServerApplication.class, args);
        
      //2.准备Disruptor的消费者
      MessageConsumer[] consumers = new MessageConsumer[4];
      for (int i = 0; i < consumers.length; i++) {
          MessageConsumer messageConsumer = new MessageConsumerImpl4Server("code:serverId:" + i);
          consumers[i] = messageConsumer;
      }
        
      //3.启动Disruptor,使用多生产者多消费者模型
      RingBufferWorkerPoolFactory.getInstance().initAndStart(
          ProducerType.MULTI,
          1024 * 1024,
          //new YieldingWaitStrategy(),
          new BlockingWaitStrategy(),
          consumers
      );
        
      //4.启动Netty
      new NettyServer();
    }
}

(4)启动Netty客户端

@SpringBootApplication
public class NettyClientApplication {
    public static void main(String[] args) {
        //1.启动SpringBoot
        SpringApplication.run(NettyClientApplication.class, args);
        
        //2.准备Disruptor的消费者
        MessageConsumer[] conusmers = new MessageConsumer[4];
        for (int i = 0; i < conusmers.length; i++) {
            MessageConsumer messageConsumer = new MessageConsumerImpl4Client("code:clientId:" + i);
            conusmers[i] = messageConsumer;
        }
       
        //3.启动Disruptor,使用多生产者多消费者模型
        RingBufferWorkerPoolFactory.getInstance().initAndStart(
            ProducerType.MULTI,
            1024 * 1024,
            //new YieldingWaitStrategy(),
            new BlockingWaitStrategy(),
            conusmers
        );
  
        //4.建立连接 并发送消息
        new NettyClient().sendData();
    }
}

后端技术栈的基础修养 文章被收录于专栏

详细介绍后端技术栈的基础内容,包括但不限于:MySQL原理和优化、Redis原理和应用、JVM和G1原理和优化、RocketMQ原理应用及源码、Kafka原理应用及源码、ElasticSearch原理应用及源码、JUC源码、Netty源码、zk源码、Dubbo源码、Spring源码、Spring Boot源码、SCA源码、分布式锁源码、分布式事务、分库分表和TiDB、大型商品系统、大型订单系统等

全部评论

相关推荐

不愿透露姓名的神秘牛友
05-07 07:24
滴滴 后端 n*15 硕士211
点赞 评论 收藏
分享
评论
1
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务