Disruptor—4.与Netty的简单应用
大纲
1.服务端代码最佳实践
2.客户端代码最佳实践
3.Netty的高性能核心问题分析
4.基于Disruptor异步化处理Netty的长链接业务
5.Disruptor核心池化封装实现
6.实现接入百万长链接
1.服务端代码最佳实践
(1)TCP握手原理
(2)Netty服务端代码
(1)TCP握手原理
服务端处理客户端的TCP连接请求时,系统底层会采用两个队列,这两个队列分别是SYNC队列和ACCEPT队列。这两个队列的处理分别对应创建Netty服务端时的两个EventLoopGroup,其中bossNioEventLoopGroup用来处理SYNC队列,workNioEventLoopGroup用来处理ACCEPT队列。
决定服务端可以接收百万级长链接的基础是:服务端的ulimit链接句柄数和backlog链接队列大小,其中backlog链接队列大小 = SYNC队列大小 + ACCEPT队列大小。
(2)Netty服务端代码
public class NettyServer { public NettyServer() { //1.创建两个工作线程组: 一个用于接受网络请求的线程组. 另一个用于实际处理业务的线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup = new NioEventLoopGroup(); //2.辅助类 ServerBootstrap serverBootstrap = new ServerBootstrap(); try { serverBootstrap.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) //表示缓存区动态调配(自适应) .option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT) //缓存区池化操作 .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) //设置使用日志 .handler(new LoggingHandler(LogLevel.INFO)) //设置workGroup的异步回调 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); sc.pipeline().addLast(new ServerHandler()); } }); //同步绑定端口 ChannelFuture cf = serverBootstrap.bind(8765).sync(); System.err.println("Server Startup..."); //同步等待请求连接 cf.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { //优雅停机 bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); System.err.println("Sever ShutDown..."); } } } public class ServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { TranslatorData request = (TranslatorData)msg; System.err.println("Sever端: id= " + request.getId() + ", name= " + request.getName() + ", message= " + request.getMessage()); //数据库持久化操作 IO读写 ---> 交给一个线程池,去异步的调用执行 TranslatorData response = new TranslatorData(); response.setId("resp: " + request.getId()); response.setName("resp: " + request.getName()); response.setMessage("resp: " + request.getMessage()); //发送response响应信息 ctx.writeAndFlush(response); } }
2.客户端代码最佳实践
public class NettyClient { public static final String HOST = "127.0.0.1"; public static final int PORT = 8765; //扩展完善池化: ConcurrentHashMap<KEY -> String, Value -> Channel> private Channel channel; //1.创建工作线程组: 用于实际处理业务的线程组 private EventLoopGroup workGroup = new NioEventLoopGroup(); private ChannelFuture cf; public NettyClient() { this.connect(HOST, PORT); } private void connect(String host, int port) { //2.辅助类(注意Client和Server不一样) Bootstrap bootstrap = new Bootstrap(); try { bootstrap.group(workGroup) .channel(NioSocketChannel.class) //表示缓存区动态调配(自适应) .option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT) //缓存区池化操作 .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) //设置日志处理器 .handler(new LoggingHandler(LogLevel.INFO)) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); sc.pipeline().addLast(new ClientHandler()); } }); //同步绑定端口 this.cf = bootstrap.connect(host, port).sync(); System.err.println("Client connected..."); //接下来就进行数据的发送, 但是首需要获取channel: this.channel = cf.channel(); } catch (InterruptedException e) { e.printStackTrace(); } } public void sendData() { for (int i = 0; i < 10; i++) { TranslatorData request = new TranslatorData(); request.setId("" + i); request.setName("请求消息名称 " + i); request.setMessage("请求消息内容 " + i); this.channel.writeAndFlush(request); } } public void close() throws Exception { cf.channel().closeFuture().sync(); //优雅停机 workGroup.shutdownGracefully(); System.err.println("Sever ShutDown..."); } } public class ClientHandler extends ChannelInboundHandlerAdapter { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { TranslatorData response = (TranslatorData)msg; System.err.println("Client端: id= " + response.getId() + ", name= " + response.getName() + ", message= " + response.getMessage()); } finally { //一定要注意用完了缓存后要进行释放 ReferenceCountUtil.release(msg); } } }
3.Netty的高性能核心问题分析
如果ServerHandler的业务很复杂,耗时很长,那么就会影响Netty性能。所以在使用Netty进行接收处理数据时,不要在workGroup中处理业务。此时可利用异步机制,使用线程池异步处理来提升ServerHandler性能。如果使用线程池,那么又意味着需要使用阻塞队列。因此为消除线程池的阻塞队列影响性能,可使用Disruptor替换线程池。
4.基于Disruptor异步化处理Netty的长链接业务
基于Netty + Disruptor构建高性能Netty的核心架构图如下所示:
5.Disruptor核心池化封装实现
基于多生产者和多消费者模型,为了避免在ClientHandler和ServerHandler中不断创建生产者和消费者,可以将生产者对象和消费者对象通过池化的方式进行管理。
public class RingBufferWorkerPoolFactory { private static Map<String, MessageProducer> producers = new ConcurrentHashMap<String, MessageProducer>(); private static Map<String, MessageConsumer> consumers = new ConcurrentHashMap<String, MessageConsumer>(); private RingBuffer<TranslatorDataWapper> ringBuffer; private SequenceBarrier sequenceBarrier; private WorkerPool<TranslatorDataWapper> workerPool; private RingBufferWorkerPoolFactory() { } public static RingBufferWorkerPoolFactory getInstance() { return SingletonHolder.instance; } public void initAndStart(ProducerType type, int bufferSize, WaitStrategy waitStrategy, MessageConsumer[] messageConsumers) { //1.构建ringBuffer对象 this.ringBuffer = RingBuffer.create( type, new EventFactory<TranslatorDataWapper>() { public TranslatorDataWapper newInstance() { return new TranslatorDataWapper(); } }, bufferSize, waitStrategy ); //2.设置序号栅栏 this.sequenceBarrier = this.ringBuffer.newBarrier(); //3.设置工作池 this.workerPool = new WorkerPool<TranslatorDataWapper>( this.ringBuffer, this.sequenceBarrier, new EventExceptionHandler(), messageConsumers ); //4.把所构建的消费者置入池中 for (MessageConsumer mc : messageConsumers) { this.consumers.put(mc.getConsumerId(), mc); } //5.添加我们的sequences this.ringBuffer.addGatingSequences(this.workerPool.getWorkerSequences()); //6.启动我们的工作池 this.workerPool.start(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() / 2)); } public MessageProducer getMessageProducer(String producerId) { MessageProducer messageProducer = this.producers.get(producerId); if (null == messageProducer) { messageProducer = new MessageProducer(producerId, this.ringBuffer); this.producers.put(producerId, messageProducer); } return messageProducer; } private static class SingletonHolder { static final RingBufferWorkerPoolFactory instance = new RingBufferWorkerPoolFactory(); } //异常静态类 static class EventExceptionHandler implements ExceptionHandler<TranslatorDataWapper> { public void handleEventException(Throwable ex, long sequence, TranslatorDataWapper event) { } public void handleOnStartException(Throwable ex) { } public void handleOnShutdownException(Throwable ex) { } } } public class MessageProducer { private String producerId; private RingBuffer<TranslatorDataWapper> ringBuffer; public MessageProducer(String producerId, RingBuffer<TranslatorDataWapper> ringBuffer) { this.producerId = producerId; this.ringBuffer = ringBuffer; } public void onData(TranslatorData data, ChannelHandlerContext ctx) { long sequence = ringBuffer.next(); try { TranslatorDataWapper wapper = ringBuffer.get(sequence); wapper.setData(data); wapper.setCtx(ctx); } finally { ringBuffer.publish(sequence); } } } //Netty的Client端和Server端分别具体实现 public abstract class MessageConsumer implements WorkHandler<TranslatorDataWapper> { protected String consumerId; public MessageConsumer(String consumerId) { this.consumerId = consumerId; } public String getConsumerId() { return consumerId; } public void setConsumerId(String consumerId) { this.consumerId = consumerId; } }
6.实现接入百万长链接
(1)服务端处理客户端请求实现支持百万长链接
(2)客户端处理服务端响应实现支持百万长链接
(3)启动Netty服务端
(4)启动Netty客户端
(1)服务端处理客户端请求实现支持百万长链接
public class ServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { TranslatorData request = (TranslatorData) msg; //自已的应用服务应该有一个ID生成规则 String producerId = "code:sessionId:001"; MessageProducer messageProducer = RingBufferWorkerPoolFactory.getInstance().getMessageProducer(producerId); //通过Disruptor的生产者来处理request,同时传入ctx messageProducer.onData(request, ctx); } } //通过Disruptor的消费者来处理request public class MessageConsumerImpl4Server extends MessageConsumer { public MessageConsumerImpl4Server(String consumerId) { super(consumerId); } public void onEvent(TranslatorDataWapper event) throws Exception { TranslatorData request = event.getData(); ChannelHandlerContext ctx = event.getCtx(); //1.对客户端请求的处理逻辑: System.err.println("Sever端: id= " + request.getId() + ", name= " + request.getName() + ", message= " + request.getMessage()); //2.发送响应信息 TranslatorData response = new TranslatorData(); response.setId("resp: " + request.getId()); response.setName("resp: " + request.getName()); response.setMessage("resp: " + request.getMessage()); //写出response响应信息 ctx.writeAndFlush(response); } }
(2)客户端处理服务端响应实现支持百万长链接
public class ClientHandler extends ChannelInboundHandlerAdapter { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //接收服务端的响应 TranslatorData response = (TranslatorData) msg; String producerId = "code:seesionId:002"; MessageProducer messageProducer = RingBufferWorkerPoolFactory.getInstance().getMessageProducer(producerId); messageProducer.onData(response, ctx); } } public class MessageConsumerImpl4Client extends MessageConsumer { public MessageConsumerImpl4Client(String consumerId) { super(consumerId); } public void onEvent(TranslatorDataWapper event) throws Exception { TranslatorData response = event.getData(); ChannelHandlerContext ctx = event.getCtx(); //对服务端返回响应的处理逻辑: try { System.err.println("Client端: id= " + response.getId() + ", name= " + response.getName() + ", message= " + response.getMessage()); } finally { ReferenceCountUtil.release(response); } } }
(3)启动Netty服务端
@SpringBootApplication public class NettyServerApplication { public static void main(String[] args) { //1.启动SpringBoot SpringApplication.run(NettyServerApplication.class, args); //2.准备Disruptor的消费者 MessageConsumer[] consumers = new MessageConsumer[4]; for (int i = 0; i < consumers.length; i++) { MessageConsumer messageConsumer = new MessageConsumerImpl4Server("code:serverId:" + i); consumers[i] = messageConsumer; } //3.启动Disruptor,使用多生产者多消费者模型 RingBufferWorkerPoolFactory.getInstance().initAndStart( ProducerType.MULTI, 1024 * 1024, //new YieldingWaitStrategy(), new BlockingWaitStrategy(), consumers ); //4.启动Netty new NettyServer(); } }
(4)启动Netty客户端
@SpringBootApplication public class NettyClientApplication { public static void main(String[] args) { //1.启动SpringBoot SpringApplication.run(NettyClientApplication.class, args); //2.准备Disruptor的消费者 MessageConsumer[] conusmers = new MessageConsumer[4]; for (int i = 0; i < conusmers.length; i++) { MessageConsumer messageConsumer = new MessageConsumerImpl4Client("code:clientId:" + i); conusmers[i] = messageConsumer; } //3.启动Disruptor,使用多生产者多消费者模型 RingBufferWorkerPoolFactory.getInstance().initAndStart( ProducerType.MULTI, 1024 * 1024, //new YieldingWaitStrategy(), new BlockingWaitStrategy(), conusmers ); //4.建立连接 并发送消息 new NettyClient().sendData(); } }
详细介绍后端技术栈的基础内容,包括但不限于:MySQL原理和优化、Redis原理和应用、JVM和G1原理和优化、RocketMQ原理应用及源码、Kafka原理应用及源码、ElasticSearch原理应用及源码、JUC源码、Netty源码、zk源码、Dubbo源码、Spring源码、Spring Boot源码、SCA源码、分布式锁源码、分布式事务、分库分表和TiDB、大型商品系统、大型订单系统等