对象生命周期管理
C++11标准库引入shared_ptr
和wak_ptr
。
shared_ptr
是强引用(可以改变资源的引用计数),控制对象的生命期:只要有一个指向对象的shared_ptr
存在,该对象就不会析构。
weak_ptr
是弱引用(无法改变资源的引用计数),不控制对象的生命期:它知道对象是否还存活;可通过线程安全的lock()
尝试提升为有效的shared_ptr
。
- 智能指针解决交叉引用导致的资源泄露问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| #include <iostream> #include <memory> #include <thread>
using namespace std;
class A { public: A() { cout << "A()" << endl; } ~A() { cout << "~A()" << endl; } void testA() { cout << "testA()" << endl; } };
void handler(weak_ptr<A> pw) { shared_ptr<A> sp = pw.lock(); if (sp != nullptr) { sp->testA(); } else { cout << "A 对象已经析构,不能再访问.." << endl; } }
int main() { shared_ptr<A> p(new A());
thread t(handler, weak_ptr<A>(p));
t.detach();
std::this_thread::sleep_for(std::chrono::seconds(2));
return 0; }
|
线程同步精要
线程同步的四项原则:
- 首要原则是尽量最低限度地共享对象,减少需要同步的场合。
- 其次是使用高级的并发编程构件,如
TaskQueue
、Producer-Consumer Queue
等。
- 不得已使用底层同步原语时,只用非递归的互斥其和条件变量,慎用读写锁(与简单的mutex相比,它实际上降低了性能),不要用信号量。
- 除了使用
atomic
整数外,不用”内核级“同步原语。
MutexLock
:封装临界区,是一个简单的资源类,用RAII
(对象创建时获取资源,对象销毁时释放资源)方式封装互斥器的创建与销毁。
MutexLockGuard
:封装临界区的进入和退出,即加锁和解锁。(类似于C++11标准中的<mark>lock_guard</mark>
)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
| #include "noncopyable.h" #include "CurrentThread.h"
#include <assert.h> #include <pthread.h>
class MutexLock : noncopyable { public: MutexLock() : holder_(0) { pthread_mutex_init(&mutex_, NULL); }
~MutexLock() { assert(holder_ == 0); pthread_mutex_destroy(&mutex_); }
bool isLockedByThisThread() { return holder_ == CurrentThread::tid(); }
void assertLocked() { assert(isLockedByThisThread()); }
void lock() { pthread_mutex_lock(&mutex_); holder_ == CurrentThread::tid(); }
void unlock() { holder_ = 0; pthread_mutex_unlock(&mutex_); }
pthread_mutex_t* getPthreadMutex() { return &mutex_; }
private: pthread_mutex_t mutex_; pid_t holder_; };
class MutexLockGuard : noncopyable { public: explicit MutexLockGuard(MutexLock& mutex) : mutex_(mutex) { mutex_.lock(); }
~MutexLockGuard() { mutex_.unlock(); } private: MutexLock& mutex_; };
|
- 以上代码仅展示互斥锁的简单封装,并未达到工业强度(
mutex
创建为PTHREAD_MUTEX_DEFAULT
类型而不是PTHREAD_MUTEX_NORMAL
类型,严格应该指定;没有检查返回值,assert()
在release build中是空语句。)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
|
MutexLock mutex_; std::mutex mtx;
void sellTicket(int index) { while (ticketCount > 0) { { MutexLockGuard lock(mutex_); if (ticketCount > 0) { cout << "窗口:" << index << "卖出第:" << ticketCount << "张票" << endl; ticketCount--; } } std::this_thread::sleep_for(std::chrono::milliseconds(100)); } }
|
- 当
ticketCount
为1时,可能多个线程都会进入while
循环抢互斥锁,而此时只需卖1张票即可,故添加双重判断以避免可能产生的编程错误。
线程同步的四项原则,尽量用高层同步设施(线程池、队列、倒计时)。
使用普通互斥器和条件变量完成剩余的同步任务,采用RAII
方式
多线程服务器
文中的“多线程服务器”是指运行在Linux操作系统上的独占式网络应用程序。
Linux下进程间通信(IPC)的方式很多,陈硕大神首选使用Socket(主要指TCP):可跨主机,具有伸缩性。
进程可理解为“内存中正在允许的程序”,每个进程都有自己独立的地址空间。
线程是CPU调度的最小单位,特点是共享地址空间,从而可以高效地共享数据。
陈硕大神推荐的C++多线程服务端编程模式为one loop per thread + thread pool。
one loop per thread:程序中的每个IO线程有一个event loop(Reactor),用于处理读写和定时事件。Event loop代表了线程的主循环,需要让哪个线程干活,就把timer或IO channel(如TCP连接)注册到哪个线程的loop里即可。
thread pool:对于没有IO而仅有计算任务的线程,使用event loop有点浪费,陈硕大神使用的是blocking queue实现的任务队列TaskQueue
。
在需要限制CPU占用率的场景下可以考虑采用单线程程序,而多线程程序的适用场景有:
- 有多个CPU可用(单核机器上多线程无性能优势);
- 线程间有共享数据,即内存中的全局状态;
- 延迟latency和吞吐量throughput同样重要;
- 具有可预测的性能。随着负载增加,性能缓慢下降,超过某个临界点之后会急速下降;
- 多线程能有效地划分责任与功能,让每个线程的逻辑比较简单,任务单一。
多线程服务程序中的线程大致分为3类
- IO线程:主循环是IO multiplexing,阻塞地等待在
select/poll/epoll_wait
系统调用上。
- 计算线程:主循环是blocking queue,阻塞地等待在
condition variable
上。
- 第三方库所用的线程,比如logging或database connection。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
|
#pragma once
#include "noncopyable.h"
#include <deque> #include <mutex> #include <memory> #include <condition_variable>
template<typename T> class BlockingQueue : noncopyable { public: BlockingQueue() : mutex_(), queue_() {}
void put(const T& x) { std::unique_lock<std::mutex> lock(mutex_); queue_.push_back(x); notEmpty_.notify_all(); }
T take() { std::unique_lock<std::mutex> lock(mutex_); while (queue_.empty()) { notEmpty_.wait(lock); } T front(std::move(queue_.front())); queue_.pop_front(); return std::move(front); }
size_t size() const { std::lock_guard<std::mutex> lock(mutex_); return queue_.size(); }
private: mutable std::mutex mutex_; std::deque<T> queue_; std::condition_variable notEmpty_; };
|
多线程编程精要
C++的标准库容器和std::string
都不是线程安全的,一方面是避免不必要的性能开销,另一方面是单个成员函数的线程安全并不具备可组合性。
pthread_t
的值容易重复,在Linux系统中,陈硕大神建议使用gettid()
系统调用的返回值作为线程id。
1 2 3 4 5 6 7 8
| safe_vector<int> vec;
if (!vec.empty()) { int x = vec[0]; }
|
线程创建的几条原则:
- 程序库不应该在未提前告知的情况下创建自己的”背景线程“。
- 尽量使用相同的方式创建线程,例如
Mudue::Thread
。
- 在进入
main()
函数之前不应该启动线程。
- 程序中线程的创建最好能在初始化阶段全部完成。
高效的多线程日志
”日志“,即文本的、供人阅读的日志,通常用于故障诊断和追踪,亦可用于性能分析。
muduo日志库采用的是双缓冲技术,基本思路是准备两块buffer:A和B,前端负责往buffer A填数据(日志信息),后端负责将buffer B的数据写入文件。当buffer A写满之后,交换A和B,让后端将buffer A的数据写入文件,而前端则往buffer B填入新的日志信息,如此往复。用两个buffer的好处是在新建日志消息的时候不必等待磁盘文件操作,也避免每条新日志消息都触发(唤醒)后端日志线程。
实际实现采用了四个缓冲区,可以进一步减少或避免日志前端的等待。
1 2 3 4 5 6 7 8 9 10 11
|
typedef boost::ptr_vector<LargeBuffer> BufferVector;
typedef BufferVector::auto_type BufferPtr muduo::MutexLock mutex_; muduo::Condition cond_; BufferPtr currentBuffer_; BufferPtr nextBuffer_; BufferVector buffers_;
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
|
void AsyncLogging::append(const char* logline, int len) { muduo::MutexLockGuard lock(mutex_); if (currentBuffer_->avail() > len) { currentBuffer_->append(logline, len); } else { buffers_.push_back(std::move(currentBuffer_));
if (nextBuffer_) { currentBuffer_ = std::move(nextBuffer_); } else { currentBuffer_.reset(new Buffer); } currentBuffer_->append(logline, len); cond_.notify(); } }
void AsyncLogging::threadFunc() { BufferPtr newBuffer1(new Buffer); BufferPtr newBuffer2(new Buffer); BufferVector buffersToWrite; while (running_) { { muduo::MutexLockGuard lock(mutex_); if (buffers_.empty()) { cond_.waitForSeconds(flushInterval_); } buffers_.push_back(std::move(currentBuffer_)); currentBuffer_ = std::move(newBuffer1); buffersToWrite.swap(buffers_); if (!nextBuffer_) { nextBuffer_ = std::move(newBuffer2); } } } }
|
在前后端之间高效传递日志消息的办法不止一种,比方说使用常规的muduo::BlockingQueue<std::string>
或muduo::BoundedBlockingQueue<std::string>
在前后端之间传递日志消息,其中每个std::string
是一条消息。这种做法每条日志消息都要分配内存,特别是在前端线程分配的内存要由后端释放,因此对malloc
的实现要求较高,需要针对多线程特别优化。
muduo库的异步日志实现用了一个全局锁。尽管临界区很小,但是如果线程数目较多,锁争用也可能影响性能。一种解决办法是像Java的ConcurrentHashMap
那样用多个桶子bucket,前端写日志的时候再按线程id哈希到不同的bucket中,以减少contention。