Unity 网络编程协议/会话层

初学者,自用笔记

协议层

基础信息

public class NetMessage
{
    public int MessageType;   // 消息类型ID
    public byte[] Payload;     // 业务序列化数据
}

// 消息类型注册表:类型ID 和 C# 类型互相映射
public interface IMessageTypeRegistry
{
    void Register<T>(int messageType) where T : class;
    int GetMessageType(Type type);
    Type GetType(int messageType);
}

// 消息类型注册表实现
public class MessageTypeRegistry : IMessageTypeRegistry
{
    private readonly Dictionary<Type, int> _typeToId = new Dictionary<Type, int>();
    private readonly Dictionary<int, Type> _idToType = new Dictionary<int, Type>();

    public void Register<T>(int messageType) where T : class
    {
        var type = typeof(T);
        _typeToId[type] = messageType;
        _idToType[messageType] = type;
    }

    public int GetMessageType(Type type)
    {
        return _typeToId.TryGetValue(type, out var id) ? id : 0;
    }

    public Type GetType(int messageType)
    {
        return _idToType.TryGetValue(messageType, out var type) ? type : null;
    }
}

TCP沾包/拆包

特殊字符法

原理

  • 在消息末尾加唯一分隔符(如 \r\n),按分隔符拆分消息
  • 消息体不能包含分隔符(需转义)、逐字节查找分隔符性能低、容错性差

包头包体法

原理

  • 包头:固定长度(通常 4 字节 int 类型),仅存储「包体的字节长度」,且统一采用大端序(网络字节序) 保证跨平台兼容;
  • 包体:实际的业务数据(文本 / 二进制 / 序列化后的对象等)。

优势

  • 无内容限制:包体可包含任意字符 / 二进制数据(如 Protobuf、图片),无分隔符冲突问题;
  • 高性能:按长度直接截取数据,无需逐字节遍历,时间复杂度 O (1);
  • 容错性强:可校验包体长度合理性,异常消息不影响后续解析。

字节序工具类

public static class EndianHelper
{
    // int 转 大端序字节数组(发送到网络时用)
    public static byte[] IntToBigEndianBytes(int value)
    {
        // 先把 int 转成本地字节序(可能是小端)
        byte[] bytes = BitConverter.GetBytes(value);
        // 如果当前系统是小端,就反转字节,变成大端
        if (BitConverter.IsLittleEndian)
            Array.Reverse(bytes);
        return bytes;
    }

    // 大端序字节数组 转 int(从网络接收后解析用)
    public static int BigEndianBytesToInt(byte[] bytes)
    {
        // 如果是小端系统,先反转回小端才能正确解析
        if (BitConverter.IsLittleEndian)
            Array.Reverse(bytes);
        return BitConverter.ToInt32(bytes, 0);
    }

    // short 转 大端序字节数组
    public static byte[] ShortToBigEndianBytes(short value)
    {
        byte[] bytes = BitConverter.GetBytes(value);
        if (BitConverter.IsLittleEndian)
            Array.Reverse(bytes);
        return bytes;
    }

    // 大端序字节数组 转 short
    public static short BigEndianBytesToShort(byte[] bytes)
    {
        if (BitConverter.IsLittleEndian)
            Array.Reverse(bytes);
        return BitConverter.ToInt16(bytes, 0);
    }
}

包头包体实现

// 协议处理接口:打包/解包
public interface ITcpProtocolHandler
{
    byte[] Pack(byte[] body);
    List<byte[]> Unpack(byte[] buffer, ref int bufferLen);
}
// 固定包头协议:4字节长度头 + 包体
public class FixedHeaderProtocol : ITcpProtocolHandler
{
    private const int HeaderSize = 4;
    private const int MaxBodySize = 1024 * 1024;

    public byte[] Pack(byte[] body)
    {
       // 1. 校验包体长度
        if (body == null || body.Length == 0)
            throw new ArgumentException("包体不能为空");
        if (body.Length > MaxBodySize)
            throw new ArgumentException("包体超长");

       // 2. 生成大端序包头(存储包体长度)
        byte[] header = EndianHelper.IntToBigEndian(body.Length);
        byte[] packet = new byte[HeaderSize + body.Length];

      // 3. 拼接头+体
        Array.Copy(header, 0, packet, 0, HeaderSize);
        Array.Copy(body, 0, packet, HeaderSize, body.Length);

        return packet;
    }

    public List<byte[]> Unpack(byte[] buffer, ref int bufferLen)
    {
        List<byte[]> list = new List<byte[]>();
        int remain = bufferLen;

        while (remain >= HeaderSize)
        {
            // 读包头
            byte[] headerBuf = new byte[HeaderSize];
            Array.Copy(buffer, 0, headerBuf, 0, HeaderSize);
            int bodyLen = EndianHelper.BigEndianToInt(headerBuf);

            // 长度校验
            if (bodyLen <= 0 || bodyLen > MaxBodySize)
                throw new Exception("非法包体长度");

            // 检查是否够一个完整包
            if (remain < HeaderSize + bodyLen)
                break;

            // 提取包体
            byte[] body = new byte[bodyLen];
            Array.Copy(buffer, HeaderSize, body, 0, bodyLen);
            list.Add(body);

            // 移动缓存
            int processed = HeaderSize + bodyLen;
            remain -= processed;
            Array.Copy(buffer, processed, buffer, 0, remain);
        }

        bufferLen = remain;
        return list;
    }
}

