C++ 多线程
2025-06-28 11:01:01

C++11多线程

  • “语言级别”→ 跨平台 - windows/linux/max
  • “语言层面”→ thread -> windows - createThread; linux - pthread_create(实质调用的仍是各系统的接口)

基本用法

  1. 如何创建启动一个线程 -> 传入线程所需的线程函数和参数,线程自动开启std::thread t(threadFunction);
  2. 子线程如何结束 -> 子线程函数在执行完成,线程就结束了
  3. 主线程如何处理子线程 -> (1)t.join():等待t线程结束,当前线程继续往下运行;(2)t.detach():将t线程设置为分离线程,主线程结束,整个线程结束,所有的子线程都自动结束。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#include <iostream>
#include <thread>
#include <chrono>

using namespace std;

void download()
{
// 模拟下载, 总共耗时500ms,阻塞线程500ms
this_thread::sleep_for(chrono::milliseconds(500));
cout << "child thread: " << this_thread::get_id() << ", download success...." << endl;
}

void func(int num, string str)
{
cout << "child thread: " << this_thread::get_id << endl;
cout << "num: " << num << ", str: " << str << endl;
}

int main()
{
// 打印主线程ID
cout << "main thread ID: " << this_thread::get_id() << endl;

// 创建子线程
thread t(download);

// join() -> 阻塞函数:若download()未完成,则主线程阻塞至任务执行完毕;若download()完成,则继续执行
t.join();

// 若程序要求固定顺序,则可用join()函数来控制 → 例如download()完成后再创建一个子线程
thread t1(func, 13, "lucky number");

// detach() -> 线程分离函数
t1.detach();

// 让主线程休眠, 等待子线程执行完毕 -> 防止线程分离后,func()未完成,主线程就退出并销毁子线程
this_thread::sleep_for(chrono::seconds(5));

// 获取当前计算机的CPU核心数
int num = thread::hardware_concurrency();
cout << "CPU number: " << num << endl;

return 0;
}

临界区互斥锁

  • 多线程程序执行的结果是一致的,不会随着CPU对线程的不同调用顺序而产生不同的运行结果
  • 若多线程程序会产生不同的运行结果,则称为竞态条件
  1. 当多个线程均涉及到临界区资源的修改时,可用mutex对临界区资源进行加锁解锁,以保证程序正常执行!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#include <iostream>
#include <thread>
#include <mutex>
#include <list>
using namespace std;

// C++ thread 模拟车站三个窗口卖票的程序

int ticketCount = 100; // 车站有100张车票,由三个窗口一起卖票
std::mutex mtx; // 全局互斥锁 -> 保证不会出现竞态条件

