「Netty源码」EventLoopGroup初始化分析

本片大致内容为 NioEventLoopGroup初始化过程,有一些细的地方就没有深入了,这里只分析bossgroup




看了上面2个图,我就有几个疑问,

  1. 既然是实现了Executor,那么自然要看看它executor方法是如何实现的
  2. EventLoopGroup是如何封装SelectorProvider 这两个问题最后解答,先看大体代码分析

废话少说,直接就进入io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...)这个方法,上面的我就略过了哈

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
    checkPositive(nThreads, "nThreads"); //前面指定为null if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    } //bossGroup 为1 children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try { //创建对应数量的EventLoop children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e);
        } finally { if (!success) { for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                } for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j]; try { while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break;
                    }
                }
            }
        }
    } //PowerOfTwoEventExecutorChooser //创建选择器PowerOfTwoEventExecutorChooser chooser = chooserFactory.newChooser(children); final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    }; //添加监听器,略 for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);//e.terminationFuture() DefaultPromise }

    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

首先先分析下executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());这一行 ThreadPerTaskExecutor就是一个执行器,它内部也很简单,就是创建先行并执行

public final class ThreadPerTaskExecutor implements Executor { private final ThreadFactory threadFactory; public ThreadPerTaskExecutor(ThreadFactory threadFactory) { this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
    } @Override public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}

而threadFactory则是由newDefaultThreadFactory()创建的线程工厂,简单的看一下,这个线程工厂为DefaultThreadFactory,既然是线程工厂,我们需要关心的也就创建线程方法,就是用FastThreadLocalThread封装下线程,里面还有个成员变量InternalThreadLocalMap,可以简单理解为ThreadLocal增强,这里如果想看可以结合FastThreadLocal。


下面就是newChild()这个方法,具体实现是NioEventLoopGroup实现,具体就是创建NioEventLoop

protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    SelectorProvider selectorProvider = (SelectorProvider) args[0];
    SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1];
    RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2];
    EventLoopTaskQueueFactory taskQueueFactory = null;
    EventLoopTaskQueueFactory tailTaskQueueFactory = null; int argsLength = args.length; if (argsLength > 3) {
        taskQueueFactory = (EventLoopTaskQueueFactory) args[3];
    } if (argsLength > 4) {
        tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4];
    } return new NioEventLoop(this, executor, selectorProvider,
            selectStrategyFactory.newSelectStrategy(),
            rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
}

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
             SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
             EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) { //2个queue都是MpscUnboundedArrayQueue,多个生产者对单个消费者,无锁队列 super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
            rejectedExecutionHandler); this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider"); this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy"); final SelectorTuple selectorTuple = openSelector(); this.selector = selectorTuple.selector; this.unwrappedSelector = selectorTuple.unwrappedSelector;
} protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, Queue<Runnable> taskQueue,
                                    RejectedExecutionHandler rejectedHandler) { super(parent); this.addTaskWakesUp = addTaskWakesUp; this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS; this.executor = ThreadExecutorMap.apply(executor, this); this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue"); this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
} protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, Queue<Runnable> taskQueue,
                                    RejectedExecutionHandler rejectedHandler) { super(parent); this.addTaskWakesUp = addTaskWakesUp; this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS; this.executor = ThreadExecutorMap.apply(executor, this); this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue"); this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}



可以看到它也实现自Executor,还记得开头那个问题吗,虽然不是NioEventLoopGroup但是也和那个有关系,先看NioEventLoop


在SingleThreadEventExecutor中就有对应实现了,多的酒省略了,大体调用关系可以将时序图和下面代码结合起来看,这里我总结下,NioEventLoop的executor方法呢,初始化的时候,会调用 NioEventLoopGroup传进来的Executor创建线程并死循环处理任务,新来的任务也事通过前面说过的队列来解耦

@Override public void execute(Runnable task) {
    ObjectUtil.checkNotNull(task, "task");
    execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
} private void execute(Runnable task, boolean immediate) { //比较thread,未启动过,thread为null,启动过后才会记录 boolean inEventLoop = inEventLoop();
    addTask(task);//需要执行的任务先添加进队列taskQueue if (!inEventLoop) { //主线程轮循,监听事件 startThread(); if (isShutdown()) { boolean reject = false; try { if (removeTask(task)) {
                    reject = true;
                }
            } catch (UnsupportedOperationException e) { // The task queue does not support removal so the best thing we can do is to just move on and // hope we will be able to pick-up the task before its completely terminated. // In worst case we will log on termination. } if (reject) {
                reject();
            }
        }
    } if (!addTaskWakesUp && immediate) {
        wakeup(inEventLoop);
    }
} private void startThread() { if (state == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { boolean success = false; try {
                doStartThread();
                success = true;
            } finally { if (!success) {
                    STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                }
            }
        }
    }
} private void doStartThread() { assert thread == null;
    executor.execute(new Runnable() { //略 try {
            SingleThreadEventExecutor.this.run();
            success = true;
        }

    }
} protected void run() { int selectCnt = 0; for (;;) { try { int strategy; try {
                strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); switch (strategy) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.BUSY_WAIT: // fall-through to SELECT since the busy-wait is not supported with NIO case SelectStrategy.SELECT: long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); if (curDeadlineNanos == -1L) {
                        curDeadlineNanos = NONE; // nothing on the calendar }
                    nextWakeupNanos.set(curDeadlineNanos); try { if (!hasTasks()) {
                            strategy = select(curDeadlineNanos);
                        }
                    } finally { // This update is just to help block unnecessary selector wakeups // so use of lazySet is ok (no race condition) nextWakeupNanos.lazySet(AWAKE);
                    } // fall through default:
                }
            } catch (IOException e) { // If we receive an IOException here its because the Selector is messed up. Let's rebuild // the selector and retry. https://github.com/netty/netty/issues/8566 rebuildSelector0();
                selectCnt = 0;
                handleLoopException(e); continue;
            }

            selectCnt++;
            cancelledKeys = 0;
            needsToSelectAgain = false; final int ioRatio = this.ioRatio; boolean ranTasks; if (ioRatio == 100) { try { if (strategy > 0) {
                        processSelectedKeys();
                    }
                } finally { // Ensure we always run tasks. ranTasks = runAllTasks();
                }
            } else if (strategy > 0) { final long ioStartTime = System.nanoTime(); try {
                    processSelectedKeys();
                } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime;

                    ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            } else { //执行前天添加的任务 ranTasks = runAllTasks(0); // This will run the minimum number of tasks } if (ranTasks || strategy > 0) { if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                            selectCnt - 1, selector);
                }
                selectCnt = 0;
            } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case) selectCnt = 0;
            }
        } catch (CancelledKeyException e) { // Harmless exception - log anyway if (logger.isDebugEnabled()) {
                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                        selector, e);
            }
        } catch (Error e) { throw e;
        } catch (Throwable t) {
            handleLoopException(t);
        } finally { // Always handle shutdown even if the loop processing threw an exception. try { if (isShuttingDown()) {
                    closeAll(); if (confirmShutdown()) { return;
                    }
                }
            } catch (Error e) { throw e;
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }
}