UDP分片协议

工具类

// UDP分片头
public struct UdpFragmentHeader
{
    public int MessageId;     // 完整消息ID
    public short FragIndex;    // 当前分片序号(从0开始)
    public short FragCount;    // 总分片数
    public int TotalLength;    // 完整消息总长度
}


public interface ITimeProvider
{
    float Now { get; }
}

// Unity 环境下的时间实现
public class UnityTimeProvider : ITimeProvider
{
    // 游戏运行到现在的真实时间(不受暂停影响)
    public float Now => Time.realtimeS***artup;
}

职责抽象

// 分片头 编码/解码器
// 负责:把 UdpFragmentHeader 结构体 和 字节数组 互相转换
public interface IUdpFragmentHeaderCodec
{
    // 结构体 → 字节数组(发送时用)
    byte[] Encode(UdpFragmentHeader header);
    // 字节数组 → 结构体(接收时用)
    UdpFragmentHeader Decode(byte[] packet);
    // 分片头固定长度(这里是12字节)
    int HeaderSize { get; }
}

// 分片打包器
// 作用:把一个很大的字节数组,切成多个符合MTU的UDP小包
public interface IUdpFragmentPacker
{
    // 输入:完整大消息
    // 输出:一堆带分片头的小包
    List<byte[]> Pack(byte[] body);
}

// 分片重组器
// 作用:接收一个个分片,缓存起来,收齐后拼成完整大消息
public interface IUdpFragmentReassembler
{
    // 传入一个收到的UDP包,尝试重组
    // 成功返回完整消息,失败返回null
    byte[] TryReassemble(byte[] packet);

    // 定时清理:超时没收到全部分片的缓冲
    void CleanupTimeout();
}

// UDP 分片对外统一服务
// 把打包+重组封装在一起,方便上层直接调用
public interface IUdpFragmentService
{
    List<byte[]> Pack(byte[] body);       // 分片
    byte[] TryReassemble(byte[] packet);  // 重组
    void CleanupTimeout();               // 清理超时
}

具体实现

// 分片头编解码实现(12字节)
// 实现 UdpFragmentHeader 和字节的互相转换
public class UdpFragmentHeaderCodec : IUdpFragmentHeaderCodec
{
    // 分片头固定长度:int(4) + short(2) + short(2) + int(4) = 12
    public int HeaderSize => 12;

    // 结构体 → 12字节数组
    public byte[] Encode(UdpFragmentHeader header)
    {
        byte[] bytes = new byte[HeaderSize];

        // 依次把4个字段按大端序写入字节数组
        Array.Copy(EndianHelper.IntToBigEndianBytes(header.MessageId), 0, bytes, 0, 4);
        Array.Copy(EndianHelper.ShortToBigEndianBytes(header.FragIndex), 0, bytes, 4, 2);
        Array.Copy(EndianHelper.ShortToBigEndianBytes(header.FragCount), 0, bytes, 6, 2);
        Array.Copy(EndianHelper.IntToBigEndianBytes(header.TotalLength), 0, bytes, 8, 4);

        return bytes;
    }

    // 12字节数组 → 结构体
    public UdpFragmentHeader Decode(byte[] packet)
    {
        return new UdpFragmentHeader
        {
            MessageId = EndianHelper.BigEndianBytesToInt(packet.AsSpan(0, 4).ToArray()),
            FragIndex = EndianHelper.BigEndianBytesToShort(packet.AsSpan(4, 2).ToArray()),
            FragCount = EndianHelper.BigEndianBytesToShort(packet.AsSpan(6, 2).ToArray()),
            TotalLength = EndianHelper.BigEndianBytesToInt(packet.AsSpan(8, 4).ToArray())
        };
    }
}

// 分片打包器实现
// 功能:把大消息切成多个 ≤1200 字节的小包
public class UdpFragmentPacker : IUdpFragmentPacker
{
    // UDP 安全包大小,避免被路由器分片/丢弃
    private const int MaxPacketSize = 1200;
    private readonly IUdpFragmentHeaderCodec _codec;
    private int _messageId = 1; // 自增消息ID,保证每个大消息唯一

    public UdpFragmentPacker(IUdpFragmentHeaderCodec codec)
    {
        _codec = codec;
    }

    // 大消息 → 分片列表
    public List<byte[]> Pack(byte[] body)
    {
        List<byte[]> packets = new List<byte[]>();
        int msgId = _messageId++;

        // 每个分片能装的有效数据长度 = 最大包长 - 分片头长度
        int payloadSize = MaxPacketSize - _codec.HeaderSize;
        // 计算总分片数(向上取整)
        int fragCount = (body.Length + payloadSize - 1) / payloadSize;

        // 循环切成每一片
        for (short i = 0; i < fragCount; i++)
        {
            // 当前分片在完整消息里的起始偏移
            int offset = i * payloadSize;
            // 当前分片实际数据长度(最后一片可能不足payloadSize)
            int len = Math.Min(payloadSize, body.Length - offset);

            // 最终要发送的UDP包 = 头 + 数据
            byte[] packet = new byte[_codec.HeaderSize + len];

            // 构造当前分片的头信息
            UdpFragmentHeader header = new UdpFragmentHeader
            {
                MessageId = msgId,
                FragIndex = i,
                FragCount = (short)fragCount,
                TotalLength = body.Length
            };

            // 写入头
            byte[] headerBytes = _codec.Encode(header);
            Array.Copy(headerBytes, 0, packet, 0, _codec.HeaderSize);
            // 写入分片数据
            Array.Copy(body, offset, packet, _codec.HeaderSize, len);

            packets.Add(packet);
        }

        return packets;
    }
}