// 模拟卖票的线程函数
void sellTicket(int index)
{
while (ticketCount > 0)
{
mtx.lock(); // 上锁
if (ticketCount > 0) // 锁 + 双重判断 -> 防止三个线程在ticketCount为1时都再卖一次票
{
// 临界区代码段 -> 原子操作 -> 线程间互斥操作
cout << "窗口:" << index << "卖出第:" << ticketCount << "张票" << endl;
ticketCount--;
}
mtx.unlock(); // 解锁
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}

int main()
{
list<std::thread> tlist;
// 创建线程 -> 模拟车站卖票
for (int i = 0; i < 3; ++i)
{
tlist.push_back(std::thread(sellTicket, i));
}

for (std::thread& t : tlist)
{
t.join();
}

cout << "所有窗口卖票结束..." << endl;

return 0;
}
  1. 一般加锁,若程序在临界区中return,则可能无法解锁,造成死锁的问题。lock_guard是在作用域中加锁,而出作用域会自动解锁,若临界区中return相当于出了作用域,故会自动解锁,不会因return造成死锁问题!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 模拟卖票的线程函数
void sellTicket(int index)
{
while (ticketCount > 0)
{
{
lock_guard<std::mutex> lock(mtx); // 构造时获取锁,出作用域{}析构锁(解锁)
if (ticketCount > 0) // 锁 + 双重判断 -> 防止三个线程在ticketCount为1时都再卖一次票
{
// 临界区代码段 -> 原子操作 -> 线程间互斥操作
cout << "窗口:" << index << "卖出第:" << ticketCount << "张票" << endl;
ticketCount--;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
  1. lock_guard定义处会调用构造函数加锁,离开定义域的话lock_guard会被销毁,调用析构函数解锁 → 若作用域较大,则锁的颗粒度较大,很大程度上影响效率。通信中常用unique_lock!
  • 一般加锁、解锁不涉及return时,可以考虑使用mutex!
  • lock_guard不可能用在函数参数传递或返回过程中,只能用于简单代码段的互斥操作中!
  • unique_lock不仅可以用于简单代码段的互斥操作,亦可用在函数的调用过程中!

同步通信

多线程编有两个问题:

  1. 线程间的互斥(竞态条件 -> 临界区代码段“加锁”
  2. 线程间的同步通信 -> 生产者、消费者模型

若生产者还未生产“产品”,消费者无法消费 -> 故需进行线程同步(“线程间通话”)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <chrono> // 包含 chrono 以支持 sleep_for

using namespace std;

mutex mtx; // 互斥锁
condition_variable cv; // 条件变量

class Queue
{
public:
Queue() : max_size(5) {} // 假设队列的最大容量为 5

void put(int val)
{
unique_lock<mutex> lck(mtx);
// 队列未满时生产者生产 -> 否则等待
while (que.size() >= max_size)
{
cv.wait(lck);
}
que.push(val);

cv.notify_all(); // 生产完通知其它线程
cout << "生产者生产了:" << val << " 号物品" << endl;
}

int get()
{
unique_lock<mutex> lck(mtx);
// 队列非空时消费者消费 -> 否则等待
while (que.empty())
{
cv.wait(lck);
}
int val = que.front();
que.pop();

cv.notify_all(); // 消费完通知其它线程
cout << "消费者消费了:" << val << " 号物品" << endl;
return val;
}

private:
queue<int> que;
int max_size; // 队列的最大容量
};

void producer(Queue* que)
{
for (int i = 1; i <= 10; ++i)
{
que->put(i);
this_thread::sleep_for(chrono::milliseconds(100));
}
}

void consumer(Queue* que)
{
for (int i = 1; i <= 10; ++i)
{
que->get();
this_thread::sleep_for(chrono::milliseconds(100));
}
}

int main()
{
Queue que; // 线程共享的队列

thread t1(producer, &que);
thread t2(consumer, &que);

t1.join();
t2.join();

return 0;
}
  • cv.notify_all() -> 通知其它在cv上等待的线程
  • 其它在cv上等待的线程一旦收到通知:等待状态 -> 阻塞状态 -> 获取互斥锁 -> 继续执行程序!

无锁队列CAS

  • 若临界区代码段较大,用互斥锁可能会比较“重”,不利于效率提升
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#include <iostream>
#include <thread>
#include <mutex>
#include <atomic> // 包含原子类型
#include <list>
#include <chrono>
using namespace std;

// volative -> 防止多线程对共享变量进行缓存 -> 访问的都是原始内存的变量
volatile std::atomic_bool isReady = false;
volatile std::atomic_int m_count = 0;

void task()
{
while (!isReady)
{
std::this_thread::yield(); // 线程让出当前时间片,等待下一次调度
}

for (int i = 0; i < 100; ++i)
{
m_count++;
}
}

int main()
{
list <std::thread> tlist;
for (int i = 0; i < 10; ++i)
{
tlist.push_back(std::thread(task));
}

std::this_thread::sleep_for(std::chrono::seconds(3));
isReady = true; // 等待子线程“工作”完后,置true,防止子线程一直调度

for (std::thread& t : tlist)
{
t.join();
}
cout << "m_count:" << m_count << endl;

return 0;
}
  • 多线程缓存可以加快线程运行的效率
  • volatile -> 无需多线程缓存共享变量 -> 某线程修改共享变量后及时反应!

Linux多线程

线程是轻量级的进程(LWP:light weight process),在Linux环境下线程的本质仍是进程。可理解为:进程是资源分配的最小单位,线程是操作系统调度执行的最小单位。

基本函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include <pthread.h>

// 返回线程ID的函数
pthread_t pthread_self(void); // ID类型为pthread_t -> 无符号长整型数

/*
线程创建函数 -> 一旦调用,即可获得一个子线程
thread -> 传出参数,if 创建成功,则将线程ID存入指针指向的内存
attr -> 线程属性,一般情况默认NULL即可
start_routine -> 函数指针,创建出的子线程的处理动作
arg -> 作为实参传递到start_routine指针指向的函数内部
线程创建成功则返回0
*/
int pthread_create(pthread_t *thread, const pthread_attrt_t *attr, void *(*start_routine)(void *), void *arg));

/*
线程退出函数 -> 一旦调用,线程退出
不会导致虚拟地址空间的释放(主要针对主线程)
retval -> 线程退出时携带的数据,若不需携带则指定为NULL
*/
void pthread_exit(void *retval);

/*
线程回收函数 -> 每次只能回收一个子线程
thread -> 要被回收的子线程的线程ID
retval -> 传出参数,二级指针,指向一级指针的地址,该地址中存储了pthread_exit()传递出的数据,若不需要,则指定为NULL
线程回收成功则返回0
*/
int pthread_join(pthread_t thread, void **retval);

/*
线程分离函数 -> 子线程和主线程分离后,其退出时占用的内核资源由其他进程接管并回收
分离后主线程中的线程回收函数无法回收子线程资源
thread -> 要与主线程分离的子线程ID
线程分离成功则返回0
*/
int pthread_detach(pthread_t thread);

基本用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// sample.c -> 主线程接收子线程传出的数据
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<string.h>
#include<pthread.h>

void* callback(void* arg)
{
printf("子线程:%ld\n", pthread_self()); // 打印子线程的线程ID

int *a = (int*) arg; // 强制类型转换
*a = 100;

pthread_exit(&a); // 线程退出,并将地址传给主线程的pthread_join()函数

return NULL;
}

int main()
{
pthread_t tid; // 线程tid

int a; // 测试变量a

pthread_create(&tid, NULL, callback, &a); // 创建子线程

printf("主线程:%ld\n", pthread_self()); // 打印主线程的线程ID

pthread_join(tid, NULL); // 利用线程回收函数

printf("a = %d\n", a);

pthread_exit(NULL); // 线程退出函数

return 0;
}
/*
linux下执行命令
gcc sample.c -lpthread -o ./bin/app
./bin/app
*/

线程同步的4种方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// 4种同步方式 -> linux环境
#include <pthread.h>

// 互斥锁 -> 一般情况下,每一个共享资源对应一把互斥锁,锁的个数与线程的个数无关
pthread_mutex_t mutex; // 互斥锁的类型
/*
互斥锁初始化函数和释放函数
restrict mutex 和 mutex -> 互斥锁变量的地址
attr -> 属性,默认指定为NULL
函数调用成功会返回0
*/
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
int pthread_mutex_destroy(pthread_mutex_t *mutex);
/*
pthread_mutex_init(&mutex, NULL);
...
pthread_mutex_destroy(&mutex);
*/
int pthread_mutex_lock(pthread_mutex_t *mutex); // 加锁函数
int pthread_mutex_unlock(pthread_mutex_t *mutex); // 解锁函数

// 读写锁 -> if 所有进程都是读操作,那么读是并行的(但是使用互斥锁,读操作也是串行的)
pthread_rwlock_t rwlock; // 读写锁的类型

// 条件变量 -> 生产者 & 消费者模型,实质:线程阻塞
pthread_cond_t cond; // 条件变量类型
/*
线程阻塞函数 -> 线程调用则被阻塞,被唤醒后继续执行
restrict cond -> 条件变量的地址
restrict mutex -> 互斥锁的地址
线程唤醒函数 -> 唤醒阻塞在条件变量上的线程,至少一个解除阻塞
cond -> 条件变量的地址
*/
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
int pthread_cond_signal(pthread_cond_t *cond);

// 信号量函数 -> 生产者 & 消费者
#include<semaphore.h>
sem_t sem; // 信号量类型
/*
信号量初始化和释放函数
sem -> 信号量变量的地址
pshared -> 0:线程同步;非0:进程同步
value -> 初始化信号量拥有的资源数
*/
int sem_init(sem_t *sem, int pshared, unsigned int value);
int sem_destroy(sem_t *sem);
// wait和post函数
int sem_wait(sem_t *sem); // 检测是否需要等待,若有资源可用,则资源数 — 1;否则阻塞
int sem_post(sem_t *sem); // 一般工作完使用,资源数 + 1
Prev
2025-06-28 11:01:01
Next