真正让你感到绝望的不是远方的高山,而是鞋底的沙子。

时间

C time.h

获取从 1970 年 1 月 1 日到当前经过的秒数

long t0 = time(NULL);

让程序休眠 3 秒

sleep(3);

当前时间的三秒后

long t1 = t0 + 3;

让程序休眠 3000000 微秒,也就是 3 秒

usleep(3000000);

C 语言的 API,没有类型区分,容易弄错单位,混淆时间点时间段

比如 t0 * 3 ,乘法对时间点而言根本是个无意义的计算,然而 C 语言把他们看做一样的 long 类型,从而容易让程序员犯错。

C++ std::chrono

利用 C++ 强类型的特点,明确区分时间点时间段,明确区分不同的时间单位。

  • 时间点例子:2022年1月8日 13点07分10秒

  • 时间段例子:1分30秒

  • 时间点类型:chrono::steady_clock::time_point

  • 时间段类型:chrono::millisecondschrono::secondschrono::minutes

  • 方便的运算符重载:时间点 + 时间段 = 时间点,时间点 - 时间点 = 时间段

auto t0 = chrono::steady_clock::now();   // 获取当前时间点
auto t1 = t0 + chrono::seconds(30);      // 当前时间点的 30 秒后
auto dt = t1 - t0;                       // 获取两个时间点的差(时间段)
int64_t sec = chrono::duration_cast<chrono::seconds>(dt).count();    // 时间差的秒数

计算花费的时间

#include <iostream>
#include <chrono>

int main() {
	auto t0 = std::chrono::steady_clock::now();
	for (volatile int i = 0; i < 10000000; ++i);
	auto t1 = std::chrono::steady_clock::now();
	auto dt = t1 - t0;

	int64_t ms = std::chrono::duration_cast<std::chrono::milliseconds>(dt).count();

	std::cout << "time elapsed: " << ms << " ms" << std::endl;
	return 0;
}

时间段:作为 double 类型

duration_cast 可以在任意的 duration 类型之间转换,duration<T, R> 表示用 T 类型,且时间单位是 R,R 省略不写就是秒,std::milli 就是毫秒,std::micro 就是微秒,seconds 是 duration<int64_t> 的类型别名,milliseconds 是 duration<int64_t, std::milli> 的类型别名,这里我们创建了 double_ms 作为 duration<double, std::milli> 的别名。

#include <iostream>
#include <thread>
#include <chrono>

int main() {
	auto t0 = std::chrono::steady_clock::now();
	for(volatile int i = 0; i < 10000000; ++i);
	auto t1 = std::chrono::steady_clock::now();
	auto dt = t1 - t0;
	using double_ms = std::chrono::duration<double, std::milli>;
	double ms = std::chrono::duration_cast<double_ms>(dt).count();
	std::cout << "time elapsed: " << ms << " ms" << std::endl;
	return 0;
}

跨平台的 sleep: std::this_thread::sleep_for

可以用 std::this_thread::sleep_for 替代 Unix 类操作系统专有的 usleep。他可以让当前线程休眠一段时间,然后继续。

而且单位也可以自己指定,比如这里 milliseconds 表示毫秒,也可以换成 microseconds 表示微妙,seconds 表示秒,chrono 的强类型让单位选择更自由。

#include <iostream>
#include <thread>
#include <chrono>

int main() {
	std::this_thread::sleep_for(std::chrono::milliseconds(400));
	return 0;
}

睡到时间点:std::this_thread::sleep_until

除了接受一个时间段的 sleep_for,还有接受一个时间点的 sleep_until,表示让当前线程休眠直到某个时间点。

#include <iostream>
#include <thread>
#include <chrono>

int main() {
	auto t = std::chrono::steady_clock::now() + std::chrono::milliseconds(400);
	std::this_thread::sleep_until(t);
	return 0;
}

线程

std::thread

std::thread 构造函数的参数可以是任意 lambda 表达式。当那个线程启动时,就会执行这个 lambda 里的内容。

#include <iostream>
#include <thread>
#include <string>

void download(std::string file) {
	for (int i = 0; i < 10; ++i) {
		std::cout << "Downloading " << file
			<< " (" << i * 10 << "%)..." << std::endl;
		std::this_thread::sleep_for(std::chrono::milliseconds(400));
	}

	std::cout << "Download complete: " << file << std::endl;
}

void interact() {
	std::string name;
	std::cin >> name;
	std::cout << "Hi, " << name << std::endl;
}

int main() {
	std::thread t1([] {
		download("hello.zip");
	});

	interact();
	return 0;
}

注意:在 linux 下可能会出现链接错误,找不到符号 pthread_create

std::thread 的实现背后是基于 pthread 的。

解决:CMakeLists.txt 里链接 Threads::Threads 即可:

cmake_minimum_required(VERSION 3.10)

set(CMAKE_CXX_STANDARD 17)

project(cpptest LANGUAGES CXX)

add_executable(cpptest main.cpp)

find_package(Threads REQUIRED)
target_link_libraries(cpptest PUBLIC Threads::Threads)

新的问题:在输入之后,用户交互所在的主线程退出后,文件下载所在的子线程,因为从属这个主线程,也被迫退出了。

主线程等待子线程结束:t1.join()

#include <iostream>
#include <thread>
#include <string>

void download(std::string file) {
	for (int i = 0; i < 10; ++i) {
		std::cout << "Downloading " << file << " (" << i * 10 << "%)..." << std::endl;
		std::this_thread::sleep_for(std::chrono::milliseconds(400));
	}

	std::cout << "Download complete: " << file << std::endl;
}

void interact() {
	std::string name;
	std::cin >> name;
	std::cout << "Hi, " << name << std::endl;
}

int main() {
	std::thread t1([] {
		download("hello.zip");
	});

	interact();
	std::cout << "Waiting for child thread..." << std::endl;
	t1.join();
	std::cout << "Child thread exited!" << std::endl;
	return 0;
}

std::thread 的解构函数会销毁线程

作为一个 C++ 类,std::thread 同样遵循 RAII 思想和三五法则:因为管理着资源,它自定义了解构函数,删除了拷贝构造/赋值函数,但是提供了移动构造/赋值函数

因此当 t1 所在的函数退出时,就会调用 std::thread 的解构函数,这会销毁 t1 线程。

即线程示例 1 会出错。

1-tofj.webp

解构函数不再销毁线程:t1.detach()

解决方案:调用成员函数 detach() 分离该线程——意味着线程的生命周期不再由当前 std::thread 对象管理,而是在线程退出以后自动销毁自己。

不过这样还是会在进程退出时候自动退出。

2-ojex.webp