// 分片重组器实现
// 功能:接收分片 → 缓存 → 收齐后拼接 → 返回完整消息
public class UdpFragmentReassembler : IUdpFragmentReassembler
{
    // 内部类:同一个大消息的所有分片缓冲
    private class FragmentBuffer
    {
        public int MessageId;         // 消息ID
        public int TotalLength;       // 完整长度
        public short FragCount;       // 总分片数
        public byte[][] Fragments;    // 分片内容数组
        public bool[] Received;       // 标记每个分片是否收到
        public float LastReceiveTime; // 最后收到分片的时间(用于超时)
    }

    private readonly IUdpFragmentHeaderCodec _codec;
    private readonly ITimeProvider _timeProvider;

    // 按 MessageId 缓存正在拼接的消息
    private readonly Dictionary<int, FragmentBuffer> _buffers = new();

    // 分片超时时间:超过5秒没收完就丢弃
    private const float Timeout = 5f;

    public UdpFragmentReassembler(IUdpFragmentHeaderCodec codec, ITimeProvider timeProvider)
    {
        _codec = codec;
        _timeProvider = timeProvider;
    }

    // 接收一个UDP包,尝试重组
    public byte[] TryReassemble(byte[] packet)
    {
        // 包长度比头还小,非法
        if (packet.Length < _codec.HeaderSize)
            return null;

        // 先解析出分片头
        UdpFragmentHeader header = _codec.Decode(packet);

        // 提取分片真正的数据(去掉头)
        byte[] payload = new byte[packet.Length - _codec.HeaderSize];
        Array.Copy(packet, _codec.HeaderSize, payload, 0, payload.Length);

        // 如果这个消息还没有缓冲,就新建一个
        if (!_buffers.TryGetValue(header.MessageId, out var buf))
        {
            buf = new FragmentBuffer
            {
                MessageId = header.MessageId,
                TotalLength = header.TotalLength,
                FragCount = header.FragCount,
                Fragments = new byte[header.FragCount],
                Received = new bool[header.FragCount],
                LastReceiveTime = _timeProvider.Now
            };
            _buffers[header.MessageId] = buf;
        }

        // 分片序号越界,直接丢弃
        if (header.FragIndex < 0 || header.FragIndex >= header.FragCount)
            return null;

        // 如果这个分片还没收到,就存起来
        if (!buf.Received[header.FragIndex])
        {
            buf.Fragments[header.FragIndex] = payload;
            buf.Received[header.FragIndex] = true;
            buf.LastReceiveTime = _timeProvider.Now;
        }

        // 检查:是不是所有分片都收到了
        foreach (bool r in buf.Received)
        {
            if (!r)
                return null; // 没收完,返回null
        }

        // 收齐了,开始拼接成完整消息
        byte[] fullBody = new byte[buf.TotalLength];
        int offset = 0;
        foreach (byte[] seg in buf.Fragments)
        {
            Array.Copy(seg, 0, fullBody, offset, seg.Length);
            offset += seg.Length;
        }

        // 拼接完成,从缓冲中移除
        _buffers.Remove(header.MessageId);

        return fullBody;
    }

    // 清理超时消息:防止内存泄漏,丢包时不会一直占内存
    public void CleanupTimeout()
    {
        float now = _timeProvider.Now;
        // 找出所有超时的消息ID
        List<int> toRemove = new List<int>();
        foreach (var kv in _buffers)
        {
            if (now - kv.Value.LastReceiveTime > Timeout)
                toRemove.Add(kv.Key);
        }
        // 移除
        foreach (int id in toRemove)
            _buffers.Remove(id);
    }
}

// 统一分片服务
// 把打包、重组封装成一个简单接口,给 UDP 客户端/服务端使用
public class UdpFragmentService : IUdpFragmentService
{
    private readonly IUdpFragmentPacker _packer;
    private readonly IUdpFragmentReassembler _reassembler;

    public UdpFragmentService(IUdpFragmentPacker packer, IUdpFragmentReassembler reassembler)
    {
        _packer = packer;
        _reassembler = reassembler;
    }

    public List<byte[]> Pack(byte[] body) => _packer.Pack(body);
    public byte[] TryReassemble(byte[] packet) => _reassembler.TryReassemble(packet);
    public void CleanupTimeout() => _reassembler.CleanupTimeout();
}

服务端

职责抽象

// 协议服务端:对上只暴露对象级别发送/接收
// 作用:给上层(RPC/业务)提供 启动、对指定客户端发消息、停止、驱动更新
public interface IProtocolServer
{
    void Start(string ip, int port);
    void SendTo<T>(EndPoint clientEP, T msg) where T : class, new();
    void Stop();
    void Tick(float deltaTime);

