当我谈线程池时我谈些什么?
最近跟着 purecpp 社区里的两篇博文复习了一下两种线程池的 C++实现(单任务队列、多任务队列),相比于之前那篇 C++11 实现线程池的博文,语言版本进行了一点小更新(C++14-C++17)。在这里结合代码记录一下学习心得。
单任务队列线程池
用现代的 C++标准库(线程+锁+条件变量)实现一个单任务队列的线程池非常简单。就像之前那篇博文里面讲的一样,原理非常简单,对新手而言最复杂的其实就是 C++11 众多的新语言特性罢了。
一个简易的单任务队列线程池的实现思路:在线程池构造时初始化线程数,在析构时停止线程池。对外也只需要提供提交任务的接口就够了。
接口设计
返回类型
explicit ThreadPool(size_t threads = std::thread::hardware_concurrency()); // 构造函数 template<typename F, typename... Args> auto enqueue(F &&f, Args &&...args); // 入队接口
入队接口 enqueue()
这个模板函数返回值使用了 auto
关键字进行推导,实际上的返回值其实是一个 future。future 的类型如下:
using return_type = std::invoke_result_t<F, Args...>; std::future<return_type> res;
也就是说这个 future 返回的类型实际上是任务的返回类型。
输入参数
输入参数是一个可调用对象和它的参数,这里利用了 C++11 的可变参数模板来实现传递任意数量的可调用对象的参数。
实现
class ThreadPool { public: explicit ThreadPool(size_t threads = std::thread::hardware_concurrency()); template<typename F, typename... Args> auto enqueue(F &&f, Args &&...args); ~ThreadPool(); private: std::vector<std::thread> workers; std::queue<std::function<void()>> tasks; std::mutex queue_mutex; std::condition_variable condition; bool stop; };
这个简易单任务队列线程池的成员只有一个线程组,一个任务队列。为了保证任务队列的线程安全,我们还提供了一个锁。同时我们还提供了一个条件变量,利用锁和条件变量,我们可以实现线程通知机制。线程通知机制指的是,刚开始时线程池中是没有任务的,所有的线程都等待任务的到来,当一个任务进入到线程池中,就会通知一个线程去处理到来的任务。
同时我们又提供了一个 stop 变量,用来在析构的时候停止和清理任务和线程。因为懒(高情商:RAII 风格线程池,生命周期基本上与应用的生命周期一致),没有提供 stop 接口。
下面是具体实现:
namespace Diana { // * 简易多线程单任务队列线程池,使用线程安全队列,接口更人性化。 class ThreadPool { public: explicit ThreadPool(size_t threads = std::thread::hardware_concurrency()) : stop(false) { // 根据 threads 数量创建多个线程 for (size_t i = 0; i < threads; ++i) { workers.emplace_back([this]() { for (;;) {// 工作线程就是一个死循环,不停查询任务队列并取出任务执行 std::function<void()> task; { std::unique_lock<std::mutex> lock(this->queue_mutex); this->condition.wait(lock, [this]() { return this->stop || !this->tasks.empty(); // 条件变量等待线程池不为空或者 stop }); if (this->stop && this->tasks.empty()) // 线程池为空且 stop,证明线程池结束,退出线程 return; task = std::move(this->tasks.front()); // 取出任务 this->tasks.pop(); } task(); // 执行任务 } }); // lambda 表达式构建 } } template<typename F, typename... Args> auto enqueue(F &&f, Args &&...args) { using return_type = std::invoke_result_t<F, Args...>; auto task = std::make_shared<std::packaged_task<return_type()>>( std::bind(std::forward<F>(f), std::forward<Args>(args)...)); // 完美转发,构造任务仿函数的指针 std::future<return_type> res = task->get_future(); // 获得函数执行的 future 返回 { std::unique_lock<std::mutex> lock(queue_mutex); if (stop) { throw std::runtime_error("enqueue on stopped Thread pool"); } tasks.emplace([task = std::move(task)]() { (*task)(); });// 塞入任务队列 } // 入队列后即可解锁 condition.notify_one(); // 仅唤醒一个线程,避免无意义的竞争 return res; } ~ThreadPool() { { std::unique_lock<std::mutex> lock(queue_mutex); stop = true; } condition.notify_all(); // 唤醒所有线程,清理任务 for (std::thread &worker: workers) worker.join(); // 阻塞,等待所有线程执行结束 } private: std::vector<std::thread> workers; std::queue<std::function<void()>> tasks; std::mutex queue_mutex; std::condition_variable condition; bool stop; }; }; // namespace Diana
细节上的东西,注释已经写完了。
注意在有队伍进入队列时,仅需要 notify_one(),避免无意义的线程竞争;在停止线程池时,要 notify_all()唤醒所有进程。由于线程在等待 stop 标志,所以当唤醒之后才会把队列中的任务取出执行直到队列为空的时候才会退出线程。
重构:分离队列代码,编写线程安全任务队列
这里进行一次重构,像之前那片博文一样,封装一个线程安全队列:
namespace Diana { // * 线程安全队列 template<typename T> class SafeQueue { public: void push(const T &item) { { std::scoped_lock lock(mtx_); queue_.push(item); } cond_.notify_one(); } void push(T &&item) {// 两个 push 方法,此处不是万能引用而是单纯右值 { std::scoped_lock lock(mtx_); queue_.push(std::move(item)); } cond_.notify_one(); } bool pop(T &item) { std::unique_lock lock(mtx_); cond_.wait(lock, [&]() { return !queue_.empty() || stop_; }); if (queue_.empty()) return false; item = std::move(queue_.front()); queue_.pop(); return true; } std::size_t size() const { std::scoped_lock lock(mtx_); return queue_.size(); } bool empty() const { std::scoped_lock lock(mtx_); return queue_.empty(); } void stop() { { std::scoped_lock lock(mtx_); stop_ = true; } cond_.notify_all(); } private: std::condition_variable cond_; mutable std::mutex mtx_; std::queue<T> queue_; bool stop_ = false; }; };// namespace Diana
线程安全任务队列需要注意的细节不多。注意一下这里写了两个 push 方法,原因是在该模板类特化后,T 已经是一个具体的类型,T&&就不再是泛型编程中常见的万能引用,而是一个单纯的右值了。我们可以通过再添加一个模板成员函数来合并两个 push:
template <typename U> void push(U&& item) { { static_assert(std::is_same<U,T>::value==true); std::scoped_lock lock(mtx_); queue_.push(std::forward(item)); } cond_.notify_one(); }
要注意的是,记得判断一下 U 和 T 的类型是一样的。
利用这个安全队列,我们可以重写一下单任务队列的线程池:
namespace Diana { using WorkItem = std::function<void()>; // * 简易多线程单任务队列线程池,使用 SafeQueue 线程安全队列。 class SimplePool { public: explicit SimplePool(size_t threads = std::thread::hardware_concurrency()) { for (size_t i = 0; i < threads; ++i) { workers_.emplace_back([this]() { for (;;) { std::function<void()> task; if (!queue_.pop(task)) return; if (task) task(); } }); } } void enqueue(WorkItem item) { queue_.push(std::move(item)); } ~SimplePool() { queue_.stop(); for (auto& thd: workers_) thd.join(); } private: SafeQueue<WorkItem> queue_; std::vector<std::thread> workers_; }; };// namespace Diana
入队接口 enqueue()史诗级简化。当然,这种简化也带来一些不便:使用 std::function<void()>作为参数,不想前面的那个传入可调用对象及其参数,在实际使用时需要用户进行包装,这个会在后面功能测试的时候讲。
多任务队列线程池
其实多任务队列的线程池的设计思路也是很简单的:每个线程对应着一个自己的任务队列。因为前面对任务队列进行了抽取重构,编写一个多任务队列的线程池也非常简单。
我们对原本单任务队列的线程池的入队接口进行改造:
int schedule_by_id(WorkItem fn, size_t id = -1)
当提交一个任务时,我们可以指定它放到任意一个线程的任务队列中。在用户没有指定任务队列时,就为该任务随机选择一个线程所对应的任务队列。
实现
简单多任务队列线程池实现如下:
namespace Diana { using WorkItem = std::function<void()>; // * 简易多线程多任务队列线程池,使用 SafeQueue 线程安全队列。 class MultiplePool { public: explicit MultiplePool(size_t thread_num = std::thread::hardware_concurrency()) : queues_(thread_num), thread_num_(thread_num) { auto worker = [this](size_t id) { while (true) { WorkItem task{}; if (!queues_[id].pop(task)) break; if (task) task(); } }; workers_.reserve(thread_num_); for (size_t i = 0; i < thread_num_; ++i) { workers_.emplace_back(worker, i); } } int schedule_by_id(WorkItem fn, size_t id = -1) { if (fn == nullptr) return -1; if (id == -1) { id = rand() % thread_num_;// 随机插入到一个线程的任务队列中 queues_[id].push(std::move(fn)); } else { assert(id < thread_num_);// 插入指定线程的任务队列 queues_[id].push(std::move(fn)); } return 0; } ~MultiplePool() { for (auto& queue: queues_) { queue.stop();// 停止每一个任务队列 } for (auto& worker: workers_) { worker.join();// 阻塞,等待每个线程执行结束 } } private: std::vector<Diana::SafeQueue<WorkItem>> queues_;// 每个线程对应一个任务队列 size_t thread_num_; std::vector<std::thread> workers_; }; };// namespace Diana
需要注意的细节也在代码注释中写明了。
测试
我们编写如下测试代码:
void test_thread_pool() { std::cout << "test_thread_pool()" << std::endl; Diana::ThreadPool threadPool; threadPool.enqueue([] { std::cout << "hello\n"; }); auto future = threadPool.enqueue([](std::string str) { return "hello" + str; }, "world"); std::cout << future.get() << std::endl; } std::string funA(std::string str) { return "hello" + str; } void test_simple_thread_pool() { std::cout << "test_simple_thread_pool()" << std::endl; Diana::SimplePool threadPool; threadPool.enqueue([] { std::cout << "hello\n"; }); // * 此处必须使用 shared_ptr 进行包装, // * 否则在 std::function<void()>中会尝试生成 std::packaged_task 的拷贝构造函数, // ! std::packaged_task 禁止拷贝操作 auto task = std::make_shared<std::packaged_task<std::string()>>(std::bind(funA, "world")); std::future<std::string> res = task->get_future(); threadPool.enqueue([task = std::move(task)] { (*task)(); }); // ! 以下实现方法是错误的 // auto task = std::packaged_task<std::string()>(std::bind(funA, "world")); // std::future<std::string> res = task.get_future(); // threadPool.enqueue(std::move(task)); std::cout << res.get() << std::endl; } void test_multiple_thread_pool() { std::cout << "test_multiple_thread_pool" << std::endl; Diana::MultiplePool threadPool; threadPool.schedule_by_id([] { std::cout << "hello\n"; }); auto task = std::make_shared<std::packaged_task<std::string()>>(std::bind(funA, "world")); std::future<std::string> res = task->get_future(); threadPool.schedule_by_id([task = std::move(task)] { (*task)(); }); std::cout << res.get() << std::endl; }
我们分别为三个版本的线程池(未重构的单任务队列线程池+重构后的单任务队列线程池+多任务队列线程池)编写了接口测试。
未重构的单任务队列线程池,因为接口简单,没有什么需要特别注意的东西。
重构后的单任务队列线程池和多任务队列线程池,可以看到我写了一些注释,还注释了一些错误的任务提交方式。还记得前面重构时,我们把提交任务的接口参数改成了 std::function<void()>嘛?为了配合这个参数格式,以及利用 future 进行异步编程,我们需要对异步获取结果的方法进行包装(之前的版本在入队接口中为用户做了这些事情)。
比较需要注意的一点是我们需要用一个 shared_ptr 智能指针来对 packaged_task 进行包装,这是因为在 std::function<void()>中会尝试生成 std::packaged_task 的拷贝构造函数,而 std::packaged_task 是禁止进行拷贝操作的,这会引起编译器的报错(感谢群里的大佬帮我整明白了这个事情)。
类模板
std::function
是通用多态函数包装器。std::function
的实例能存储、复制及调用任何 可复制构造 (CopyConstructible) 的 可调用 (Callable) 目标——函数、 lambda 表达式 、 bind 表达式 或其他函数对象,还有指向成员函数指针和指向数据成员指针。存储的可调用对象被称为
std::function
的目标。若std::function
不含目标,则称它为空。调用空std::function
的目标导致抛出 std::bad_function_call 异常。
std::function
满足 可复制构造 (CopyConstructible) 和 可复制赋值 (CopyAssignable) 。
cppreference-std::packaged_task 构造函数:
构造新的
std::packaged_task
对象。复制构造函数被删除,
std::packaged_task
仅可移动。
性能比较
根据性能测试,重构和未重构版本的单任务队列线程池有着细微的性能差别(未重构版本略优)。而在数据量足够大的情况下,因为多任务队列的设计,多任务队列线程池性能甚至达到了单任务队列线程池的两倍。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论