C++11 实现线程池

oneNeko 于 2022-05-18 发布

线程池是什么

线程池(Thread Pool)管理一个任务队列,一个线程队列,然后每次把一个任务分配给一个线程去做,循环往复

为什么要用线程池

如何构建线程池

线程池一般是要复用线程,所以如果是取一个task分配给某一个thread,执行完之后再重新分配,在语言层面这是基本不能实现的:C++的thread都是执行一个固定的task函数,执行完之后线程也就结束了。所以该如何实现task和thread的分配呢?

让每一个thread创建后,就去执行调度函数:循环获取task,然后执行。

这个循环该什么时候停止呢?

很简单,当线程池停止使用时,循环停止。

这样一来,就保证了thread函数的唯一性,而且复用线程执行task。

总结一下,我们的线程池的主要组成部分有二:

线程池中的线程会不断查询任务队列有无可用任务,当多个线程试图同时查询同一个任务时,这会引起难以估计的灾难。因而我们需要对C++的std::queue进行包装,实现一个线程安全的SafeQueue

实现一个线程安全的SafeQueue原理很简单,利用mutex来限制并发访问即可。我们可以在SafeQueue类中定义一个std::mutex类型的成员变量,并在相应的操作接口(如入队接口enqueue())中利用互斥体包装器来管理这个mutex,确保没有其他人正在访问该资源。

template<typename T>
class SafeQueue {
private:
	std::queue<T> m_queue;
	std::mutex m_mutex; //访问互斥信号量
public:
	SafeQueue() {};
	SafeQueue(SafeQueue&& other) {};
	~SafeQueue() {};

	bool empty() {
		std::unique_lock<std::mutex> lock(m_mutex);
		return m_queue.empty();
	}


	int size() {
		std::unique_lock<std::mutex> lock(m_mutex);
		return m_queue.size();
	}

	void enqueue(T& t) {
		std::unique_lock<std::mutex> lock(m_mutex);
		m_queue.emplace(t);
	}

	bool dequeue(T& t) {
		std::unique_lock<std::mutex> lock(m_mutex);

		if (m_queue.empty()) {
			return false;
		}

		t = std::move(m_queue.front());
		m_queue.pop();
		return true;
	}
};

提交函数

线程池最重要的方法就是负责向任务队列添加任务。我们的提交函数应该做到以下两点:

完整的提交函数如下所示:

// 向线程池提交要执行的函数 ①
template <typename F, typename... Args>
auto submit(F&& f, Args &&...args)->std::future<decltype(f(args...))> {
    
    // 将函数和参数绑定 ②
    std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...);

    // 封装函数 ③
    auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(func);

    std::function<void()> wrapper_func = [task_ptr]() {
        (*task_ptr)();
    };

    // 队列通用安全封包函数,并压入安全队列
    m_queue.enqueue(wrapper_func);

    // 唤醒一个等待中的线程
    m_conditional_lock.notify_one();

    // 返回先前注册的任务指针
    return task_ptr->get_future();
}

① 函数声明

submit()是一个模板函数,这很明显。template<typename F, typename... Args>中的typename... Args是C++11引入的可变模参variadic templates

首先来看长得奇奇怪怪的函数头部分,auto submit(F &&f, Args &&...args) -> std::future<decltype(f(args...))>,这里函数类型的定义用到了叫做尾返回类型推导的技巧。按照标准,auto关键字不能用于函数形参的类型推导,在C++14以前,也不能直接用auto func()的形式来推导函数的返回类型。

因此传统C++中我们必须这么写:

template<typename R, typename T, typename U>
R add(T x, U y) {
 return x+y;
}

这样存在很明显的缺陷:事实上很多时候我们并不知道add()这个函数会进行什么操作,获取什么样的返回类型。最终在C++11中这个问题得到了解决。C++11关键字decltype解决了auto关键字只能对变量类型进行类型推导的缺陷。但是为了利用decltype来推导函数的返回类型,我们并不能直接写出这种形式的代码:

decltype(x+y) add(T x, U y)

因为编译器在读到decltype(x+y)时,xy尚未定义。而这个问题的解决方案,正是尾返回类型推导。C++11引入了一个尾返回类型trailing return type,利用auto关键字将返回类型后置:

template<typename T, typename U>
auto add2(T x, U y) -> decltype(x+y){
    return x + y;
}

至此,看起来奇奇怪怪的函数头中关于函数的返回类型的定义已经清楚明了:该函数的返回值将从std::future<decltype(f(args...))>中自动推导得出。

接着谈函数头。这里我们谈一下std::future,它提供了一个访问异步操作结果的途径。我们可以使用std::future的wait()方法来设置屏障,阻塞线程,实现线程同步。并最终使用std::future的get()方法来获得执行结果。