    event Action<object, EndPoint> OnMessageReceived; // 收到客户端消息
    event Action<EndPoint> OnClientConnected;          // 客户端连接
    event Action<EndPoint> OnClientDisconnected;       // 客户端断开
}

具体实现

// 服务端协议层:
// 负责:对象序列化、反序列化、TCP/UDP、多客户端消息分发
public class ProtocolServer : IProtocolServer
{
    private readonly ITransportRouterServer _router;
    private readonly IFullSerializer _serializer;
    private readonly IMessageTypeRegistry _typeRegistry;
    private readonly ITcpProtocolHandler _tcpProtocol;
    private readonly IUdpFragmentProtocol _udpProtocol;

    public event Action<object, EndPoint> OnMessageReceived;
    public event Action<EndPoint> OnClientConnected;
    public event Action<EndPoint> OnClientDisconnected;

    // 构造注入所有底层依赖
    public ProtocolServer(
        ITransportRouterServer router,
        IFullSerializer serializer,
        IMessageTypeRegistry typeRegistry,
        ITcpProtocolHandler tcpProtocol,
        IUdpFragmentProtocol udpProtocol)
    {
        _router = router;
        _serializer = serializer;
        _typeRegistry = typeRegistry;
        _tcpProtocol = tcpProtocol;
        _udpProtocol = udpProtocol;

        // 监听底层字节与连接事件
        _router.OnReliableBytesReceived += OnTcpBytesReceived;
        _router.OnUnreliableBytesReceived += OnUdpBytesReceived;
        _router.OnClientConnected += ep => OnClientConnected?.Invoke(ep);
        _router.OnClientDisconnected += ep => OnClientDisconnected?.Invoke(ep);
    }

    // 启动服务器
    public void Start(string ip, int port)
    {
        _router.Start(ip, port);
    }

    // 向指定客户端发送消息对象
    public void SendTo<T>(EndPoint clientEP, T msg) where T : class, new()
    {
        if (msg == null) return;

        // 获取类型ID
        int typeId = _typeRegistry.GetMessageType(msg.GetType());
        if (typeId == 0) return;

        // 序列化业务消息
        byte[] payload = _serializer.Serialize(msg);

        // 包装成网络消息
        var netMsg = new NetMessage
        {
            MessageType = typeId,
            Payload = payload
        };

        byte[] body = _serializer.Serialize(netMsg);

        // TCP:可靠发送
        if (msg is IReliableMessage)
        {
            byte[] packet = _tcpProtocol.Pack(body);
            _router.SendReliableTo(clientEP, packet);
        }
        // UDP:不可靠发送(分片)
        else if (msg is IUnreliableMessage)
        {
            List<byte[]> packets = _udpProtocol.Pack(body);
            foreach (var packet in packets)
            {
                _router.SendUnreliableTo(clientEP, packet);
            }
        }
    }

    // 停止服务器
    public void Stop()
    {
        _router.Stop();
    }

    // 驱动更新:清理UDP超时分片
    public void Tick(float deltaTime)
    {
        _udpProtocol.CleanupTimeout();
    }

    // TCP数据到达 → 解包
    private void OnTcpBytesReceived(byte[] bytes, EndPoint ep)
    {
        int len = bytes.Length;
        var bodies = _tcpProtocol.Unpack(bytes, ref len);

        foreach (var body in bodies)
        {
            HandleIncomingBody(body, ep);
        }
    }

    // UDP数据到达 → 重组
    private void OnUdpBytesReceived(byte[] bytes, EndPoint ep)
    {
        byte[] body = _udpProtocol.TryReassemble(bytes);
        if (body != null)
        {
            HandleIncomingBody(body, ep);
        }
    }

    // 反序列化 → 抛给上层业务
    private void HandleIncomingBody(byte[] body, EndPoint ep)
    {
        var netMsg = _serializer.Deserialize<NetMessage>(body);
        if (netMsg == null) return;

        Type msgType = _typeRegistry.GetType(netMsg.MessageType);
        if (msgType == null) return;

        var method = typeof(IFullSerializer)
            .GetMethod(nameof(IFullSerializer.Deserialize))
            ?.MakeGenericMethod(msgType);

        if (method == null) return;

        object msgObj = method.Invoke(_serializer, new object[] { netMsg.Payload });
        if (msgObj != null)
        {
            OnMessageReceived?.Invoke(msgObj, ep);
        }
    }
}

客户端

职责抽象

// 协议客户端:对上只暴露对象级别发送/接收
// 作用:给上层(RPC/业务)提供 连接、发送对象、关闭、驱动更新
public interface IProtocolClient
{
    void Connect(string ip, int port);
    void Send<T>(T msg) where T : class, new();
    void Close();
    void Tick(float deltaTime);

    event Action<object> OnMessageReceived; // 接收消息抛给上层
}

具体实现

