深入浅出mediasoup—基础通信

libuv 是一个跨平台的异步事件驱动库,用于构建高性能和可扩展的网络应用程序。mediasoup 基于 libuv 构建了 Pipe 通信、Socket 通信、定时器和信号处理等一整套通信框架,具有异步、单线程、事件驱动的典型特征,这是构建高性能 WebRTC 流媒体服务器的重要基础,本文主要分析 mediasoup 是如何对 libuv 进行封装的。

1. Pipe 通信

mediasoup 的 node.js 进程与 worker 进程使用管道通信。node.js 进程通过管道向 worker 发送请求,从 worker 接收响应。worker 也可以主动向 node.js 进程发送通知消息。因此,需要实现双向管道通信。

1.1. 文件描述符

管道通信需要使用两个文件描述符,node.js 进程使用如下定义:

this.#channel = new Channel({
	producerSocket: this.#child.stdio[3],
	consumerSocket: this.#child.stdio[4],
	pid: this.#pid,
});

worker 进程使用如下定义:

static constexpr int ConsumerChannelFd{ 3 };
static constexpr int ProducerChannelFd{ 4 };

1.2. 静态结构

mediasoup 封装的管道通信看起来比较复杂,涉及到的类比较多,如下图所示。这里面糅合了几个逻辑,拆解以后就会更好理解:

1)UnixStreamSocketHandle ChannelSocket 封装了基于 libuv 的 pipe 通信能力。

2)ChannelSocket 内部包含 ConsumerSocket 和 ProducerSocket 对应管道通信的读和写两个方向。ChannelSocket 继承了 ConsumerSocket::Listener,从 ConsumerSocket 收到的管道消息,都会回调到 ChannelSocket。

2)全局只有一个 ChannelSocket对象,被 Worker 持有。Worker 继承了 ChannelSocekt::Listener,ChannelSocket 收到的所有管道消息都会回调 Worker。

3)Worker 包含了一个 Shared 对象,从名字上出,这是一个“共享对象”,可以认为是一个全局对象,只不过是通过传参的方式共享给各个对象。

4)Shared 包含两个对象:ChannelMessageRegistor 和 ChannelNotifier。ChannelMessageRegistor 用来注册管道消息处理器,Worker 根据注册信息分发收到的管道消息。ChannelNotifier 用来发送管道消息,其内部也是使用 ChannelSocket 来发送消息。

1.3. 数据流

管道通信的数据流如下图所示。接收到的管道消息一层层回调到 Worker 对象,Worker 先对消息进行过滤,如果是 Worker 自己关注的消息,则自己处理掉;如果不是自己关注的消息,则根据消息注册信息,将消息路由到对应的模块进行处理。各模块如果要发送管道消息,调用 ChannelNotifier::Emit 接口,最终是调用 libuv 接口发送出去。

2. Socket 通信

Socket 通信是指 mediasoup worker 与 WebRTC 客户端之间的媒体通信,mediasoup 支持使用 UDP 或 TCP 进行媒体通信。

2.1. 静态结构

2.1.1. UDP

1)UdpSocketHandle 封装了基于 libuv 的 UDP 通信能力。

2)UdpSocket 继承自 UdpSocketHandle,内部包含了一个数据监听对象,用来接收 UDP 消息。

2)PipeTransport、PlainTransport、WebRtcTransport 和 WebRtcServer 都支持 UDP 通信,它们内部都包含一个指向 UdpSocket 的指针,用来发送 UDP 消息。

2.1.2. TCP

1)TcpServerHandle 封装了基于 libuv 的 TCP 监听能力,管理从监听端口创建的 TCP 连接。

2)TcpConnectionHandle 封装了基于 libuv 的 TCP 通信能力,连接中断后会通过 OnTcpConnectionClosed 通知 TcpServerHandle。

3)TcpConnection 继承自 TcpConnectionHandle,收到 TCP 报文会回调连接监听者。

4)当前只有 WebRtcServer 和 WebRtcTransport 支持 TCP 通信。

【注】WebRtcServer 用来做端口聚合,其上可以承载多个 WebRtcTransport。

2.2. Socket 创建

2.2.1. WebRtcServer

WebRtcServer 根据传入的参数,决定创建 UdpSocket 还是 TcpServer。如果配置了端口范围则从端口范围中取一个可用端口;如果配置了端口,则使用指定端口;否则使用命令行参数指定的端口范围。

