线程池是什么
线程池(Thread Pool)管理一个任务队列,一个线程队列,然后每次把一个任务分配给一个线程去做,循环往复
为什么要用线程池
- 创建/销毁线程伴随着系统开销,过于频繁的创建/销毁线程,会很大程度上影响处理效率。而线程池缓存线程,利用闲置的线程来执行任务,避免频繁创建销毁线程的开销
- 线程并发数量过多,抢占系统资源从而导致阻塞。而线程池可以控制线程并发数,避免线程过多造成阻塞
如何构建线程池
线程池一般是要复用线程,所以如果是取一个task分配给某一个thread,执行完之后再重新分配,在语言层面这是基本不能实现的:C++的thread都是执行一个固定的task函数,执行完之后线程也就结束了。所以该如何实现task和thread的分配呢?
让每一个thread创建后,就去执行调度函数:循环获取task,然后执行。
这个循环该什么时候停止呢?
很简单,当线程池停止使用时,循环停止。
这样一来,就保证了thread函数的唯一性,而且复用线程执行task。
总结一下,我们的线程池的主要组成部分有二:
- 任务队列(
Task Queue
) - 线程池(
Thread Pool
) 线程池与任务队列之间的匹配操作,是典型的生产者-消费者模型,本模型使用了两个工具:一个mutex
+ 一个条件变量。mutex
就是锁,保证任务的添加和移除(获取)的互斥性;一个条件变量保证多个线程获取task
的同步性:当任务队列为空时,线程应该等待(阻塞)。任务队列(
Task Queue
)我们会理所当然地希望任务以发送它相同的顺序来逐个执行,因此队列是最适合的数据结构。
线程池中的线程会不断查询任务队列有无可用任务,当多个线程试图同时查询同一个任务时,这会引起难以估计的灾难。因而我们需要对C++的std::queue
进行包装,实现一个线程安全的SafeQueue
。
实现一个线程安全的SafeQueue原理很简单,利用mutex来限制并发访问即可。我们可以在SafeQueue类中定义一个std::mutex类型的成员变量,并在相应的操作接口(如入队接口enqueue())中利用互斥体包装器来管理这个mutex,确保没有其他人正在访问该资源。
template<typename T>
class SafeQueue {
private:
std::queue<T> m_queue;
std::mutex m_mutex; //访问互斥信号量
public:
SafeQueue() {};
SafeQueue(SafeQueue&& other) {};
~SafeQueue() {};
bool empty() {
std::unique_lock<std::mutex> lock(m_mutex);
return m_queue.empty();
}
int size() {
std::unique_lock<std::mutex> lock(m_mutex);
return m_queue.size();
}
void enqueue(T& t) {
std::unique_lock<std::mutex> lock(m_mutex);
m_queue.emplace(t);
}
bool dequeue(T& t) {
std::unique_lock<std::mutex> lock(m_mutex);
if (m_queue.empty()) {
return false;
}
t = std::move(m_queue.front());
m_queue.pop();
return true;
}
};
提交函数
线程池最重要的方法就是负责向任务队列添加任务。我们的提交函数应该做到以下两点:
- 接收任何参数的任何函数。(普通函数,Lambda,成员函数……)
- 立即返回“东西”,避免阻塞主线程。这里返回的“东西”或者说“对象”应该包含任务结束的结果。
完整的提交函数如下所示:
// 向线程池提交要执行的函数 ①
template <typename F, typename... Args>
auto submit(F&& f, Args &&...args)->std::future<decltype(f(args...))> {
// 将函数和参数绑定 ②
std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
// 封装函数 ③
auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(func);
std::function<void()> wrapper_func = [task_ptr]() {
(*task_ptr)();
};
// 队列通用安全封包函数,并压入安全队列
m_queue.enqueue(wrapper_func);
// 唤醒一个等待中的线程
m_conditional_lock.notify_one();
// 返回先前注册的任务指针
return task_ptr->get_future();
}
① 函数声明
submit()
是一个模板函数,这很明显。template<typename F, typename... Args>
中的typename... Args
是C++11引入的可变模参variadic templates
首先来看长得奇奇怪怪的函数头部分,auto submit(F &&f, Args &&...args) -> std::future<decltype(f(args...))>
,这里函数类型的定义用到了叫做尾返回类型推导
的技巧。按照标准,auto
关键字不能用于函数形参的类型推导,在C++14以前,也不能直接用auto func()
的形式来推导函数的返回类型。
因此传统C++中我们必须这么写:
template<typename R, typename T, typename U>
R add(T x, U y) {
return x+y;
}
这样存在很明显的缺陷:事实上很多时候我们并不知道add()
这个函数会进行什么操作,获取什么样的返回类型。最终在C++11中这个问题得到了解决。C++11关键字decltype
解决了auto
关键字只能对变量类型进行类型推导的缺陷。但是为了利用decltype来推导函数的返回类型,我们并不能直接写出这种形式的代码:
decltype(x+y) add(T x, U y)
因为编译器在读到decltype(x+y)
时,x
和y
尚未定义。而这个问题的解决方案,正是尾返回类型推导。C++11引入了一个尾返回类型trailing return type
,利用auto
关键字将返回类型后置:
template<typename T, typename U>
auto add2(T x, U y) -> decltype(x+y){
return x + y;
}
至此,看起来奇奇怪怪的函数头中关于函数的返回类型的定义已经清楚明了:该函数的返回值将从std::future<decltype(f(args...))>
中自动推导得出。
接着谈函数头。这里我们谈一下std::future,它提供了一个访问异步操作结果的途径。我们可以使用std::future的wait()方法来设置屏障,阻塞线程,实现线程同步。并最终使用std::future的get()方法来获得执行结果。
② 绑定函数和参数
这里我们使用了std::function
进行包装从而产生了一个特殊函数,这个特殊函数使用std::bind
将函数f
和参数args
绑定起来。
简单来说,std::function
可以对多个相似的函数进行包装(即通用的描述方法)。std::function
可以hold
住任何可以通过()
来调用的对象,包括:
- 普通函数
- 成员函数
- lambda
- std::bind
而std::bind
可以将调用函数时的部分参数先制定好,留下一部分在真正调用时确定。(当然,你也可以直接指定全部参数,在调用时不再指定。)
这里我们会注意到,std::bind
中,出现了一个std::forward()
的特殊方法。std::forward()
又被称作完美转发。简单来说,std::forward()
将会完整保留参数的引用类型进行转发。如果参数是左值引用(lvalue
),该方法会将参数保留左值引用的形式进行转发,如果参数是右值引用(rvalue
),该方法会将参数保留右值引用的形式进行转发。而我们这里为什么要使用这个方法呢?
我们会对为什么使用std::forward()
方法产生疑惑,可能是因为我们看到了函数头中的F&& f
和Args&&... args
,这难道不已经指明这个函数接收的参数类型应为右值引用吗?其实不然。这里的F&& f
和Args&&... args
中的&&
并非是右值引用意思,而是一种特殊现象,这个现象被称作万能引用(universal reference
)。
万能引用可以简单理解为,当T
是模板参数时,T&&
的作用主要是保持值类别进行转发。然而,一个绑定到universial reference
上的对象可能具有lvaluesness
或者rvalueness
,正是因为有这种二义性,所以产生了std::forward
。
总的来说,②会产生一个以函数f(arg...)返回类型
为返回类型、不含参数的特殊函数包装func
③ 封装函数
这里我们使用std::make_shared<>()
方法,声明了一个std::packaged_task<decltype(f(args...))()>
类型的智能指针,并将前面std::function
方法声明的特殊函数包装func
传入作为std::packaged_task
的实例化参数。智能指针将更方便我们对该std::packaged_task
对象进行管理。
std::packaged_task
可以用来封装任何可以调用的目标,从而用于实现异步的调用。
④ 唤醒线程
这里条件变量会通知一个处于wait
状态的线程,该线程将会从任务队列中取得任务并执行。
这里简要介绍一下条件变量(std::condition_variable
):
条件变量std::condition_variable
是为了解决死锁而生,当互斥操作不够用而引入的。比如,线程可能需要等待某个条件为真才能继续执行,而一个忙等待循环中可能会导致所有其他线程都无法进入临界区使得条件为真时,就会发生死锁。所以,condition_variable
实例被创建出现主要就是用于唤醒等待线程从而避免死锁。std::condition_variable
的notify_one()
用于唤醒一个线程;notify_all()
则是通知所有线程。
线程工作类
本文在线程池中设立私有成员类ThreadWoker作为内置线程工作类,执行真正的工作。
class ThreadWorker // 内置线程工作类
{
private:
int m_id; // 工作id
ThreadPool *m_pool; // 所属线程池
public:
// 构造函数
ThreadWorker(ThreadPool *pool, const int id) : m_pool(pool), m_id(id)
{
}
// 重载()操作
void operator()()
{
std::function<void()> func; // 定义基础函数类func
bool dequeued; // 是否正在取出队列中元素
// 判断线程池是否关闭,没有关闭则从任务队列中循环提取任务
while (!m_pool->m_shutdown)
{
{
// 为线程环境加锁,互访问工作线程的休眠和唤醒
std::unique_lock<std::mutex> lock(m_pool->m_conditional_mutex);
// 如果任务队列为空,阻塞当前线程
if (m_pool->m_queue.empty())
{
m_pool->m_conditional_lock.wait(lock); // 等待条件变量通知,开启线程
}
// 取出任务队列中的元素
dequeued = m_pool->m_queue.dequeue(func);
}
// 如果成功取出,执行工作函数
if (dequeued)
func();
}
}
};
线程池
完整线程池代码
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include <condition_variable>
#include <functional>
#include <future>
#include <mutex>
#include <queue>
template<typename T>
class SafeQueue {
private:
std::queue<T> m_queue;
std::mutex m_mutex; //访问互斥信号量
public:
SafeQueue() {};
SafeQueue(SafeQueue&& other) {};
~SafeQueue() {};
bool empty() {
std::unique_lock<std::mutex> lock(m_mutex);
return m_queue.empty();
}
int size() {
std::unique_lock<std::mutex> lock(m_mutex);
return m_queue.size();
}
void enqueue(T& t) {
std::unique_lock<std::mutex> lock(m_mutex);
m_queue.emplace(t);
}
bool dequeue(T& t) {
std::unique_lock<std::mutex> lock(m_mutex);
if (m_queue.empty()) {
return false;
}
t = std::move(m_queue.front());
m_queue.pop();
return true;
}
};
class ThreadPool {
private:
// 线程工作类
class ThreadWorker {
int m_id; //线程id
ThreadPool* m_pool;//所属线程池
public:
ThreadWorker(ThreadPool* pool, const int id) :m_pool(pool), m_id(id)
{ }
void operator()() {
std::function<void()> func;//定义基础函数类func
bool dequeued;//是否正在取出队列元素
while (!m_pool->m_shutdown) {
{
// 为线程环境加锁,互访问工作线程的休眠和唤醒
std::unique_lock<std::mutex> lock(m_pool->m_conditional_mutex);
// 如果任务队列为空,阻塞当前线程
if (m_pool->m_queue.empty()) {
m_pool->m_conditional_lock.wait(lock);
}
// 取出任务
dequeued = m_pool->m_queue.dequeue(func);
}
// 如果成功取出,执行工作函数
if (dequeued) {
func();
}
}
}
};
private:
bool m_shutdown;// 线程池是否关闭
SafeQueue<std::function<void()>> m_queue;//任务队列
std::vector<std::thread> m_threads;// 工作线程队列
std::mutex m_conditional_mutex;// 线程休眠锁互斥变量
std::condition_variable m_conditional_lock;// 线程环境锁,可以让线程处于休眠或者唤醒状态
public:
ThreadPool(const int n_threads = 4) :
m_threads(std::vector<std::thread>(n_threads)),
m_shutdown(false) {
}
ThreadPool(const ThreadPool&) = delete;
ThreadPool(ThreadPool&&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
ThreadPool& operator=(ThreadPool&&) = delete;
// 初始化线程池
void init() {
for (int i = 0; i < m_threads.size(); i++) {
m_threads.at(i) = std::thread(ThreadWorker(this, i));
}
}
// 等待线程完成当前工作,关闭线程池
void shutdown() {
m_shutdown = true;
m_conditional_lock.notify_all();// 唤醒所有工作线程
for (int i = 0; i < m_threads.size(); i++) {
if (m_threads.at(i).joinable()) {
m_threads.at(i).join();// 并入主线程
}
}
}
// 向线程池提交要执行的函数
template <typename F, typename... Args>
auto submit(F&& f, Args &&...args)->std::future<decltype(f(args...))> {
// 将函数和参数绑定
std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
// 封装函数,方便管理
auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(func);
std::function<void()> wrapper_func = [task_ptr]() {
(*task_ptr)();
};
// 队列通用安全封包函数,并压入安全队列
m_queue.enqueue(wrapper_func);
// 唤醒一个等待中的线程
m_conditional_lock.notify_one();
// 返回先前注册的任务指针
return task_ptr->get_future();
}
};
#endif
结合注释应该能很轻松的理解线程池剩余的代码。
注意一下init()
函数和shutdown()
函数:
在线程池初始化函数init()
中,我们声明并分配工作线程,将工作线程放入工作线程队列m_threads
中。
在线程池关闭函数shutdown()
中,我们唤醒所有工作线程,并等待期完成所有工作后关闭线程池。
这里我们也可以改进一下代码,将shutdown()
函数中的工作转移到ThreadPool
的析构函数中,从而更便利日后的使用。
示例
#include "main.h"
#include <string>
#include <regex>
#include <iostream>
#include "ThreadPool.h"
using namespace std;
void multiply(const int a, const int b) {
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
cout << a << " * " << b << " = " << a * b << endl;
}
void multiply_output(int& out, const int a, const int b) {
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
cout << a << " * " << b << " = " << a * b << endl;
}
int main() {
ThreadPool pool(4);
pool.init();
for (int i = 0; i < 3; i++) {
for (int j = 0; j < 10; j++) {
pool.submit(multiply, i, j);
}
}
auto f1=pool.submit(multiply, 5, 10);
f1.get();
//std::this_thread::sleep_for(std::chrono::milliseconds(2000));
pool.shutdown();
return 0;
}