解构函数不再销毁线程:移动到全局线程池

但是 detach 的问题是进程退出时候不会等待所有子线程执行完毕。所以另一种解法是把 t1 对象移动到一个全局变量去,从而延长其生命周期到 myfunc 函数体外。

#include <iostream>
#include <thread>
#include <string>
#include <vector>

void download(std::string file) {
	for (int i = 0; i < 10; ++i) {
		std::cout << "Downloading " << file
			<< " (" << i * 10 << "%)..." << std::endl;
		std::this_thread::sleep_for(std::chrono::milliseconds(400));
	}

	std::cout << "Download complete: " << file << std::endl;
}

void interact() {
	std::string name;
	std::cin >> name;
	std::cout << "Hi, " << name << std::endl;
}

std::vector<std::thread> pool;

void myFunc() {
	std::thread t1([] {
		download("hello.zip");
	});

	pool.push_back(std::move(t1));
}

int main() {
	myFunc();
	interact();
	for (auto& t : pool) t.join();
	return 0;
}

main 函数退出后自动 join 全部线程

但是需要在 main 里面手动 join 全部线程还是有点麻烦,我们可以自定义一个类 ThreadPool,并用他创建一个全局变量,其解构函数会在 main 退出后自动调用。

#include <iostream>
#include <thread>
#include <string>
#include <vector>

void download(std::string file) {
	for (int i = 0; i < 10; ++i) {
		std::cout << "Downloading " << file
			<< " (" << i * 10 << "%)..." << std::endl;
		std::this_thread::sleep_for(std::chrono::milliseconds(400));
	}

	std::cout << "Download complete: " << file << std::endl;
}

void interact() {
	std::string name;
	std::cin >> name;
	std::cout << "Hi, " << name << std::endl;
}

class ThreadPool {
	std::vector<std::thread> m_pool;

public:
	void push_back(std::thread thr) {
		m_pool.push_back(std::move(thr));
	}

	~ThreadPool() {
		for (auto& t : m_pool) t.join();
	}
};

ThreadPool tpool;

void myFunc() {
	std::thread t1([] {
		download("hello.zip");
	});

	tpool.push_back(std::move(t1));
}

int main() {
	myFunc();
	interact();
	return 0;
}

std::jthread: 符合 RAII 思想,解构时自动 join()

C++20 引入了 std::jthread 类,和 std::thread 不同在于:他的解构函数里会自动调用 join() 函数,从而保证 pool 解构时会自动等待全部线程执行完毕。

#include <iostream>
#include <thread>
#include <string>
#include <vector>

void download(std::string file) {
	for (int i = 0; i < 10; ++i) {
		std::cout << "Downloading " << file << " (" << i * 10 << "%)..." << std::endl;
		std::this_thread::sleep_for(std::chrono::milliseconds(400));
	}

	std::cout << "Download complete: " << file << std::endl;
}

void interact() {
	std::string name;
	std::cin >> name;
	std::cout << "Hi, " << name << std::endl;
}

std::vector<std::jthread> pool;

void myFunc() {
	std::jthread t1([] {
		download("hello.zip");
	});

	pool.push_back(std::move(t1));
}

int main() {
	myFunc();
	interact();
	return 0;
}

异步

std::async

std::async 接受一个带返回值的 lambda,自身返回一个 std::future 对象。lambda 的函数体将会在另一个线程里执行。接下来你可以在 main 里面做一些别的事情,download 会持续在后台悄悄运行。最后调用 future 的 get() 方法,如果此时 download 还没完成,会等待 download 完成,并获取 download 的返回值。

#include <iostream>
#include <thread>
#include <string>
#include <future>

int download(std::string file) {
	for (int i = 0; i < 10; ++i) {
		std::cout << "Downloading " << file
			<< " (" << i * 10 << "%)..." << std::endl;
		std::this_thread::sleep_for(std::chrono::milliseconds(400));
	}

	std::cout << "Download complete: " << file << std::endl;
	return 404;
}

void interact() {
	std::string name;
	std::cin >> name;
	std::cout << "Hi, " << name << std::endl;
}

int main() {
	std::future<int> fret = std::async([] {
		return download("hello.zip");
	});

	interact();
	int ret = fret.get();
	std::cout << "Download result: " << ret << std::endl;
	return 0;
}

3-ulgg.webp

显示地等待:wait()

除了 get() 会等待线程执行完毕外,wait() 也可以等待他执行完,但是不会返回其值。

int main() {
	std::future<int> fret = std::async([] {
		return download("hello.zip");
	});

	interact();
	std::cout << "Waiting for download complete..." << std::endl;
	fret.wait();
	std::cout << "Wait returned!" << std::endl;
	int ret = fret.get();
	std::cout << "Download result: " << ret << std::endl;
	return 0;
}

4-aczn.webp

等待一段时间:wait_for()

只要线程没有执行完,wait() 会无限等下去。而 wait_for() 则可以指定一个最长等待时间,用 chrono 里的类表示单位。他会返回一个 std::future_status 表示等待是否成功。如果超过这个时间线程还没有执行完毕,则放弃等待,返回 future_status::timeout。如果线程在指定的时间内执行完毕,则认为等待成功,返回 future_status::ready。同理还有 wait_until() 其参数是一个时间点。

#include <iostream>
#include <thread>
#include <string>
#include <future>

int download(std::string file) {
	for (int i = 0; i < 10; ++i) {
		std::cout << "Downloading " << file
			<< " (" << i * 10 << "%)..." << std::endl;
		std::this_thread::sleep_for(std::chrono::milliseconds(400));
	}

	std::cout << "Download complete: " << file << std::endl;
	return 404;
}

void interact() {
	std::string name;
	std::cin >> name;
	std::cout << "Hi, " << name << std::endl;
}

int main() {
	std::future<int> fret = std::async([] {
		return download("hello.zip");
	});

	interact();
	while (true) {
		std::cout << "Waiting for download complete..." << std::endl;
		auto stat = fret.wait_for(std::chrono::milliseconds(1000));

		if (stat == std::future_status::ready) {
			std::cout << "Future is ready!!" << std::endl;
			break;
		}
		else {
			std::cout << "Future not ready!!" << std::endl;
		}
	}

	int ret = fret.get();
	std::cout << "Download result: " << ret << std::endl;
	return 0;
}

另一种用法:std::launch::deferred 做参数

std::async 的第一个参数可以设为 std::launch::deferred,这时不会创建一个线程来执行,他只会把 lambda 函数体内的运算推迟到 future 的 get() 被调用时。也就是 main 中的 interact 计算完毕后。

