Netty中NioEventLoop源码分析

准备赶趟上春招实习的车,刚开始学netty,感觉项目很简单,所以就想看看源码,非科班看这些是真吃力。。放点小笔记,都是站在巨人的肩膀上总结的(小抄),加油!

前置芝士

需要知道Executor执行器的一些操作。

Demo

public class NettyServer {
    int port;

    public NettyServer(int port) {
        this.port = port;
    }

    public void start() {
        ServerBootstrap bootstrap = new ServerBootstrap();
        EventLoopGroup boss = new NioEventLoopGroup(1);
        EventLoopGroup work = new NioEventLoopGroup();
        try {
            bootstrap.group(boss, work)
                    .handler(new LoggingHandler(LogLevel.DEBUG))
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChatRoomServerInitializer());

            ChannelFuture f = bootstrap.bind(new InetSocketAddress(port)).sync();
            System.out.println("http server started. port : " + port);
            f.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            boss.shutdownGracefully();
            work.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        NettyServer server = new NettyServer(8080);// 8080为启动端口
        server.start();
    }
}

上面是一段比较简单的Netty服务端的代码,我们主要关注:

EventLoopGroup boss = new NioEventLoopGroup(1); // 用于新连接接入的Group,初始化为1
EventLoopGroup work = new NioEventLoopGroup(); // 用于处理channel中的io事件以及任务的group

NioEventLoopGroup初始化过程

跟进到上述构造函数中,最后会来到MultithreadEventLoopGroup 类中的构造函数:

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
  super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

传入的参数就是指定Group的大小,默认大小 DEFAULT_EVENT_LOOP_THREADSRuntime.getRuntime().availableProcessors() * 2 也就是两倍的CPU数。

继续跟会来到:

# MultithreadEventExecutorGroup.java
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }
                // 这个executor是group所包含的executor,其将来会为其所包含的每个eventLoop创建一个线程
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }

        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                  // 创建eventLoop
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                  // 有创建失败的eventLoop就关闭所有之前创建的
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }

                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }
                // 创建选择器
        chooser = chooserFactory.newChooser(children);

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };

        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }

        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }

从类名就可以知道这是多线程的线程组,这里主要完成几件事情:

  • new ThreadPerTaskExecutor() [线程创建器]
  • for(){ new Child() } [构造NioEventLoop]
  • chooserFactory.newChooser() [线程选择器]

线程创建器

先看看名字,是给每个任务创建一个线程的线程创建器,其保存在NioEventGroup中的executor中。主要是为每一个NioEventLoop创建一个对应的线程,1:1。

# ThreadPerTaskExecutor.java
public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
          // 传入一个线程工厂
        this.threadFactory = threadFactory;
    }

    @Override
    public void execute(Runnable command) {
          // 在执行exector的execute()方法时,调用线程工厂创建线程,并start()
        threadFactory.newThread(command).start();
    }
}

上述代码其实就是初始化NioEventGroup中的executor为一个线程工厂,通过之后调用execute()方法为将来的NioEventLoop创建线程来一一对应。

打住,先来看看NioEventLoop的继承关系:

image-20201231162817950

这图挂了。。

可知NioEventLoop本身就是一个单线程的EventExecutor,因此有下面创建线程组数组

children = new EventExecutor[nThreads];

而实例化创建EventLoop在函数newChild()中。

构造NioEventLoop

我们跟进到构造NioEventLoop的函数newChild()

# NioEventLoopGroup.java
    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }

继续跟来到NioEventLoop的构造函数:

# NioEventLoop.java
    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
          // 父类单线程的构造方法,传入的参数executor是group中的executor
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        selector = openSelector(); // 创建selector事件轮询器到NioEventLoop上
        selectStrategy = strategy;
    }

先跟进父类的构造方法:

# SingleThreadEventLoop.java
    protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
                                    boolean addTaskWakesUp, int maxPendingTasks,
                                    RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
        tailTasks = newTaskQueue(maxPendingTasks); // 创建一个Mpsc任务队列,多生产者,单个消费者的任务队列
    }

上述代码父类构造除了继续调用父类构造外,创建了一个Mpsc任务队列,外部线程的任务会被加入到这个任务中,保证只有一个线程去处理这些任务,保证线程安全。

