Netty源码阅读实战(二)-Netty基本组件
关注我,紧跟本系列专栏文章,咱们下篇再续!
作者简介:魔都架构师,多家大厂后端一线研发经验,在分布式系统设计、数据平台架构和AI应用开发等领域都有丰富实践经验。
各大技术社区头部专家博主。具有丰富的引领团队经验,深厚业务架构和解决方案的积累。
负责:
- 中央/分销预订系统性能优化
- 活动&券等营销中台建设
- 交易平台及数据中台等架构和开发设计
- 车联网核心平台-物联网连接平台、大数据平台架构设计及优化
- LLM Agent应用开发
- 区块链应用开发
- 大数据开发挖掘经验
- 推荐系统项目
目前主攻市级软件项目设计、构建服务全社会的应用系统。
参考:
1 传统socket编程
1.1 实战
服务端:ServerBoot
/**
* @author JavaEdge
*/
public class ServerBoot {
private static final int PORT = 8000;
public static void main(String[] args) {
Server server = new Server(PORT);
server.start();
}
}
Server
/**
* @author JavaEdge
*/
public class Server {
private ServerSocket serverSocket;
public Server(int port) {
try {
this.serverSocket = new ServerSocket(port);
System.out.println("Server starts success,端口:" + port);
} catch (IOException exception) {
System.out.println("Server starts failed");
}
}
public void start() {
new Thread(() -> doStart()).start();
}
private void doStart() {
while (true) {
try {
Socket client = serverSocket.accept();
new ClientHandler(client).start();
} catch (IOException e) {
System.out.println("Server failure");
}
}
}
}
ClientHandler
/**
* @author JavaEdge
*/
public class ClientHandler {
public static final int MAX_DATA_LEN = 1024;
private final Socket socket;
public ClientHandler(Socket socket) {
this.socket = socket;
}
public void start() {
System.out.println("新客户端接入");
new Thread(() -> doStart()).start();
}
private void doStart() {
try {
InputStream inputStream = socket.getInputStream();
while (true) {
byte[] data = new byte[MAX_DATA_LEN];
int len;
while ((len = inputStream.read(data)) != -1) {
String message = new String(data, 0, len);
System.out.println("客户端传来消息: " + message);
socket.getOutputStream().write(data);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
Client
/**
* @author JavaEdge
*/
public class Client {
private static final String HOST = "127.0.0.1";
private static final int PORT = 8000;
private static final int SLEEP_TIME = 5000;
public static void main(String[] args) throws IOException {
final Socket socket = new Socket(HOST, PORT);
new Thread(() -> {
System.out.println("客户端启动成功!");
while (true) {
try {
String message = "hello world";
System.out.println("客户端发送数据: " + message);
socket.getOutputStream().write(message.getBytes());
} catch (Exception e) {
System.out.println("写数据出错!");
}
sleep();
}
}).start();
}
private static void sleep() {
try {
Thread.sleep(SLEEP_TIME);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
先后启动 ServerBoot、Client,输出
Server starts success,端口:8000
新客户端接入
客户端传来消息: hello worldhello world
客户端传来消息: hello world
客户端传来消息: hello world
客户端传来消息: hello world
客户端传来消息: hello world
客户端启动成功!
客户端发送数据: hello world
客户端发送数据: hello world
客户端发送数据: hello world
1.2 传统HTTP服务器原理
- 创建一个
ServerSocket - 监听并绑定一个端口一系列客户端来请求这个端口服务器使用Accept,获得一个来自客户端的Socket连接对象
- 启动一个新线程处理连接读Socket,
- 得到字节流解码协议
- 得到Http请求对象处理Http请求
- 得到一个结果
- 封装成一个HttpResponse对象编码协议
- 将结果序列化字节流写Socket,
- 将字节流发给客户端
- 继续循环步骤3
1.3 C/S 交互流程

2 Netty版socket编程

3 Netty核心组件
3.1 NioEventLoop
① EventLoop
一个 EventLoop 就是一个 eventexecutor:
/**
* 一旦Channel注册了,将处理所有的 I/O 操作
* 一个 EventLoop 实例通常处理一个以上Channel
*/
public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
@Override
EventLoopGroup parent();
}
NioEventLoopGroup ,Netty 框架提供的一个基于 NIO 的实现。是一个处理 I/O 操作的多线程事件循环的组。Netty 为不同类型的传输提供各种 EventLoopGroup 实现。
示例代码中实现服务器端应用程序,将用两个NioEventLoopGroup:
-
boss,接受传入的连接。因为accept事件只需建立一次连接,连接可复用,所以accept只接受一次
-
work,在上司接受连接并登记到工作人员后,处理接受连接的流量。使用多少线程及咋映射到创建的通道
Netty的发动机:通常一个 NioEventLoop 会绑定到一个特定的 Thread。 NioEventLoop 的所有 I/O 操作和任务执行都会在这个绑定的 Thread 上进行。这确保了 NioEventLoop 的线程安全性,避免多线程并发访问的问题。
Server端:
private void doStart() {
while (true) {
try {
Socket client = serverSocket.accept();
new ClientHandler(client).start();
} catch (IOException e) {
System.out.println("Server failure");
}
}
}
ClientHandler:
private void doStart() {
try {
InputStream inputStream = socket.getInputStream();
// 对应一个 run 方法
while (true) {
byte[] data = new byte[MAX_DATA_LEN];
int len;
while ((len = inputStream.read(data)) != -1) {
String message = new String(data, 0, len);
System.out.println("客户端传来消息: " + message);
socket.getOutputStream().write(data);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
NioEventLoop#run
@Override
protected void run() {
for (;;) {
try {
...
if (ioRatio == 100) {
processSelectedKeys();
runAllTasks();
} else {
// 打断点
processSelectedKeys();
}
...
}
}
netty的NIO,对IO事件处理是在NioEventLoop,事件的注册:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
return;
}
if (eventLoop == this) {
unsafe.close(unsafe.voidPromise());
}
return;
}
try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// 先处理OP_WRITE,因为可写一些排队的缓冲区,从而释放内存
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// 调用 forceFlush,一旦无剩余可写内容,它也将清除 OP_WRITE
// 可见,注册 OP_WRITE 事件,要执行的就是 flush 操作.
ch.unsafe().forceFlush();
}
// 还要校验 readOps 为0以解决可能的JDK bug,否则可能导致 spin loop
// 处理读请求(断开连接)或接入连接
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
}
...
}
private void processSelectedKeys() {
if (selectedKeys != null) {
// 不用 JDK 的 selector.selectedKeys,性能更好(%1-2%),GC更少
processSelectedKeysOptimized();
}
...
}
不同事件调用unsafe的不同方法,Netty对底层socket的操作都通过
unsafe
- NioMessageUnsafe
NioServerSocketChannel使用NioMessageUnsafe做socket操作 - NioByteUnsafe
NioSocketChannel使用NioByteUnsafe做socket操作
处理每个连接:
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// 数组中的空节点允许通道关闭后对其GC
// 在SelectedSelectionKeySet中使用单数组
// 动机:SelectedSelectionKeySet当前在内部使用2个数组,并且期望用户调用flip访问基础数组并切换活动数组。
// 但是,我们不能同时使用2个数组,如果在重置数组元素时格外小心,就可以摆脱使用单数组。
// 修改: 介绍包装了Selector的SelectedSelectionKeySetSelector并确保我们在选择之前重置基础的SelectedSelectionKeySet数据结构-
// NioEventLoop#processSelectedKeysOptimized中的循环边界可以更精确地定义,因为我们知道基础数组的实际大小
selectedKeys.keys[i] = null;
// attachment 就是 NioServerSocketChannel
final Object a = k.attachment();
// 打断点!!!
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
}
...
}
}
② EventExecutorGroup
负责:
- 经由其使用
next()方法,提供EventExecutor - 处理自己的生命周期,并允许在全局模式中关闭它们
③ EventExecutor
特殊的EventExecutorGroup,快捷方法查看是否有Thread在事件循环执行。
④ EventLoopGroup
public interface EventLoopGroup extends EventExecutorGroup
特殊的 EventExecutorGroup,允许注册 Channel,即事件循环期间可执行 channel 操作,得到处理,供以后选用。
/**
* EventExecutorGroup实现的抽象基类,它同时处理多个线程的任务
*/
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
// 底层数组存储
private final EventExecutor[] children;
3.2 Channel
Channel 是 Netty 对 Socket 的高层抽象。
检查 SelectionKey 的 readyOps 标志位,以确定是否有可读或可接受的事件发生。如果有,则调用 unsafe.read() 方法进行处理:
NioEventLoop.java
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
NioMessageUnsafe#read()处理是否有新连接进来:
@Override
public void read() {
...
int localRead = doReadMessages(readBuf);
具体子类NioServerSocketChannel#doReadMessages:
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = javaChannel().accept();
...
}
@Override
protected ServerSocketChannel javaChannel() {
// 把底层的 channel 封装成 NioSocketChannel
return (ServerSocketChannel) super.javaChannel();
}
}
3.3 ByteBuf
Netty 对字节数据的高层抽象,提供了更丰富的功能和更灵活的操作方式。

3.4 Pipeline
逻辑链(Logic Chain)是指一系列有序的处理步骤或逻辑单元,用于处理特定的任务或数据流。在 Netty 中,逻辑链通常由 Pipeline 中的 ChannelHandler 组成,每个 ChannelHandler 负责处理数据流的一部分。
netty 将其抽象成逻辑链,netty咋把每个 pipeline 加入到客户端连接的呢?
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
...
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new DefaultSocketChannelConfig(this, socket.socket());
}
...
}
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
...
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
...
}
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
protected AbstractChannel(Channel parent) {
this.parent = parent;
unsafe = newUnsafe();
pipeline = new DefaultChannelPipeline(this);
}
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
}
final class DefaultChannelPipeline implements ChannelPipeline {
protected DefaultChannelPipeline(Channel channel) {
...
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
}
3.5 ChannelHandler
Logic链的一环。
DefaultChannelPipeline.java
// 主要入参的ChannelHandler
@Override
public final ChannelPipeline addAfter(
EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
final AbstractChannelHandlerContext ctx;
synchronized (this) {
checkMultiplicity(handler);
name = filterName(name, handler);
ctx = getContextOrDie(baseName);
newCtx = newContext(group, name, handler);
addAfter0(ctx, newCtx);
// If the registered is false it means that the channel was not registered on an eventLoop yet.
// In this case we remove the context from the pipeline and add a task that will call
// ChannelHandler.handlerRemoved(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
#笔记##读书笔记#
查看18道真题和解析