C++春招项目-高性能RPC框架-支持负载均衡、加密、压缩和心跳检测

内容来自:程序员老廖

1. 总览与目标

  • 目标:提供基于 muduo 的轻量、高性能 RPC 框架,支持请求/响应、压缩、加密、序列化、负载均衡、服务注册、心跳与线程池,业务仅需处理类型化请求/响应。
  • 技术栈:C++17,muduo,protobuf/JSON,zlib/zstd,OpenSSL(AES-GCM)。
  • 设计原则:分层清晰、可插拔(压缩/加密/序列化)、类型安全、向后兼容。

图示:整体架构

视频讲解与源码领取:************************************

2. 协议与数据流

2.1 帧格式

整体帧结构

Header详细结构

字段说明/约束:

  • magic:0x5250 ('R''P'),便于快速过滤错误流量。
  • version:当前 1,预留向后兼容;如需扩展字段,建议 bump version 并保持旧字段兼容。
  • flags:保留位,当前未用(可用于分片/压测标记等)。
  • message_type:0=Request,1=Response,2=Heartbeat。
  • serialization/compression/encryption:枚举透传,驱动编解码/解压/解密流程。
  • request_id:由客户端递增生成,响应/心跳回传同一 id。
  • body_length:不含 Header 的体长度。
  • checksum:体的 CRC32,防止数据损坏(先压缩/加密后计算)。
  • reserved:保留 4 字节对齐,后向扩展。

注意:该协议头部与数值字段均为大端(网络序)。

Body消息类型结构

ASCII帧格式示意

┌───────────────────────────────────────────────────┐
│ Header (固定28字节)                                │
├─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────────┬
│magic│ ver │flags│type │ ser │ cmp │ enc │ request │
│ 2B  │ 1B  │ 1B  │ 1B  │ 1B  │ 1B  │ 1B  │ id 8B   │
├─────┴─────┴─────┴─────┴─────┴─────┴─────┴─────────┴
│ body_length 4B │ checksum 4B │ reserved 4B        │
└───────────────────────────────────────────────────┘
                            │
                            ▼
┌───────────────────────────────────────────────────┐
│ Body (变长,根据message_type不同)                  │
├───────────────────────────────────────────────────┤
│                                                   │
│ type=0 (Request):                                 │
│   service_len│method_len│route_len│timeout_ms     │
│   service│method│route│payload                    │
│                                                   │
│ type=1 (Response):                                │
│   status_code│error_len│error│payload             │
│                                                   │
│ type=2 (Heartbeat):                               │
│   无body                                          │
└───────────────────────────────────────────────────┘

2.2 编码/解码流程

编码流程 (Encode)

解码流程 (Decode)

完整编解码管线

2.3 典型数据流

3. 线程模型

总体目标:IO 与业务解耦,防止单点阻塞;客户端最小化等待阻塞区域。

3.1 服务端线程模型

  • IO:muduo TcpServer,支持配置 IO 线程数;负责 accept、收包、拆帧、初步校验。
  • 业务:自研 ThreadPool(worker),执行 handler。
  • 流程:IO 线程 OnMessage -> 拆帧 -> DecodeFrame -> 将 Request 投递到 worker -> worker 执行业务 -> EncodeResponse -> 回到 IO 线程发送(runInLoop)。
  • 优点:IO 不被业务阻塞;业务线程数可独立扩缩容。
  • 调优:IO 线程数通常设为 CPU/2 ~ CPU;worker 线程与核心数相当或略高(视 CPU/IO 密度)。

3.2 客户端线程模型

  • 单独 EventLoopThread 驱动 TcpClient,负责收发、心跳。
  • 调用线程同步等待:使用条件变量等待 pending map 中的响应或超时。
  • 优点:简单、无额外线程池;缺点:高并发调用下等待队列竞争(可通过分片客户端实例缓解)。

3.3 ThreadPool 实现要点

  • 数据结构:vector<thread> + queue<function> + mutex + condvar。
  • 停机:stopping_ 标志 + 唤醒所有 worker,清空队列。
  • 异常防护:任务执行 try/catch,避免 worker 线程因异常退出。
  • 任务提交:若已停止则丢弃;Submit 后唤醒一个 worker。