# SingleThreadEventExecutor.java
    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, int maxPendingTasks,
                                        RejectedExecutionHandler rejectedHandler) {
        super(parent);
        this.addTaskWakesUp = addTaskWakesUp;
        this.maxPendingTasks = Math.max(16, maxPendingTasks);
          // 这是当前NioEventLoop所包含的executor
        this.executor = ObjectUtil.checkNotNull(executor, "executor");
        taskQueue = newTaskQueue(this.maxPendingTasks); // 创建一个普通任务队列
        rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }

看博客说关于wakeup的代码比较旧,就不看了。😁

最终父类会创建一个单线程的线程创建器SingleThreadEventExecutor,除此之外,还保存了executor到了当前NioEventLoop中,也就是前面一路下来的group中的executor,帮你回忆一下:

children[i] = newChild(executor, args);

保存这个executor主要是为了之后调用NioEventLoopexecute()方法时其实就是调用传入的这个执行器,也就是executor.execute()

然后是创建事件轮询器:

selector = openSelector();
# NioEventLoop.java
// 只截取了关心的一些代码
    private Selector openSelector() {
        final Selector selector;
        try {
            selector = provider.openSelector();
        } catch (IOException e) {
            throw new ChannelException("failed to open a new selector", e);
        }

        if (DISABLE_KEYSET_OPTIMIZATION) {
            return selector;
        }
          ... ...
    }

其中这个provider是一个java.nio中的SelectorProvider,也就是调用jdk创建了一个selector绑定到NioEventLoop上。

线程选择器

# MultithreadEventExecutorGroup.java
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    }

在构造函数中,选择器参数传入的是一个默认选择器工厂的实例(单例模式)。

chooser = chooserFactory.newChooser(children);

将线程组交给线程选择器,跟进到chooserFactory.newChooser()

# DefaultEventExecutorChooserFactory.java
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {

      // 单例
    public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();

    private DefaultEventExecutorChooserFactory() { }

    @SuppressWarnings("unchecked")
    @Override
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTowEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }
        // 判断长度是否是2的幂
    private static boolean isPowerOfTwo(int val) {
        return (val & -val) == val;
    }
        // 如果是就用这个分配
    private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        PowerOfTowEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1]; // 用与运算
        }
    }
        // 否则就是普通的分配
    private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        GenericEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[Math.abs(idx.getAndIncrement() % executors.length)];
        }
    }
}

选择器策略是一个一个往后分配,循环遍历整个线程组给新连接绑定对应的NioEventLoop,实际的调用在MultithreadEventExecutorGroup.next()

还做了以下的优化:

  • 先判断线程组数组的长度是否是2的幂
  • 如果是,则调用PowerOfTowEventExecutorChooser(),使用的是位运算替代%,效率比较高
  • 否则就是GenericEventExecutorChooser()

NioEventLoop启动过程

NioEventLoop启动触发器:

  • 服务端启动绑定端口
  • 新连接接入通过chooser绑定一个NioEventLoop

以绑定端口为例跟一下启动过程,先说一下总的逻辑:

  • bind() -> execute(task) [入口]

    • startThread() -> doStartThread() [创建线程]
      • ThreadPerTaskExecutor.execute()
        • thread = Thread.currentThread()
        • NioEventLoop.run() [启动]

    先跟进到入口,bind方法:

# AbstractBootstrap.java
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });

将端口绑定作为一个Runnable任务去调用NioEventLoop.execute()方法,具体实现在父类中。

# SingleThreadEventExecutor.java
    @Override
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
            startThread();
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }

        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }

先留意下inEventLoop()这个方法,能够解决netty的异步串行无锁化。

直接跟到startThread()中,还是在这个类中:

    private void startThread() {
        if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { // 如果线程状态为未启动,就cas设置成启动状态,然后执行下面方法
                doStartThread();
            }
        }
    }
// 只贴关心的代码
private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread(); // 保留当前线程信息,绑定端口任务一开始的线程就是main线程
                if (interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
                updateLastExecutionTime();
                try {
                    SingleThreadEventExecutor.this.run(); // for循环执行任务队列的任务
                    success = true;
                } catch (Throwable t) {
                    ... ...
                }
              ... ...
            }
          ... ...
        }
}

