既然是线程池,就需要给不同任务提供接口,一般底层基于任务队列来实现,C++中 STL容器并非线程安全的,要用std::mutex
来封装。
基本概念
并发:单核,多个线程分配不同时间片 → “看起来共同执行”。
并行:多核,真正的共同执行。
IO密集型:程序里面的指令涉及一些IO操作,比如设备、文件、网络操作(等待客户端连接IO操作是可以阻塞程序)。
CPU密集型:程序里面的指令都是做计算用的。
fixed模式线程池:线程池里面的线程个数是固定不变的,一般是ThreadPool创建时根据当前机器的CPU核心数量进行指定。
cached模式线程池:线程池里面的线程个数是可动态增长的,根据任务的数量动态的增加线程的数量
注:多线程程序一定好吗?不一定(对于多核系统而言,多线程程序对于IO密集型和CPU密集型都是适合的;对于单核系统而言,IO密集型程序是适合的,CPU密集型的程序不适合 - 线程的调度有额外开销。 )
注:线程越多越好吗?不一定(线程的创建和销毁都是非常“重”的操作,业务端频繁创建和销毁线程资源消耗较大;线程栈本身占用大量内存,每个线程都需要线程栈;线程的上下文切换要占用大量时间。 )
基于queue
的阻塞队列
阻塞队列通常用于多线程环境中,尝试对队列执行某些操作,同时需要保证线程安全性,配合条件变量condition_variable
保证线程间通信。 若队列已满,则向队列中添加元素的线程(生产者线程)将被阻塞,直至队列中有空间可以容纳新的元素为止; 若队列为空,则从队列中获取元素的线程(消费者线程)将被阻塞,直至队列中有空就可以容纳新的元素为止。
可以只利用一把互斥锁std::mutex
来保证一个队列的线程安全性;
亦可以在底层维护两个队列,一个队列供生产者使用、一个队列供消费者使用在一定程度上减少生产者、消费者线程对同一把锁的竞争 ,消费者:not_empty_.wait(lock, [this]() { return !consumer_queue_.empty() || SwapQueue() != 0; });
其中SwapQueue()
主要是将生产者队列和消费者队列进行一个交换,消费者队列为空时,队列底层会尝试将两个队列进行交换,但此时依然有可能发生生产者和消费者线程对同一把锁的竞争!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 #ifndef BLOCKINGQUEUEPRO_H #define BLOCKINGQUEUEPRO_H #include <queue> #include <mutex> #include <condition_variable> #include <iostream> template <typename T>class BlockingQueuePro { public : explicit BlockingQueuePro (size_t capacity = 100 ) : capacity_{ capacity} {} BlockingQueuePro (const BlockingQueuePro &) = delete ; BlockingQueuePro &operator =(const BlockingQueuePro &) = delete ; void Push (const T &item) { std::unique_lock<std::mutex> lock (producer_mutex_) ; not_full_.wait (lock, [this ]() { return producer_queue_.size () < capacity_; }); producer_queue_.emplace (item); not_empty_.notify_one (); } void Push (T &&item) { std::unique_lock<std::mutex> lock (producer_mutex_) ; not_full_.wait (lock, [this ]() { return producer_queue_.size () < capacity_; }); producer_queue_.emplace (std::move (item)); not_empty_.notify_one (); } T Pop () { std::unique_lock<std::mutex> lock (consumer_mutex_) ; not_empty_.wait (lock, [this ]() { return !consumer_queue_.empty () || SwapQueue () != 0 ; }); T front = std::move (consumer_queue_.front ()); consumer_queue_.pop (); not_full_.notify_one (); return front; } bool TryPop (T& item) { std::lock_guard<std::mutex> lock (consumer_mutex_) ; if (consumer_queue_.empty () && SwapQueue () == 0 ) { return false ; } item = std::move (consumer_queue_.front ()); consumer_queue_.pop (); not_full_.notify_one (); return true ; } size_t Size () const { std::lock_guard<std::mutex> lock (producer_mutex_) ; return producer_queue_.size (); } private : int SwapQueue () { std::unique_lock<std::mutex> lock (producer_mutex_) ; std::swap (producer_queue_, consumer_queue_); return consumer_queue_.size (); } std::queue<T> producer_queue_; std::queue<T> consumer_queue_; mutable std::mutex producer_mutex_; mutable std::mutex consumer_mutex_; std::condition_variable not_full_; std::condition_variable not_empty_; size_t capacity_; }; #endif
阻塞队列提供的Pop()
接口一般设定为如果队列为空就一直阻塞,如果实际场景中生产者队列在某段时间内始终未向队列中添加新的任务,那么消费者线程就会阻塞在条件变量上,不利于系统性能。
一些优化思路:
设计bool TryPop(T& item)
等尝试获取 / 尝试提交任务的接口,非阻塞式来配合线程池动态调整线程数量;
结合wait_for()
也可以配合线程池动态调整!
基于阻塞队列的线程池
基于C++11 cached模式线程池
基于可变参模板编程支持用户提交各种不同类型的任务!
如何支撑用户提交各种不同类型的任务?
模板定义,使用尾置返回类型语法,返回一个std::future
对象;
打包任务task
,绑定任务函数及其参数,调用task->get_future
异步获取任务的执行结果;
将任务放入任务队列,等待线程池的工作线程执行;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 template <typename Func, typename ... Args>auto SubmitTask (Func &&func, Args... args) -> std::future<decltype (func(args...)) > { using RType = decltype (func (args...)); auto task = std::make_shared<std::packaged_task<RType ()>>( std::bind (std::forward<Func>(func), std::forward<Args>(args)...)); std::future<RType> result = task->get_future (); task_queue_.Push ([task]() { (*task)(); }); return result; }
线程类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 #ifndef THREAD_H #define THREAD_H #include <functional> class Thread { public : using ThreadFunc = std::function<void (int )>; Thread (ThreadFunc func); ~Thread () = default ; void Start () ; int GetId () const ; private : ThreadFunc func_; static int generate_id_; int thread_id_; }; #endif
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 #include "Thread.h" #include <thread> Thread::Thread (ThreadFunc func) : func_ (func) , thread_id_ (generate_id_++) {} void Thread::Start () { std::thread t (func_, thread_id_) ; t.detach (); } int Thread::GetId () const { return thread_id_; } int Thread::generate_id_ = 0 ;
线程池类 ThreadPool提供接口:
支持用户设置线程上限阈值void SetThreadSizeThresHold(int threshold)
;
支持用户开启线程池void Start(int initThreadSize = std::thread::hardware_concurrency());
支持用户手动关闭线程池void Shutdown();
支持用户提交任务auto SubmitTask(Func &&func, Args... args) -> std::future<decltype(func(args...))>
可以继续拓展!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 #ifndef THREADPOOL_H #define THREADPOOL_H #include "Thread.h" #include "BlockingQueuePro.h" #include <chrono> #include <atomic> #include <future> #include <functional> #include <unordered_map> const int TASK_MAX_THRESHOLD = 3000 ; const int THREAD_MAX_THRESHOLD = 20 ; const int THREAD_MAX_IDLE_TIME = 10 ; class ThreadPool { public : ThreadPool (); ~ThreadPool (); ThreadPool (const ThreadPool &) = delete ; ThreadPool &operator =(const ThreadPool &) = delete ; void SetThreadSizeThresHold (int threshold) ; void Start (int initThreadSize = std::thread::hardware_concurrency()) ; void Shutdown () ; template <typename Func, typename ... Args> auto SubmitTask (Func &&func, Args... args) -> std::future<decltype (func(args...)) > { using RType = decltype (func (args...)); auto task = std::make_shared<std::packaged_task<RType ()>>( std::bind (std::forward<Func>(func), std::forward<Args>(args)...)); std::future<RType> result = task->get_future (); task_queue_.Push ([task]() { (*task)(); }); if (cur_thread_size_ < thread_threshold_ && task_queue_.Size () > idle_thread_size_ * 10 ) { AddThread (); } return result; } private : void AddThread () ; void ThreadFunc (int thread_id) ; bool CheckRunningState () const ; private : using Task = std::function<void ()>; BlockingQueuePro<Task> task_queue_{TASK_MAX_THRESHOLD}; std::unordered_map<int , std::unique_ptr<Thread>> threads_; std::atomic_bool running_; std::atomic_int cur_thread_size_; std::atomic_int idle_thread_size_; int init_thread_size_; int thread_threshold_; std::mutex idle_thread_mtx_; }; #endif
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 #include "ThreadPool.h" ThreadPool::ThreadPool () : running_{false }, cur_thread_size_{0 }, idle_thread_size_{0 }, thread_threshold_ (THREAD_MAX_THRESHOLD) { } ThreadPool::~ThreadPool () { Shutdown (); } void ThreadPool::Shutdown () { running_ = false ; std::cout << "Shutdown()" << cur_thread_size_ << std::endl; threads_.clear (); cur_thread_size_ = 0 ; idle_thread_size_ = 0 ; } void ThreadPool::SetThreadSizeThresHold (int threshold) { if (CheckRunningState ()) return ; thread_threshold_ = threshold; } void ThreadPool::Start (int initThreadSize) { running_ = true ; init_thread_size_ = initThreadSize; for (int i = 0 ; i < init_thread_size_; ++i) { AddThread (); } } void ThreadPool::AddThread () { std::unique_ptr<Thread> ptr (new Thread(std::bind(&ThreadPool::ThreadFunc, this , std::placeholders::_1))) ; int thread_id = ptr->GetId (); std::cout << "Thread: " << ptr->GetId () << " start work." << std::endl; threads_.emplace (thread_id, std::move (ptr)); threads_[thread_id]->Start (); cur_thread_size_++; idle_thread_size_++; } void ThreadPool::ThreadFunc (int thread_id) { std::chrono::steady_clock::time_point lastActiveTime = std::chrono::steady_clock::now (); for (;;) { if (!running_) { std::cout << "Thread: " << thread_id << " exit!" << std::endl; return ; } Task task; if (!task_queue_.TryPop (task)) { auto now = std::chrono::steady_clock::now (); auto idleDuration = std::chrono::duration_cast <std::chrono::seconds>(now - lastActiveTime).count (); if (idleDuration >= THREAD_MAX_IDLE_TIME) { bool exiting_ = false ; { std::lock_guard<std::mutex> lock (idle_thread_mtx_) ; if (cur_thread_size_ > init_thread_size_) { threads_.erase (thread_id); cur_thread_size_--; idle_thread_size_--; exiting_ = true ; } } if (exiting_) { std::cout << "Thread: " << thread_id << " exit!" << std::endl; return ; } } } else { idle_thread_size_--; task (); idle_thread_size_++; lastActiveTime = std::chrono::steady_clock::now (); } } } bool ThreadPool::CheckRunningState () const { return running_; }
调用示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 #include "ThreadPool.h" #include <thread> void ExampleTask (int taskId) { std::this_thread::sleep_for (std::chrono::milliseconds (100 )); } int Add (int a, int b) { return a + b; } int main () { ThreadPool pool; pool.Start (4 ); for (int i = 0 ; i < 100 ; ++i) { pool.SubmitTask (ExampleTask, i); } std::this_thread::sleep_for (std::chrono::seconds (30 )); for (int i = 0 ; i < 10000 ; ++i) { auto result = pool.SubmitTask (Add, i, i); } std::this_thread::sleep_for (std::chrono::seconds (30 )); return 0 ; }