WebRtcServer::WebRtcServer(RTC::Shared* shared, const std::string& id,
  const flatbuffers::Vector<flatbuffers::Offset<Transport::ListenInfo>>* listenInfos)
  : id(id), shared(shared)
{
  ...
  // 遍历所有地址
  for (const auto* listenInfo : *listenInfos)
  {
    auto ip = listenInfo->ip()->str();
    ...
    // UDP 协议
    if (listenInfo->protocol() == FBS::Transport::Protocol::UDP)
    {
      RTC::UdpSocket* udpSocket;

      // 指定端口范围,从中选择一个
      if (listenInfo->portRange()->min() != 0 && listenInfo->portRange()->max() != 0)
      {
        uint64_t portRangeHash{ 0u };
        udpSocket = new RTC::UdpSocket(
          this,
          ip,
          listenInfo->portRange()->min(),
          listenInfo->portRange()->max(),
          flags,
          portRangeHash);
      }
      // 指定端口
      else if (listenInfo->port() != 0)
      {
        udpSocket = new RTC::UdpSocket(this, ip, listenInfo->port(), flags);
      }
      // 未指定端口,使用配置中的端口
      else
      {
        uint64_t portRangeHash{ 0u };
        udpSocket = new RTC::UdpSocket(
          this,
          ip,
          Settings::configuration.rtcMinPort,
          Settings::configuration.rtcMaxPort,
          flags,
          portRangeHash);
      }
      ...
    }
    // TCP 协议
    else if (listenInfo->protocol() == FBS::Transport::Protocol::TCP)
    {
      RTC::TcpServer* tcpServer;
      // 指定端口范围
      if (listenInfo->portRange()->min() != 0 && listenInfo->portRange()->max() != 0)
      {
        uint64_t portRangeHash{ 0u };

        tcpServer = new RTC::TcpServer(
          this,
          this,
          ip,
          listenInfo->portRange()->min(),
          listenInfo->portRange()->max(),
          flags,
          portRangeHash);
      }
      // 指定端口
      else if (listenInfo->port() != 0)
      {
        tcpServer = new RTC::TcpServer(this, this, ip, listenInfo->port(), flags);
      }
      // 未指定端口,使用配置中的端口
      else
      {
        uint64_t portRangeHash{ 0u };
        tcpServer = new RTC::TcpServer(
          this,
          this,
          ip,
          Settings::configuration.rtcMinPort,
          Settings::configuration.rtcMaxPort,
          flags,
          portRangeHash);
      }
      ...
    }
  }
  ...
}

2.2.2. WebRtcTransport

如果创建 WebRtcTransport 时指定了 WebRtcServer,则 WebRtcTransport 不会再创建 Socket,而是复用 WebRtcServer 创建的 Socket。

WebRtcTransport::WebRtcTransport(...)
{
  ...
  
  // 加入到 WebRtcServer 的转发列表
  this->webRtcTransportListener->OnWebRtcTransportCreated(this);

  ...
}

如果不在 WebRtcServer上 创建 WebRtcTransport,则根据参数创建对应 Socket,处理逻辑与 WebRtcServer类似,不再赘述。

WebRtcTransport::WebRtcTransport(...)
{
  ...

  for (const auto* listenInfo : *listenInfos)
  {
    if (listenInfo->protocol() == FBS::Transport::Protocol::UDP)
    {
      ...
    }
    else if (listenInfo->protocol() == FBS::Transport::Protocol::TCP)
    {
      ...
    }
  }
  ...
}

2.2.3. PlainTransport

PlainTransport 用来对接像 FFMEPT 推流这种第三方编码器和工具, 只支持 UDP 协议。