3.4 并发风险与注意

  • Handler 需自行保证共享资源线程安全。
  • Cluster 的客户端缓存目前未内置锁,默认单线程调用;多线程共用需外层加锁或加内部互斥。
  • RoundRobin LB 持有 service 维度锁;极高并发下可考虑分片 offset 或原子替代。

图示:线程与事件模型

4. 服务器端设计

4.1 核心处理流程

OnMessage:
  while buffer >= HeaderSize:
    peek header -> 长度校验 -> 取 frame
    DecodeFrame -> HandleDecodedFrame

HandleDecodedFrame:
  Request  -> HandleRequest(worker)
  Heartbeat-> 回空 Response

HandleRequest(worker):
  查 handler (typed/raw)
  执行业务 -> 填 RpcResponse
  EncodeResponse -> runInLoop 发送

4.2 泛型 Handler

  • RegisterHandlerTyped<ReqMsg, RespMsg>(service, method, handler)。
  • 框架:反序列化 payload -> 业务 handler(req_msg, resp_msg) -> 序列化 resp_msg -> 回填 response.payload。
  • 约束:ReqMsg/RespMsg 继承 google::protobuf::Message。
  • 兼容:原 RegisterHandler 保留,便于渐进迁移或自定义处理。

4.3 生命周期与资源管理

  • RpcServer 持有 TcpServer、ThreadPool、handlers 映射;Start 启动线程池与监听。
  • 发送响应使用 conn->getLoop()->runInLoop,确保在正确的 IO 线程发送。
  • Worker 中捕获业务异常,防止崩溃;失败返回 status_code=-2。

4.4 Handler 编写规范

  • 避免长时间阻塞(IO/磁盘/锁),必要时在业务内再分层异步。
  • 保证线程安全,尤其是共享状态/缓存。
  • 尽量使用 Typed Handler,减少手工解析/序列化错误。

4.5 错误处理策略

  • Decode header 失败/CRC 失败:关闭连接。
  • handler 未找到:status_code = -1,error_message = "handler not found"。
  • 业务返回 false:status_code = -2,error_message 由业务填写。
  • EncodeResponse 失败:直接丢弃本次响应(当前实现)。

5. 客户端设计

5.1 RpcClient 调用链

Call:
  WaitConnected -> request_id++
  EncodeRequest -> send
  WaitFor(timeout) -> 返回结果/超时

OnMessage:
  拆帧 -> DecodeFrame -> Fulfill(request_id)

Heartbeat:
  runEvery 发送 Heartbeat 帧

5.2 泛型调用封装

  • CallMessage:消息级封装。
  • CallTyped<ReqMsg, RespMsg>:自动序列化请求、自动反序列化响应,沿用调用时指定的序列化/压缩/加密配置。

5.3 RpcClientCluster

  • 依赖 ServiceRegistry + LoadBalancer(RoundRobin/Random),每次调用选节点。
  • 缓存/复用子 RpcClient;空列表返回错误。
  • 提供 CallTyped,与单客户端体验一致。

调用流程细化:

List(service) -> endpoints
if empty: error "no endpoint available"
chosen = lb.Select(service, endpoints)
client = cache[key(host:port)] 或新建 RpcClient
Call/CallTyped -> EncodeRequest -> send -> WaitFor

线程安全注意:

  • Cluster 内部缓存未加锁,默认单线程使用;多线程共享时需外部互斥。
  • RpcClient 自身针对 pending/连接状态有内部锁。

5.4 心跳与等待模型

  • 心跳:定时器 runEvery 发送 Heartbeat。
  • 等待:pending map + condvar;超时清理 pending 并返回超时错误。

超时策略:

  • WaitConnected 超时:返回 "connection not ready"。
  • WaitFor 响应超时:返回 "rpc call timeout",并清理 pending。
  • 业务错误:status_code != 0,error_message 透传。

6. 注册发现与负载均衡