这种写法,download 的执行仍在主线程中,他只是函数式编程范式意义上的异步,而不涉及到真正的多线程。可以用这个实现惰性求值(lazy evaluation)之类。

int main() {
	std::future<int> fret = std::async(std::launch::deferred, [] {
		return download("hello.zip");
	});

	interact();

	int ret = fret.get();
	std::cout << "Download result: " << ret << std::endl;
	return 0;
}

5-wbkq.webp

std::async 的底层实现: std::promise

如果不想让 std::async 帮你自动创建线程,想要手动创建线程,可以直接用 std::promise

然后在线程返回的时候,用 set_value() 设置返回值。在主线程里,用 get_future() 获取其 std::future 对象,进一步 get() 可以等待并获取线程返回值。

int main() {
	std::promise<int> pret;
	std::thread t1([&] {
		auto ret = download("hello.zip");
		pret.set_value(ret);
	});
	std::future<int> fret = pret.get_future();

	interact();
	int ret = fret.get();
	std::cout << "Download result: " << ret << std::endl;
	
	t1.join();
	return 0;
}

std::future 小贴士

future 为了三五法则,删除了拷贝构造/赋值函数。如果需要浅拷贝,实现共享同一个 future 对象,可以用 std::shared_future

#include <iostream>
#include <thread>
#include <string>
#include <future>

void download(std::string file) {
	for (int i = 0; i < 10; ++i) {
		std::cout << "Downloading " << file
			<< " (" << i * 10 << "%)..." << std::endl;
		std::this_thread::sleep_for(std::chrono::milliseconds(400));
	}

	std::cout << "Download complete: " << file << std::endl;
}

void interact() {
	std::string name;
	std::cin >> name;
	std::cout << "Hi, " << name << std::endl;
}

int main() {
	std::shared_future<void> fret = std::async([] {
		download("hello.zip");
	});

	auto fret2 = fret;
	auto fret3 = fret;
	interact();
	fret3.wait();
	std::cout << "Download completed" << std::endl;
	return 0;
}

如果不需要返回值,std::async 里 lambda 的返回类型可以为 void, 这时 future 对象的类型为 std::future<void>。 同理有 std::promise<void>,他的 set_value() 不接受参数,仅仅作为同步用,不传递任何实际的值。

互斥量

数据竞争(data-race)

多个线程同时访问一个 vector。

#include <iostream>
#include <string>
#include <thread>
#include <vector>

int main() {
	std::vector<int> arr;
	std::thread t1([&] {
		for(int i = 0; i < 1000; ++i) {
			arr.push_back(1);
		}
	});
	
	std::thread t2([&] {
		for(int i = 0; i < 1000; ++i) {
			arr.push_back(2);
		}
	});
	t1.join();
	t2.join();
	return 0;
}

6-nonf.webp

std::mutex

调用 std::mutexlock() 时,会检测 mutex 是否已经上锁。如果没有锁定,则对 mutex 进行上锁。如果已经锁定,则陷入等待,直到 mutex 被另一个线程解锁后,才再次上锁。而调用 unlock() 则会进行解锁操作。这样,就可以保证 mtx.lock()mtx.unlock() 之间的代码段,同一时间只有一个线程在执行,从而避免数据竞争。

#include <iostream>
#include <string>
#include <thread>
#include <vector>
#include <mutex>

int main() {
	std::vector<int> arr;
	std::mutex mtx;
	std::thread t1([&] {
		for (int i = 0; i < 1000; ++i) {
			mtx.lock();
			arr.push_back(1);
			mtx.unlock();
		}
	});

	std::thread t2([&] {
		for (int i = 0; i < 1000; ++i) {
			mtx.lock();
			arr.push_back(2);
			mtx.unlock();
		}
	});

	t1.join();
	t2.join();
	return 0;
}

std::lock_guard

根据 RAII 思想,可将锁的持有视为资源,上锁视为锁的获取,解锁视为锁的释放。std::lock_guard 就是这样一个工具类,他的构造函数里会调用 mtx.lock(),解构函数会调用 mtx.unlock()。从而退出函数作用域时能够自动解锁,避免程序员粗心不小心忘记解锁。

#include <iostream>
#include <string>
#include <thread>
#include <vector>
#include <mutex>

int main() {
	std::vector<int> arr;
	std::mutex mtx;
	std::thread t1([&] {
		for (int i = 0; i < 1000; ++i) {
			std::lock_guard grd(mtx);
			arr.push_back(1);
		}
	});

	std::thread t2([&] {
		for (int i = 0; i < 1000; ++i) {
			std::lock_guard grd(mtx);
			arr.push_back(2);
		}
	});

	t1.join();
	t2.join();
	return 0;
}

std::unique_lock

std::lock_guard 严格在解构时 unlock(),但是有时候我们会希望提前 unlock()。这时可以用 std::unique_lock,他额外存储了一个 flag 表示是否已经被释放。他会在解构检测这个 flag,如果没有释放,则调用 unlock(),否则不调用。

然后可以直接调用 unique_lock 的 unlock() 函数来提前解锁,但是即使忘记解锁也没关系,退出作用域时候他还会自动检查一遍要不要解锁。

#include <iostream>
#include <string>
#include <thread>
#include <vector>
#include <mutex>

int main() {
	std::vector<int> arr;
	std::mutex mtx;
	std::thread t1([&] {
		for (int i = 0; i < 1000; ++i) {
			std::unique_lock grd(mtx);
			arr.push_back(1);
		}
	});

	std::thread t2([&] {
		for (int i = 0; i < 1000; ++i) {
			std::unique_lock grd(mtx);
			arr.push_back(2);
			grd.unlock();
			printf("outside of lock\\n");
			// grd.lock();		// 如果需要, 还可以重新上锁
		}
	});

	t1.join();
	t2.join();
	return 0;
}

std::unique_lock: 用 std::defer_lock 作为参数

std::unique_lock 的构造函数还可以有一个额外参数,那就是 std::defer_lock。指定了这个参数的话,std::unique_lock 不会在构造函数中调用 mtx.lock(),需要之后再手动调用 grd.lock() 才能上锁。

好处依然是即使忘记 grd.unlock() 也能够自动调用 mtx.unlock()

#include <iostream>
#include <string>
#include <thread>
#include <vector>
#include <mutex>

int main() {
	std::vector<int> arr;
	std::mutex mtx;
	std::thread t1([&] {
		for (int i = 0; i < 1000; ++i) {
			std::unique_lock grd(mtx);
			arr.push_back(1);
		}
	});

	std::thread t2([&] {
		for (int i = 0; i < 1000; ++i) {
			std::unique_lock grd(mtx, std::defer_lock);
			printf("before the lock\\n");
			grd.lock();
			arr.push_back(2);
			grd.unlock();
			printf("outside of lock\\n");
		}
	});

	t1.join();
	t2.join();
	return 0;
}

