可以用在项目中的一个小组件:C++11 高性能线程池设计与实现

概述

这是一个基于C++11标准实现的高性能线程池,具有以下特点:

支持任意函数作为任务:可以执行普通函数、成员函数、lambda表达式等

支持获取返回值:通过 std::future 机制异步获取任务执行结果

类型安全:利用C++11模板推导确保类型安全

高效同步:使用条件变量和互斥锁实现高效的线程同步

灵活控制:支持线程池的启动、停止、等待等操作

视频讲解及源码领取:https://www.bilibili.com/video/BV1ywG5zhE8X/

核心设计思想

1. 生产者-消费者模式

用户线程(生产者) → 任务队列 工作线程(消费者)

2. 任务抽象化

将所有可执行的任务抽象为 std::function<void()> 类型,实现统一管理。

3. 异步结果获取

通过 std::packaged_task std::future 实现异步结果获取。

关键C++11新特性详解

1. 可变参数模板 (Variadic Templates)

template <class F, class... Args>
auto exec(F&& f, Args&&... args) -> std::future<decltype(f(args...))>

核心概念:

  • class... Args :表示可以接受0到任意个数的模板参数
  • Args&&... args :参数包展开,支持完美转发
  • 使用示例:
threadpool.exec(func0); // 0个参数
threadpool.exec(func1, 10); // 1个参数
threadpool.exec(func2, 20, "darren"); // 2个参数

2. 完美转发 (Perfect Forwarding)

std::bind(std::forward<F>(f), std::forward<Args>(args)...)

作用:

保持参数的值类别(左值/右值)

避免不必要的拷贝操作

确保参数原样传递给目标函数

3. 自动类型推导

using RetType = decltype(f(args...)); // 推导返回值类型
auto task = std::make_shared<std::packaged_task<RetType()>>(...);

特点:

  • decltype :编译期类型推导
  • auto :自动类型推导,简化代码
  • using :类型别名,比typedef更强大

4. 移动语义 (Move Semantics)

task = std::move(_tasks.front()); // 避免拷贝,提高性能