6.1 ServiceRegistry

  • 数据结构:unordered_map<std::string, std::vector<Endpoint>>,以 service 维度存储 endpoints。
  • 线程安全:内部互斥;Register/Unregister 会加锁修改,List 会加锁并返回拷贝,避免长时间持锁。
  • 去重策略:Register 时查重,避免重复节点;Unregister 按 host/port 匹配删除。
  • 设计取舍:当前为内存实现,适合单进程 demo/bench;生产可替换外部注册中心(etcd/consul/zookeeper),由上层拉取后写入本地 registry。

6.2 LoadBalancer 抽象

接口:Endpoint Select(const std::string& service, const std::vector<Endpoint>& endpoints)

现有实现:

  • RoundRobin:为每个 service 维护独立 offset(unordered_map + mutex)。选取 endpoints[offset % N],再 offset++。优点:分发均匀、可重复;缺点:持锁粒度在 service 层,极高并发时可能有轻微争用。
  • Random:mt19937 + 均匀分布,适合突发但可能产生抖动;无共享状态、无锁。

线程安全:

  • RoundRobin 内部加锁保护 offset;Random 无共享状态。
  • 若后续扩展到多实例 LB 或跨线程共用,可以将 offset 存为原子或使用分片锁。

可扩展:可替换为权重、一致性哈希、最小连接数、就近策略,只需实现同一接口。

6.3 Cluster 调用链

endpoints = registry.List(service)
if empty -> error "no endpoint available"
chosen = lb.Select(service, endpoints)
client = GetOrCreateClient(chosen)   // key=host:port,缓存 RpcClient
client->Call / CallTyped(...)
  • 客户端缓存:unordered_map<std::string, std::unique_ptr<RpcClient>>,生命周期随 Cluster。
  • 并发注意:当前 Cluster 未在内部加锁;默认单线程/上层串行使用。如需多线程共用 Cluster,应在调用外层加锁,或对 Cluster 自身加互斥。
  • 连接复用:同一 endpoint 复用单个 RpcClient,避免重复建连;Stop 时统一清理。

6.4 失败与健康

现状:无内建健康检查/摘除;节点不可用时调用可能失败但仍会被选择。

建议扩展:

  • 失败计数 + 冷却:连续失败超过阈值将节点标记为"冷却",在一定时间内不被选择;冷却期后重试成功则恢复。
  • 定期心跳/探活:Cluster 侧周期性对节点做轻量探测(或复用业务心跳)。
  • 失败快速重试:选择下一个可用节点重试(注意幂等/重复语义)。

7. 压缩与加密

7.1 压缩设计

触发策略:编码端依据 compression 字段与 CodecOptions::compression_threshold 决定是否压缩;小于阈值直接透传,避免小包放大 CPU。

支持算法:zlib(默认启用)、zstd(编译宏 SPARK_RPC_HAS_ZSTD 控制)。

实现细节:

  • MaybeCompress:根据类型与阈值决定是否压缩,失败返回错误;MaybeDecompress 解码端按 header.compression 解压。
  • zlib:使用 compress2 / uncompress,解压时按 4x 预估并动态扩容。
  • zstd:ZSTD_compress / ZSTD_decompress,通过 frame size 估算输出;若库不可用返回错误"zstd not available"。
  • 保持 compression 字段在头部透传,便于端到端一致。

调优建议:

  • 小消息量大时,将阈值调高或关闭压缩减轻 CPU。
  • 大消息/跨机房时开启 zstd(若可用)可降低带宽占用。

7.2 加密设计

算法:AES-GCM(认证加密),key 长度支持 128/192/256 位,tag 长度 16 字节。

参数来源:CodecOptions::aes_key / aes_iv;若开启加密且 key/iv 为空,直接报错。

实现细节:

  • EncryptIfNeeded / DecryptIfNeeded:在编码时加密 body,解码时先解密再解压。
  • GCM 流程:EncryptAesGcm 生成密文 + tag(附在密文尾部),DecryptAesGcm 校验 tag,失败返回错误。
  • 头部的 encryption 字段指示是否需要解密,保持端到端一致。

安全注意:

  • IV 必须唯一不可重用,同一 key 下重复 IV 会破坏 GCM 安全性;当前示例使用固定 iv,生产应使用随机/计数器并随帧携带或协商。
  • Key/IV 管理:框架不管理密钥分发,需业务侧或配置侧保证安全下发与轮换。
  • 若需更高安全性,可增加 TLS 传输层加密(当前仅提供报文体 AES-GCM)。

