Featured image of post C++11线程池

C++11线程池

根据c++11实现的一个线程池。

引言

在C++11之前,标准库并不提供线程相关的支持(如std::threadstd::mutex等),因此在C++11之前创建线程池需要依赖于操作系统的线程库或第三方库,如 POSIX线程(pthread)Boost.Thread。而在c++11之后,我们可以使用c++的标准库来实现线程池,从而写出跨平台的线程池。

线程池简介

线程池的定义

线程池是一种线程管理模式,用于复用一组固定数量的线程来执行任务,而不是为每个任务都单独创建和销毁线程。通过线程池,任务被放入任务队列中,由空闲的线程依次取出并执行,从而提升性能和资源利用率。


为什么需要线程池?

线程创建和销毁的开销

  • 每次创建和销毁线程都需要占用系统资源。
  • 在线程数量较多时,这些开销会变得显著。

系统资源限制

  • 一个进程中可以创建的线程数量是有限的(由系统资源决定)。
  • 如果频繁创建过多线程,可能会导致资源耗尽。

高效的任务调度

  • 使用线程池可以让多个任务由有限的线程处理,避免因线程切换导致的性能下降。
  • 空闲线程可以立即复用,减少等待时间。

控制并发量

  • 限制线程的数量,防止系统过载。

线程池的工作原理

初始化线程池

  • 创建一定数量的线程,并让它们处于等待(阻塞)状态。

任务提交

  • 新任务被添加到任务队列中。

任务执行

  • 空闲线程从任务队列中取出任务并执行。
  • 执行完任务后,线程继续等待下一个任务。

线程池销毁

  • 停止线程池,等待所有线程完成任务并退出。

线程池的主要组件

任务队列

  • 存放待执行的任务,可以是FIFO队列或优先级队列。
  • 每个任务通常是一个函数或可调用对象。

线程集合

  • 一组预创建的工作线程,用于执行任务。

同步机制

  • 使用锁(如std::mutexpthread_mutex_t)保护任务队列,防止多线程竞争条件。
  • 使用条件变量(如std::condition_variablepthread_cond_t)来通知线程任务的到来。

线程池管理器

  • 提供接口来提交任务、管理线程池大小、以及终止线程池。

线程池的优点

减少资源消耗

  • 复用线程,避免频繁创建和销毁线程。

提高系统性能

  • 避免过多线程竞争CPU资源。
  • 降低线程切换的开销。

方便任务管理

  • 可控制任务的执行顺序(如任务优先级)。
  • 可根据负载动态调整线程池的大小。

提高可扩展性

  • 在多核处理器上,线程池能够更好地利用多核资源,实现并行化处理。

线程池的使用场景

高并发服务器

  • 如Web服务器、数据库服务器等,需要处理大量短时间的任务。

CPU密集型任务

  • 通过固定数量的线程并行处理计算任务,提高CPU利用率。

I/O密集型任务

  • 处理网络请求或文件I/O时,通过线程池避免阻塞。

定时任务

  • 周期性任务调度,例如日志清理、数据备份等。

示例:线程池的任务调度流程

  1. 应用程序提交任务。
  2. 任务被放入任务队列。
  3. 线程池中一个空闲线程取出任务并执行。
  4. 执行完成后,线程返回线程池,等待下一任务。

基于c++11的线程池的实现

task_queue类的设计

  • 任务队列使用生产者消费者模型,将任务当作物品,每当将任务放到任务队列的时候,就会通知线程拿取任务,并让线程执行任务。
  • task_queue的接口如下
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#ifndef TASKQUEUE_H_
#define TASKQUEUE_H_

#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>

class TaskQueue{
    //表示任务的类型
    using ElemType = std::function<void()>;
 
public:
    explicit TaskQueue(int que_size);
    ~TaskQueue() = default;
    void Push(ElemType&& ptask);
    ElemType Pop();
    bool IsFull() const;
    bool IsEmpty() const;
    void WakeUp();

private:
    size_t que_size_;
    std::queue<ElemType> que_;			//用于存放任务,任务应当是一个void()的可调用对象
    std::mutex mutex_;
    std::condition_variable not_full_;
    std::condition_variable not_empty_;
    bool flag_;  //为了唤醒所有的工作线程,可以让while退出
};

#endif
  • task_queue类的实现如下
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#include "task_queue.h"

TaskQueue::TaskQueue(const int que_size) : 
que_size_(que_size),
mutex_(),
not_full_(),
not_empty_(),
flag_(true)
{   
}

