dubbo 网络设计

1、网络 IO 模型

IO 多路复用常见的有 select, poll, epoll。
在 netty 中,要使用 epollo,需要使用 EpollEventLoopGroup, 要使用 select,需要使用 NioEventLoopGroup
下面,来看下 dubbo 在创建 EventLoopGroup 时,使用的是哪个类.
netty 在创建 EventLoopGroup 都是通过 NettyEventLoopFactory#eventLoopGroup(int, String) 去创建。

  1. 默认不开启 epollo
  2. netty.epoll.enable 为 true 且操作系统为 linux 时, epollo 才生效
private static boolean shouldEpoll() {
    Configuration configuration = ApplicationModel.getEnvironment().getConfiguration();
    if (configuration.getBoolean("netty.epoll.enable", false)) {
        String osName = configuration.getString("os.name");
        return osName.toLowerCase().contains("linux") && Epoll.isAvailable();
    }
    return false;
}

为什么默认不使用 epollo?

个人看法:一般而言在连接不多的情况下,select 会比 epollo 更高效。而对于大部分应用程序,不会有太多的节点,因此使用 select 比 epollo 更合理。

1.1、业务线程设计

netty 的网络模型使用的是 reactor 模式。在 reactor 模式中,将线程分为了 bossGroup、workGroup、业务线程。
bossGroup 用于处理连接。建立连接后,将连接交给 workGroup, workGroup 负责读写。读写处理完后,再将消息转发给业务线程。
在 dubbo 中,业务逻辑处理使用的是业务线程。编解码消息体默认使用 workGroup 线程,可通过 decode.in.io 配置进行修改。

那么在 dubbo 中,工作线程长什么样呢?

dubbo 允许用户通过 SPI 机制修改工作线程。默认提供的工作线程有如下:

  1. CachedThreadPool(cached)
    线程池模型为 Executors.newCachedThreadPool()
  2. EagerThreadPool(ager)
    核心线程数满时,创建新的线程去执行任务,而不是将任务放到阻塞队列中
  3. FixedThreadPool(默认-fixed)
    线程池模型为 Executors.newFixedThreadPool()
  4. LimitedThreadPool(limited)
    不会收缩的线程池

可以通过 dubbo.protocol.threadpool 修改默认行为

2、任务派发

我们知道 reactor 有分 IO 线程、工作线程。如果派发到工作线程,则还需要将任务提交给工作线程。如果在 IO 线程执行,则免去这一步骤, 效率更高。因为接收消息的就是 IO 线程。
dubbo 提供了以下几种分发策略

  • all 所有消息都派发到业务线程池,包括请求,响应,连接事件,断开事件,心跳等。(默认)
  • direct 所有消息都不派发到业务线程池,全部在 workGroup 线程上直接执行。
  • message 只有请求响应消息派发到业务线程池,其它连接断开事件,心跳等消息,直接在 workGroup 线程上执行。
  • execution 只有请求消息派发到业务线程池,不含响应,响应和其它连接断开事件,心跳等消息,直接在 workGroup 线程上执行。
  • connection 在 workGroup 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到业务线程池。

3、粘包、半包处理

代码入口:InternalDecoder#decode()
粘包、半包的处理原则是:不停的读取数据流,直到数据流成为一个完整的包。至于如何判断是否为完整的包,则由程序决定。RPC 中,常用的是 固定的消息头大小 + 可变的消息体。并在消息头在声明消息体的大小。dubbo 采用的就是这种方式。

4、心跳机制

dubbo 没有重新设计心跳机制,而是使用 netty 自带的心跳机制。

4.1、client

功能为:客户端如果 60s 没有发送数据包,则发送一次心跳包。每次发完心跳包,都会检查连接是否已经失效。如果是,则关闭连接,并从本地缓存中删掉该连接

代码位置 NettyClient#doOpen()

ch.pipeline()
.addLast("client-idle-handler", new IdleStateHandler(60000, 0, 0, MILLISECONDS))
.addLast("handler", nettyClientHandler);

具体的处理机制, 代码位置 NettyClientHandler

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    // send heartbeat when read idle.
    if (evt instanceof IdleStateEvent) {
        try {
            NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
            if (logger.isDebugEnabled()) {
                logger.debug("IdleStateEvent triggered, send heartbeat to channel " + channel);
            }
            Request req = new Request();
            req.setVersion(Version.getProtocolVersion());
            req.setTwoWay(true);
            req.setEvent(HEARTBEAT_EVENT);
            channel.send(req);
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.channel());
        }
    } else {
        super.userEventTriggered(ctx, evt);
    }
}

4.2、server

功能为:如果 180s 没有任何的收发数据包,则关闭连接
代码位置 NettyServer#doOpen

ch.pipeline()
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler);

代码位置 NettyServerHandler

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    // server will close channel when server don't receive any heartbeat from client util timeout.
    if (evt instanceof IdleStateEvent) {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
        try {
            logger.info("IdleStateEvent triggered, close channel " + channel);
            channel.close();
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.channel());
        }
    }
    super.userEventTriggered(ctx, evt);
}

5、client 重连机制

client 通过定时任务方式,定时检查 client 是否需要重连.

5.1、创建重连定时任务

代码入口:HeaderExchangeClient(Client, boolean)
默认每隔 180s 检测一次。即,心跳时间的 3 倍。
需要注意,大部分框架,都会将判断应用是否死亡时间,设置为心跳时间的 3倍。主要的原因是:可能出于网络原因,导致心跳包没有接收到。因此给了 client 2 次重试的机会。

image.png

5.2、是否触发重连判断

代码入口:ReconnectTimerTask#doTask(Channel)

image.png

如果应用失去连接,则直接重连,否则判断上次读的时间是否超过了空闲时间(180s),如果是也触发重连。
需要注意的是,在网络编程中,应用可连接不代表应用是存活正常的,程序有可能出于假死状态。因此需要触发重连。

6、总结

dubbo 对网络编程做了高度的封装,本质还是不离网络编程,例如 粘包、半包、协议设计、心跳、重连等网络开发中会遇到的常见问题。

全部评论

相关推荐

点赞 评论 收藏
转发
点赞 收藏 评论
分享
牛客网
牛客企业服务