可以看一下 std::defer_lock_t,是个空的类,其实这种用一个空 tag 类来区分不同构造函数的思想在 C++ 中很常见,包括 std::inplace, std::piecewise_construct 等。

// msvc 中实现
struct defer_lock_t {
	explicit defer_lock_t() = default;
}

inline constexpr defer_lock_t defer_lock{};

多个对象?每个对象一个 mutex 即可

mtx1 用来锁定 arr1,mtx2 用来锁定 arr2。不同的对象,各有一个 mutex,独立地上锁,可以避免不必要的锁定,提升高并发时的性能。还用了一个 {} 包住 std::lock_guard,限制其变量的作用域,从而可以让他在 } 之前解构并调用 unlock(),也避免了和下面一个 lock_guard 变量名冲突。

#include <iostream>
#include <string>
#include <thread>
#include <vector>
#include <mutex>

int main() {
	std::vector<int> arr1;
	std::mutex mtx1;

	std::vector<int> arr2;
	std::mutex mtx2;

	std::thread t1([&] {
		for (int i = 0; i < 1000; ++i) {
			{
				std::lock_guard grd(mtx1);
				arr1.push_back(1);
			}

			{
				std::lock_guard grd(mtx2);
				arr2.push_back(1);
			}
		}
	});

	std::thread t2([&] {
		for (int i = 0; i < 1000; ++i) {
			{
				std::lock_guard grd(mtx1);
				arr1.push_back(2);
			}
			{
				std::lock_guard grd(mtx2);
				arr2.push_back(2);
			}
		}
	});

	t1.join();
	t2.join();
	return 0;
}

try_lock()

lock() 如果发现 mutex 已经上锁的话,会等待它直到它解锁。也可以用无阻塞的 try_lock(),它在上锁失败时不会陷入等待,而是直接返回 false;如果上锁成功,则会返回 true。

#include <cstdio>
#include <mutex>

std::mutex mtx1;

int main() {
	if (mtx1.try_lock())
		printf("succeed\n");
	else
		printf("failed\n");

	if (mtx1.try_lock())
		printf("succeed\n");
	else
		printf("failed\n");
	mtx1.unlock();

	return 0;
}

7-xuhl.webp

try_lock_for()

try_lock() 碰到已经上锁的情况,会立即返回 false。 如果需要等待,但仅限一段时间,可以用 std::timed_mutex 的 try_lock_for() 函数,它的参数是最长等待时间,同样是由 chrono 指定时间单位。超过这个时间还没成功就会“不耐烦地”失败并返回 false;如果这个时间内上锁成功则返回 true。

同理还有接受时间点的 try_lock_until()

#include <cstdio>
#include <mutex>

std::timed_mutex mtx1;

int main() {
	if (mtx1.try_lock_for(std::chrono::milliseconds(500)))
		printf("succeed\\n");
	else
		printf("failed\\n");

	if (mtx1.try_lock_for(std::chrono::milliseconds(500)))
		printf("succeed\\n");
	else
		printf("failed\\n");
}

std::unique_lock: 用 std::try_to_lock 做参数

和无参数相比,他会调用 mtx1.try_lock() 而不是 mtx1.lock()。之后,可以用 grd.owns_lock() 判断是否上锁成功。

#include <cstdio>
#include <thread>
#include <mutex>

int main() {
	std::mutex mtx;
	std::thread t1([&] {
		std::unique_lock grd(mtx, std::try_to_lock);
		if (grd.owns_lock())
			printf("t1 success\n");
		else
			printf("t1 failed\n");
		std::this_thread::sleep_for(std::chrono::milliseconds(1000));
	});

	std::thread t2([&] {
		std::unique_lock grd(mtx, std::try_to_lock);
		if (grd.owns_lock())
			printf("t2 success\n");
		else
			printf("t2 failed\n");
		std::this_thread::sleep_for(std::chrono::milliseconds(1000));
	});

	t1.join();
	t2.join();
	return 0;
}

8-fzsm.webp

std::unique_lock: 用 std::adopt_lock 做参数

如果当前 mutex 已经上锁了,但是之后仍然希望用 RAII 思想在解构时候自动调用 unlock(),可以用 std::adopt_lock 作为 std::unique_lockstd::lock_guard 的第二个参数,这时他们会默认 mtx 已经上锁。

#include <cstdio>
#include <thread>
#include <mutex>

int main() {
	std::mutex mtx;
	std::thread t1([&] {
		std::unique_lock grd(mtx);
		printf("t1 owns the lock\\n");
		std::this_thread::sleep_for(std::chrono::milliseconds(1000));
	});

	std::thread t2([&] {
		mtx.lock();
		std::unique_lock grd(mtx, std::adopt_lock);
		printf("t2 owns the lock\\n");
		std::this_thread::sleep_for(std::chrono::milliseconds(1000));
	});

	t1.join();
	t2.join();
	return 0;
}

std::unique_lock 和 std::mutex 具有同样的接口

其实 std::unique_lock 具有 mutex 的所有成员函数:lock(), unlock(), try_lock(), try_lock_for() 等。除了他会在解构时按需自动调用 unlock()

因为 std::lock_guard 无非是调用其构造参数名为 lock() 的成员函数,所以 std::unique_lock 也可以作为 std::lock_guard 的构造参数! 这种只要具有某些指定名字的成员函数,就判断一个类是否满足某些功能的思想,在 Python 称为鸭子类型,而 C++ 称为 concept(概念)。比起虚函数和动态多态的接口抽象,concept 使实现和接口更加解耦合且没有性能损失。

#include <cstdio>
#include <thread>
#include <mutex>

int main() {
	std::mutex mtx;
	std::thread t1([&] {
		std::unique_lock grd(mtx, std::defer_lock);
		std::lock_guard grd2(grd);
		printf("t1 owns the lock\\n");
		std::this_thread::sleep_for(std::chrono::milliseconds(1000));
	});

	std::thread t2([&] {
		std::unique_lock grd(mtx, std::defer_lock);
		std::lock_guard grd2(grd);
		printf("t2 owns the lock\\n");
		std::this_thread::sleep_for(std::chrono::milliseconds(1000));
	});

	t1.join();
	t2.join();
	return 0;
}

死锁

同时锁住多个 mutex

由于同时执行的两个线程,他们中发生的指令不一定是同步的,因此有可能出现这种情况:

  1. t1 执行 mtx1.lock();

  2. t2 执行 mtx2.lock();

  3. t1 执行 mtx2.lock():失败,陷入等待;

  4. t2 执行 mtx1.lock():失败,陷入等待。

