C++ 线程池
2025-06-28 10:03:11

既然是线程池,就需要给不同任务提供接口,一般底层基于任务队列来实现,C++中 STL容器并非线程安全的,要用std::mutex来封装。

基本概念

  • 并发:单核,多个线程分配不同时间片 → “看起来共同执行”。
  • 并行:多核,真正的共同执行。
  • IO密集型:程序里面的指令涉及一些IO操作,比如设备、文件、网络操作(等待客户端连接IO操作是可以阻塞程序)。
  • CPU密集型:程序里面的指令都是做计算用的。
  • fixed模式线程池:线程池里面的线程个数是固定不变的,一般是ThreadPool创建时根据当前机器的CPU核心数量进行指定。
  • cached模式线程池:线程池里面的线程个数是可动态增长的,根据任务的数量动态的增加线程的数量

注:多线程程序一定好吗?不一定(对于多核系统而言,多线程程序对于IO密集型和CPU密集型都是适合的;对于单核系统而言,IO密集型程序是适合的,CPU密集型的程序不适合 - 线程的调度有额外开销。

注:线程越多越好吗?不一定(线程的创建和销毁都是非常“重”的操作,业务端频繁创建和销毁线程资源消耗较大;线程栈本身占用大量内存,每个线程都需要线程栈;线程的上下文切换要占用大量时间。

基于queue的阻塞队列

阻塞队列通常用于多线程环境中,尝试对队列执行某些操作,同时需要保证线程安全性,配合条件变量condition_variable保证线程间通信。
若队列已满,则向队列中添加元素的线程(生产者线程)将被阻塞,直至队列中有空间可以容纳新的元素为止;
若队列为空,则从队列中获取元素的线程(消费者线程)将被阻塞,直至队列中有空就可以容纳新的元素为止。

  • 可以只利用一把互斥锁std::mutex来保证一个队列的线程安全性;
  • 亦可以在底层维护两个队列,一个队列供生产者使用、一个队列供消费者使用在一定程度上减少生产者、消费者线程对同一把锁的竞争,消费者:not_empty_.wait(lock, [this]() { return !consumer_queue_.empty() || SwapQueue() != 0; });其中SwapQueue()主要是将生产者队列和消费者队列进行一个交换,消费者队列为空时,队列底层会尝试将两个队列进行交换,但此时依然有可能发生生产者和消费者线程对同一把锁的竞争!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#ifndef BLOCKINGQUEUEPRO_H
#define BLOCKINGQUEUEPRO_H

/*
阻塞队列:独立生产者队列和消费者队列 - 两把锁(一把给生产者用、一把给消费者用)
生产者和消费者的独立锁可以在一定程度上减少锁竞争
适用场景:高并发的生产者和消费者 - 队列操作频繁
*/

#include <queue>
#include <mutex>
#include <condition_variable>
#include <iostream>

template <typename T>
class BlockingQueuePro
{
public:
// 构造函数
explicit BlockingQueuePro(size_t capacity = 100)
: capacity_{capacity} {}

// 禁止拷贝和赋值
BlockingQueuePro(const BlockingQueuePro &) = delete;
BlockingQueuePro &operator=(const BlockingQueuePro &) = delete;

// 向队列中推入元素
void Push(const T &item)
{
std::unique_lock<std::mutex> lock(producer_mutex_);
// 阻塞等待至队列不满
not_full_.wait(lock, [this]()
{ return producer_queue_.size() < capacity_; });

producer_queue_.emplace(item);
not_empty_.notify_one();
}

// 重载Push,支持右值引用,提高效率
void Push(T &&item)
{
std::unique_lock<std::mutex> lock(producer_mutex_);
not_full_.wait(lock, [this]()
{ return producer_queue_.size() < capacity_; });

producer_queue_.emplace(std::move(item));
not_empty_.notify_one();
}

// 从队列中弹出元素
T Pop()
{
std::unique_lock<std::mutex> lock(consumer_mutex_);
not_empty_.wait(lock, [this]()
{ return !consumer_queue_.empty() || SwapQueue() != 0; });

T front = std::move(consumer_queue_.front());
consumer_queue_.pop();
// 通知可能正在等待的生产者
not_full_.notify_one();
return front;
}

/*
非阻塞地尝试从队列中获取一个元素
阻塞的方式可能会导致某个线程一直等待在锁上,占用系统资源
非阻塞方式如果队列为空直接返回false!
当然阻塞方式也可以用wait_for设置阻塞时间!根据实际场景需求来确定即可!

== 系统长时间稳定运行 => 使用阻塞式Pop() 结合fixed模式线程池支撑系统稳定运行即可!
== 系统运行状态时急时缓 => 使用非阻塞式TryPop() 结合cached模式线程池动态适应系统需求!
*/
bool TryPop(T& item)
{
std::lock_guard<std::mutex> lock(consumer_mutex_);

if (consumer_queue_.empty() && SwapQueue() == 0)
{
return false; // 无任务,返回false
}

item = std::move(consumer_queue_.front());
consumer_queue_.pop();
// 通知可能正在等待的生产者
not_full_.notify_one();
return true;
}

// 返回队列大小 -> 这里返回的是生产者队列大小,可以自行调整!
size_t Size() const
{
std::lock_guard<std::mutex> lock(producer_mutex_);
return producer_queue_.size();
}

private:
// 消费者队列为空,交换消费者和生产者队列
int SwapQueue()
{
std::unique_lock<std::mutex> lock(producer_mutex_);
std::swap(producer_queue_, consumer_queue_);
return consumer_queue_.size();
}

std::queue<T> producer_queue_;
std::queue<T> consumer_queue_;
mutable std::mutex producer_mutex_;
mutable std::mutex consumer_mutex_;
std::condition_variable not_full_;
std::condition_variable not_empty_;
size_t capacity_;
};

#endif

阻塞队列提供的Pop()接口一般设定为如果队列为空就一直阻塞,如果实际场景中生产者队列在某段时间内始终未向队列中添加新的任务,那么消费者线程就会阻塞在条件变量上,不利于系统性能。

一些优化思路:

  1. 设计bool TryPop(T& item)等尝试获取 / 尝试提交任务的接口,非阻塞式来配合线程池动态调整线程数量;
  2. 结合wait_for()也可以配合线程池动态调整!

基于阻塞队列的线程池

  • 基于C++11 cached模式线程池
  • 基于可变参模板编程支持用户提交各种不同类型的任务!

如何支撑用户提交各种不同类型的任务?

  1. 模板定义,使用尾置返回类型语法,返回一个std::future对象;
  2. 打包任务task,绑定任务函数及其参数,调用task->get_future异步获取任务的执行结果;
  3. 将任务放入任务队列,等待线程池的工作线程执行;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 基于可变参模板支持用户提交各种不同类型的任务
template <typename Func, typename... Args>
auto SubmitTask(Func &&func, Args... args) -> std::future<decltype(func(args...))>
{
// 打包任务,放入任务队列
using RType = decltype(func(args...));

auto task = std::make_shared<std::packaged_task<RType()>>(
std::bind(std::forward<Func>(func), std::forward<Args>(args)...)); // 打包任务,将参数绑定至任务函数
std::future<RType> result = task->get_future();

// 通过匿名对象调用实际任务
task_queue_.Push([task]()
{ (*task)(); });

// ...

return result;
}

线程类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// Thread.h
#ifndef THREAD_H
#define THREAD_H

#include <functional>

// 线程类型
class Thread
{
public:
// 线程函数对象类型
using ThreadFunc = std::function<void(int)>;

Thread(ThreadFunc func);
~Thread() = default;

// 启动线程
void Start();

// 获取线程id
int GetId() const;

private:
ThreadFunc func_;
static int generate_id_;
int thread_id_; // 保存线程id
};

#endif
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Thread.cc
#include "Thread.h"

#include <thread>

// 线程构造
Thread::Thread(ThreadFunc func)
: func_(func)
, thread_id_(generate_id_++)
{}

// 启动线程
void Thread::Start()
{
// 创建一个线程来执行一个线程函数
std::thread t(func_, thread_id_); // C++11 线程对象t 和线程函数func_
t.detach(); // 设置分离线程 → 防止线程对象出作用域自动析构
}

// 获取线程ID
int Thread::GetId() const
{
return thread_id_;
}

// 静态成员变量类外初始化
int Thread::generate_id_ = 0;

线程池类

ThreadPool提供接口:

  • 支持用户设置线程上限阈值void SetThreadSizeThresHold(int threshold);
  • 支持用户开启线程池void Start(int initThreadSize = std::thread::hardware_concurrency());
  • 支持用户手动关闭线程池void Shutdown();
  • 支持用户提交任务auto SubmitTask(Func &&func, Args... args) -> std::future<decltype(func(args...))>
  • 可以继续拓展!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
// ThreadPool.h
#ifndef THREADPOOL_H
#define THREADPOOL_H

#include "Thread.h"
#include "BlockingQueuePro.h"

#include <chrono>
#include <atomic>
#include <future>
#include <functional>
#include <unordered_map>

const int TASK_MAX_THRESHOLD = 3000; // 任务队列大小
const int THREAD_MAX_THRESHOLD = 20; // 线程数量上限
const int THREAD_MAX_IDLE_TIME = 10; // 空闲线程的最大存活时间(秒)

// 线程池类型
class ThreadPool
{
public:
ThreadPool();
~ThreadPool();

ThreadPool(const ThreadPool &) = delete;
ThreadPool &operator=(const ThreadPool &) = delete;

// 支持用户设置线程上限阈值
void SetThreadSizeThresHold(int threshold);

void Start(int initThreadSize = std::thread::hardware_concurrency());

void Shutdown();

// 基于可变参模板支持用户提交各种不同类型的任务
template <typename Func, typename... Args>
auto SubmitTask(Func &&func, Args... args) -> std::future<decltype(func(args...))>
{
// 打包任务,放入任务队列
using RType = decltype(func(args...));

auto task = std::make_shared<std::packaged_task<RType()>>(
std::bind(std::forward<Func>(func), std::forward<Args>(args)...)); // 打包任务,将参数绑定至任务函数
std::future<RType> result = task->get_future();

// 通过匿名对象调用实际任务
task_queue_.Push([task]()
{ (*task)(); });

// 动态扩展线程 -> 条件可以根据实际情况调整
if (cur_thread_size_ < thread_threshold_ && task_queue_.Size() > idle_thread_size_ * 10)
{
AddThread();
}

return result;
}

private:
// 添加新线程
void AddThread();

// 定义线程函数
void ThreadFunc(int thread_id);

// 检查pool的运行状态
bool CheckRunningState() const;

private:
using Task = std::function<void()>;

BlockingQueuePro<Task> task_queue_{TASK_MAX_THRESHOLD}; // 任务队列
std::unordered_map<int, std::unique_ptr<Thread>> threads_; // 线程列表

std::atomic_bool running_; // 线程池运行状态
std::atomic_int cur_thread_size_; // 当前线程数量
std::atomic_int idle_thread_size_; // 当前空闲线程数量

int init_thread_size_; // 初始线程数量
int thread_threshold_; // 线程数量上限阈值

std::mutex idle_thread_mtx_; // 保护线程回收
};

#endif
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#include "ThreadPool.h"

ThreadPool::ThreadPool() : running_{false}, cur_thread_size_{0}, idle_thread_size_{0}, thread_threshold_(THREAD_MAX_THRESHOLD)
{
}

ThreadPool::~ThreadPool()
{
Shutdown();
}

void ThreadPool::Shutdown()
{
running_ = false;

std::cout << "Shutdown()" << cur_thread_size_ << std::endl;
// for (int i = 0; i < cur_thread_size_; ++i)
// {
// task_queue_.Push([]() {}); // 插入空任务,唤醒线程
// }

threads_.clear();
cur_thread_size_ = 0;
idle_thread_size_ = 0;
}

// 支持用户设置线程上限阈值
void ThreadPool::SetThreadSizeThresHold(int threshold)
{
// 运行过程无法修改线程池状态
if (CheckRunningState())
return;

thread_threshold_ = threshold;
}

void ThreadPool::Start(int initThreadSize)
{
running_ = true;
init_thread_size_ = initThreadSize;

for (int i = 0; i < init_thread_size_; ++i)
{
AddThread();
}
}

void ThreadPool::AddThread()
{
// auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::ThreadFunc, this, std::placeholders::_1)); // C++14
std::unique_ptr<Thread> ptr(new Thread(std::bind(&ThreadPool::ThreadFunc, this, std::placeholders::_1)));
int thread_id = ptr->GetId();
std::cout << "Thread: " << ptr->GetId() << " start work." << std::endl;
threads_.emplace(thread_id, std::move(ptr));
// 启动线程
threads_[thread_id]->Start();
// 修改线程相关数量
cur_thread_size_++;
idle_thread_size_++;
}

void ThreadPool::ThreadFunc(int thread_id)
{
std::chrono::steady_clock::time_point lastActiveTime = std::chrono::steady_clock::now();

for (;;)
{
if (!running_)
{
std::cout << "Thread: " << thread_id << " exit!" << std::endl;
return; // 关闭线程池线程退出~
}

Task task;
// 非阻塞式获取任务失败
if (!task_queue_.TryPop(task))
{
// 检查空闲时间并决定是否退出
auto now = std::chrono::steady_clock::now();
auto idleDuration = std::chrono::duration_cast<std::chrono::seconds>(now - lastActiveTime).count();
// std::cout << "idleDuration: " << idleDuration << std::endl;
// if (idleDuration >= THREAD_MAX_IDLE_TIME && cur_thread_size_ > init_thread_size_)
// {
// // 线程退出前更新线程池状态
// {
// std::lock_guard<std::mutex> lock(idle_thread_mtx_);
// threads_.erase(thread_id);
// std::cout << "pixel" << threads_.size() << std::endl;
// std::cout << "threadid:" << std::this_thread::get_id() << " exit!" << std::endl;
// cur_thread_size_--;
// idle_thread_size_--;
// }

// std::cout << std::this_thread::get_id() << std::endl;
// std::cout << "cur_thread_size_ " << cur_thread_size_ << std::endl;
// std::cout << "idle_thread_size_ " << idle_thread_size_ << std::endl;

// return; // 退出线程
// }

if (idleDuration >= THREAD_MAX_IDLE_TIME)
{
// 尝试退出前检查是否还有足够的线程
bool exiting_ = false;
{
std::lock_guard<std::mutex> lock(idle_thread_mtx_);
if (cur_thread_size_ > init_thread_size_)
{
// std::cout << "pixel" << threads_.size() << std::endl;
threads_.erase(thread_id);
cur_thread_size_--;
idle_thread_size_--;
exiting_ = true;
}
}

if (exiting_)
{
std::cout << "Thread: " << thread_id << " exit!" << std::endl;
return; // 退出线程
}
}
}
else // 获取任务成功
{
idle_thread_size_--;
task(); // 执行任务
idle_thread_size_++;

// 更新最后活跃时间
lastActiveTime = std::chrono::steady_clock::now();
}
}
}

bool ThreadPool::CheckRunningState() const
{
return running_;
}

调用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#include "ThreadPool.h"

#include <thread>

void ExampleTask(int taskId)
{
// std::cout << "Task " << taskId << " is running on thread " << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟任务耗时
}

int Add(int a, int b)
{
return a + b;
}

int main()
{
ThreadPool pool;
pool.Start(4); // 初始化线程池,4个线程

// 提交多个任务
for (int i = 0; i < 100; ++i)
{
pool.SubmitTask(ExampleTask, i);
}

std::this_thread::sleep_for(std::chrono::seconds(30)); // 等待任务完成

for (int i = 0; i < 10000; ++i)
{
auto result = pool.SubmitTask(Add, i, i);
// std::cout << result.get() << std::endl;
}

std::this_thread::sleep_for(std::chrono::seconds(30)); // 等待任务完成

// pool.Shutdown(); // 按需手动关闭线程池
return 0;
}
Prev
2025-06-28 10:03:11
Next