② 绑定函数和参数

这里我们使用了std::function进行包装从而产生了一个特殊函数,这个特殊函数使用std::bind将函数f和参数args绑定起来。

简单来说,std::function可以对多个相似的函数进行包装(即通用的描述方法)。std::function可以hold住任何可以通过()来调用的对象,包括:

std::bind可以将调用函数时的部分参数先制定好,留下一部分在真正调用时确定。(当然,你也可以直接指定全部参数,在调用时不再指定。)

这里我们会注意到,std::bind中,出现了一个std::forward()的特殊方法。std::forward()又被称作完美转发。简单来说,std::forward()将会完整保留参数的引用类型进行转发。如果参数是左值引用(lvalue),该方法会将参数保留左值引用的形式进行转发,如果参数是右值引用(rvalue),该方法会将参数保留右值引用的形式进行转发。而我们这里为什么要使用这个方法呢?

我们会对为什么使用std::forward()方法产生疑惑,可能是因为我们看到了函数头中的F&& fArgs&&... args,这难道不已经指明这个函数接收的参数类型应为右值引用吗?其实不然。这里的F&& fArgs&&... args中的&&并非是右值引用意思,而是一种特殊现象,这个现象被称作万能引用(universal reference)。 万能引用可以简单理解为,当T是模板参数时,T&&的作用主要是保持值类别进行转发。然而,一个绑定到universial reference上的对象可能具有lvaluesness或者rvalueness,正是因为有这种二义性,所以产生了std::forward

总的来说,②会产生一个以函数f(arg...)返回类型为返回类型、不含参数的特殊函数包装func

③ 封装函数

这里我们使用std::make_shared<>()方法,声明了一个std::packaged_task<decltype(f(args...))()>类型的智能指针,并将前面std::function方法声明的特殊函数包装func传入作为std::packaged_task的实例化参数。智能指针将更方便我们对该std::packaged_task对象进行管理。

std::packaged_task可以用来封装任何可以调用的目标,从而用于实现异步的调用。

④ 唤醒线程

这里条件变量会通知一个处于wait状态的线程,该线程将会从任务队列中取得任务并执行。

这里简要介绍一下条件变量(std::condition_variable):

条件变量std::condition_variable是为了解决死锁而生,当互斥操作不够用而引入的。比如,线程可能需要等待某个条件为真才能继续执行,而一个忙等待循环中可能会导致所有其他线程都无法进入临界区使得条件为真时,就会发生死锁。所以,condition_variable实例被创建出现主要就是用于唤醒等待线程从而避免死锁。std::condition_variablenotify_one()用于唤醒一个线程;notify_all()则是通知所有线程。

线程工作类

本文在线程池中设立私有成员类ThreadWoker作为内置线程工作类,执行真正的工作。

class ThreadWorker // 内置线程工作类
{
private:
    int m_id; // 工作id
​
    ThreadPool *m_pool; // 所属线程池
public:
    // 构造函数
    ThreadWorker(ThreadPool *pool, const int id) : m_pool(pool), m_id(id)
    {
    }
​
    // 重载()操作
    void operator()()
    {
        std::function<void()> func; // 定义基础函数类func
​
        bool dequeued; // 是否正在取出队列中元素
        
        // 判断线程池是否关闭,没有关闭则从任务队列中循环提取任务
        while (!m_pool->m_shutdown)
        {
            {
                // 为线程环境加锁,互访问工作线程的休眠和唤醒
                std::unique_lock<std::mutex> lock(m_pool->m_conditional_mutex);
​
                // 如果任务队列为空,阻塞当前线程
                if (m_pool->m_queue.empty())
                {
                    m_pool->m_conditional_lock.wait(lock); // 等待条件变量通知,开启线程
                }
​
                // 取出任务队列中的元素
                dequeued = m_pool->m_queue.dequeue(func);
            }
​
            // 如果成功取出,执行工作函数
            if (dequeued)
                func();
        }
    }
};

线程池

完整线程池代码

#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include <condition_variable>
#include <functional>
#include <future>
#include <mutex>
#include <queue>

template<typename T>
class SafeQueue {
private:
	std::queue<T> m_queue;
	std::mutex m_mutex; //访问互斥信号量
public:
	SafeQueue() {};
	SafeQueue(SafeQueue&& other) {};
	~SafeQueue() {};

	bool empty() {
		std::unique_lock<std::mutex> lock(m_mutex);
		return m_queue.empty();
	}


	int size() {
		std::unique_lock<std::mutex> lock(m_mutex);
		return m_queue.size();
	}