上述代码中的executorNioEventLoop所包含的那个,根据之前创建过程可以知道,这个execute最后会调用NioEventLoopGroup中的execute方法。

NioEventLoop执行流程

先说一下总的执行逻辑:

  • run() -> for(;;)
    • select() [检查是否有io事件]
    • processSelectedKeys() [处理io事件]
    • runAllTasks() [处理异步任务队列]

run()方法中有一个for循环,一共做三件事,select注册到轮询器上的channel中的io事件,然后调用processSelectedKeys()处理轮询出来的io事件,runAllTasks()处理外部线程扔到taskqueue中的任务。

跟进到SingleThreadEventExecutor.this.run()

# NioEventLoop.java
    @Override
    protected void run() {
        for (;;) {
            try {
                // selector选择一个已经就绪的channel
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT: // SelectStrategy.SELECT == -1,说明此时任务队列中没有任务
                        // 阻塞式选择
                        select(wakenUp.getAndSet(false));
                           ... ...
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                        // fallthrough
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio; // 这是一个处理io和处理任务队列任务的比值
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys(); // 处理就绪channel的io
                    } finally {
                        // Ensure we always run tasks.
                        runAllTasks(); // 执行任务队列中的任务
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }

ioRatio默认是50,说明处理io和任务处理时间是1:1的。

接下来主要讲一下之前提到的三个过程。

select()方法执行逻辑

这一段逻辑就是在以下代码:

  switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
    case SelectStrategy.CONTINUE:
      continue;
    case SelectStrategy.SELECT:
      select(wakenUp.getAndSet(false));

跟进到主要方法select()中,还是在这个类中:

   private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        try {
            int selectCnt = 0;
            long currentTimeNanos = System.nanoTime();
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
            for (;;) {
                  // 判断是否超时
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                // 这一段的逻辑就是如果当前超时了,说明有定时任务要执行,那么如果一次都没有选择过,就执行一次非阻塞的选择,并且将选择计数器加1,break
                if (timeoutMillis <= 0) {
                    if (selectCnt == 0) {
                        selector.selectNow();
                        selectCnt = 1;
                    }
                    break;
                }

                // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
                // Selector#wakeup. So we need to check task queue again before executing select operation.
                // If we don't, the task might be pended until select operation was timed out.
                // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
                // 异步任务队列中是否有任务,如果有任务的话,就直接执行,并+1返回
                if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }

                  // 如果没有定时任务,没有超时,且任务队列中没有任务,就执行阻塞式select,超时时间1s
                int selectedKeys = selector.select(timeoutMillis);
                selectCnt ++;

                // 如果轮询到了时间 | select被外部线程唤醒 | 有任务 | 有定时任务,当前select操作就终止
                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                    // - Selected something,
                    // - waken up by user, or
                    // - the task queue has a pending task.
                    // - a scheduled task is ready for processing
                    break;
                }
                if (Thread.interrupted()) {
                    // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                    // As this is most likely a bug in the handler of the user or it's client library we will
                    // also log it.
                    //
                    // See https://github.com/netty/netty/issues/2426
                    if (logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely because " +
                                "Thread.currentThread().interrupt() was called. Use " +
                                "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                    }
                    selectCnt = 1;
                    break;
                }

                  // 执行到这里说明以及进行了一次阻塞式的select操作
                long time = System.nanoTime();
                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                    // timeoutMillis elapsed without anything selected.
                    // 如果阻塞时间到了,就说明执行了一次阻塞式select,那么计数器就是1 
                    selectCnt = 1;
                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                    // 否则没有到阻塞时间,那么说明没有进行阻塞就返回了,执行了空轮询,空轮询到一定阈值就会rebuildSelector()
                    // The selector returned prematurely many times in a row.
                    // Rebuild the selector to work around the problem.
                    logger.warn(
                            "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                            selectCnt, selector);

                    rebuildSelector();
                    selector = this.selector;

                    // Select again to populate selectedKeys.
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }

                currentTimeNanos = time;
            }

            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                            selectCnt - 1, selector);
                }
            }
        } catch (CancelledKeyException e) {
            if (logger.isDebugEnabled()) {
                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                        selector, e);
            }
            // Harmless exception - log anyway
        }
    }