双方都在等着对方释放锁,但是因为等待而无法释放锁,从而要无限制等下去。这种现象称为死锁(dead-lock)。

#include <iostream>
#include <string>
#include <thread>
#include <mutex>

int main() {
	std::mutex mtx1;
	std::mutex mtx2;

	std::thread t1([&] {
		for (int i = 0; i < 1000; ++i) {
			mtx1.lock();
			mtx2.lock();
			mtx2.unlock();
			mtx1.unlock();
		}
	});

	std::thread t2([&] {
		for (int i = 0; i < 1000; ++i) {
			mtx2.lock();
			mtx1.lock();
			mtx1.unlock();
			mtx2.unlock();
		}
	});
	t1.join();
	t2.join();

	return 0;
}

解决 1: 永远不要同时持有两个锁

最为简单的方法,就是一个线程永远不要同时持有两个锁,分别上锁,这样也可以避免死锁。

因此这里双方都在 mtx1.unlock() 之后才 mtx2.lock(),从而也不会出现一方等着对方的同时持有了对方等着的锁的情况。

#include <iostream>
#include <string>
#include <thread>
#include <mutex>

int main() {
	std::mutex mtx1;
	std::mutex mtx2;

	std::thread t1([&] {
		for (int i = 0; i < 1000; ++i) {
			mtx1.lock();
			mtx1.unlock();
			mtx2.lock();
			mtx2.unlock();
		}
	});

	std::thread t2([&] {
		for (int i = 0; i < 1000; ++i) {
			mtx2.lock();
			mtx2.unlock();
			mtx1.lock();
			mtx1.unlock();
		}
	});
	t1.join();
	t2.join();

	return 0;
}

解决 2: 保证双方上锁顺序一致

其实,只需保证双方上锁的顺序一致,即可避免死锁。因此这里调整 t2 也变为先锁 mtx1,再锁 mtx2。这时,无论实际执行顺序是怎样,都不会出现一方等着对方的同时持有了对方等着的锁的情况。

#include <iostream>
#include <string>
#include <thread>
#include <mutex>

int main() {
	std::mutex mtx1;
	std::mutex mtx2;

	std::thread t1([&] {
		for (int i = 0; i < 1000; ++i) {
			mtx1.lock();
			mtx2.lock();
			mtx2.unlock();
			mtx1.unlock();
		}
	});

	std::thread t2([&] {
		for (int i = 0; i < 1000; ++i) {
			mtx1.lock();
			mtx2.lock();
			mtx2.unlock();
			mtx1.unlock();
		}
	});
	t1.join();
	t2.join();

	return 0;
}

解决 3: 用 std::lock 同时对多个锁上锁

如果没办法保证上锁顺序一致,可以用标准库的 std::lock(mtx1, mtx2, ...) 函数,一次性对多个 mutex 上锁。它接受任意多个 mutex 作为参数,并且他保证在无论任意线程中调用的顺序是否相同,都不会产生死锁问题。

#include <iostream>
#include <string>
#include <thread>
#include <mutex>

int main() {
	std::mutex mtx1;
	std::mutex mtx2;

	std::thread t1([&] {
		for (int i = 0; i < 1000; ++i) {
			std::lock(mtx1, mtx2);
			mtx1.unlock();
			mtx2.unlock();
		}
	});

	std::thread t2([&] {
		for (int i = 0; i < 1000; ++i) {
			std::lock(mtx2, mtx1);
			mtx2.unlock();
			mtx1.unlock();
		}
	});

	t1.join();
	t2.join();
	return 0;
}

std::lock 的 RAII 版本: std::scoped_lock

和 std::lock_guard 相对应,std::lock 也有 RAII 的版本 std::scoped_lock。只不过它可以同时对多个 mutex 上锁。

#include <iostream>
#include <string>
#include <thread>
#include <mutex>

int main() {
	std::mutex mtx1;
	std::mutex mtx2;

	std::thread t1([&] {
		for (int i = 0; i < 1000; ++i) {
			std::scoped_lock grd(mtx1, mtx2);
		}
	});

	std::thread t2([&] {
		for (int i = 0; i < 1000; ++i) {
			std::scoped_lock grd(mtx2, mtx1);
		}
	});

	t1.join();
	t2.join();
	return 0;
}

同一个线程重复调用 lock() 也会造成死锁

除了两个线程同时持有两个锁会造成死锁外,即使只有一个线程一个锁,如果 lock() 以后又调用 lock(),也会造成死锁。

比如右边的 func 函数,上了锁之后,又调用了 other 函数,它也需要上锁。而 other 看到 mtx1 已经上锁,还以为是别的线程上的锁,于是陷入等待。殊不知是调用他的 func 上的锁,other 陷入等待后 func 里的 unlock() 永远得不到调用。

#include <iostream>
#include <mutex>

std::mutex mtx1;

void other() {
	mtx1.lock();

	mtx1.unlock();
}

void func() {
	mtx1.lock();
	other();
	mtx1.unlock();
}

int main() {
	func();
	return 0;
}

解决1:other 里不要再上锁

遇到这种情况最好是把 other 里的 lock() 去掉,并在其文档中说明:”other 不是线程安全的,调用本函数之前需要保证某 mutex 已经上锁。”

#include <iostream>
#include <mutex>

std::mutex mtx1;

void other() {

}

void func() {
	mtx1.lock();
	other();
	mtx1.unlock();
}

int main() {
	func();
	return 0;
}

解决2:改用 std::recursive_mutex

如果实在不能改的话,可以用 std::recursive_mutex。他会自动判断是不是同一个线程 lock() 了多次同一个锁,如果是则让计数器加1,之后 unlock() 会让计数器减1,减到0时才真正解锁。但是相比普通的 std::mutex 有一定性能损失。

同理还有 std::recursive_timed_mutex,如果你同时需要 try_lock_for() 的话。

#include <iostream>
#include <mutex>

std::recursive_mutex mtx1;

void other() {
	mtx1.lock();

	mtx1.unlock();
}

void func() {
	mtx1.lock();
	other();
	mtx1.unlock();
}

int main() {
	func();
	return 0;
}

数据结构

封装一个线程安全的 vector

#include <iostream>
#include <thread>
#include <vector>
#include <mutex>

class MTVector {
	std::vector<int> m_arr;
	mutable std::mutex m_mtx;

public:
	void push_back(int val) {
		m_mtx.lock();
		m_arr.push_back(val);
		m_mtx.unlock();
	}