// 客户端协议层:
// 负责:对象序列化、反序列化、TCP/UDP 分包/粘包处理、消息分发
public class ProtocolClient : IProtocolClient
{
    // 路由:负责底层 TCP/UDP 收发字节
    private readonly ITransportRouterClient _router;
    // 序列化器:对象 ↔ 字节
    private readonly IFullSerializer _serializer;
    // 消息类型注册表:消息类型 <-> 数字ID 映射
    private readonly IMessageTypeRegistry _typeRegistry;
    // TCP 协议处理器:处理粘包、分包
    private readonly ITcpProtocolHandler _tcpProtocol;
    // UDP 协议处理器:处理分片、重组、超时
    private readonly IUdpFragmentProtocol _udpProtocol;

    // 上层监听:收到完整消息对象
    public event Action<object> OnMessageReceived;

    // 构造注入:所有底层依赖由外部传入(符合DI架构)
    public ProtocolClient(
        ITransportRouterClient router,
        IFullSerializer serializer,
        IMessageTypeRegistry typeRegistry,
        ITcpProtocolHandler tcpProtocol,
        IUdpFragmentProtocol udpProtocol)
    {
        _router = router;
        _serializer = serializer;
        _typeRegistry = typeRegistry;
        _tcpProtocol = tcpProtocol;
        _udpProtocol = udpProtocol;

        // 监听底层字节到达
        _router.OnReliableBytesReceived += OnTcpBytesReceived;
        _router.OnUnreliableBytesReceived += OnUdpBytesReceived;
    }

    // 连接服务器
    public void Connect(string ip, int port)
    {
        _router.Connect(ip, port);
    }

    // 发送消息对象(自动识别TCP/UDP、序列化、分包)
    public void Send<T>(T msg) where T : class, new()
    {
        if (msg == null) return;

        // 获取消息类型ID
        int typeId = _typeRegistry.GetMessageType(msg.GetType());
        if (typeId == 0) return;

        // 序列化业务消息体
        byte[] payload = _serializer.Serialize(msg);

        // 包装成统一网络消息结构(typeId + body)
        var netMsg = new NetMessage
        {
            MessageType = typeId,
            Payload = payload
        };

        // 序列化NetMessage
        byte[] body = _serializer.Serialize(netMsg);

        // 可靠消息走TCP(粘包处理)
        if (msg is IReliableMessage)
        {
            byte[] packet = _tcpProtocol.Pack(body);
            _router.SendReliable(packet);
        }
        // 不可靠消息走UDP(分片处理)
        else if (msg is IUnreliableMessage)
        {
            List<byte[]> packets = _udpProtocol.Pack(body);
            foreach (var packet in packets)
            {
                _router.SendUnreliable(packet);
            }
        }
    }

    // 关闭连接
    public void Close()
    {
        _router.Close();
    }

    // 驱动更新:清理UDP超时分片
    public void Tick(float deltaTime)
    {
        _udpProtocol.CleanupTimeout();
    }

    // TCP字节流到达 → 解包 → 处理消息
    private void OnTcpBytesReceived(byte[] bytes)
    {
        int len = bytes.Length;
        var bodies = _tcpProtocol.Unpack(bytes, ref len);

        foreach (var body in bodies)
        {
            HandleIncomingBody(body);
        }
    }

    // UDP字节流到达 → 重组 → 处理消息
    private void OnUdpBytesReceived(byte[] bytes)
    {
        byte[] body = _udpProtocol.TryReassemble(bytes);
        if (body != null)
        {
            HandleIncomingBody(body);
        }
    }

    // 处理完整消息体(反序列化 → 抛给上层)
    private void HandleIncomingBody(byte[] body)
    {
        // 先反序列化为统一网络消息
        var netMsg = _serializer.Deserialize<NetMessage>(body);
        if (netMsg == null) return;

        // 根据类型ID找到真实消息类型
        Type msgType = _typeRegistry.GetType(netMsg.MessageType);
        if (msgType == null) return;

        // 反射调用反序列化,生成真实消息对象
        var method = typeof(IFullSerializer)
            .GetMethod(nameof(IFullSerializer.Deserialize))
            ?.MakeGenericMethod(msgType);

        if (method == null) return;

        object msgObj = method.Invoke(_serializer, new object[] { netMsg.Payload });
        if (msgObj != null)
        {
            OnMessageReceived?.Invoke(msgObj);
        }
    }
}

会话层

连接管理

职责抽象

// 连接状态枚举
public enum ConnectionState
{
    Disconnected,   // 未连接
    Connecting,     // 正在连接
    Connected,      // 已连接
    Reconnecting,   // 正在重连
    Closing         // 正在关闭
}

// 状态机接口:规定要实现什么功能
public interface IConnectionStateMachine
{
    ConnectionState State { get; }          // 当前状态
    void SetState(ConnectionState state);    // 修改状态
    bool IsState(ConnectionState state);     // 判断是否处于某个状态
    event Action<ConnectionState> OnStateChanged; // 状态变化事件
}

具体实现

// 状态机实现
public class ConnectionStateMachine : IConnectionStateMachine
{
    public ConnectionState State { get; private set; } // 当前状态
    public event Action<ConnectionState> OnStateChanged; // 状态变了就通知外部

    // 设置状态,如果和当前状态一样就不处理
    public void SetState(ConnectionState state)
    {
        if (State == state) return;
        State = state;
        OnStateChanged?.Invoke(State);
    }