注意:

delayNanos(currentTimeNanos):用来计算当前定时任务队列第一个定时任务还有多久执行。

selector.selectNow():非阻塞选择

看了博客补充一下:Selector 的阻塞选择和非阻塞选择的区别就是,非阻塞选择在当前 select方法执行时判断循环判断所有的 channel 是否就绪并返回所有的就绪数量,而阻塞式选择则是阻塞指定时间直至阻塞时间内获取到就绪 channel 或者阻塞时间超时时立刻返回。

这个select()方法主要干了几件事情:

第一件事情,deadline以及任务穿插逻辑处理:

首先计算deadline,也就是定时任务的执行时间,计算出一个超时时间timeoutMillis,也就是距离最近一次定时任务开始的时间,如果小于0,说明要执行定时任务,则执行一次非阻塞的选择:

  // 这一段的逻辑就是如果当前超时了,说明有定时任务要执行,那么如果一次都没有选择过,就执行一次非阻塞的选择,并且将选择计数器加1,break
  if (timeoutMillis <= 0) {
    if (selectCnt == 0) {
      selector.selectNow();
      selectCnt = 1;
    }
    break;
  }

第二件事,select阻塞式选择:

  // 如果没有定时任务,没有超时,且任务队列中没有任务,就执行阻塞式select,超时时间1s
  int selectedKeys = selector.select(timeoutMillis);
  selectCnt ++;

  // 如果轮询到了时间 | select被外部线程唤醒 | 有任务 | 有定时任务,当前select操作就终止
  if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
    // - Selected something,
    // - waken up by user, or
    // - the task queue has a pending task.
    // - a scheduled task is ready for processing
    break;
  }

第三件事,解决jdk空轮询bug:

  // 执行到这里说明以及进行了一次阻塞式的select操作
  long time = System.nanoTime();
  if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
    // timeoutMillis elapsed without anything selected.
    // 如果阻塞时间到了,就说明执行了一次阻塞式select,那么计数器就是1 
    selectCnt = 1;
  } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
             selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
    // 否则没有到阻塞时间,那么说明没有进行阻塞就返回了,执行了空轮询,空轮询到一定阈值就会rebuildSelector()
    // The selector returned prematurely many times in a row.
    // Rebuild the selector to work around the problem.
    logger.warn(
      "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
      selectCnt, selector);

    rebuildSelector();
    selector = this.selector;

    // Select again to populate selectedKeys.
    selector.selectNow();
    selectCnt = 1;
    break;
  }

其中阈值SELECTOR_AUTO_REBUILD_THRESHOLD默认512,因此这个selectCnt主要就是用来记录空轮询的次数。

这个解决空轮询的方法其实是有点乐观的,他并没有从根源上解决, 而是rebuildSelector(),期盼着下一次能够不出现空轮询。

processSelectedKey()执行逻辑

NioEventLoop的第二个过程就是处理检测到的io事件。

先来看看netty对于selectKey做了什么小动作。

select操作每次会把就绪状态的io事件添加到底层的hashset当中,而netty会通过反射把这个hashset修改成数组,这样添加的操作就是O(1)的时间复杂度,具体过程如下:

# NioEventLoop.java
    private Selector openSelector() {
        final Selector selector;
        try {
            selector = provider.openSelector();
        } catch (IOException e) {
            throw new ChannelException("failed to open a new selector", e);
        }
                // 如果不需要优化,就直接返回原生的selector,默认为false
        if (DISABLE_KEYSET_OPTIMIZATION) {
            return selector;
        }

        final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
      ... ...
    }

在之前创建事件轮询器的代码中,做了这方面的优化,他用一个SelectedSelectionKeySet来替换底层的keyset的数据结构:

# SelectedSelectionKeySet.java
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {

    private SelectionKey[] keysA;
    private int keysASize;
    private SelectionKey[] keysB;
    private int keysBSize;
    private boolean isA = true;

    SelectedSelectionKeySet() {
        keysA = new SelectionKey[1024];
        keysB = keysA.clone();
    }

    @Override
    public boolean add(SelectionKey o) {
        if (o == null) {
            return false;
        }

        if (isA) {
            int size = keysASize;
            keysA[size ++] = o;
            keysASize = size;
            if (size == keysA.length) {
                doubleCapacityA();
            }
        } else {
            int size = keysBSize;
            keysB[size ++] = o;
            keysBSize = size;
            if (size == keysB.length) {
                doubleCapacityB();
            }
        }

        return true;
    }
      ...
}