	size_t size() const {
		m_mtx.lock();
		size_t ret = m_arr.size();
		m_mtx.unlock();
		return ret;
	}
};

int main() {
	MTVector arr;

	std::thread t1([&]() {
		for (int i = 0; i < 1000; ++i) {
			arr.push_back(i);
		}
	});

	std::thread t2([&]() {
		for (int i = 0; i < 1000; ++i) {
			arr.push_back(1000 + i);
		}
	});

	t1.join();
	t2.join();

	std::cout << arr.size() << std::endl;
	return 0;
}

读写锁

读可以共享,写必须独占,且写和读不能共存。

针对这种更具体的情况,又发明了读写锁,他允许的状态有:

  1. n个人读取,没有人写入;

  2. 1个人写入,没有人读取;

  3. 没有人读取,也没有人写入。

为此,标准库提供了 std::shared_mutex,上锁时,要指定你的需求是写入还是读取,负责调度的读写锁会帮你判断要不要等待。这里 push_back() 需要修改数据,因需求此为写入,使用 lock()unlock() 的组合。而 size() 则只要读取数据,不修改数据,因此可以和别人共享,使用 lock_shared()unlock_shared() 的组合。

#include <iostream>
#include <thread>
#include <vector>
#include <shared_mutex>

class MTVector {
	std::vector<int> m_arr;
	mutable std::shared_mutex m_mtx;

public:
	void push_back(int val) {
		m_mtx.lock();
		m_arr.push_back(val);
		m_mtx.unlock();
	}

	size_t size() const {
		m_mtx.lock_shared();
		size_t ret = m_arr.size();
		m_mtx.unlock_shared();
		return ret;
	}
};

int main() {
	MTVector arr;

	std::thread t1([&]() {
		for (int i = 0; i < 1000; ++i) {
			arr.push_back(i);
		}
	});

	std::thread t2([&]() {
		for (int i = 0; i < 1000; ++i) {
			arr.push_back(1000 + i);
		}
	});

	t1.join();
	t2.join();

	std::cout << arr.size() << std::endl;
	return 0;
}

符合 RAII 思想的 lock_shared(): std::shared_lock

std::unique_lock 针对 lock(),也可以用 std::shared_lock 针对 lock_shared()。这样就可以在函数体退出时自动调用 unlock_shared(),更加安全了。 shared_lock 同样支持 defer_lock 做参数,owns_lock() 判断等

#include <iostream>
#include <thread>
#include <vector>
#include <shared_mutex>

class MTVector {
	std::vector<int> m_arr;
	mutable std::shared_mutex m_mtx;

public:
	void push_back(int val) {
		std::unique_lock grd(m_mtx);
		m_arr.push_back(val);
	}

	size_t size() const {
		std::shared_lock grd(m_mtx);
		return m_arr.size();
	}
};

int main() {
	MTVector arr;

	std::thread t1([&]() {
		for (int i = 0; i < 1000; ++i) {
			arr.push_back(i);
		}
	});

	std::thread t2([&]() {
		for (int i = 0; i < 1000; ++i) {
			arr.push_back(1000 + i);
		}
	});

	t1.join();
	t2.join();

	std::cout << arr.size() << std::endl;
	return 0;
}

访问者模式

Accessor 或者说 Viewer 模式,常用用于设计 GPU 容器的数据结构访问。

#include <vector>
#include <mutex>
#include <iostream>

class MTVector {
	std::vector<int> m_arr;
	std::mutex m_mtx;

public:
	class Accessor {
		MTVector& m_that;
		std::unique_lock<std::mutex> m_guard;

	public:
		Accessor(MTVector &that)
			: m_that(that), m_guard(that.m_mtx)
		{}

		void push_back(int val) const {
			return m_that.m_arr.push_back(val);
		}

		size_t size() const {
			return m_that.m_arr.size();
		}
	};

	Accessor access() {
		return { *this };
	}
};

int main() {
	MTVector arr;

	std::thread t1([&]() {
		auto axr = arr.access();
		for (int i = 0; i < 1000; ++i) {
			axr.push_back(i);
		}
	});

	std::thread t2([&]() {
		auto axr = arr.access();
		for (int i = 0; i < 1000; ++i) {
			axr.push_back(1000 + i);
		}
	});

	t1.join();
	t2.join();

	std::cout << arr.access().size() << std::endl;
	return 0;
}

条件变量

等待被唤醒

cv.wait(lck) 将会让当前线程陷入等待。在其他线程中调用 cv.notify_one() 则会唤醒那个陷入等待的线程。可以发现 std::condition_variable 必须和 std::unique_lockstd::mutex一起用,稍后会解释原因。

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>

int main() {
	std::condition_variable cv;
	std::mutex mtx;

	std::thread t1([&] {
		std::unique_lock lck(mtx);
		cv.wait(lck);

		std::cout << "t1 is awake" << std::endl;
	});

	std::this_thread::sleep_for(std::chrono::milliseconds(400));

	std::cout << "notifying..." << std::endl;
	cv.notify_one();		// will awake t1

	t1.join();
}

9-furx.webp

条件变量:等待某一条件成真

还可以额外指定一个参数,变成 cv.wait(lck, expr) 的形式,其中 expr 是个 lambda 表达式,只有其返回值为 true 时才会真正唤醒,否则继续等待。

#include <iostream>
#include <thread>
#include <vector>
#include <mutex>
#include <condition_variable>

int main() {
	std::condition_variable cv;
	std::mutex mtx;
	bool ready = false;

	std::thread t1([&] {
		std::unique_lock lck(mtx);
		cv.wait(lck, [&] { return ready; });
		lck.unlock();

		std::cout << "t1 is awake" << std::endl;
	});

	std::cout << "notifying not ready" << std::endl;
	cv.notify_one();

	ready = true;
	std::cout << "notifying ready" << std::endl;
	cv.notify_one();

	t1.join();
	return 0;
}

条件变量:多个等待者

cv.notify_one() 只会唤醒其中一个等待中的线程,而 cv.notify_all() 会唤醒全部。这就是为什么 wait() 需要一个 unique_lock 作为参数,因为要保证多个线程被唤醒时,只有一个能够被启动。如果不需要,在 wait() 返回后调用 lck.unlock() 即可。

顺便一提,wait() 的过程中会暂时 unlock() 这个锁。

#include <iostream>
#include <thread>
#include <vector>
#include <mutex>
#include <condition_variable>