    // 判断当前是不是某个状态
    public bool IsState(ConnectionState state) => State == state;
}

心跳管理

职责抽象

// 心跳接口
public interface IHeartbeatManager
{
    void OnConnected();                     // 连接成功时重置计时
    void OnHeartbeatSent();                 // 心跳发送成功
    void OnHeartbeatReceived();             // 收到对方心跳
    bool ShouldSendHeartbeat(float now);    // 到时间该发心跳了吗
    bool IsTimeout(float now);              // 心跳超时了吗
}

具体实现

// 心跳实现
public class HeartbeatManager : IHeartbeatManager
{
    private readonly float _interval;   // 心跳发送间隔
    private readonly float _timeout;    // 超时时间
    private float _lastSendTime;        // 上次发送心跳时间
    private float _lastReceiveTime;     // 上次收到心跳时间

    public HeartbeatManager(float interval, float timeout)
    {
        _interval = interval;
        _timeout = timeout;
    }

    // 连接成功,重置时间
    public void OnConnected()
    {
        _lastSendTime = _lastReceiveTime = Time.realtimeS***artup;
    }

    // 发送心跳后记录时间
    public void OnHeartbeatSent() => _lastSendTime = Time.realtimeS***artup;

    // 收到对方心跳,刷新超时时间
    public void OnHeartbeatReceived() => _lastReceiveTime = Time.realtimeS***artup;

    // 判断:距离上次发送够间隔了吗 → 够了就该发
    public bool ShouldSendHeartbeat(float now) => now - _lastSendTime >= _interval;

    // 判断:多久没收到消息 → 超时了
    public bool IsTimeout(float now) => now - _lastReceiveTime >= _timeout;
}

重连管理

职责抽象

// 重连接口
public interface IReconnectManager
{
    void OnDisconnected();                // 断线时触发
    void OnConnected();                   // 重连成功
    bool ShouldReconnect(float now);      // 现在可以重试吗
    bool CanReconnect();                 // 还能继续重连吗(没超次数)
    void MarkReconnectAttempt(float now); // 记录一次重连尝试
    void Reset();                        // 重置计数
}

具体实现

// 重连实现
public class ReconnectManager : IReconnectManager
{
    private readonly float _interval;     // 重连间隔
    private readonly int _maxAttempts;    // 最大重试次数
    private float _lastAttemptTime;      // 上次重试时间
    private int _attempts;               // 当前重试次数

    public ReconnectManager(float interval, int maxAttempts)
    {
        _interval = interval;
        _maxAttempts = maxAttempts;
    }

    public void OnDisconnected() { }
    public void OnConnected() => Reset(); // 连上就清空计数

    // 够不够重连间隔
    public bool ShouldReconnect(float now) => now - _lastAttemptTime >= _interval;

    // 有没有超过最大次数
    public bool CanReconnect() => _attempts < _maxAttempts;

    // 记录一次重连
    public void MarkReconnectAttempt(float now)
    {
        _lastAttemptTime = now;
        _attempts++;
    }

    // 重置
    public void Reset()
    {
        _attempts = 0;
        _lastAttemptTime = 0;
    }
}

发送队列

职责抽象

// 发送队列接口
public interface ISendQueue
{
    void Enqueue(byte[] data);      // 加入消息
    bool TryDequeue(out byte[] data); // 取出一条消息
    void Clear();                   // 清空队列
    int Count { get; }              // 消息数量
}

具体实现

// 队列实现(线程安全)
public class SendQueue : ISendQueue
{
    private readonly Queue<byte[]> _queue = new Queue<byte[]>();
    private readonly object _lockObj = new object(); // 锁,保证多线程安全
    private readonly int _capacity; // 队列最大容量

    public SendQueue(int capacity) => _capacity = capacity;

    // 队列里有多少条消息
    public int Count { get { lock (_lockObj) return _queue.Count; } }

    // 加入消息,满了就丢掉最早的
    public void Enqueue(byte[] data)
    {
        if (data == null) return;
        lock (_lockObj)
        {
            if (_queue.Count >= _capacity) _queue.Dequeue();
            _queue.Enqueue(data);
        }
    }

    // 取出一条消息
    public bool TryDequeue(out byte[] data)
    {
        lock (_lockObj)
        {
            if (_queue.Count > 0)
            {
                data = _queue.Dequeue();
                return true;
            }
        }
        data = null;
        return false;
    }

    // 清空
    public void Clear() { lock (_lockObj) _queue.Clear(); }
}

udp客户端活跃管理

职责抽象

// UDP 客户端在线状态接口
public interface IUdpClientActivityTracker
{
    void OnPacketReceived(EndPoint clientEP); // 收到包,标记在线
    bool IsActive(EndPoint clientEP);         // 判断是否在线
    void Tick(float now);                     // 每帧检查超时
    event Action<EndPoint> OnClientActive;     // 客户端上线
    event Action<EndPoint> OnClientTimeout;    // 客户端超时
}

具体实现

// 实现
public class UdpClientActivityTracker : IUdpClientActivityTracker
{
    // 内部记录一个客户端的最后活跃时间
    private class ClientActivity
    {
        public EndPoint EndPoint;
        public float LastSeenTime;
    }