openSelector还是挺好玩的,这里觉得java自带的Selector不好用,用反射吧selectedKeys和publicSelectedKeys设置成了自己的SelectedSelectionKeySet

private SelectorTuple openSelector() { final Selector unwrappedSelector; try {
        unwrappedSelector = provider.openSelector();
    } catch (IOException e) { throw new ChannelException("failed to open a new selector", e);
    } if (DISABLE_KEY_SET_OPTIMIZATION) { return new SelectorTuple(unwrappedSelector);
    }

    Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { try { //返回SelectorImpl.class return Class.forName( "sun.nio.ch.SelectorImpl", false,
                        PlatformDependent.getSystemClassLoader());
            } catch (Throwable cause) { return cause;
            }
        }
    }); //判断是否获取到了这个类 if (!(maybeSelectorImplClass instanceof Class) || // ensure the current selector implementation is what we can instrument. !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) { if (maybeSelectorImplClass instanceof Throwable) {
            Throwable t = (Throwable) maybeSelectorImplClass;
            logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
        } return new SelectorTuple(unwrappedSelector);
    } final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass; final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

    Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { try { //翻着获取字段 Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys"); if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) { // Let us try to use sun.misc.Unsafe to replace the SelectionKeySet. // This allows us to also do this in Java9+ without any extra flags. long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField); long publicSelectedKeysFieldOffset =
                            PlatformDependent.objectFieldOffset(publicSelectedKeysField); if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
                        PlatformDependent.putObject(
                                unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
                        PlatformDependent.putObject(
                                unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet); return null;
                    } // We could not retrieve the offset, lets try reflection as last-resort. }

                Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true); if (cause != null) { return cause;
                }
                cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true); if (cause != null) { return cause;
                }

                selectedKeysField.set(unwrappedSelector, selectedKeySet);
                publicSelectedKeysField.set(unwrappedSelector, selectedKeySet); return null;
            } catch (NoSuchFieldException e) { return e;
            } catch (IllegalAccessException e) { return e;
            }
        }
    }); if (maybeException instanceof Exception) {
        selectedKeys = null;
        Exception e = (Exception) maybeException;
        logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e); return new SelectorTuple(unwrappedSelector);
    }
    selectedKeys = selectedKeySet;
    logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector); return new SelectorTuple(unwrappedSelector, new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}

看到这里肯定有点晕,不过不用怕,newChild()我想讲的已经讲完了,下面回到NioEventLoopGroup,接下来就是选择器chooser = chooserFactory.newChooser(children);其实也不用关心,就是返回选择的是哪个NioEventLoop来执行

private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors;

    PowerOfTwoEventExecutorChooser(EventExecutor[] executors) { this.executors = executors;
    } @Override public EventExecutor next() { //executors 总数必须是2的幂次方 return executors[idx.getAndIncrement() & executors.length - 1];
    }
}

回到开头的问题NioEventLoopGroup的executor是怎么执行的呢 它的实现是在AbstractEventExecutorGroup,其中next()的实现,就是通过选择器来执行,但是,debugger的时候,基本上也没有走到这,拿了eventLoop自己去玩了

@Override public void execute(Runnable command) {
    next().execute(command);
}

总结

  • 既然是实现了Executor,那么自然要看看它executor方法是如何实现的
    • 他封装了NioEventLoop,bossGroup是通过死循环处理网络和其他任务,具体实现,没分析,后面有机会再说
  • EventLoopGroup是如何封装SelectorProvider
    • NioEventLoop封装了,并且创建好Selector,还反射用了自己的集合管理SelectionKey
#Java##程序员#
全部评论

相关推荐

造车新势力 自动驾驶规控 29k * 13
点赞 评论 收藏
转发
点赞 评论 收藏
转发
点赞 收藏 评论
分享
牛客网
牛客企业服务