优势:

  • 减少不必要的深拷贝
  • 提高大对象传递效率
  • 支持只移动类型(如unique_ptr

5. Lambda表达式

fPtr->_func = [task]() { // 捕获task,无参数
printf("do task-----------\n");
(*task)();
};

语法:

  • [task] :按值捕获task
  • () :参数列表
  • {} :函数体

6. 智能指针

typedef shared_ptr<TaskFunc> TaskFuncPtr;
auto task = std::make_shared<std::packaged_task<RetType()>>(...);

优势:

  • 自动内存管理
  • 线程安全的引用计数
  • 避免内存泄漏

7. 标准线程库

std::thread, std::mutex, std::condition_variable, std::atomic

特点:

  • 跨平台线程支持
  • 标准化的同步原语
  • 高效的线程间通信

架构设计分析

类图结构

ZERO_ThreadPool
├── _threads: vector<thread*> // 工作线程池
├── _tasks: queue<TaskFuncPtr> // 任务队列
├── _mutex: mutex // 互斥锁
├── _condition: condition_variable // 条件变量
├── _threadNum: size_t // 线程数量
├── _bTerminate: bool // 终止标志
└── _atomic: atomic<int> // 原子计数器

执行流程图

核心组件详解

1. 任务封装机制

struct TaskFunc {
std::function<void()> _func;
};

设计优势:

  • 统一的任务接口
  • 支持任意可调用对象
  • 便于队列管理

2. 任务提交流程(exec函数)

template <class F, class... Args>
auto exec(F&& f, Args&&... args) -> std::future<decltype(f(args...))>
{
   // 1. 类型推导
   using RetType = decltype(f(args...));

   // 2. 创建packaged_task
   auto task = std::make_shared<std::packaged_task<RetType()>>(
        std::bind(std::forward<F>(f), std::forward<Args>(args)...)
   );

   // 3. 包装为统一任务类型
   TaskFuncPtr fPtr = std::make_shared<TaskFunc>();
   fPtr->_func = [task]() {
         (*task)();
   };

   // 4. 加入队列并通知
   std::unique_lock<std::mutex> lock(_mutex);
   _tasks.push(fPtr);
   _condition.notify_one();

   // 5. 返回future
   return task->get_future();
}

关键步骤解析:

1. 类型推导decltype(f(args...)) 推导函数返回类型

2. 任务包装packaged_task 将函数包装为可存储的任务

3. 参数绑定std::bind 将参数与函数绑定

4. 统一接口lambda表达式将不同类型任务统一为 void()

5. 异步获取:通过 future 异步获取结果

3. 工作线程执行逻辑

void run()
{
   while (!isTerminate()) {
         TaskFuncPtr task;
         bool ok = get(task); // 获取任务
         if (ok) {
             ++_atomic; // 增加执行计数
             try {
                task->_func(); // 执行任务
             } catch (...) {
                // 异常处理
             }
             --_atomic; // 减少执行计数

            // 检查是否所有任务完成
            std::unique_lock<std::mutex> lock(_mutex);
            if (_atomic == 0 && _tasks.empty()) {
                _condition.notify_all(); // 通知waitForAllDone
            }
        }
    }
}

4. 任务获取机制

bool get(TaskFuncPtr& task)
{
    std::unique_lock<std::mutex> lock(_mutex);
    if (_tasks.empty()) {
        // 等待任务或终止信号
        _condition.wait(lock, [this] {
           return _bTerminate || !_tasks.empty();
        });
    }

    if (_bTerminate) return false;

    if (!_tasks.empty()) {
        task = std::move(_tasks.front()); // 移动语义
        _tasks.pop();
        return true;
    }
    return false;
}

设计亮点:

  • 使用条件变量避免忙等待
  • lambda谓词确保等待条件准确
  • 移动语义提高性能

5. 优雅关闭机制

void stop()
{
   {
     std::unique_lock<std::mutex> lock(_mutex);
     _bTerminate = true; // 设置终止标志
     _condition.notify_all(); // 唤醒所有等待线程
   }

   // 等待所有线程结束
   for (auto& thread : _threads) {
     if (thread->joinable()) {
     thread->join();
     }
     delete thread;
   }

   _threads.clear();
}

使用方法与示例

1. 基本使用流程

ZERO_ThreadPool threadpool;
threadpool.init(4); // 初始化4个工作线程
threadpool.start(); // 启动线程池

// 提交任务
threadpool.exec(func0);
threadpool.exec(func1, 10);
threadpool.exec(func2, 20, "hello");

threadpool.waitForAllDone(); // 等待所有任务完成
threadpool.stop(); // 停止线程池

2. 获取返回值

// 函数有返回值
int compute(int a, int b) {
    return a + b;
}

// 提交任务并获取结果
auto future = threadpool.exec(compute, 10, 20);
int result = future.get(); // 阻塞等待结果
cout << "Result: " << result << endl; // 输出: Result: 30

3. 类成员函数调用

class Calculator {
public:
    int multiply(int a, int b) {
        return a * b;
    }
};

Calculator calc;
auto future = threadpool.exec(
    std::bind(&Calculator::multiply, &calc, std::placeholders::_1,
std::placeholders::_2),
   5, 6
);
cout << "Result: " << future.get() << endl; // 输出: Result: 30

4. Lambda表达式任务

auto future = threadpool.exec([](int x) -> int {
   cout << "Processing: " << x << endl;
   return x * x;
}, 5);

cout << "Square: " << future.get() << endl; // 输出: Square: 25

性能优化点

1. 内存管理优化

  • 使用智能指针自动管理内存
  • 移动语义减少拷贝开销
  • 对象池可进一步优化内存分配

2. 锁竞争优化

  • 使用条件变量减少忙等待
  • 最小化临界区大小
  • 可考虑无锁队列进一步优化

3. 缓存友好性

  • 任务队列使用连续内存(std::queue基于deque
  • 原子操作减少缓存一致性开销

4. 线程亲和性

// 可扩展:设置CPU亲和性
#ifdef __linux__
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(cpu_id, &cpuset);
pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
#endif

注意事项与最佳实践

1. 线程数量设置

// CPU密集型任务
size_t cpu_threads = std::thread::hardware_concurrency();

// IO密集型任务
size_t io_threads = cpu_threads * 2; // 经验值

threadpool.init(cpu_threads);

2. 异常安全

try {
    auto future = threadpool.exec(risky_function);
    auto result = future.get(); // 可能抛出异常
} catch (const std::exception& e) {
   // 处理任务执行中的异常
}

3. 资源管理

// RAII模式管理线程池
class ThreadPoolManager {
    ZERO_ThreadPool pool;
public:
    ThreadPoolManager(size_t threads) {
       pool.init(threads);
       pool.start();
    }

    ~ThreadPoolManager() {
       pool.stop(); // 自动清理
    }

    template<typename F, typename... Args>
    auto submit(F&& f, Args&&... args) {
       return pool.exec(std::forward<F>(f), std::forward<Args>(args)...);
    }
};

4. 性能监控

// 添加性能统计
class EnhancedThreadPool : public ZERO_ThreadPool {
    std::atomic<uint64_t> tasks_completed{0};
    std::atomic<uint64_t> total_execution_time{0};

public:
    void printStats() {
       cout << "Tasks completed: " << tasks_completed << endl;
       cout << "Average execution time: "
            << (total_execution_time / tasks_completed) << "ms" << endl;
    }
};

5. 扩展建议

  • 1. 优先级队列:支持任务优先级
  • 2. 动态调整:根据负载动态调整线程数
  • 3. 任务超时:支持任务执行超时
  • 4. 统计信息:添加性能监控和统计
  • 5. 异常处理:更完善的异常处理机制

总结

这个C++11线程池实现展示了现代C++的强大特性:

技术亮点:

  • 利用可变参数模板实现类型安全的通用接口
  • 通过完美转发避免不必要的拷贝
  • 使用future/promise模式实现异步结果获取
  • 采用条件变量实现高效的线程同步
  • 智能指针确保内存安全

应用价值:

  • 📈 显著提高多核CPU利用率
  • 🔧 简化并发编程复杂度
  • 支持各种类型的异步任务
  • 🛡 提供类型安全和异常安全保证
#实习##项目##后端##c++##牛客创作赏金赛#
全部评论

相关推荐

评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务