int main() {
	std::condition_variable cv;
	std::mutex mtx;

	std::thread t1([&] {
		std::unique_lock lck(mtx);
		cv.wait(lck);
		std::cout << "t1 is awake" << std::endl;
	});

	std::thread t2([&] {
		std::unique_lock lck(mtx);
		cv.wait(lck);
		std::cout << "t2 is awake" << std::endl;
	});

	std::thread t3([&] {
		std::unique_lock lck(mtx);
		cv.wait(lck);
		std::cout << "t3 is awake" << std::endl;
	});

	std::this_thread::sleep_for(std::chrono::milliseconds(400));

	std::cout << "notifying one" << std::endl;
	cv.notify_one();   // 随机唤醒一个

	std::this_thread::sleep_for(std::chrono::milliseconds(400));

	std::cout << "notifying all" << std::endl;
	cv.notify_all();

	t1.join();
	t2.join();
	t3.join();
	return 0;
}

10-jhsx.webp

条件变量:等待某一条件成真

还可以额外指定一个参数,变成 cv.wait(lck, expr) 的形式,其中 expr 是个 lambda 表达式,只有其返回值为 true 时才会真正唤醒,否则继续等待。

#include <iostream>
#include <thread>
#include <vector>
#include <mutex>
#include <condition_variable>

int main() {
	std::condition_variable cv;
	std::mutex mtx;
	bool ready = false;

	std::thread t1([&] {
		std::unique_lock lck(mtx);
		cv.wait(lck, [&] { return ready; });
		lck.unlock();

		std::cout << "t1 is awake" << std::endl;
	});

	std::cout << "notifying not ready" << std::endl;
	cv.notify_one();

	ready = true;
	std::cout << "notifying ready" << std::endl;
	cv.notify_one();

	t1.join();
	return 0;
}

条件变量:多个等待者

cv.notify_one() 只会唤醒其中一个等待中的线程,而 cv.notify_all() 会唤醒全部。这就是为什么 wait() 需要一个 unique_lock 作为参数,因为要保证多个线程被唤醒时,只有一个能够被启动。如果不需要,在 wait() 返回后调用 lck.unlock() 即可。

顺便一提,wait() 的过程中会暂时 unlock() 这个锁。

#include <iostream>
#include <thread>
#include <vector>
#include <mutex>
#include <condition_variable>

int main() {
	std::condition_variable cv;
	std::mutex mtx;

	std::thread t1([&] {
		std::unique_lock lck(mtx);
		cv.wait(lck);
		std::cout << "t1 is awake" << std::endl;
	});

	std::thread t2([&] {
		std::unique_lock lck(mtx);
		cv.wait(lck);
		std::cout << "t2 is awake" << std::endl;
	});

	std::thread t3([&] {
		std::unique_lock lck(mtx);
		cv.wait(lck);
		std::cout << "t3 is awake" << std::endl;
	});

	std::this_thread::sleep_for(std::chrono::milliseconds(400));

	std::cout << "notifying one" << std::endl;
	cv.notify_one();   // 随机唤醒一个

	std::this_thread::sleep_for(std::chrono::milliseconds(400));

	std::cout << "notifying all" << std::endl;
	cv.notify_all();

	t1.join();
	t2.join();
	t3.join();
	return 0;
}

11-rahs.webp

生成者-消费者模式

#include <iostream>
#include <thread>
#include <vector>
#include <mutex>
#include <condition_variable>

int main() {
	std::condition_variable cv;
	std::mutex mtx;

	std::vector<int> foods;

	std::thread t1([&] {
		for (int i = 0; i < 2; ++i) {
			std::unique_lock lck(mtx);
			cv.wait(lck, [&] {
				return foods.size() != 0;
			});

			auto food = foods.back();
			foods.pop_back();
			lck.unlock();

			std::cout << "t1 got food:" << food << std::endl;
		}
	});

	std::thread t2([&] {
		for (int i = 0; i < 2; ++i) {
			std::unique_lock lck(mtx);
			cv.wait(lck, [&] {
				return foods.size() != 0;
			});

			auto food = foods.back();
			foods.pop_back();
			lck.unlock();

			std::cout << "t2 got food:" << food << std::endl;
		}
	});

	foods.push_back(42);
	cv.notify_one();
	foods.push_back(233);
	cv.notify_one();

	foods.push_back(666);
	foods.push_back(4399);
	cv.notify_all();

	t1.join();
	t2.join();

	return 0;
}

12-jrpb.webp

封装成类

#include <iostream>
#include <condition_variable>
#include <vector>
#include <mutex>

template <class T>
class MTQueue {
	std::condition_variable m_cv;
	std::mutex m_mtx;
	std::vector<T> m_arr;

public:
	T pop() {
		std::unique_lock lck(m_mtx);
		m_cv.wait(lck, [this] { return !m_arr.empty(); });
		T ret = std::move(m_arr.back());
		m_arr.pop_back();
		return ret;
	}

	auto pop_hold() {
		std::unique_lock lck(m_mtx);
		m_cv.wait(lck, [this] { return !m_arr.empty(); });
		T ret = std::move(m_arr.back());
		m_arr.pop_back();
		return std::pair(std::move(ret), std::move(lck));
	}

	void push(T val) {
		std::unique_lock lck(m_mtx);
		m_arr.push_back(std::move(val));
		m_cv.notify_one();
	}

	void push_many(std::initializer_list<T> vals) {
		std::unique_lock lck(m_mtx);
		std::copy(
			std::move_iterator(vals.begin()),
			std::move_iterator(vals.end()),
			std::back_insert_iterator(m_arr));
		m_cv.notify_all();
	}
};

int main() {
	MTQueue<int> foods;

	std::thread t1([&] {
		for (int i = 0; i < 2; ++i) {
			auto food = foods.pop();
			std::cout << "t1 got food:" << food << std::endl;
		}
	});

	std::thread t2([&] {
		for (int i = 0; i < 2; ++i) {
			auto food = foods.pop();
			std::cout << "t2 got food:" << food << std::endl;
		}
	});

	foods.push(42);
	foods.push(233);
	foods.push_many({ 666, 4399 });

	t1.join();
	t2.join();

	return 0;
}

13-rvtp.webp

std::condition_variable 小贴士

  1. std::condition_variable 仅仅支持 std::unique_lockstd::mutex 作为 wait 的参数,如果需要用其他类型的 mutex 锁,可以用 std::condition_variable_any

  2. wait_for()wait_until() 函数,分别接受 chrono 时间段和时间点作为参数。详见:https://en.cppreference.com/w/cpp/thread/condition_variable/wait_for。

原子操作

多个线程修改同一个计数器

多个线程同时往一个 int 变量里累加,这样肯定会出错,因为 counter += i 在 CPU 看来会变成三个指令:

  1. 读取 counter 变量到 rax 寄存器;

  2. rax 寄存器的值加上 1;

  3. 把 rax 写入到 counter 变量。