这个数据结构其实就一个方法有用就是add()方法,是由一个数组和size的方法实现添加的,原来的HashSet的添加在最坏情况下为O(n)。

        final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;

        Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    // 获取到成员变量selectedKeys和publicSelectedKeys
                    Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                    Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

                    selectedKeysField.setAccessible(true); // 允许修改
                    publicSelectedKeysField.setAccessible(true);

                    selectedKeysField.set(selector, selectedKeySet); // 进行替换
                    publicSelectedKeysField.set(selector, selectedKeySet);
                    return null;
                } catch (NoSuchFieldException e) {
                    return e;
                } catch (IllegalAccessException e) {
                    return e;
                } catch (RuntimeException e) {
                    // JDK 9 can throw an inaccessible object exception here; since Netty compiles
                    // against JDK 7 and this exception was only added in JDK 9, we have to weakly
                    // check the type
                    if ("java.lang.reflect.InaccessibleObjectException".equals(e.getClass().getName())) {
                        return e;
                    } else {
                        throw e;
                    }
                }
            }
        });

        if (maybeException instanceof Exception) {
            selectedKeys = null;
            Exception e = (Exception) maybeException;
            logger.trace("failed to instrument a special java.util.Set into: {}", selector, e);
        } else {
            selectedKeys = selectedKeySet;
            logger.trace("instrumented a special java.util.Set into: {}", selector);
        }

        return selector;

还是在这个方法中,当我们构造好这个数组后,通过反射,修改原来selector中的属性selectedKeyspublicSelectedKeys为上面构造好的数组selectedKeySet(一个披着set皮的array)。

openSelector()方法的最后也将selectedKeySet保存成一个NioEventLoop的成员变量。

对selected keySet优化完后,开始对这些就绪事件进行处理,调用processSelectedKeysOptimized(),跟到run()方法中的processSelectedKey()