    private readonly Dictionary<string, ClientActivity> _clients = new();
    private readonly float _timeout; // 超时时间

    public event Action<EndPoint> OnClientActive;
    public event Action<EndPoint> OnClientTimeout;

    public UdpClientActivityTracker(float timeout) => _timeout = timeout;

    // 收到包 → 更新最后活跃时间
    public void OnPacketReceived(EndPoint clientEP)
    {
        string key = clientEP.ToString();
        if (!_clients.TryGetValue(key, out var act))
        {
            act = new ClientActivity { EndPoint = clientEP };
            _clients[key] = act;
        }
        act.LastSeenTime = Time.realtimeS***artup;
        OnClientActive?.Invoke(clientEP);
    }

    // 判断客户端是否在线
    public bool IsActive(EndPoint clientEP)
    {
        return _clients.TryGetValue(clientEP.ToString(), out var act)
            && Time.realtimeS***artup - act.LastSeenTime <= _timeout;
    }

    // 每帧检查:超时的客户端踢掉
    public void Tick(float now)
    {
        var expired = _clients.Where(p => now - p.Value.LastSeenTime > _timeout).ToList();
        foreach (var e in expired)
        {
            OnClientTimeout?.Invoke(e.Value.EndPoint);
            _clients.Remove(e.Key);
        }
    }
}

客户端

职责抽象

// 客户端网络会话接口
public interface IClientNetworkSession
{
    ConnectionState State { get; }
    void Connect(string ip, int port);
    void Close();
    void Tick(float deltaTime);
    void Send<T>(T msg) where T : class, new();

    event Action OnConnected;
    event Action OnDisconnected;
    event Action<object> OnMessageReceived;
}

具体实现

// 客户端会话:管理连接状态、心跳、重连、发送
public class ClientNetworkSession : IClientNetworkSession
{
    private readonly IProtocolClient _protocol;                 // 协议层
    private readonly IConnectionStateMachine _stateMachine;     // 状态机
    private readonly IHeartbeatManager _heartbeat;             // 心跳
    private readonly IReconnectManager _reconnect;             // 重连
    private readonly ISendQueue _sendQueue;                     // 发送队列

    public ConnectionState State => _stateMachine.State;

    public event Action OnConnected;
    public event Action OnDisconnected;
    public event Action<object> OnMessageReceived;

    public ClientNetworkSession(
        IProtocolClient protocol,
        IConnectionStateMachine stateMachine,
        IHeartbeatManager heartbeat,
        IReconnectManager reconnect,
        ISendQueue sendQueue)
    {
        _protocol = protocol;
        _stateMachine = stateMachine;
        _heartbeat = heartbeat;
        _reconnect = reconnect;
        _sendQueue = sendQueue;

        // 协议层收到对象后转发出去
        _protocol.OnMessageReceived += msg => OnMessageReceived?.Invoke(msg);
    }

    // 主动连接
    public void Connect(string ip, int port)
    {
        if (_stateMachine.IsState(ConnectionState.Connected) ||
            _stateMachine.IsState(ConnectionState.Connecting))
            return;

        _stateMachine.SetState(ConnectionState.Connecting);
        _protocol.Connect(ip, port);
    }

    // 主动关闭
    public void Close()
    {
        _stateMachine.SetState(ConnectionState.Closing);
        _protocol.Close();
        _sendQueue.Clear();
        _reconnect.Reset();
        _stateMachine.SetState(ConnectionState.Disconnected);
        OnDisconnected?.Invoke();
    }

    // 发送消息:先入队,Tick 时统一发
    public void Send<T>(T msg) where T : class, new()
    {
        if (!_stateMachine.IsState(ConnectionState.Connected)) return;
        _sendQueue.Enqueue(msg);
    }

    // 每帧更新
    public void Tick(float deltaTime)
    {
        float now = Time.realtimeSinceStartup;

        _protocol.Tick(deltaTime);

        // 1. 发送心跳
        if (_stateMachine.IsState(ConnectionState.Connected) && _heartbeat.ShouldSendHeartbeat(now))
        {
            _protocol.Send(new HeartbeatMessage());
            _heartbeat.OnHeartbeatSent();
        }

        // 2. 心跳超时
        if (_stateMachine.IsState(ConnectionState.Connected) && _heartbeat.IsTimeout(now))
        {
            HandleDisconnected();
        }

        // 3. 把发送队列真正交给协议层
        while (_sendQueue.TryDequeue(out var msg))
        {
            _protocol.Send(msg);
        }

        // 4. 自动重连
        if (_stateMachine.IsState(ConnectionState.Disconnected))
        {
            if (_reconnect.CanReconnect() && _reconnect.ShouldReconnect(now))
            {
                _reconnect.MarkReconnectAttempt(now);
                _stateMachine.SetState(ConnectionState.Reconnecting);
                _protocol.Connect("127.0.0.1", 8888);
            }
        }
    }

    // 协议层收到第一条消息时,认为连接成功
    public void OnProtocolConnected()
    {
        _stateMachine.SetState(ConnectionState.Connected);
        _heartbeat.OnConnected();
        _reconnect.OnConnected();
        OnConnected?.Invoke();
    }