即使编译器优化成 add [counter], 1 也没用,因为现代 CPU 为了高效,使用了大量奇技淫巧,比如他会把一条汇编指令拆分成很多微指令 (micro-ops),三个甚至有点保守估计了。

#include <iostream>
#include <thread>

int main() {
	int counter = 0;

	std::thread t1([&] {
		for (int i = 0; i < 10000; ++i) {
			counter += 1;
		}
	});

	std::thread t2([&] {
		for (int i = 0; i < 10000; ++i) {
			counter += 1;
		}
	});

	t1.join();
	t2.join();

	std::cout << counter << std::endl;
	return 0;
}

有多个线程同时运行,顺序是不确定的:

  1. t1:读取 counter 变量,到 rax 寄存器;

  2. t2:读取 counter 变量,到 rax 寄存器;

  3. t1:rax 寄存器的值加上 1;

  4. t2:rax 寄存器的值加上 1;

  5. t1:把 rax 写入到 counter 变量;

  6. t2:把 rax 写入到 counter 变量。

如果是这种顺序,最后 t1 的写入就被 t2 覆盖了,从而 counter 只增加了 1,而没有像预期的那样增加 2。

更不用说现代 CPU 还有高速缓存,乱序执行,指令级并行等优化策略,你根本不知道每条指令实际的先后顺序。

用 mutex 上锁

这样的确可以防止多个线程同时修改 counter 变量,从而不会冲突。

问题:mutex 太过重量级,他会让线程被挂起,从而需要通过系统调用,进入内核层,调度到其他线程执行,有很大的开销。

可我们只是想要修改一个小小的 int 变量而已,用昂贵的 mutex 严重影响了效率。

atomic:有专门的硬件指令加持

可以用更轻量级的 atomic,对他的 += 等操作,会被编译器转换成专门的指令。

lock xadd %eax, (%rdx)

CPU 识别到该指令时,会锁住内存总线,放弃乱序执行等优化策略(将该指令视为一个同步点,强制同步掉之前所有的内存操作),从而向你保证该操作是原子 (atomic) 的(取其不可分割之意),不会加法加到一半另一个线程插一脚进来。

对于程序员,只需把 int 改成 atomic<int> 即可,也不必像 mutex 那样需要手动上锁解锁,因此用起来也更直观。

#include <iostream>
#include <thread>
#include <atomic>

int main() {
	std::atomic<int> counter = 0;

	std::thread t1([&] {
		for (int i = 0; i < 10000; ++i) {
			counter += 1;
		}
	});

	std::thread t2([&] {
		for (int i = 0; i < 10000; ++i) {
			counter += 1;
		}
	});

	t1.join();
	t2.join();

	std::cout << counter << std::endl;
	return 0;
}

注意:请用 +=

counter = counter + 1;  // 错,不能保证原子性
counter += 1;           // OK,能保证原子性
counter++;              // OK,能保证原子性

fetch_add

除了用方便的运算符重载之外,还可以直接调用相应的函数名。

fetch_add 对应于 +=;

store 对应于 =;

load 用于读取其中的 int 值。

#include <iostream>
#include <thread>
#include <atomic>

int main() {
	std::atomic<int> counter;
	counter.store(0);

	std::thread t1([&] {
		for (int i = 0; i < 10000; ++i) {
			counter.fetch_add(1);
		}
	});

	std::thread t2([&] {
		for (int i = 0; i < 10000; ++i) {
			counter.fetch_add(1);
		}
	});

	t1.join();
	t2.join();

	std::cout << counter.load() << std::endl;
	return 0;
}

fetch_add 会返回其旧值

int old = atm.fetch_add(val);

除了会导致 atm 的值增加 val 外,还会返回 atm 增加前的值,存储到 old。 这个特点使得他可以用于并行地往一个列表里追加数据:追加写入的索引就是 fetch_add 返回的旧值。

当然这里也可以 counter++,不过要追加多个的话还是得用到 counter.fetch_add(n)。

#include <iostream>
#include <thread>
#include <atomic>
#include <vector>

int main() {
	std::atomic<int> counter;
	counter.store(0);
	std::vector<int> data(20000);

	std::thread t1([&] {
		for (int i = 0; i < 10000; ++i) {
			int index = counter.fetch_add(1);
			data[index] = i;
		}
	});

	std::thread t2([&] {
		for (int i = 0; i < 10000; ++i) {
			int index = counter.fetch_add(1);
			data[index] = i + 10000;
		}
	});

	t1.join();
	t2.join();

	std::cout << data[10000] << std::endl;
	return 0;
}

exchange:读取的同时写入

exchange(val) 会把 val 写入原子变量,同时返回其旧的值。

#include <iostream>
#include <atomic>

int main() {
	std::atomic<int> counter;
	counter.store(0);

	int old = counter.exchange(3);
	std::cout << "old=" << old << std::endl;

	int now = counter.load();
	std::cout << "cnt=" << now << std::endl;
	
	return 0;
}

compare_exchange_strong: 读取,比较是否相等,相等则写入

#include <iostream>
#include <atomic>

int main() {
	std::atomic<int> counter;

	counter.store(2);

	int old = 1;
	bool equal = counter.compare_exchange_strong(old, 3);
	std::cout << "equal=" << equal << std::endl;		// false
	std::cout << "old=" << old << std::endl;				// 2

	int now = counter.load();
	std::cout << "cnt=" << now << std::endl;				// 2

	old = 2;
	equal = counter.compare_exchange_strong(old, 3);
	std::cout << "equal=" << equal << std::endl;		// true
	std::cout << "old=" << old << std::endl;				// 2

	now = counter.load();
	std::cout << "cnt=" << now << std::endl;				// 3
	
	return 0;
}

伪代码

struct AtomicInt {
	int cnt;

	int store(int val) {
		cnt = val;
	}

	int load() const {
		return cnt;
	}

	int fetch_add(int val) {
		int old = cnt;
		cnt += val;
		return old;
	}

	bool compare_exchange_strong(int& old, int val) {
		if (cnt == old) {
			cnt = val;
			return true;
		}
		else {
			old = cnt;
			return false;
		}
	}
};

可以看到其中 compare_exchange_strong 的逻辑最为复杂,一般简称 CAS (compare-and-swap),他是并行编程最常用的原子操作之一。实际上任何 atomic 操作,包括 fetch_add,都可以基于 CAS 来实现。

https://www.bilibili.com/video/BV1Ya411q7y4/?spm_id_from=333.999.0.0&vd_source=22db36befcbe98b5c4cdb3d1d5f45f5d