多线程系统编程
2025-06-28 10:03:11

对象生命周期管理

C++11标准库引入shared_ptrwak_ptr

  • shared_ptr是强引用(可以改变资源的引用计数),控制对象的生命期:只要有一个指向对象的shared_ptr存在,该对象就不会析构。
  • weak_ptr是弱引用(无法改变资源的引用计数),不控制对象的生命期:它知道对象是否还存活;可通过线程安全lock()尝试提升为有效的shared_ptr
  • 智能指针解决交叉引用导致的资源泄露问题
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 智能指针解决多线程访问共享对象时的安全问题
#include <iostream>
#include <memory>
#include <thread>

using namespace std;

class A
{
public:
A() { cout << "A()" << endl; }
~A() { cout << "~A()" << endl; }
void testA() { cout << "testA()" << endl; }
};


// void handler(A *q)
void handler(weak_ptr<A> pw) // 弱智能指针不会引起对象的引用计数改变!
{
// std::this_thread::sleep_for(std::chrono::seconds(2));
shared_ptr<A> sp = pw.lock();
if (sp != nullptr) // 检测共享对象是否存活
{
sp->testA();
}
else
{
cout << "A 对象已经析构,不能再访问.." << endl;
}
}

int main()
{
// A *p = new A();
shared_ptr<A> p(new A());

thread t(handler, weak_ptr<A>(p));

t.detach(); // 分离线程

std::this_thread::sleep_for(std::chrono::seconds(2)); // 主线程睡眠2s

return 0;
}

线程同步精要

线程同步的四项原则:

  1. 首要原则是尽量最低限度地共享对象,减少需要同步的场合。
  2. 其次是使用高级的并发编程构件,如TaskQueueProducer-Consumer Queue等。
  3. 不得已使用底层同步原语时,只用非递归的互斥其和条件变量,慎用读写锁(与简单的mutex相比,它实际上降低了性能),不要用信号量。
  4. 除了使用atomic整数外,不用”内核级“同步原语。

MutexLock:封装临界区,是一个简单的资源类,用RAII对象创建时获取资源,对象销毁时释放资源)方式封装互斥器的创建与销毁。