# NioEventLoop.java
    private void processSelectedKeys() {
        if (selectedKeys != null) { // 这个key是优化过的key
            processSelectedKeysOptimized(selectedKeys.flip()); 
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }

selectedKeys.flip():返回底层的数组,也就是selectedKeys背后真正的keysA数组,继续跟进 :

  private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
        for (int i = 0;; i ++) { 
            final SelectionKey k = selectedKeys[i];
            if (k == null) {
                break;
            }
            // null out entry in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            selectedKeys[i] = null; // 便于GC

            final Object a = k.attachment(); // 拿到key的attachment,也就是一个经过netty封装的channel

            if (a instanceof AbstractNioChannel) { // 如果是netty的channel
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (needsToSelectAgain) {
                // null out entries in the array to allow to have it GC'ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
                for (;;) {
                    i++;
                    if (selectedKeys[i] == null) {
                        break;
                    }
                    selectedKeys[i] = null;
                }

                selectAgain();
                // Need to flip the optimized selectedKeys to get the right reference to the array
                // and reset the index to -1 which will then set to 0 on the for loop
                // to start over again.
                //
                // See https://github.com/netty/netty/issues/1523
                selectedKeys = this.selectedKeys.flip();
                i = -1;
            }
        }
    }

显然需要继续跟进到方法processSelectedKey(k, (AbstractNioChannel) a)中:

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); // unsafe也是和channel唯一绑定的
        if (!k.isValid()) { // 如果key不合法,需要关闭channel
            final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
                // If the channel implementation throws an exception because there is no event loop, we ignore this
                // because we are only trying to determine if ch is registered to this event loop and thus has authority
                // to close ch.
                return;
            }
            // Only close ch if ch is still registerd to this EventLoop. ch could have deregistered from the event loop
            // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
            // still healthy and should not be closed.
            // See https://github.com/netty/netty/issues/5125
            if (eventLoop != this || eventLoop == null) {
                return;
            }
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            int readyOps = k.readyOps();
            // 判断就绪事件类型
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }

            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }

            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { // 主要关心read和accept事件
                unsafe.read();
                if (!ch.isOpen()) {
                    // Connection already closed - no need to handle write.
                    return;
                }
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

看了别人博客的总结,这段逻辑就是处理就绪 channel 的 IO 事件的逻辑:

  1. 判断当前SelectionKey是否有效。失效结束处理并关闭资源。

  2. 判断当前 channel的关注事件,针对处理:获取SelectionKeyreadyOps。这里的判断逻辑都是使用高效的位运算。readyOps 为当前 SelectionKey 的就绪的事件类型。

  3. (readyOps & SelectionKey.OP_CONNECT) != 0:连接就绪事件

    这个事件在 server 端不会关注,只有 client 用来连接 server 时才会关注连接就绪事件。

    连接就绪后,获取当前SelectionKeyinterestOps值,将当前interestOps值修改后,调用底层 unsafe连接server

  4. (readyOps & SelectionKey.OP_WRITE) != 0:写就绪事件

    当前 channel 关注的是写就绪事件,此时写操作已经就绪,所以直接调用unsafe将数据写入网卡缓存。

  5. (readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0 :当前channel关注的是读就绪事件,或者前面因为有新增任务而触发的就绪channel处理逻辑,只有因为任务触发的情况下 readyOps才可能会是 0readyOps = 0 意味着没有就绪channel

    直接调用 unsafe 继续读操作,将网卡缓存的数据读取到用户空间。如果是 readyOps = 0 的情况相当于网卡缓存并没有就绪数据,则时进行的读操作不会读取到数据。

unsafe是个啥玩意?现在只知道Unsafe类使Java拥有了像C语言的指针一样操作内存空间的能力,等我有点B树了再说。

runAllTasks()执行逻辑

三件事:对task进行分类和添加、对任务进行聚合、执行任务

第一件事,对task进行分类和添加:

在之前说execute方法时,代码如下:

# SingleThreadEventExecutor.java
    @Override
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop(); // 是否是外部线程
        if (inEventLoop) {
            addTask(task);
        } else {
            startThread();
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }

        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }

先判断是否是外部线程调用的execute方法,如果是就先startThread();,再将其加入到创建NioEventLoop时创建的任务队列MpscQueue中。

除了普通任务队列还有一个定时任务队列:

# AbstractScheduledEventExecutor.java
    <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
        if (inEventLoop()) {
            scheduledTaskQueue().add(task);
        } else {
            execute(new Runnable() {
                @Override
                public void run() {
                    scheduledTaskQueue().add(task);
                }
            });
        }

        return task;

对于定时任务,这里同样进行了判断,是否是外部线程,如果是外部线程在,则调用execute()方法进行线程安全的操作,即现startThread(),再添加任务,保证只有一个线程进行处理,即都在NioEventLoop中处理。

这是为什么呢?这是因为scheduledTaskQueue的实现是非线程安全的,普通优先队列:

    Queue<ScheduledFutureTask<?>> scheduledTaskQueue() {
        if (scheduledTaskQueue == null) {
            scheduledTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
        }
        return scheduledTaskQueue;
    }

第二件事,任务的聚合:

# SingleThreadEventExecutor.java
    protected boolean runAllTasks(long timeoutNanos) {
        fetchFromScheduledTaskQueue(); // 任务的聚合
        Runnable task = pollTask();
        if (task == null) {
            afterRunningAllTasks();
            return false;
        }

        ... ...

        return true;
    }
    private boolean fetchFromScheduledTaskQueue() {
        long nanoTime = AbstractScheduledEventExecutor.nanoTime();
        Runnable scheduledTask  = pollScheduledTask(nanoTime); // 取定时任务队列中当前需要允许的任务
        while (scheduledTask != null) {
            if (!taskQueue.offer(scheduledTask)) { 
                // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
                scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
                return false;
            }
            scheduledTask  = pollScheduledTask(nanoTime);
        }
        return true;
    }

上述while代码中的逻辑,定时任务不为空,即有定时任务:

  1. 先讲定时任务添加到普通任务队列中;
  2. 如果添加失败,则添加回定时任务队列中,因为之前取的时候poll了,并返回false;
  3. 成功添加后,继续从定时任务中取定时任务;
  4. while循环结束,所有定时任务都被添加到了普通任务队列,完成任务的聚合。

第三件事,任务的执行:

    protected boolean runAllTasks(long timeoutNanos) {
        fetchFromScheduledTaskQueue();
        Runnable task = pollTask(); // 从普通任务队列中拿任务
        if (task == null) {
            afterRunningAllTasks();
            return false;
        }

        final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; // 根据超时时间计算截止时间
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {
            safeExecute(task); // 执行任务,Runnable.run()

            runTasks ++;

            // Check timeout every 64 tasks because nanoTime() is relatively expensive.
            // XXX: Hard-coded value - will make it configurable if it is really a problem.
            if ((runTasks & 0x3F) == 0) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }

            task = pollTask(); // 继续拿任务
            if (task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime(); 
                break;
            }
        }

        afterRunningAllTasks();
        this.lastExecutionTime = lastExecutionTime; 
        return true;
    }

