C++春招项目-高性能RPC框架-支持负载均衡、加密、压缩和心跳检测
内容来自:程序员老廖
1. 总览与目标
- 目标:提供基于 muduo 的轻量、高性能 RPC 框架,支持请求/响应、压缩、加密、序列化、负载均衡、服务注册、心跳与线程池,业务仅需处理类型化请求/响应。
- 技术栈:C++17,muduo,protobuf/JSON,zlib/zstd,OpenSSL(AES-GCM)。
- 设计原则:分层清晰、可插拔(压缩/加密/序列化)、类型安全、向后兼容。
图示:整体架构
视频讲解与源码领取:************************************
2. 协议与数据流
2.1 帧格式
整体帧结构
Header详细结构
字段说明/约束:
- magic:0x5250 ('R''P'),便于快速过滤错误流量。
- version:当前 1,预留向后兼容;如需扩展字段,建议 bump version 并保持旧字段兼容。
- flags:保留位,当前未用(可用于分片/压测标记等)。
- message_type:0=Request,1=Response,2=Heartbeat。
- serialization/compression/encryption:枚举透传,驱动编解码/解压/解密流程。
- request_id:由客户端递增生成,响应/心跳回传同一 id。
- body_length:不含 Header 的体长度。
- checksum:体的 CRC32,防止数据损坏(先压缩/加密后计算)。
- reserved:保留 4 字节对齐,后向扩展。
注意:该协议头部与数值字段均为大端(网络序)。
Body消息类型结构
ASCII帧格式示意
┌───────────────────────────────────────────────────┐
│ Header (固定28字节) │
├─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────────┬
│magic│ ver │flags│type │ ser │ cmp │ enc │ request │
│ 2B │ 1B │ 1B │ 1B │ 1B │ 1B │ 1B │ id 8B │
├─────┴─────┴─────┴─────┴─────┴─────┴─────┴─────────┴
│ body_length 4B │ checksum 4B │ reserved 4B │
└───────────────────────────────────────────────────┘
│
▼
┌───────────────────────────────────────────────────┐
│ Body (变长,根据message_type不同) │
├───────────────────────────────────────────────────┤
│ │
│ type=0 (Request): │
│ service_len│method_len│route_len│timeout_ms │
│ service│method│route│payload │
│ │
│ type=1 (Response): │
│ status_code│error_len│error│payload │
│ │
│ type=2 (Heartbeat): │
│ 无body │
└───────────────────────────────────────────────────┘
2.2 编码/解码流程
编码流程 (Encode)
解码流程 (Decode)
完整编解码管线
2.3 典型数据流
3. 线程模型
总体目标:IO 与业务解耦,防止单点阻塞;客户端最小化等待阻塞区域。
3.1 服务端线程模型
- IO:muduo TcpServer,支持配置 IO 线程数;负责 accept、收包、拆帧、初步校验。
- 业务:自研 ThreadPool(worker),执行 handler。
- 流程:IO 线程 OnMessage -> 拆帧 -> DecodeFrame -> 将 Request 投递到 worker -> worker 执行业务 -> EncodeResponse -> 回到 IO 线程发送(runInLoop)。
- 优点:IO 不被业务阻塞;业务线程数可独立扩缩容。
- 调优:IO 线程数通常设为 CPU/2 ~ CPU;worker 线程与核心数相当或略高(视 CPU/IO 密度)。
3.2 客户端线程模型
- 单独 EventLoopThread 驱动 TcpClient,负责收发、心跳。
- 调用线程同步等待:使用条件变量等待 pending map 中的响应或超时。
- 优点:简单、无额外线程池;缺点:高并发调用下等待队列竞争(可通过分片客户端实例缓解)。
3.3 ThreadPool 实现要点
- 数据结构:vector<thread> + queue<function> + mutex + condvar。
- 停机:stopping_ 标志 + 唤醒所有 worker,清空队列。
- 异常防护:任务执行 try/catch,避免 worker 线程因异常退出。
- 任务提交:若已停止则丢弃;Submit 后唤醒一个 worker。
3.4 并发风险与注意
- Handler 需自行保证共享资源线程安全。
- Cluster 的客户端缓存目前未内置锁,默认单线程调用;多线程共用需外层加锁或加内部互斥。
- RoundRobin LB 持有 service 维度锁;极高并发下可考虑分片 offset 或原子替代。
图示:线程与事件模型
4. 服务器端设计
4.1 核心处理流程
OnMessage:
while buffer >= HeaderSize:
peek header -> 长度校验 -> 取 frame
DecodeFrame -> HandleDecodedFrame
HandleDecodedFrame:
Request -> HandleRequest(worker)
Heartbeat-> 回空 Response
HandleRequest(worker):
查 handler (typed/raw)
执行业务 -> 填 RpcResponse
EncodeResponse -> runInLoop 发送
4.2 泛型 Handler
- RegisterHandlerTyped<ReqMsg, RespMsg>(service, method, handler)。
- 框架:反序列化 payload -> 业务 handler(req_msg, resp_msg) -> 序列化 resp_msg -> 回填 response.payload。
- 约束:ReqMsg/RespMsg 继承 google::protobuf::Message。
- 兼容:原 RegisterHandler 保留,便于渐进迁移或自定义处理。
4.3 生命周期与资源管理
- RpcServer 持有 TcpServer、ThreadPool、handlers 映射;Start 启动线程池与监听。
- 发送响应使用 conn->getLoop()->runInLoop,确保在正确的 IO 线程发送。
- Worker 中捕获业务异常,防止崩溃;失败返回 status_code=-2。
4.4 Handler 编写规范
- 避免长时间阻塞(IO/磁盘/锁),必要时在业务内再分层异步。
- 保证线程安全,尤其是共享状态/缓存。
- 尽量使用 Typed Handler,减少手工解析/序列化错误。
4.5 错误处理策略
- Decode header 失败/CRC 失败:关闭连接。
- handler 未找到:status_code = -1,error_message = "handler not found"。
- 业务返回 false:status_code = -2,error_message 由业务填写。
- EncodeResponse 失败:直接丢弃本次响应(当前实现)。
5. 客户端设计
5.1 RpcClient 调用链
Call: WaitConnected -> request_id++ EncodeRequest -> send WaitFor(timeout) -> 返回结果/超时 OnMessage: 拆帧 -> DecodeFrame -> Fulfill(request_id) Heartbeat: runEvery 发送 Heartbeat 帧
5.2 泛型调用封装
- CallMessage:消息级封装。
- CallTyped<ReqMsg, RespMsg>:自动序列化请求、自动反序列化响应,沿用调用时指定的序列化/压缩/加密配置。
5.3 RpcClientCluster
- 依赖 ServiceRegistry + LoadBalancer(RoundRobin/Random),每次调用选节点。
- 缓存/复用子 RpcClient;空列表返回错误。
- 提供 CallTyped,与单客户端体验一致。
调用流程细化:
List(service) -> endpoints if empty: error "no endpoint available" chosen = lb.Select(service, endpoints) client = cache[key(host:port)] 或新建 RpcClient Call/CallTyped -> EncodeRequest -> send -> WaitFor
线程安全注意:
- Cluster 内部缓存未加锁,默认单线程使用;多线程共享时需外部互斥。
- RpcClient 自身针对 pending/连接状态有内部锁。
5.4 心跳与等待模型
- 心跳:定时器 runEvery 发送 Heartbeat。
- 等待:pending map + condvar;超时清理 pending 并返回超时错误。
超时策略:
- WaitConnected 超时:返回 "connection not ready"。
- WaitFor 响应超时:返回 "rpc call timeout",并清理 pending。
- 业务错误:status_code != 0,error_message 透传。
6. 注册发现与负载均衡
6.1 ServiceRegistry
- 数据结构:unordered_map<std::string, std::vector<Endpoint>>,以 service 维度存储 endpoints。
- 线程安全:内部互斥;Register/Unregister 会加锁修改,List 会加锁并返回拷贝,避免长时间持锁。
- 去重策略:Register 时查重,避免重复节点;Unregister 按 host/port 匹配删除。
- 设计取舍:当前为内存实现,适合单进程 demo/bench;生产可替换外部注册中心(etcd/consul/zookeeper),由上层拉取后写入本地 registry。
6.2 LoadBalancer 抽象
接口:Endpoint Select(const std::string& service, const std::vector<Endpoint>& endpoints)。
现有实现:
- RoundRobin:为每个 service 维护独立 offset(unordered_map + mutex)。选取 endpoints[offset % N],再 offset++。优点:分发均匀、可重复;缺点:持锁粒度在 service 层,极高并发时可能有轻微争用。
- Random:mt19937 + 均匀分布,适合突发但可能产生抖动;无共享状态、无锁。
线程安全:
- RoundRobin 内部加锁保护 offset;Random 无共享状态。
- 若后续扩展到多实例 LB 或跨线程共用,可以将 offset 存为原子或使用分片锁。
可扩展:可替换为权重、一致性哈希、最小连接数、就近策略,只需实现同一接口。
6.3 Cluster 调用链
endpoints = registry.List(service) if empty -> error "no endpoint available" chosen = lb.Select(service, endpoints) client = GetOrCreateClient(chosen) // key=host:port,缓存 RpcClient client->Call / CallTyped(...)
- 客户端缓存:unordered_map<std::string, std::unique_ptr<RpcClient>>,生命周期随 Cluster。
- 并发注意:当前 Cluster 未在内部加锁;默认单线程/上层串行使用。如需多线程共用 Cluster,应在调用外层加锁,或对 Cluster 自身加互斥。
- 连接复用:同一 endpoint 复用单个 RpcClient,避免重复建连;Stop 时统一清理。
6.4 失败与健康
现状:无内建健康检查/摘除;节点不可用时调用可能失败但仍会被选择。
建议扩展:
- 失败计数 + 冷却:连续失败超过阈值将节点标记为"冷却",在一定时间内不被选择;冷却期后重试成功则恢复。
- 定期心跳/探活:Cluster 侧周期性对节点做轻量探测(或复用业务心跳)。
- 失败快速重试:选择下一个可用节点重试(注意幂等/重复语义)。
7. 压缩与加密
7.1 压缩设计
触发策略:编码端依据 compression 字段与 CodecOptions::compression_threshold 决定是否压缩;小于阈值直接透传,避免小包放大 CPU。
支持算法:zlib(默认启用)、zstd(编译宏 SPARK_RPC_HAS_ZSTD 控制)。
实现细节:
- MaybeCompress:根据类型与阈值决定是否压缩,失败返回错误;MaybeDecompress 解码端按 header.compression 解压。
- zlib:使用 compress2 / uncompress,解压时按 4x 预估并动态扩容。
- zstd:ZSTD_compress / ZSTD_decompress,通过 frame size 估算输出;若库不可用返回错误"zstd not available"。
- 保持 compression 字段在头部透传,便于端到端一致。
调优建议:
- 小消息量大时,将阈值调高或关闭压缩减轻 CPU。
- 大消息/跨机房时开启 zstd(若可用)可降低带宽占用。
7.2 加密设计
算法:AES-GCM(认证加密),key 长度支持 128/192/256 位,tag 长度 16 字节。
参数来源:CodecOptions::aes_key / aes_iv;若开启加密且 key/iv 为空,直接报错。
实现细节:
- EncryptIfNeeded / DecryptIfNeeded:在编码时加密 body,解码时先解密再解压。
- GCM 流程:EncryptAesGcm 生成密文 + tag(附在密文尾部),DecryptAesGcm 校验 tag,失败返回错误。
- 头部的 encryption 字段指示是否需要解密,保持端到端一致。
安全注意:
- IV 必须唯一不可重用,同一 key 下重复 IV 会破坏 GCM 安全性;当前示例使用固定 iv,生产应使用随机/计数器并随帧携带或协商。
- Key/IV 管理:框架不管理密钥分发,需业务侧或配置侧保证安全下发与轮换。
- 若需更高安全性,可增加 TLS 传输层加密(当前仅提供报文体 AES-GCM)。
7.3 压缩与加密处理顺序
8. 心跳与连接健壮性
- 客户端定期发送 Heartbeat;服务端收到 Heartbeat 回空 Response。
- 当前未实现自动重连/摘除,可扩展:心跳超时触发重连,Cluster 侧暂时屏蔽故障节点。
9. 示例(Echo / EchoMeta)
- proto:echo.proto 含 Echo 与 EchoMeta(可选 uppercase,返回 size)。
- 服务端:bench/bench_server.cpp 用 RegisterHandlerTyped 注册 Echo/EchoMeta。
- 客户端:spark_rpc_bench、spark_rpc_bench_multi 使用 CallTyped;单线程 bench 末尾演示 EchoMeta。
9.1 负载均衡示例(演示用)
可在应用侧快速构造内存 registry 与 RoundRobin 负载均衡器,复用 RpcClientCluster:
#include "spark_rpc/rpc_client.h"
#include "spark_rpc/load_balancer.h"
#include "spark_rpc/registry.h"
#include "echo.pb.h"
using namespace sparkpush::rpc;
int main() {
ServiceRegistry registry;
// 注册两个后端节点(示例:同机不同端口)
registry.Register("EchoService", {"127.0.0.1", 9000});
registry.Register("EchoService", {"127.0.0.1", 9001});
RoundRobinLoadBalancer rr;
CodecOptions options;
RpcClientCluster cluster(registry, rr, options);
bench::EchoRequest req;
req.set_payload("hello lb");
bench::EchoReply resp;
std::string err;
bool ok = cluster.CallTyped("EchoService", "Echo", req, resp,
SerializationType::kProtobuf,
CompressionType::kNone,
EncryptionType::kNone,
3000, &err);
if (!ok) {
fprintf(stderr, "call failed: %s\n", err.c_str());
return 1;
}
printf("reply payload=%s\n", resp.payload().c_str());
return 0;
}
在 bench 场景中,可将多实例 server 分别监听 9000/9001,使用上述示例验证轮询分发。
10. 性能与开销分析
- 泛型封装不增加序列化次数;额外开销为模板/类型检查,可忽略。
- 主成本:网络 IO、压缩/解压、加解密、序列化。
- 优化:调压缩阈值;优先 protobuf;必要时 zstd;复用 protobuf 对象或 arena;减少拷贝。
11. 构建与依赖
依赖:muduo、protobuf、OpenSSL、ZLIB、可选 ZSTD。
CMake 选项:
- SPARK_RPC_ENABLE_BENCH:开启基准工具(默认 ON)。
- SPARK_RPC_PROTO_FILE:proto 路径,默认 proto/echo.proto,自动生成 spark_push_proto。
- SPARK_RPC_USE_BUNDLED_MUDUO:使用内置 muduo(默认 OFF)。
构建示例:
mkdir build cd build cmake .. make -j
12. 测试与验证
- 单元/集成:编解码、压缩/解压、加密/解密、序列化/反序列化。
- 回归:Echo/EchoMeta 正常与错误路径;超时;心跳收发。
- 压测:spark_rpc_bench_server + spark_rpc_bench/bench_multi,关注 QPS、延迟、CPU。
13. 监控与可观测性(建议)
- 指标:QPS、P95/P99、失败率、超时率、重连次数、压缩命中率、加解密失败数。
- 日志:编解码失败、handler not found、超时、心跳异常。
- 钩子:发送前/后、业务前/后可埋点(便于接入 Prometheus/Stats)。
14. 扩展与演进方向
- 自动重连、健康检查、故障摘除/恢复。
- 负载均衡:权重、一致性哈希、最小连接数。
- Streaming/异步接口,支持 pipeline/multiplex。
- 安全:TLS/证书校验;密钥轮换。
- 内存:protobuf arena、对象池、零拷贝 Buffer。
源码领取:************************************
#一人推荐一个值得做的项目##春招##校招##C++##简历中的项目经历要怎么写#