MutexLockGuard:封装临界区的进入和退出,即加锁和解锁。(类似于C++11标准中的<mark>lock_guard</mark>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
#include "noncopyable.h"
#include "CurrentThread.h"

#include <assert.h>
#include <pthread.h>

class MutexLock : noncopyable
{
public:
MutexLock()
: holder_(0)
{
// 初始化互斥锁
pthread_mutex_init(&mutex_, NULL);
}

~MutexLock()
{
// 断言当前没有线程持有锁
assert(holder_ == 0);
// 销毁互斥锁
pthread_mutex_destroy(&mutex_);
}

bool isLockedByThisThread()
{
// 检查当前线程是否持有锁
return holder_ == CurrentThread::tid();
}

void assertLocked()
{
assert(isLockedByThisThread());
}

void lock() // 仅供MutexLockGuard调用,严禁用户代码调用
{
pthread_mutex_lock(&mutex_);
holder_ == CurrentThread::tid();
}

void unlock() // 仅供MutexLockGuard调用,严禁用户代码调用
{
holder_ = 0;
pthread_mutex_unlock(&mutex_);
}

pthread_mutex_t* getPthreadMutex() // 仅供Condition调用,严禁用户代码调用
{
return &mutex_;
}

private:
pthread_mutex_t mutex_; // 互斥锁
pid_t holder_; // 持有锁的线程ID
};

class MutexLockGuard : noncopyable
{
public:
explicit MutexLockGuard(MutexLock& mutex)
: mutex_(mutex)
{
mutex_.lock();
}

~MutexLockGuard()
{
mutex_.unlock();
}

private:
MutexLock& mutex_;
};
  • 以上代码仅展示互斥锁的简单封装,并未达到工业强度(mutex创建为PTHREAD_MUTEX_DEFAULT类型而不是PTHREAD_MUTEX_NORMAL类型,严格应该指定;没有检查返回值,assert()在release build中是空语句。)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 用MutexLockGuard代替普通的互斥锁模拟车站卖票

MutexLock mutex_;
std::mutex mtx; // 全局互斥锁 -> 保证不会出现竞态条件

void sellTicket(int index) // 模拟卖票的线程函数
{
while (ticketCount > 0)
{
{
MutexLockGuard lock(mutex_);
// lock_guard<std::mutex> lock(mtx); // 构造时获取锁,出作用域{}析构锁(解锁)
if (ticketCount > 0) // 锁 + 双重判断 -> 防止三个线程在ticketCount为1时都再卖一次票
{
// 临界区代码段 -> 原子操作 -> 线程间互斥操作
cout << "窗口:" << index << "卖出第:" << ticketCount << "张票" << endl;
ticketCount--;
}
}
// mtx.lock(); // 上锁
// if (ticketCount > 0) // 锁 + 双重判断 -> 防止三个线程在ticketCount为1时都再卖一次票
// {
// // 临界区代码段 -> 原子操作 -> 线程间互斥操作
// cout << "窗口:" << index << "卖出第:" << ticketCount << "张票" << endl;
// ticketCount--;
// }
// mtx.unlock(); // 解锁
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
  • ticketCount为1时,可能多个线程都会进入while循环抢互斥锁,而此时只需卖1张票即可,故添加双重判断以避免可能产生的编程错误。

线程同步的四项原则,尽量用高层同步设施(线程池、队列、倒计时)。
使用普通互斥器和条件变量完成剩余的同步任务,采用RAII方式

多线程服务器

文中的“多线程服务器”是指运行在Linux操作系统上的独占式网络应用程序。
Linux下进程间通信(IPC)的方式很多,陈硕大神首选使用Socket(主要指TCP):可跨主机,具有伸缩性。

进程可理解为“内存中正在允许的程序”,每个进程都有自己独立的地址空间。

线程是CPU调度的最小单位,特点是共享地址空间,从而可以高效地共享数据。

陈硕大神推荐的C++多线程服务端编程模式为one loop per thread + thread pool。
one loop per thread:程序中的每个IO线程有一个event loop(Reactor),用于处理读写和定时事件。Event loop代表了线程的主循环,需要让哪个线程干活,就把timer或IO channel(如TCP连接)注册到哪个线程的loop里即可。
thread pool:对于没有IO而仅有计算任务的线程,使用event loop有点浪费,陈硕大神使用的是blocking queue实现的任务队列TaskQueue

在需要限制CPU占用率的场景下可以考虑采用单线程程序,而多线程程序的适用场景有:

  • 有多个CPU可用(单核机器上多线程无性能优势);
  • 线程间有共享数据,即内存中的全局状态;
  • 延迟latency和吞吐量throughput同样重要;
  • 具有可预测的性能。随着负载增加,性能缓慢下降,超过某个临界点之后会急速下降;
  • 多线程能有效地划分责任与功能,让每个线程的逻辑比较简单,任务单一。

多线程服务程序中的线程大致分为3类

  1. IO线程:主循环是IO multiplexing,阻塞地等待在select/poll/epoll_wait系统调用上。
  2. 计算线程:主循环是blocking queue,阻塞地等待在condition variable上。
  3. 第三方库所用的线程,比如logging或database connection。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// 在Muduo网络库中陈硕大神基于MutexLockGuard和Condition
// 实现了线程安全的任务队列TaskQueue
// 在此基于C++11封装一个blocking的简单任务队列TaskQueue
// C++11提供的unique_ptr、lock_guard等非常方便封装vector、queue等
// 成为线程安全的容器。

#pragma once

#include "noncopyable.h"

#include <deque>
#include <mutex>
#include <memory>
#include <condition_variable>

template<typename T>
class BlockingQueue : noncopyable
{
public:
BlockingQueue()
: mutex_(),
queue_()
{}

void put(const T& x)
{
std::unique_lock<std::mutex> lock(mutex_);
queue_.push_back(x);
notEmpty_.notify_all();
}

T take()
{
std::unique_lock<std::mutex> lock(mutex_);
while (queue_.empty())
{
notEmpty_.wait(lock);
}
T front(std::move(queue_.front())); // 取出队列前端的元素
queue_.pop_front();
return std::move(front);
}

size_t size() const
{
std::lock_guard<std::mutex> lock(mutex_);
return queue_.size();
}

private:
// // mutable修饰符用于C++中,表示即使在一个const成员函数中,该变量仍可修改
mutable std::mutex mutex_; // 互斥锁
std::deque<T> queue_;
std::condition_variable notEmpty_; // 用于表示队列非空
};

多线程编程精要

C++的标准库容器和std::string都不是线程安全的,一方面是避免不必要的性能开销,另一方面是单个成员函数的线程安全并不具备可组合性
pthread_t的值容易重复,在Linux系统中,陈硕大神建议使用gettid()系统调用的返回值作为线程id。

1
2
3
4
5
6
7
8
safe_vector<int> vec; // 全局可见 -> 其每个成员函数都是线程安全的

if (!vec.empty()) // 未加锁保护
{
int x = vec[0]; // 在多线程情况下不安全
}

// 可能在if语句判空后,别的线程清空vec的元素,造成vec[0]失效

线程创建的几条原则:

  1. 程序库不应该在未提前告知的情况下创建自己的”背景线程“。
  2. 尽量使用相同的方式创建线程,例如Mudue::Thread
  3. 在进入main()函数之前不应该启动线程。
  4. 程序中线程的创建最好能在初始化阶段全部完成。

高效的多线程日志

”日志“,即文本的、供人阅读的日志,通常用于故障诊断和追踪,亦可用于性能分析。

muduo日志库采用的是双缓冲技术,基本思路是准备两块buffer:A和B,前端负责往buffer A填数据(日志信息),后端负责将buffer B的数据写入文件。当buffer A写满之后,交换A和B,让后端将buffer A的数据写入文件,而前端则往buffer B填入新的日志信息,如此往复。用两个buffer的好处是在新建日志消息的时候不必等待磁盘文件操作,也避免每条新日志消息都触发(唤醒)后端日志线程。

实际实现采用了四个缓冲区,可以进一步减少或避免日志前端的等待。

1
2
3
4
5
6
7
8
9
10
11
// muduo/base/AsyncLogging.h

// LargeBuffer大小为4MB,可以存至少1000条日志消息
typedef boost::ptr_vector<LargeBuffer> BufferVector;
// ptr_vector::auto_type类似于std::unique_ptr
typedef BufferVector::auto_type BufferPtr
muduo::MutexLock mutex_; // 用于保护后面的四个数据成员
muduo::Condition cond_;
BufferPtr currentBuffer_; // 当前缓冲
BufferPtr nextBuffer_; // 预备缓冲
BufferVector buffers_; // 将写入文件的已填满的缓冲
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// muduo/base/AsyncLogging.cc

// 前端在生成一条日志消息的时候会调用AsyncLogging::append()
void AsyncLogging::append(const char* logline, int len)
{
muduo::MutexLockGuard lock(mutex_);
if (currentBuffer_->avail() > len)
{
// 当前缓冲剩余的空间足够大,直接把日志消息拷贝(追加)到当前缓冲中
currentBuffer_->append(logline, len);
}
else
{
// 否则,当前缓冲区已写满,把它送入(移入)buffers_
buffers_.push_back(std::move(currentBuffer_));

// 试图将预备缓冲移用为当前缓冲
if (nextBuffer_)
{
currentBuffer_ = std::move(nextBuffer_);
}
else
{
currentBuffer_.reset(new Buffer); // Rarely happens
}
// 追加日志消息,并通知(唤醒)后端开始写入日志数据
currentBuffer_->append(logline, len);
cond_.notify();
}
}

// 接收方(后端)实现
void AsyncLogging::threadFunc()
{
// 准备好两块空闲的buffer,以备在临界区内交换
BufferPtr newBuffer1(new Buffer);
BufferPtr newBuffer2(new Buffer);
BufferVector buffersToWrite;
while (running_)
{
// swap out what need to be written, keep CS short
{
muduo::MutexLockGuard lock(mutex_);
if (buffers_.empty())
{
cond_.waitForSeconds(flushInterval_);
}
// 条件满足时,将缓冲区currentBuffer_移入buffers_
buffers_.push_back(std::move(currentBuffer_));
// 并将空闲的newBuffer1移为当前缓冲
currentBuffer_ = std::move(newBuffer1);
// 将buffers_与buffersToWrite交换
buffersToWrite.swap(buffers_);
if (!nextBuffer_)
{
// 用newBuffer2替换nextBuffer_,保证前端始终有一个预备buffer可供调配
nextBuffer_ = std::move(newBuffer2);
}
}
// 临界区外安全的访问buffersToWrite,将日志数据写入文件
// 重新填充newBuffer1和newBuffer2
// 这样下次执行时还有两个空闲buffer可用于替换前端的当前缓冲和预备缓冲
}
// flush output
}

在前后端之间高效传递日志消息的办法不止一种,比方说使用常规的muduo::BlockingQueue<std::string>muduo::BoundedBlockingQueue<std::string>在前后端之间传递日志消息,其中每个std::string是一条消息。这种做法每条日志消息都要分配内存,特别是在前端线程分配的内存要由后端释放,因此对malloc的实现要求较高,需要针对多线程特别优化。
muduo库的异步日志实现用了一个全局锁。尽管临界区很小,但是如果线程数目较多,锁争用也可能影响性能。一种解决办法是像Java的ConcurrentHashMap那样用多个桶子bucket,前端写日志的时候再按线程id哈希到不同的bucket中,以减少contention。

Prev
2025-06-28 10:03:11
Next