PipeTransport::PipeTransport(
  RTC::Shared* shared,
  const std::string& id,
  RTC::Transport::Listener* listener,
  const FBS::PipeTransport::PipeTransportOptions* options)
  : RTC::Transport::Transport(shared, id, listener, options->base())
{
  ...

  // 指定端口范围
  if (this->listenInfo.portRange.min != 0 && this->listenInfo.portRange.max != 0)
  {
    uint64_t portRangeHash{ 0u };

    this->udpSocket = new RTC::UdpSocket(
      this,
      this->listenInfo.ip,
      this->listenInfo.portRange.min,
      this->listenInfo.portRange.max,
      this->listenInfo.flags,
      portRangeHash);
  }
  // 指定端口
  else if (this->listenInfo.port != 0)
  {
    this->udpSocket = new RTC::UdpSocket(
      this, this->listenInfo.ip, this->listenInfo.port, this->listenInfo.flags);
  }
  // 未指定端口,使用配置
  else
  {
    uint64_t portRangeHash{ 0u };

    this->udpSocket = new RTC::UdpSocket(
      this,
      this->listenInfo.ip,
      Settings::configuration.rtcMinPort,
      Settings::configuration.rtcMaxPort,
      this->listenInfo.flags,
      portRangeHash);
  }
  
  ...
}

2.2.4. PipeTransport

PipeTransport 用来从服务端进行对接。PipeTransport 也只支持 UDP 协议。

PipeTransport::PipeTransport(
  RTC::Shared* shared,
  const std::string& id,
  RTC::Transport::Listener* listener,
  const FBS::PipeTransport::PipeTransportOptions* options)
  : RTC::Transport::Transport(shared, id, listener, options->base())
{
  ...

  if (this->listenInfo.portRange.min != 0 && this->listenInfo.portRange.max != 0)
  {
    uint64_t portRangeHash{ 0u };

    this->udpSocket = new RTC::UdpSocket(
      this,
      this->listenInfo.ip,
      this->listenInfo.portRange.min,
      this->listenInfo.portRange.max,
      this->listenInfo.flags,
      portRangeHash);
  }
  else if (this->listenInfo.port != 0)
  {
    this->udpSocket = new RTC::UdpSocket(
      this, this->listenInfo.ip, this->listenInfo.port, this->listenInfo.flags);
  }
  else
  {
    uint64_t portRangeHash{ 0u };

    this->udpSocket = new RTC::UdpSocket(
      this,
      this->listenInfo.ip,
      Settings::configuration.rtcMinPort,
      Settings::configuration.rtcMaxPort,
      this->listenInfo.flags,
      portRangeHash);
  }

  ...
}

2.3. 数据流

2.3.1. UDP

2.3.1.1. 接收数据

以 WebRtcServer 为例,libuv 收到 UDP 消息会回调 UdpSocketHandle::OnUvRecv,UdpSocketHandle 再回调 UdpSocket::UserOnUdpDatagramReceived,UdpSocket 将消息回调给数据监听者。

2.3.1.2. 发送数据

需要发送数据的模块持有 TransportTuple 对象,发送 UDP 数据时,调用 TransportTuple:: Send 方法,内部会调用 UdpSocketHandle::Send,最后调用 libuv 接口将数据发送到网络。

需要注意,UDP 报文发送有一个特殊机制,如果由于各种原因导致报文发送失败,比如网络缓冲区溢出,mediasoup 会调用 libuv 的异步发送接口进行发送,而不会直接返回错误。

void UdpSocketHandle::Send(const uint8_t* data, size_t len, const struct sockaddr* addr,   UdpSocketHandle::onSendCallback* cb)
{
  ...

  // 使用待发送送数据初始化一块uv缓冲区
  uv_buf_t buffer = uv_buf_init(
    reinterpret_cast<char*>(const_cast<uint8_t*>(data)), len);

  // 调用同步接口发送
  const int sent  = uv_udp_try_send(this->uvHandle, &buffer, 1, addr);
  // 所有数据都发送完成
  if (sent == static_cast<int>(len))
  {
    // Update sent bytes.
    this->sentBytes += sent;
    if (cb)
    {
      (*cb)(true); // 回调返回成功
      delete cb;
    }
    return;
  }
  // 发送了部分数据
  else if (sent >= 0)
  {
    this->sentBytes += sent;
    if (cb)
    {
      (*cb)(false); // 回调返回失败
      delete cb;
    }
    return;
  }
  // 出错了,可能是网络繁忙,使用异步接口uv_udp_send发送
  else if (sent != UV_EAGAIN)
  {
    MS_WARN_DEV("uv_udp_try_send() failed, trying uv_udp_send(): %s", uv_strerror(sent));
  }

  // 创建一个异步处理数据结构
  auto* sendData = new UvSendData(len);
  // 作为自定义数据挂载到uv数据结构中
  sendData->req.data = static_cast<void*>(sendData);
  // 拷贝待发送数据
  std::memcpy(sendData->store, data, len);
  // 保存回调函数指针
  sendData->cb = cb;

  // 使用待发送数据的拷贝初始化uv缓冲区
  buffer = uv_buf_init(reinterpret_cast<char*>(sendData->store), len);

  // 调用异步接口发送,设置回调接口onSend
  const int err = uv_udp_send(&sendData->req, this->uvHandle, &buffer, 1, addr, 
    static_cast<uv_udp_send_cb>(onSend));
  if (err != 0)
  {
    if (cb)
    {
      (*cb)(false);
    }
    delete sendData;
  }
  else
  {
    this->sentBytes += len;
  }
}

