引言
在C++11之前,标准库并不提供线程相关的支持(如std::thread、std::mutex等),因此在C++11之前创建线程池需要依赖于操作系统的线程库或第三方库,如 POSIX线程(pthread) 或 Boost.Thread。而在c++11之后,我们可以使用c++的标准库来实现线程池,从而写出跨平台的线程池。
线程池简介
线程池的定义
线程池是一种线程管理模式,用于复用一组固定数量的线程来执行任务,而不是为每个任务都单独创建和销毁线程。通过线程池,任务被放入任务队列中,由空闲的线程依次取出并执行,从而提升性能和资源利用率。
为什么需要线程池?
线程创建和销毁的开销:
- 每次创建和销毁线程都需要占用系统资源。
- 在线程数量较多时,这些开销会变得显著。
系统资源限制:
- 一个进程中可以创建的线程数量是有限的(由系统资源决定)。
- 如果频繁创建过多线程,可能会导致资源耗尽。
高效的任务调度:
- 使用线程池可以让多个任务由有限的线程处理,避免因线程切换导致的性能下降。
- 空闲线程可以立即复用,减少等待时间。
控制并发量:
线程池的工作原理
初始化线程池:
- 创建一定数量的线程,并让它们处于等待(阻塞)状态。
任务提交:
任务执行:
- 空闲线程从任务队列中取出任务并执行。
- 执行完任务后,线程继续等待下一个任务。
线程池销毁:
线程池的主要组件
任务队列:
- 存放待执行的任务,可以是FIFO队列或优先级队列。
- 每个任务通常是一个函数或可调用对象。
线程集合:
同步机制:
- 使用锁(如
std::mutex或pthread_mutex_t)保护任务队列,防止多线程竞争条件。
- 使用条件变量(如
std::condition_variable或pthread_cond_t)来通知线程任务的到来。
线程池管理器:
- 提供接口来提交任务、管理线程池大小、以及终止线程池。
线程池的优点
减少资源消耗:
提高系统性能:
- 避免过多线程竞争CPU资源。
- 降低线程切换的开销。
方便任务管理:
- 可控制任务的执行顺序(如任务优先级)。
- 可根据负载动态调整线程池的大小。
提高可扩展性:
- 在多核处理器上,线程池能够更好地利用多核资源,实现并行化处理。
线程池的使用场景
高并发服务器:
- 如Web服务器、数据库服务器等,需要处理大量短时间的任务。
CPU密集型任务:
- 通过固定数量的线程并行处理计算任务,提高CPU利用率。
I/O密集型任务:
定时任务:
示例:线程池的任务调度流程
- 应用程序提交任务。
- 任务被放入任务队列。
- 线程池中一个空闲线程取出任务并执行。
- 执行完成后,线程返回线程池,等待下一任务。
基于c++11的线程池的实现
task_queue类的设计
- 任务队列使用生产者消费者模型,将任务当作物品,每当将任务放到任务队列的时候,就会通知线程拿取任务,并让线程执行任务。
- task_queue的接口如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
#ifndef TASKQUEUE_H_
#define TASKQUEUE_H_
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
class TaskQueue{
//表示任务的类型
using ElemType = std::function<void()>;
public:
explicit TaskQueue(int que_size);
~TaskQueue() = default;
void Push(ElemType&& ptask);
ElemType Pop();
bool IsFull() const;
bool IsEmpty() const;
void WakeUp();
private:
size_t que_size_;
std::queue<ElemType> que_; //用于存放任务,任务应当是一个void()的可调用对象
std::mutex mutex_;
std::condition_variable not_full_;
std::condition_variable not_empty_;
bool flag_; //为了唤醒所有的工作线程,可以让while退出
};
#endif
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
#include "task_queue.h"
TaskQueue::TaskQueue(const int que_size) :
que_size_(que_size),
mutex_(),
not_full_(),
not_empty_(),
flag_(true)
{
}
void TaskQueue::Push(ElemType&& ptask) {
std::unique_lock lock(mutex_);
while(IsFull()) {
//等待not_full_来唤醒
not_full_.wait(lock);
}
que_.push(std::move(ptask));
not_empty_.notify_one();
}
TaskQueue::ElemType TaskQueue::Pop() {
std::unique_lock lock(mutex_);
while(IsEmpty() && flag_) {
//等待not_empty_来唤醒
not_empty_.wait(lock);
}
if (flag_) {
ElemType task = que_.front();
que_.pop();
not_full_.notify_one();
return task;
} else {
return nullptr;
}
}
bool TaskQueue::IsFull() const {
return que_.size() == que_size_;
}
bool TaskQueue::IsEmpty() const {
return que_.empty();
}
void TaskQueue::WakeUp() {
flag_ = false;
not_empty_.notify_all();
}
|
thread_pool类的设计
- thread_pool用于创建多个线程,并可以添加和获取任务,让线程去执行任务。其中有一个DoTask函数,当作线程函数,让线程执行任务。DoTask在执行任务前应当获取任务,如果没有任务,线程就会阻塞等待任务。当任务到来时,通过条件变量唤醒线程,并获取到任务task(一个可调用对象),这时候就可以直接调用task()去执行任务。
- thread_pool的接口如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
#ifndef THREAD_POOL_H_
#define THREAD_POOL_H_
#include <memory>
#include <vector>
#include <functional>
#include <thread>
#include "task_queue.h"
class ThreadPool {
using Task = std::function<void()>;
public:
ThreadPool(size_t thread_num, size_t que_size);
~ThreadPool() = default;
void Start();
void Stop();
void AddTask(Task&& task);
private:
Task GetTask();
void DoTask();
private:
size_t thread_num_;
size_t que_size_;
std::vector<std::unique_ptr<std::thread>> threads_;
TaskQueue task_que_;
bool is_exit_;
};
#endif
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
#include <iostream>
#include <functional>
#include <thread>
#include <chrono>
#include "thread_pool.h"
ThreadPool::ThreadPool(const size_t thread_num, const size_t que_size) :
thread_num_(thread_num),
que_size_(que_size),
task_que_(que_size_),
is_exit_(false) {
threads_.reserve(thread_num_);
}
void ThreadPool::Start() {
for (size_t i = 0; i < thread_num_; i++) {
threads_.push_back(std::make_unique<std::thread>(std::bind(&ThreadPool::DoTask,this)));
}
}
void ThreadPool::Stop() {
//确保任务队列里的任务可以执行完
while(!task_que_.IsEmpty()) {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
std::this_thread::sleep_for(std::chrono::seconds(2));
is_exit_ = true;
task_que_.WakeUp();
for (size_t i = 0; i < thread_num_; i++) {
threads_[i]->join();
}
}
void ThreadPool::AddTask(Task&& task) {
if (task) {
task_que_.Push(std::move(task));
}
}
ThreadPool::Task ThreadPool::GetTask() {
return task_que_.Pop();
}
void ThreadPool::DoTask() {
while(!is_exit_) {
if (Task task = GetTask()) {
task();
}
}
}
|
设计上的细节
- 在终止线程池之前,要确保任务队列中的任务被全部取出。
- 当线程池终止的时候,会将退出标志设置为true,并且会唤醒所有睡着的线程,并告知退出,此时线程将不再会进入循环。
使用线程池
- 定义MyTask类,使用process()成员函数作为任务的具体执行过程,任务所需要的变量可以用MyTask的成员变量来表示。注意任务应当是void()类型的可调用对象,所以thread_pool添加任务的时候,需要绑定一下this指针。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
#include <iostream>
#include <vector>
#include <functional>
#include "thread_pool.h"
//自己实现任务逻辑交给线程池去执行
class MyTask {
public:
explicit MyTask(const int num) : num_(num) {}
void process() const {
std::cout << num_ << std::endl;
}
char num_;
};
int main() {
ThreadPool pool(4, 10);
std::vector<MyTask> tasks;
for (int i = 0; i < 40; i++) {
tasks.emplace_back(i+'A');
}
pool.Start();
for (int i = 0; i < 40; i++) {
pool.AddTask(std::bind(&MyTask::process,&tasks[i]));
}
pool.Stop();
}
|