上述代码前半请看注释,关键在于对任务次数的判断:

    // Check timeout every 64 tasks because nanoTime() is relatively expensive.
    // XXX: Hard-coded value - will make it configurable if it is really a problem.
    if ((runTasks & 0x3F) == 0) {
          lastExecutionTime = ScheduledFutureTask.nanoTime();
          if (lastExecutionTime >= deadline) {
            break;
          }
    }

这里也是与操作,即(&63)== (%64),每执行64次就判断一下是否超时(因为需要和io处理时间分配时间,所以有一个超时时间),如果超时就退出,可能去进行io处理了。

为什么不每次都判断一下呢?上面的英文注释说了,nanoTime()老费时间了。

面试相关

问:Netty如何保证异步串行无锁化?

  • 在NioEventLoop中封装了一个线程, 这个IO线程就是用来处理客户端的连接事件, 读写事件, 处理队列中的任务. 没错, 每个NioEventLoop都有一个队列, 这个队列是在创建NioEventLoop时被初始化的,netty比较重任务。这个任务队列是一个多生产者单消费者的队列,因此可以保证线程安全。根据inEventLoop()判断,如果是外部线程,也就不是我们自己的io线程,那么就把他的runnable任务放到我们的Mpsc队列中来保证线程安全,同理与定时任务队列中的任务,我们的io线程只处理普通任务中的任务,因此保证了线程之间不需要同步。

问:默认情况下,Netty服务端起多少线程?何时启动?

  • 默认两倍CPU数,调用execute()方法判断是否在本线程内,如果是,那么就已经启动了,如果是在外部线程中,那么就需要执行startThread()方法判断线程是否启动,未启动就启动此线程。

问:Netty如何解决jdk空轮询bug?

  • jdk空轮询是在阻塞式select中,没有阻塞timeoutMillis时间就结束了阻塞select操作,我们称之为一次空轮询,因此判断这种空轮询操作是否超过设定的阈值(512),如果超过,就调用rebuildSelector()方法重建Selector把之前的key都移到新的轮询器上,避免bug。

问:简单说说NioEventLoop?

  • 用户创建Boss/Worker EventLoopGroup时创建,默认创建NioEventLoop个数为2*CPU核数;每个NioEventLoop都由线程选择器chooser分配,并且用与运算优化了选择方式;每个NioEventLoop构造过程中都创建了Selector、任务队列,创建Selector时,通过反射使用数组替换集合方式保存selectedKeys ;NioEventLoop执行时调用execute()方法启动/创建FastThreadLocalThread线程,保存到该NioEventLoop的成员变量中进行一对一绑定;NioEventLoop执行逻辑在run方法中,包括检测io事件、处理io事件、执行任务队列。

PS:看了两天了这个。。非科班实在是水平不够,希望大佬们别喷,只是留个纪念,提提学习netty的建议,虚心接受-;-

#Java#
全部评论
这就i是打工人吗?新年前一夜也不放假🤣
点赞 回复
分享
发布于 2021-01-01 09:13
感谢参与牛客创作者计划!欢迎更多牛友来写干货,瓜分5000元奖励~~技术场活动链接:https://www.nowcoder.com/link/czztlqjs (参与奖马克杯每周五发放,敬请期待~)
点赞 回复
分享
发布于 2021-01-04 19:59
饿了么
校招火热招聘中
官网直投

相关推荐

3 9 评论
分享
牛客网
牛客企业服务