7.3 压缩与加密处理顺序

8. 心跳与连接健壮性

  • 客户端定期发送 Heartbeat;服务端收到 Heartbeat 回空 Response。
  • 当前未实现自动重连/摘除,可扩展:心跳超时触发重连,Cluster 侧暂时屏蔽故障节点。

9. 示例(Echo / EchoMeta)

  • proto:echo.proto 含 Echo 与 EchoMeta(可选 uppercase,返回 size)。
  • 服务端:bench/bench_server.cpp 用 RegisterHandlerTyped 注册 Echo/EchoMeta。
  • 客户端:spark_rpc_bench、spark_rpc_bench_multi 使用 CallTyped;单线程 bench 末尾演示 EchoMeta。

9.1 负载均衡示例(演示用)

可在应用侧快速构造内存 registry 与 RoundRobin 负载均衡器,复用 RpcClientCluster

#include "spark_rpc/rpc_client.h"
#include "spark_rpc/load_balancer.h"
#include "spark_rpc/registry.h"
#include "echo.pb.h"

using namespace sparkpush::rpc;

int main() {
    ServiceRegistry registry;
    // 注册两个后端节点(示例:同机不同端口)
    registry.Register("EchoService", {"127.0.0.1", 9000});
    registry.Register("EchoService", {"127.0.0.1", 9001});

    RoundRobinLoadBalancer rr;
    CodecOptions options;
    RpcClientCluster cluster(registry, rr, options);

    bench::EchoRequest req;
    req.set_payload("hello lb");
    bench::EchoReply resp;
    std::string err;
    bool ok = cluster.CallTyped("EchoService", "Echo", req, resp,
                                SerializationType::kProtobuf,
                                CompressionType::kNone,
                                EncryptionType::kNone,
                                3000, &err);
    if (!ok) {
        fprintf(stderr, "call failed: %s\n", err.c_str());
        return 1;
    }
    printf("reply payload=%s\n", resp.payload().c_str());
    return 0;
}

在 bench 场景中,可将多实例 server 分别监听 9000/9001,使用上述示例验证轮询分发。

10. 性能与开销分析

  • 泛型封装不增加序列化次数;额外开销为模板/类型检查,可忽略。
  • 主成本:网络 IO、压缩/解压、加解密、序列化。
  • 优化:调压缩阈值;优先 protobuf;必要时 zstd;复用 protobuf 对象或 arena;减少拷贝。

11. 构建与依赖

依赖:muduo、protobuf、OpenSSL、ZLIB、可选 ZSTD。

CMake 选项:

  • SPARK_RPC_ENABLE_BENCH:开启基准工具(默认 ON)。
  • SPARK_RPC_PROTO_FILE:proto 路径,默认 proto/echo.proto,自动生成 spark_push_proto。
  • SPARK_RPC_USE_BUNDLED_MUDUO:使用内置 muduo(默认 OFF)。

构建示例:

mkdir build
cd build
cmake ..
make -j

12. 测试与验证

  • 单元/集成:编解码、压缩/解压、加密/解密、序列化/反序列化。
  • 回归:Echo/EchoMeta 正常与错误路径;超时;心跳收发。
  • 压测:spark_rpc_bench_server + spark_rpc_bench/bench_multi,关注 QPS、延迟、CPU。

13. 监控与可观测性(建议)

  • 指标:QPS、P95/P99、失败率、超时率、重连次数、压缩命中率、加解密失败数。
  • 日志:编解码失败、handler not found、超时、心跳异常。
  • 钩子:发送前/后、业务前/后可埋点(便于接入 Prometheus/Stats)。

14. 扩展与演进方向

  • 自动重连、健康检查、故障摘除/恢复。
  • 负载均衡:权重、一致性哈希、最小连接数。
  • Streaming/异步接口,支持 pipeline/multiplex。
  • 安全:TLS/证书校验;密钥轮换。
  • 内存:protobuf arena、对象池、零拷贝 Buffer。

源码领取:************************************

#一人推荐一个值得做的项目##春招##校招##C++##简历中的项目经历要怎么写#
全部评论

相关推荐

评论
1
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务