	void enqueue(T& t) {
		std::unique_lock<std::mutex> lock(m_mutex);
		m_queue.emplace(t);
	}

	bool dequeue(T& t) {
		std::unique_lock<std::mutex> lock(m_mutex);

		if (m_queue.empty()) {
			return false;
		}

		t = std::move(m_queue.front());
		m_queue.pop();
		return true;
	}
};

class ThreadPool {
private:

	// 线程工作类
	class ThreadWorker {
		int m_id; //线程id

		ThreadPool* m_pool;//所属线程池
	public:
		ThreadWorker(ThreadPool* pool, const int id) :m_pool(pool), m_id(id)
		{	}

		void operator()() {
			std::function<void()> func;//定义基础函数类func
			bool dequeued;//是否正在取出队列元素

			while (!m_pool->m_shutdown) {
				{
					// 为线程环境加锁,互访问工作线程的休眠和唤醒
					std::unique_lock<std::mutex> lock(m_pool->m_conditional_mutex);
					
					// 如果任务队列为空,阻塞当前线程
					if (m_pool->m_queue.empty()) {
						m_pool->m_conditional_lock.wait(lock);
					}
					
					// 取出任务
					dequeued = m_pool->m_queue.dequeue(func);
				}

				// 如果成功取出,执行工作函数
				if (dequeued) {
					func();
				}
			}
		}
	};

private:
	bool m_shutdown;// 线程池是否关闭
	SafeQueue<std::function<void()>> m_queue;//任务队列
	std::vector<std::thread> m_threads;// 工作线程队列
	std::mutex m_conditional_mutex;// 线程休眠锁互斥变量
	std::condition_variable m_conditional_lock;// 线程环境锁,可以让线程处于休眠或者唤醒状态
public:
	ThreadPool(const int n_threads = 4) :
		m_threads(std::vector<std::thread>(n_threads)),
		m_shutdown(false) {
	}
	ThreadPool(const ThreadPool&) = delete;
	ThreadPool(ThreadPool&&) = delete;
	ThreadPool& operator=(const ThreadPool&) = delete;
	ThreadPool& operator=(ThreadPool&&) = delete;

	// 初始化线程池
	void init() {
		for (int i = 0; i < m_threads.size(); i++) {
			m_threads.at(i) = std::thread(ThreadWorker(this, i));
		}
	}

	// 等待线程完成当前工作,关闭线程池
	void shutdown() {
		m_shutdown = true;
		m_conditional_lock.notify_all();// 唤醒所有工作线程

		for (int i = 0; i < m_threads.size(); i++) {
			if (m_threads.at(i).joinable()) {
				m_threads.at(i).join();// 并入主线程
			}
		}
	}

	// 向线程池提交要执行的函数
	template <typename F, typename... Args>
	auto submit(F&& f, Args &&...args)->std::future<decltype(f(args...))> {
		
		// 将函数和参数绑定
		std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...);

		// 封装函数,方便管理
		auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(func);

		std::function<void()> wrapper_func = [task_ptr]() {
			(*task_ptr)();
		};

		// 队列通用安全封包函数,并压入安全队列
		m_queue.enqueue(wrapper_func);

		// 唤醒一个等待中的线程
		m_conditional_lock.notify_one();

		// 返回先前注册的任务指针
		return task_ptr->get_future();
	}
};

#endif

结合注释应该能很轻松的理解线程池剩余的代码。

注意一下init()函数和shutdown()函数:

在线程池初始化函数init()中,我们声明并分配工作线程,将工作线程放入工作线程队列m_threads中。

在线程池关闭函数shutdown()中,我们唤醒所有工作线程,并等待期完成所有工作后关闭线程池。

这里我们也可以改进一下代码,将shutdown()函数中的工作转移到ThreadPool的析构函数中,从而更便利日后的使用。

示例

#include "main.h"

#include <string>
#include <regex>
#include <iostream>

#include "ThreadPool.h"
using namespace std;


void multiply(const int a, const int b) {
	std::this_thread::sleep_for(std::chrono::milliseconds(2000));
	cout << a << " * " << b << " = " << a * b << endl;
}

void multiply_output(int& out, const int a, const int b) {
	std::this_thread::sleep_for(std::chrono::milliseconds(2000));
	cout << a << " * " << b << " = " << a * b << endl;
}

int main() {
	ThreadPool pool(4);
	pool.init();

	for (int i = 0; i < 3; i++) {
		for (int j = 0; j < 10; j++) {
			pool.submit(multiply, i, j);
		}
	}
	auto f1=pool.submit(multiply, 5, 10);
	f1.get();

	//std::this_thread::sleep_for(std::chrono::milliseconds(2000));

	pool.shutdown();
	return 0;
}

参考

基于C++实现线程池