    // 断线处理
    private void HandleDisconnected()
    {
        _protocol.Close();
        _stateMachine.SetState(ConnectionState.Disconnected);
        _reconnect.OnDisconnected();
        OnDisconnected?.Invoke();
    }
}

服务端

职责抽象

// 服务端客户端会话接口
public interface IServerClientSession
{
    EndPoint ClientEP { get; }
    bool IsActive { get; }
    void OnPacketReceived();
    void Send<T>(T msg) where T : class, new();
    void Tick(float deltaTime);
}

具体实现

// 单个客户端会话:维护一个客户端的心跳和发送
public class ServerClientSession : IServerClientSession
{
    public EndPoint ClientEP { get; }

    private readonly IProtocolServer _protocol;       // 协议层
    private readonly IHeartbeatManager _heartbeat;    // 心跳
    private readonly ISendQueue _sendQueue;           // 发送队列

    public bool IsActive => !_heartbeat.IsTimeout(Time.realtimeSinceStartup);

    public ServerClientSession(
        EndPoint clientEP,
        IProtocolServer protocol,
        IHeartbeatManager heartbeat,
        ISendQueue sendQueue)
    {
        ClientEP = clientEP;
        _protocol = protocol;
        _heartbeat = heartbeat;
        _sendQueue = sendQueue;
    }

    // 收到客户端消息
    public void OnPacketReceived()
    {
        _heartbeat.OnHeartbeatReceived();
    }

    // 发消息给这个客户端
    public void Send<T>(T msg) where T : class, new()
    {
        _sendQueue.Enqueue(msg);
    }

    // 每帧处理发送
    public void Tick(float deltaTime)
    {
        while (_sendQueue.TryDequeue(out var msg))
        {
            _protocol.SendTo(ClientEP, msg);
        }
    }
}

服务端会话管理

职责抽象

// 服务端会话管理接口
public interface IServerSessionManager
{
    void Start(string ip, int port);
    void Stop();
    void Tick(float deltaTime);

    void Broadcast<T>(T msg) where T : class, new();

    event Action<EndPoint> OnClientConnected;
    event Action<EndPoint> OnClientDisconnected;
    event Action<object, EndPoint> OnMessageReceived;
}

具体实现

// 服务端会话管理器:管理所有客户端会话
public class ServerSessionManager : IServerSessionManager
{
    private readonly IProtocolServer _protocol; // 协议层
    private readonly ISerializer _serializer;   // 如果后续还要做缓存/日志,可保留
    private readonly Dictionary<EndPoint, ServerClientSession> _sessions = new Dictionary<EndPoint, ServerClientSession>();

    public event Action<EndPoint> OnClientConnected;
    public event Action<EndPoint> OnClientDisconnected;
    public event Action<object, EndPoint> OnMessageReceived;

    public ServerSessionManager(IProtocolServer protocol, ISerializer serializer)
    {
        _protocol = protocol;
        _serializer = serializer;

        // 订阅协议层事件
        _protocol.OnClientConnected += OnProtocolClientConnected;
        _protocol.OnClientDisconnected += OnProtocolClientDisconnected;
        _protocol.OnMessageReceived += OnProtocolMessageReceived;
    }

    // 启动服务
    public void Start(string ip, int port)
    {
        _protocol.Start(ip, port);
    }

    // 停止服务
    public void Stop()
    {
        _protocol.Stop();
        _sessions.Clear();
    }

    // 每帧更新所有会话
    public void Tick(float deltaTime)
    {
        _protocol.Tick(deltaTime);

        foreach (var session in _sessions.Values)
        {
            session.Tick(deltaTime);
        }

        ClearTimeoutSessions();
    }

    // 广播消息
    public void Broadcast<T>(T msg) where T : class, new()
    {
        foreach (var session in _sessions.Values)
        {
            session.Send(msg);
        }
    }

  // 单播
    public void SendTo(int clientId, NetMessage msg)
    {
        // 如果当前没有 clientId 到 EndPoint 的映射,
        // 可以按实际项目结构补一个映射表
    }
  
  
    // 协议层通知:有客户端连接
    private void OnProtocolClientConnected(EndPoint ep)
    {
        if (_sessions.ContainsKey(ep)) return;

        var session = new ServerClientSession(
            ep,
            _protocol,
            new HeartbeatManager(1f, 5f),
            new SendQueue(50));

        _sessions[ep] = session;
        OnClientConnected?.Invoke(ep);
    }

    // 协议层通知:有客户端断开
    private void OnProtocolClientDisconnected(EndPoint ep)
    {
        if (_sessions.Remove(ep))
        {
            OnClientDisconnected?.Invoke(ep);
        }
    }

    // 协议层收到消息
    private void OnProtocolMessageReceived(object msg, EndPoint ep)
    {
        if (_sessions.TryGetValue(ep, out var session))
        {
            session.OnPacketReceived();
        }

        OnMessageReceived?.Invoke(msg, ep);
    }

    // 清理超时会话
    private void ClearTimeoutSessions()
    {
        var timeoutList = _sessions.Where(kvp => !kvp.Value.IsActive).ToList();
        foreach (var item in timeoutList)
        {
            _sessions.Remove(item.Key);
            OnClientDisconnected?.Invoke(item.Key);
        }
    }
}
全部评论

相关推荐

评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务