void TaskQueue::Push(ElemType&& ptask) {
    std::unique_lock lock(mutex_);
    while(IsFull()) {
        //等待not_full_来唤醒
        not_full_.wait(lock);
    }
    que_.push(std::move(ptask));
    not_empty_.notify_one();
}

TaskQueue::ElemType TaskQueue::Pop() {
    std::unique_lock lock(mutex_);
    while(IsEmpty() && flag_) {
        //等待not_empty_来唤醒
        not_empty_.wait(lock);
    }
    if (flag_) {
        ElemType task = que_.front();
        que_.pop();
        not_full_.notify_one();
        return task;
    } else {
        return nullptr;
    }
}

bool TaskQueue::IsFull() const {
    return que_.size() == que_size_;
}

bool TaskQueue::IsEmpty() const {
    return que_.empty();
}

void TaskQueue::WakeUp() {
    flag_ = false;
    not_empty_.notify_all();
}

thread_pool类的设计

  • thread_pool用于创建多个线程,并可以添加和获取任务,让线程去执行任务。其中有一个DoTask函数,当作线程函数,让线程执行任务。DoTask在执行任务前应当获取任务,如果没有任务,线程就会阻塞等待任务。当任务到来时,通过条件变量唤醒线程,并获取到任务task(一个可调用对象),这时候就可以直接调用task()去执行任务。
  • thread_pool的接口如下
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#ifndef THREAD_POOL_H_
#define THREAD_POOL_H_

#include <memory>
#include <vector>
#include <functional>
#include <thread>

#include "task_queue.h"

class ThreadPool {
    using Task = std::function<void()>;
public:
    ThreadPool(size_t thread_num, size_t que_size);
    ~ThreadPool() = default;
    void Start();
    void Stop();
    void AddTask(Task&& task);

private:
    Task GetTask();
    void DoTask();

private:
    size_t thread_num_;
    size_t que_size_; 
    std::vector<std::unique_ptr<std::thread>> threads_;
    TaskQueue task_que_;
    bool is_exit_;
};

#endif
  • thread_pool的实现如下
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#include <iostream>
#include <functional>
#include <thread>
#include <chrono>

#include "thread_pool.h"

ThreadPool::ThreadPool(const size_t thread_num, const size_t que_size) :
thread_num_(thread_num),
que_size_(que_size),
task_que_(que_size_),
is_exit_(false) {
    threads_.reserve(thread_num_);
}

void ThreadPool::Start() {
    for (size_t i = 0; i < thread_num_; i++) {
        threads_.push_back(std::make_unique<std::thread>(std::bind(&ThreadPool::DoTask,this)));
    }
}

void ThreadPool::Stop() {
    //确保任务队列里的任务可以执行完
    while(!task_que_.IsEmpty()) {
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
    std::this_thread::sleep_for(std::chrono::seconds(2));
    is_exit_ = true;
    task_que_.WakeUp();
    for (size_t i = 0; i < thread_num_; i++) {
        threads_[i]->join();
    }
}

void ThreadPool::AddTask(Task&& task) {
    if (task) {
        task_que_.Push(std::move(task));
    }
} 

ThreadPool::Task ThreadPool::GetTask() {
    return task_que_.Pop();
}

void ThreadPool::DoTask() {
    while(!is_exit_) {
        if (Task task = GetTask()) {
            task();
        }
    } 
}

设计上的细节

  • 在终止线程池之前,要确保任务队列中的任务被全部取出。
  • 当线程池终止的时候,会将退出标志设置为true,并且会唤醒所有睡着的线程,并告知退出,此时线程将不再会进入循环。

使用线程池

  • 定义MyTask类,使用process()成员函数作为任务的具体执行过程,任务所需要的变量可以用MyTask的成员变量来表示。注意任务应当是void()类型的可调用对象,所以thread_pool添加任务的时候,需要绑定一下this指针。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#include <iostream>
#include <vector>
#include <functional>

#include "thread_pool.h"

//自己实现任务逻辑交给线程池去执行
class MyTask {
public:
    explicit MyTask(const int num) : num_(num) {}
    void process() const {
        std::cout << num_ << std::endl;
    }
    char num_;
};

int main() {
    ThreadPool pool(4, 10);
    std::vector<MyTask> tasks;
    for (int i = 0; i < 40; i++) {
        tasks.emplace_back(i+'A');
    }
    pool.Start();
    for (int i = 0; i < 40; i++) {
        pool.AddTask(std::bind(&MyTask::process,&tasks[i]));
    }
    pool.Stop();
}