UvSendData 定义如下

struct UvSendData
{
  uv_udp_send_t req{};
  uint8_t* store{ nullptr };
  UdpSocketHandle::onSendCallback* cb{ nullptr };
};

libuv 发送完成后会回调 onSend,在 onSend 函数中处理善后事宜。

inline static void onSend(uv_udp_send_t* req, int status)
{
  auto* sendData = static_cast<UdpSocketHandle::UvSendData*>(req->data);
  auto* handle   = req->handle;
  auto* socket   = static_cast<UdpSocketHandle*>(handle->data);
  const auto* cb = sendData->cb;

  if (socket)
  {
    socket->OnUvSend(status, cb);
  }

  // Delete the UvSendData struct (it will delete the store and cb too).
  delete sendData;
}

2.3.2. TCP

2.3.2.1. 监听连接

TcpServer 调用 libuv 接口建立监听,客户端与服务器完成三次握手后,libuv 会回调 TcpServerHandle::OnUvConnection, TcpServerHandle 回调 TcpServer::UserOnTcpConnectionAlloc,TcpServer 创建 TcpConnection 并调用 AcceptTcpConnection 告知要接受这个连接。TcpServerHandle 对 TcpConnection 进行初始化,调用 libuv 的 uv_accpet 方法完成新连接的创建,最后,调用 TcpConnectionHandle::Start 开始接收数据。

2.3.2.2. 接收数据

以 WebRtcServer 为例,libuv 收到数据后层层回调到 WebRtcServer。

2.3.2.3. 发送数据

发送数据的逻辑与 UDP 基本一样,层层调用 libuv 接口将数据发送到网络。

3. 定时器

定时器在很多地方都会被用到,mediasoup 使用 TimerHanlde 封装 libuv 的定时器能力,需要使用定时器的类需要继承 TimerHandle::Listener,实现 OnTimer 虚拟方法。然后创建一个 TimerHandle 对象,传入 this 指针,调用 TimerHandle::Start 方法即可。

4. 信号处理

信号是进程间通信的一种机制,也是操作系统用来通知进程有关系统事件或异常状况的重要手段。信号可以由系统内核发送给进程,也可以由一个进程发送给另一个进程。在 Worker 进程中,Worker 类是唯一处理 signal 的类,它继承 SignalHandle::Listener,实现 OnSignal 虚拟方法,进程接收的所有信号都会回调给 Worker 处理。

void Worker::OnSignal(SignalHandle* /*signalHandle*/, int signum)
{
  if (this->closed)
  {
    return;
  }

  switch (signum)
  {
    case SIGINT:
    {
      if (this->closed)
      {
        return;
      }
      Close();
      break;
    }
    case SIGTERM:
    {
      if (this->closed)
      {
        return;
      }
      Close();
      break;
    }
    default:
    {
      MS_WARN_DEV("received a non handled signal [signum:%d]", signum);
    }
  }
}

5. 总结

本文简要描述了 mediasoup 是如何封装 libuv 的,覆盖了 pipe、socket、signal 等不同通信方式,重点分析了 Socket 通信的静态结构和数据流,针对性的分析了 UDP 报文的异步发送机制,为进一步阅读 mediasoup 源码打下了基础。

深入浅出mediasoup 文章被收录于专栏

深入分析 mediasoup 实现原理

全部评论

相关推荐

点赞 评论 收藏
分享
后来123321:别着急,我学院本大二,投了1100份,两个面试,其中一个还是我去线下招聘会投的简历,有时候这东西也得看运气
点赞 评论 收藏
分享
白火同学:大二有这水平很牛了,可以适当对关键信息加粗一点,比如关键技术、性能指标之类的。
点赞 评论 收藏
分享
评论
1
1
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务