真正让你感到绝望的不是远方的高山,而是鞋底的沙子。
时间
C time.h
获取从 1970 年 1 月 1 日到当前经过的秒数
long t0 = time(NULL);
让程序休眠 3 秒
sleep(3);
当前时间的三秒后
long t1 = t0 + 3;
让程序休眠 3000000 微秒,也就是 3 秒
usleep(3000000);
C 语言的 API,没有类型区分,容易弄错单位,混淆时间点和时间段。
比如 t0 * 3
,乘法对时间点而言根本是个无意义的计算,然而 C 语言把他们看做一样的 long 类型,从而容易让程序员犯错。
C++ std::chrono
利用 C++ 强类型的特点,明确区分时间点与时间段,明确区分不同的时间单位。
时间点例子:2022年1月8日 13点07分10秒
时间段例子:1分30秒
时间点类型:
chrono::steady_clock::time_point
等时间段类型:
chrono::milliseconds
,chrono::seconds
,chrono::minutes
等方便的运算符重载:时间点 + 时间段 = 时间点,时间点 - 时间点 = 时间段
auto t0 = chrono::steady_clock::now(); // 获取当前时间点
auto t1 = t0 + chrono::seconds(30); // 当前时间点的 30 秒后
auto dt = t1 - t0; // 获取两个时间点的差(时间段)
int64_t sec = chrono::duration_cast<chrono::seconds>(dt).count(); // 时间差的秒数
计算花费的时间
#include <iostream>
#include <chrono>
int main() {
auto t0 = std::chrono::steady_clock::now();
for (volatile int i = 0; i < 10000000; ++i);
auto t1 = std::chrono::steady_clock::now();
auto dt = t1 - t0;
int64_t ms = std::chrono::duration_cast<std::chrono::milliseconds>(dt).count();
std::cout << "time elapsed: " << ms << " ms" << std::endl;
return 0;
}
时间段:作为 double 类型
duration_cast
可以在任意的 duration 类型之间转换,duration<T, R>
表示用 T 类型,且时间单位是 R,R 省略不写就是秒,std::milli
就是毫秒,std::micro
就是微秒,seconds 是 duration<int64_t>
的类型别名,milliseconds 是 duration<int64_t, std::milli>
的类型别名,这里我们创建了 double_ms 作为 duration<double, std::milli>
的别名。
#include <iostream>
#include <thread>
#include <chrono>
int main() {
auto t0 = std::chrono::steady_clock::now();
for(volatile int i = 0; i < 10000000; ++i);
auto t1 = std::chrono::steady_clock::now();
auto dt = t1 - t0;
using double_ms = std::chrono::duration<double, std::milli>;
double ms = std::chrono::duration_cast<double_ms>(dt).count();
std::cout << "time elapsed: " << ms << " ms" << std::endl;
return 0;
}
跨平台的 sleep: std::this_thread::sleep_for
可以用 std::this_thread::sleep_for
替代 Unix 类操作系统专有的 usleep
。他可以让当前线程休眠一段时间,然后继续。
而且单位也可以自己指定,比如这里 milliseconds 表示毫秒,也可以换成 microseconds 表示微妙,seconds 表示秒,chrono 的强类型让单位选择更自由。
#include <iostream>
#include <thread>
#include <chrono>
int main() {
std::this_thread::sleep_for(std::chrono::milliseconds(400));
return 0;
}
睡到时间点:std::this_thread::sleep_until
除了接受一个时间段的 sleep_for,还有接受一个时间点的 sleep_until,表示让当前线程休眠直到某个时间点。
#include <iostream>
#include <thread>
#include <chrono>
int main() {
auto t = std::chrono::steady_clock::now() + std::chrono::milliseconds(400);
std::this_thread::sleep_until(t);
return 0;
}
线程
std::thread
std::thread
构造函数的参数可以是任意 lambda 表达式。当那个线程启动时,就会执行这个 lambda 里的内容。
#include <iostream>
#include <thread>
#include <string>
void download(std::string file) {
for (int i = 0; i < 10; ++i) {
std::cout << "Downloading " << file
<< " (" << i * 10 << "%)..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(400));
}
std::cout << "Download complete: " << file << std::endl;
}
void interact() {
std::string name;
std::cin >> name;
std::cout << "Hi, " << name << std::endl;
}
int main() {
std::thread t1([] {
download("hello.zip");
});
interact();
return 0;
}
注意:在 linux 下可能会出现链接错误,找不到符号 pthread_create
。
std::thread 的实现背后是基于 pthread 的。
解决:CMakeLists.txt 里链接 Threads::Threads 即可:
cmake_minimum_required(VERSION 3.10)
set(CMAKE_CXX_STANDARD 17)
project(cpptest LANGUAGES CXX)
add_executable(cpptest main.cpp)
find_package(Threads REQUIRED)
target_link_libraries(cpptest PUBLIC Threads::Threads)
新的问题:在输入之后,用户交互所在的主线程退出后,文件下载所在的子线程,因为从属这个主线程,也被迫退出了。
主线程等待子线程结束:t1.join()
#include <iostream>
#include <thread>
#include <string>
void download(std::string file) {
for (int i = 0; i < 10; ++i) {
std::cout << "Downloading " << file << " (" << i * 10 << "%)..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(400));
}
std::cout << "Download complete: " << file << std::endl;
}
void interact() {
std::string name;
std::cin >> name;
std::cout << "Hi, " << name << std::endl;
}
int main() {
std::thread t1([] {
download("hello.zip");
});
interact();
std::cout << "Waiting for child thread..." << std::endl;
t1.join();
std::cout << "Child thread exited!" << std::endl;
return 0;
}
std::thread 的解构函数会销毁线程
作为一个 C++ 类,std::thread
同样遵循 RAII 思想和三五法则:因为管理着资源,它自定义了解构函数,删除了拷贝构造/赋值函数,但是提供了移动构造/赋值函数。
因此当 t1 所在的函数退出时,就会调用 std::thread
的解构函数,这会销毁 t1 线程。
即线程示例 1 会出错。
解构函数不再销毁线程:t1.detach()
解决方案:调用成员函数 detach()
分离该线程——意味着线程的生命周期不再由当前 std::thread 对象管理,而是在线程退出以后自动销毁自己。
不过这样还是会在进程退出时候自动退出。
解构函数不再销毁线程:移动到全局线程池
但是 detach 的问题是进程退出时候不会等待所有子线程执行完毕。所以另一种解法是把 t1 对象移动到一个全局变量去,从而延长其生命周期到 myfunc 函数体外。
#include <iostream>
#include <thread>
#include <string>
#include <vector>
void download(std::string file) {
for (int i = 0; i < 10; ++i) {
std::cout << "Downloading " << file
<< " (" << i * 10 << "%)..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(400));
}
std::cout << "Download complete: " << file << std::endl;
}
void interact() {
std::string name;
std::cin >> name;
std::cout << "Hi, " << name << std::endl;
}
std::vector<std::thread> pool;
void myFunc() {
std::thread t1([] {
download("hello.zip");
});
pool.push_back(std::move(t1));
}
int main() {
myFunc();
interact();
for (auto& t : pool) t.join();
return 0;
}
main 函数退出后自动 join 全部线程
但是需要在 main 里面手动 join 全部线程还是有点麻烦,我们可以自定义一个类 ThreadPool,并用他创建一个全局变量,其解构函数会在 main 退出后自动调用。
#include <iostream>
#include <thread>
#include <string>
#include <vector>
void download(std::string file) {
for (int i = 0; i < 10; ++i) {
std::cout << "Downloading " << file
<< " (" << i * 10 << "%)..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(400));
}
std::cout << "Download complete: " << file << std::endl;
}
void interact() {
std::string name;
std::cin >> name;
std::cout << "Hi, " << name << std::endl;
}
class ThreadPool {
std::vector<std::thread> m_pool;
public:
void push_back(std::thread thr) {
m_pool.push_back(std::move(thr));
}
~ThreadPool() {
for (auto& t : m_pool) t.join();
}
};
ThreadPool tpool;
void myFunc() {
std::thread t1([] {
download("hello.zip");
});
tpool.push_back(std::move(t1));
}
int main() {
myFunc();
interact();
return 0;
}
std::jthread: 符合 RAII 思想,解构时自动 join()
C++20 引入了 std::jthread 类,和 std::thread 不同在于:他的解构函数里会自动调用 join() 函数,从而保证 pool 解构时会自动等待全部线程执行完毕。
#include <iostream>
#include <thread>
#include <string>
#include <vector>
void download(std::string file) {
for (int i = 0; i < 10; ++i) {
std::cout << "Downloading " << file << " (" << i * 10 << "%)..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(400));
}
std::cout << "Download complete: " << file << std::endl;
}
void interact() {
std::string name;
std::cin >> name;
std::cout << "Hi, " << name << std::endl;
}
std::vector<std::jthread> pool;
void myFunc() {
std::jthread t1([] {
download("hello.zip");
});
pool.push_back(std::move(t1));
}
int main() {
myFunc();
interact();
return 0;
}
异步
std::async
std::async
接受一个带返回值的 lambda,自身返回一个 std::future
对象。lambda 的函数体将会在另一个线程里执行。接下来你可以在 main 里面做一些别的事情,download 会持续在后台悄悄运行。最后调用 future 的 get()
方法,如果此时 download 还没完成,会等待 download 完成,并获取 download 的返回值。
#include <iostream>
#include <thread>
#include <string>
#include <future>
int download(std::string file) {
for (int i = 0; i < 10; ++i) {
std::cout << "Downloading " << file
<< " (" << i * 10 << "%)..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(400));
}
std::cout << "Download complete: " << file << std::endl;
return 404;
}
void interact() {
std::string name;
std::cin >> name;
std::cout << "Hi, " << name << std::endl;
}
int main() {
std::future<int> fret = std::async([] {
return download("hello.zip");
});
interact();
int ret = fret.get();
std::cout << "Download result: " << ret << std::endl;
return 0;
}
显示地等待:wait()
除了 get()
会等待线程执行完毕外,wait()
也可以等待他执行完,但是不会返回其值。
int main() {
std::future<int> fret = std::async([] {
return download("hello.zip");
});
interact();
std::cout << "Waiting for download complete..." << std::endl;
fret.wait();
std::cout << "Wait returned!" << std::endl;
int ret = fret.get();
std::cout << "Download result: " << ret << std::endl;
return 0;
}
等待一段时间:wait_for()
只要线程没有执行完,wait()
会无限等下去。而 wait_for()
则可以指定一个最长等待时间,用 chrono 里的类表示单位。他会返回一个 std::future_status
表示等待是否成功。如果超过这个时间线程还没有执行完毕,则放弃等待,返回 future_status::timeout
。如果线程在指定的时间内执行完毕,则认为等待成功,返回 future_status::ready
。同理还有 wait_until()
其参数是一个时间点。
#include <iostream>
#include <thread>
#include <string>
#include <future>
int download(std::string file) {
for (int i = 0; i < 10; ++i) {
std::cout << "Downloading " << file
<< " (" << i * 10 << "%)..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(400));
}
std::cout << "Download complete: " << file << std::endl;
return 404;
}
void interact() {
std::string name;
std::cin >> name;
std::cout << "Hi, " << name << std::endl;
}
int main() {
std::future<int> fret = std::async([] {
return download("hello.zip");
});
interact();
while (true) {
std::cout << "Waiting for download complete..." << std::endl;
auto stat = fret.wait_for(std::chrono::milliseconds(1000));
if (stat == std::future_status::ready) {
std::cout << "Future is ready!!" << std::endl;
break;
}
else {
std::cout << "Future not ready!!" << std::endl;
}
}
int ret = fret.get();
std::cout << "Download result: " << ret << std::endl;
return 0;
}
另一种用法:std::launch::deferred 做参数
std::async
的第一个参数可以设为 std::launch::deferred
,这时不会创建一个线程来执行,他只会把 lambda 函数体内的运算推迟到 future 的 get()
被调用时。也就是 main 中的 interact 计算完毕后。
这种写法,download 的执行仍在主线程中,他只是函数式编程范式意义上的异步,而不涉及到真正的多线程。可以用这个实现惰性求值(lazy evaluation)之类。
int main() {
std::future<int> fret = std::async(std::launch::deferred, [] {
return download("hello.zip");
});
interact();
int ret = fret.get();
std::cout << "Download result: " << ret << std::endl;
return 0;
}
std::async 的底层实现: std::promise
如果不想让 std::async
帮你自动创建线程,想要手动创建线程,可以直接用 std::promise
。
然后在线程返回的时候,用 set_value()
设置返回值。在主线程里,用 get_future()
获取其 std::future
对象,进一步 get()
可以等待并获取线程返回值。
int main() {
std::promise<int> pret;
std::thread t1([&] {
auto ret = download("hello.zip");
pret.set_value(ret);
});
std::future<int> fret = pret.get_future();
interact();
int ret = fret.get();
std::cout << "Download result: " << ret << std::endl;
t1.join();
return 0;
}
std::future 小贴士
future 为了三五法则,删除了拷贝构造/赋值函数。如果需要浅拷贝,实现共享同一个 future 对象,可以用 std::shared_future
。
#include <iostream>
#include <thread>
#include <string>
#include <future>
void download(std::string file) {
for (int i = 0; i < 10; ++i) {
std::cout << "Downloading " << file
<< " (" << i * 10 << "%)..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(400));
}
std::cout << "Download complete: " << file << std::endl;
}
void interact() {
std::string name;
std::cin >> name;
std::cout << "Hi, " << name << std::endl;
}
int main() {
std::shared_future<void> fret = std::async([] {
download("hello.zip");
});
auto fret2 = fret;
auto fret3 = fret;
interact();
fret3.wait();
std::cout << "Download completed" << std::endl;
return 0;
}
如果不需要返回值,std::async
里 lambda 的返回类型可以为 void, 这时 future 对象的类型为 std::future<void>
。 同理有 std::promise<void>
,他的 set_value(
) 不接受参数,仅仅作为同步用,不传递任何实际的值。
互斥量
数据竞争(data-race)
多个线程同时访问一个 vector。
#include <iostream>
#include <string>
#include <thread>
#include <vector>
int main() {
std::vector<int> arr;
std::thread t1([&] {
for(int i = 0; i < 1000; ++i) {
arr.push_back(1);
}
});
std::thread t2([&] {
for(int i = 0; i < 1000; ++i) {
arr.push_back(2);
}
});
t1.join();
t2.join();
return 0;
}
std::mutex
调用 std::mutex
的 lock()
时,会检测 mutex 是否已经上锁。如果没有锁定,则对 mutex 进行上锁。如果已经锁定,则陷入等待,直到 mutex 被另一个线程解锁后,才再次上锁。而调用 unlock()
则会进行解锁操作。这样,就可以保证 mtx.lock()
和 mtx.unlock()
之间的代码段,同一时间只有一个线程在执行,从而避免数据竞争。
#include <iostream>
#include <string>
#include <thread>
#include <vector>
#include <mutex>
int main() {
std::vector<int> arr;
std::mutex mtx;
std::thread t1([&] {
for (int i = 0; i < 1000; ++i) {
mtx.lock();
arr.push_back(1);
mtx.unlock();
}
});
std::thread t2([&] {
for (int i = 0; i < 1000; ++i) {
mtx.lock();
arr.push_back(2);
mtx.unlock();
}
});
t1.join();
t2.join();
return 0;
}
std::lock_guard
根据 RAII 思想,可将锁的持有视为资源,上锁视为锁的获取,解锁视为锁的释放。std::lock_guard
就是这样一个工具类,他的构造函数里会调用 mtx.lock()
,解构函数会调用 mtx.unlock()
。从而退出函数作用域时能够自动解锁,避免程序员粗心不小心忘记解锁。
#include <iostream>
#include <string>
#include <thread>
#include <vector>
#include <mutex>
int main() {
std::vector<int> arr;
std::mutex mtx;
std::thread t1([&] {
for (int i = 0; i < 1000; ++i) {
std::lock_guard grd(mtx);
arr.push_back(1);
}
});
std::thread t2([&] {
for (int i = 0; i < 1000; ++i) {
std::lock_guard grd(mtx);
arr.push_back(2);
}
});
t1.join();
t2.join();
return 0;
}
std::unique_lock
std::lock_guard
严格在解构时 unlock()
,但是有时候我们会希望提前 unlock()
。这时可以用 std::unique_lock
,他额外存储了一个 flag 表示是否已经被释放。他会在解构检测这个 flag,如果没有释放,则调用 unlock()
,否则不调用。
然后可以直接调用 unique_lock 的 unlock()
函数来提前解锁,但是即使忘记解锁也没关系,退出作用域时候他还会自动检查一遍要不要解锁。
#include <iostream>
#include <string>
#include <thread>
#include <vector>
#include <mutex>
int main() {
std::vector<int> arr;
std::mutex mtx;
std::thread t1([&] {
for (int i = 0; i < 1000; ++i) {
std::unique_lock grd(mtx);
arr.push_back(1);
}
});
std::thread t2([&] {
for (int i = 0; i < 1000; ++i) {
std::unique_lock grd(mtx);
arr.push_back(2);
grd.unlock();
printf("outside of lock\\n");
// grd.lock(); // 如果需要, 还可以重新上锁
}
});
t1.join();
t2.join();
return 0;
}
std::unique_lock: 用 std::defer_lock 作为参数
std::unique_lock
的构造函数还可以有一个额外参数,那就是 std::defer_lock
。指定了这个参数的话,std::unique_lock
不会在构造函数中调用 mtx.lock()
,需要之后再手动调用 grd.lock()
才能上锁。
好处依然是即使忘记 grd.unlock()
也能够自动调用 mtx.unlock()
。
#include <iostream>
#include <string>
#include <thread>
#include <vector>
#include <mutex>
int main() {
std::vector<int> arr;
std::mutex mtx;
std::thread t1([&] {
for (int i = 0; i < 1000; ++i) {
std::unique_lock grd(mtx);
arr.push_back(1);
}
});
std::thread t2([&] {
for (int i = 0; i < 1000; ++i) {
std::unique_lock grd(mtx, std::defer_lock);
printf("before the lock\\n");
grd.lock();
arr.push_back(2);
grd.unlock();
printf("outside of lock\\n");
}
});
t1.join();
t2.join();
return 0;
}
可以看一下 std::defer_lock_t
,是个空的类,其实这种用一个空 tag 类来区分不同构造函数的思想在 C++ 中很常见,包括 std::inplace
, std::piecewise_construct
等。
// msvc 中实现
struct defer_lock_t {
explicit defer_lock_t() = default;
}
inline constexpr defer_lock_t defer_lock{};
多个对象?每个对象一个 mutex 即可
mtx1 用来锁定 arr1,mtx2 用来锁定 arr2。不同的对象,各有一个 mutex,独立地上锁,可以避免不必要的锁定,提升高并发时的性能。还用了一个 {} 包住 std::lock_guard
,限制其变量的作用域,从而可以让他在 } 之前解构并调用 unlock()
,也避免了和下面一个 lock_guard 变量名冲突。
#include <iostream>
#include <string>
#include <thread>
#include <vector>
#include <mutex>
int main() {
std::vector<int> arr1;
std::mutex mtx1;
std::vector<int> arr2;
std::mutex mtx2;
std::thread t1([&] {
for (int i = 0; i < 1000; ++i) {
{
std::lock_guard grd(mtx1);
arr1.push_back(1);
}
{
std::lock_guard grd(mtx2);
arr2.push_back(1);
}
}
});
std::thread t2([&] {
for (int i = 0; i < 1000; ++i) {
{
std::lock_guard grd(mtx1);
arr1.push_back(2);
}
{
std::lock_guard grd(mtx2);
arr2.push_back(2);
}
}
});
t1.join();
t2.join();
return 0;
}
try_lock()
lock()
如果发现 mutex 已经上锁的话,会等待它直到它解锁。也可以用无阻塞的 try_lock()
,它在上锁失败时不会陷入等待,而是直接返回 false;如果上锁成功,则会返回 true。
#include <cstdio>
#include <mutex>
std::mutex mtx1;
int main() {
if (mtx1.try_lock())
printf("succeed\n");
else
printf("failed\n");
if (mtx1.try_lock())
printf("succeed\n");
else
printf("failed\n");
mtx1.unlock();
return 0;
}
try_lock_for()
try_lock()
碰到已经上锁的情况,会立即返回 false。 如果需要等待,但仅限一段时间,可以用 std::timed_mutex 的 try_lock_for() 函数,它的参数是最长等待时间,同样是由 chrono 指定时间单位。超过这个时间还没成功就会“不耐烦地”失败并返回 false;如果这个时间内上锁成功则返回 true。
同理还有接受时间点的 try_lock_until()
。
#include <cstdio>
#include <mutex>
std::timed_mutex mtx1;
int main() {
if (mtx1.try_lock_for(std::chrono::milliseconds(500)))
printf("succeed\\n");
else
printf("failed\\n");
if (mtx1.try_lock_for(std::chrono::milliseconds(500)))
printf("succeed\\n");
else
printf("failed\\n");
}
std::unique_lock: 用 std::try_to_lock 做参数
和无参数相比,他会调用 mtx1.try_lock()
而不是 mtx1.lock()
。之后,可以用 grd.owns_lock()
判断是否上锁成功。
#include <cstdio>
#include <thread>
#include <mutex>
int main() {
std::mutex mtx;
std::thread t1([&] {
std::unique_lock grd(mtx, std::try_to_lock);
if (grd.owns_lock())
printf("t1 success\n");
else
printf("t1 failed\n");
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
});
std::thread t2([&] {
std::unique_lock grd(mtx, std::try_to_lock);
if (grd.owns_lock())
printf("t2 success\n");
else
printf("t2 failed\n");
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
});
t1.join();
t2.join();
return 0;
}
std::unique_lock: 用 std::adopt_lock 做参数
如果当前 mutex 已经上锁了,但是之后仍然希望用 RAII 思想在解构时候自动调用 unlock()
,可以用 std::adopt_lock
作为 std::unique_lock
或 std::lock_guard
的第二个参数,这时他们会默认 mtx 已经上锁。
#include <cstdio>
#include <thread>
#include <mutex>
int main() {
std::mutex mtx;
std::thread t1([&] {
std::unique_lock grd(mtx);
printf("t1 owns the lock\\n");
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
});
std::thread t2([&] {
mtx.lock();
std::unique_lock grd(mtx, std::adopt_lock);
printf("t2 owns the lock\\n");
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
});
t1.join();
t2.join();
return 0;
}
std::unique_lock 和 std::mutex 具有同样的接口
其实 std::unique_lock
具有 mutex 的所有成员函数:lock()
, unlock()
, try_lock()
, try_lock_for()
等。除了他会在解构时按需自动调用 unlock()
。
因为 std::lock_guard
无非是调用其构造参数名为 lock()
的成员函数,所以 std::unique_lock
也可以作为 std::lock_guard
的构造参数! 这种只要具有某些指定名字的成员函数,就判断一个类是否满足某些功能的思想,在 Python 称为鸭子类型,而 C++ 称为 concept(概念)。比起虚函数和动态多态的接口抽象,concept 使实现和接口更加解耦合且没有性能损失。
#include <cstdio>
#include <thread>
#include <mutex>
int main() {
std::mutex mtx;
std::thread t1([&] {
std::unique_lock grd(mtx, std::defer_lock);
std::lock_guard grd2(grd);
printf("t1 owns the lock\\n");
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
});
std::thread t2([&] {
std::unique_lock grd(mtx, std::defer_lock);
std::lock_guard grd2(grd);
printf("t2 owns the lock\\n");
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
});
t1.join();
t2.join();
return 0;
}
死锁
同时锁住多个 mutex
由于同时执行的两个线程,他们中发生的指令不一定是同步的,因此有可能出现这种情况:
t1 执行 mtx1.lock();
t2 执行 mtx2.lock();
t1 执行 mtx2.lock():失败,陷入等待;
t2 执行 mtx1.lock():失败,陷入等待。
双方都在等着对方释放锁,但是因为等待而无法释放锁,从而要无限制等下去。这种现象称为死锁(dead-lock)。
#include <iostream>
#include <string>
#include <thread>
#include <mutex>
int main() {
std::mutex mtx1;
std::mutex mtx2;
std::thread t1([&] {
for (int i = 0; i < 1000; ++i) {
mtx1.lock();
mtx2.lock();
mtx2.unlock();
mtx1.unlock();
}
});
std::thread t2([&] {
for (int i = 0; i < 1000; ++i) {
mtx2.lock();
mtx1.lock();
mtx1.unlock();
mtx2.unlock();
}
});
t1.join();
t2.join();
return 0;
}
解决 1: 永远不要同时持有两个锁
最为简单的方法,就是一个线程永远不要同时持有两个锁,分别上锁,这样也可以避免死锁。
因此这里双方都在 mtx1.unlock()
之后才 mtx2.lock()
,从而也不会出现一方等着对方的同时持有了对方等着的锁的情况。
#include <iostream>
#include <string>
#include <thread>
#include <mutex>
int main() {
std::mutex mtx1;
std::mutex mtx2;
std::thread t1([&] {
for (int i = 0; i < 1000; ++i) {
mtx1.lock();
mtx1.unlock();
mtx2.lock();
mtx2.unlock();
}
});
std::thread t2([&] {
for (int i = 0; i < 1000; ++i) {
mtx2.lock();
mtx2.unlock();
mtx1.lock();
mtx1.unlock();
}
});
t1.join();
t2.join();
return 0;
}
解决 2: 保证双方上锁顺序一致
其实,只需保证双方上锁的顺序一致,即可避免死锁。因此这里调整 t2 也变为先锁 mtx1,再锁 mtx2。这时,无论实际执行顺序是怎样,都不会出现一方等着对方的同时持有了对方等着的锁的情况。
#include <iostream>
#include <string>
#include <thread>
#include <mutex>
int main() {
std::mutex mtx1;
std::mutex mtx2;
std::thread t1([&] {
for (int i = 0; i < 1000; ++i) {
mtx1.lock();
mtx2.lock();
mtx2.unlock();
mtx1.unlock();
}
});
std::thread t2([&] {
for (int i = 0; i < 1000; ++i) {
mtx1.lock();
mtx2.lock();
mtx2.unlock();
mtx1.unlock();
}
});
t1.join();
t2.join();
return 0;
}
解决 3: 用 std::lock 同时对多个锁上锁
如果没办法保证上锁顺序一致,可以用标准库的 std::lock(mtx1, mtx2, ...)
函数,一次性对多个 mutex 上锁。它接受任意多个 mutex 作为参数,并且他保证在无论任意线程中调用的顺序是否相同,都不会产生死锁问题。
#include <iostream>
#include <string>
#include <thread>
#include <mutex>
int main() {
std::mutex mtx1;
std::mutex mtx2;
std::thread t1([&] {
for (int i = 0; i < 1000; ++i) {
std::lock(mtx1, mtx2);
mtx1.unlock();
mtx2.unlock();
}
});
std::thread t2([&] {
for (int i = 0; i < 1000; ++i) {
std::lock(mtx2, mtx1);
mtx2.unlock();
mtx1.unlock();
}
});
t1.join();
t2.join();
return 0;
}
std::lock 的 RAII 版本: std::scoped_lock
和 std::lock_guard 相对应,std::lock 也有 RAII 的版本 std::scoped_lock。只不过它可以同时对多个 mutex 上锁。
#include <iostream>
#include <string>
#include <thread>
#include <mutex>
int main() {
std::mutex mtx1;
std::mutex mtx2;
std::thread t1([&] {
for (int i = 0; i < 1000; ++i) {
std::scoped_lock grd(mtx1, mtx2);
}
});
std::thread t2([&] {
for (int i = 0; i < 1000; ++i) {
std::scoped_lock grd(mtx2, mtx1);
}
});
t1.join();
t2.join();
return 0;
}
同一个线程重复调用 lock() 也会造成死锁
除了两个线程同时持有两个锁会造成死锁外,即使只有一个线程一个锁,如果 lock()
以后又调用 lock()
,也会造成死锁。
比如右边的 func 函数,上了锁之后,又调用了 other 函数,它也需要上锁。而 other 看到 mtx1 已经上锁,还以为是别的线程上的锁,于是陷入等待。殊不知是调用他的 func 上的锁,other 陷入等待后 func 里的 unlock() 永远得不到调用。
#include <iostream>
#include <mutex>
std::mutex mtx1;
void other() {
mtx1.lock();
mtx1.unlock();
}
void func() {
mtx1.lock();
other();
mtx1.unlock();
}
int main() {
func();
return 0;
}
解决1:other 里不要再上锁
遇到这种情况最好是把 other 里的 lock()
去掉,并在其文档中说明:”other 不是线程安全的,调用本函数之前需要保证某 mutex 已经上锁。”
#include <iostream>
#include <mutex>
std::mutex mtx1;
void other() {
}
void func() {
mtx1.lock();
other();
mtx1.unlock();
}
int main() {
func();
return 0;
}
解决2:改用 std::recursive_mutex
如果实在不能改的话,可以用 std::recursive_mutex
。他会自动判断是不是同一个线程 lock()
了多次同一个锁,如果是则让计数器加1,之后 unlock()
会让计数器减1,减到0时才真正解锁。但是相比普通的 std::mutex
有一定性能损失。
同理还有 std::recursive_timed_mutex
,如果你同时需要 try_lock_for()
的话。
#include <iostream>
#include <mutex>
std::recursive_mutex mtx1;
void other() {
mtx1.lock();
mtx1.unlock();
}
void func() {
mtx1.lock();
other();
mtx1.unlock();
}
int main() {
func();
return 0;
}
数据结构
封装一个线程安全的 vector
#include <iostream>
#include <thread>
#include <vector>
#include <mutex>
class MTVector {
std::vector<int> m_arr;
mutable std::mutex m_mtx;
public:
void push_back(int val) {
m_mtx.lock();
m_arr.push_back(val);
m_mtx.unlock();
}
size_t size() const {
m_mtx.lock();
size_t ret = m_arr.size();
m_mtx.unlock();
return ret;
}
};
int main() {
MTVector arr;
std::thread t1([&]() {
for (int i = 0; i < 1000; ++i) {
arr.push_back(i);
}
});
std::thread t2([&]() {
for (int i = 0; i < 1000; ++i) {
arr.push_back(1000 + i);
}
});
t1.join();
t2.join();
std::cout << arr.size() << std::endl;
return 0;
}
读写锁
读可以共享,写必须独占,且写和读不能共存。
针对这种更具体的情况,又发明了读写锁,他允许的状态有:
n个人读取,没有人写入;
1个人写入,没有人读取;
没有人读取,也没有人写入。
为此,标准库提供了 std::shared_mutex
,上锁时,要指定你的需求是写入还是读取,负责调度的读写锁会帮你判断要不要等待。这里 push_back()
需要修改数据,因需求此为写入,使用 lock()
和 unlock()
的组合。而 size()
则只要读取数据,不修改数据,因此可以和别人共享,使用 lock_shared()
和 unlock_shared()
的组合。
#include <iostream>
#include <thread>
#include <vector>
#include <shared_mutex>
class MTVector {
std::vector<int> m_arr;
mutable std::shared_mutex m_mtx;
public:
void push_back(int val) {
m_mtx.lock();
m_arr.push_back(val);
m_mtx.unlock();
}
size_t size() const {
m_mtx.lock_shared();
size_t ret = m_arr.size();
m_mtx.unlock_shared();
return ret;
}
};
int main() {
MTVector arr;
std::thread t1([&]() {
for (int i = 0; i < 1000; ++i) {
arr.push_back(i);
}
});
std::thread t2([&]() {
for (int i = 0; i < 1000; ++i) {
arr.push_back(1000 + i);
}
});
t1.join();
t2.join();
std::cout << arr.size() << std::endl;
return 0;
}
符合 RAII 思想的 lock_shared(): std::shared_lock
std::unique_lock
针对 lock()
,也可以用 std::shared_lock
针对 lock_shared()
。这样就可以在函数体退出时自动调用 unlock_shared()
,更加安全了。 shared_lock
同样支持 defer_lock
做参数,owns_lock()
判断等
#include <iostream>
#include <thread>
#include <vector>
#include <shared_mutex>
class MTVector {
std::vector<int> m_arr;
mutable std::shared_mutex m_mtx;
public:
void push_back(int val) {
std::unique_lock grd(m_mtx);
m_arr.push_back(val);
}
size_t size() const {
std::shared_lock grd(m_mtx);
return m_arr.size();
}
};
int main() {
MTVector arr;
std::thread t1([&]() {
for (int i = 0; i < 1000; ++i) {
arr.push_back(i);
}
});
std::thread t2([&]() {
for (int i = 0; i < 1000; ++i) {
arr.push_back(1000 + i);
}
});
t1.join();
t2.join();
std::cout << arr.size() << std::endl;
return 0;
}
访问者模式
Accessor 或者说 Viewer 模式,常用用于设计 GPU 容器的数据结构访问。
#include <vector>
#include <mutex>
#include <iostream>
class MTVector {
std::vector<int> m_arr;
std::mutex m_mtx;
public:
class Accessor {
MTVector& m_that;
std::unique_lock<std::mutex> m_guard;
public:
Accessor(MTVector &that)
: m_that(that), m_guard(that.m_mtx)
{}
void push_back(int val) const {
return m_that.m_arr.push_back(val);
}
size_t size() const {
return m_that.m_arr.size();
}
};
Accessor access() {
return { *this };
}
};
int main() {
MTVector arr;
std::thread t1([&]() {
auto axr = arr.access();
for (int i = 0; i < 1000; ++i) {
axr.push_back(i);
}
});
std::thread t2([&]() {
auto axr = arr.access();
for (int i = 0; i < 1000; ++i) {
axr.push_back(1000 + i);
}
});
t1.join();
t2.join();
std::cout << arr.access().size() << std::endl;
return 0;
}
条件变量
等待被唤醒
cv.wait(lck)
将会让当前线程陷入等待。在其他线程中调用 cv.notify_one()
则会唤醒那个陷入等待的线程。可以发现 std::condition_variable
必须和 std::unique_lock
,std::mutex
一起用,稍后会解释原因。
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
int main() {
std::condition_variable cv;
std::mutex mtx;
std::thread t1([&] {
std::unique_lock lck(mtx);
cv.wait(lck);
std::cout << "t1 is awake" << std::endl;
});
std::this_thread::sleep_for(std::chrono::milliseconds(400));
std::cout << "notifying..." << std::endl;
cv.notify_one(); // will awake t1
t1.join();
}
条件变量:等待某一条件成真
还可以额外指定一个参数,变成 cv.wait(lck, expr)
的形式,其中 expr 是个 lambda 表达式,只有其返回值为 true 时才会真正唤醒,否则继续等待。
#include <iostream>
#include <thread>
#include <vector>
#include <mutex>
#include <condition_variable>
int main() {
std::condition_variable cv;
std::mutex mtx;
bool ready = false;
std::thread t1([&] {
std::unique_lock lck(mtx);
cv.wait(lck, [&] { return ready; });
lck.unlock();
std::cout << "t1 is awake" << std::endl;
});
std::cout << "notifying not ready" << std::endl;
cv.notify_one();
ready = true;
std::cout << "notifying ready" << std::endl;
cv.notify_one();
t1.join();
return 0;
}
条件变量:多个等待者
cv.notify_one()
只会唤醒其中一个等待中的线程,而 cv.notify_all()
会唤醒全部。这就是为什么 wait()
需要一个 unique_lock 作为参数,因为要保证多个线程被唤醒时,只有一个能够被启动。如果不需要,在 wait()
返回后调用 lck.unlock()
即可。
顺便一提,wait()
的过程中会暂时 unlock()
这个锁。
#include <iostream>
#include <thread>
#include <vector>
#include <mutex>
#include <condition_variable>
int main() {
std::condition_variable cv;
std::mutex mtx;
std::thread t1([&] {
std::unique_lock lck(mtx);
cv.wait(lck);
std::cout << "t1 is awake" << std::endl;
});
std::thread t2([&] {
std::unique_lock lck(mtx);
cv.wait(lck);
std::cout << "t2 is awake" << std::endl;
});
std::thread t3([&] {
std::unique_lock lck(mtx);
cv.wait(lck);
std::cout << "t3 is awake" << std::endl;
});
std::this_thread::sleep_for(std::chrono::milliseconds(400));
std::cout << "notifying one" << std::endl;
cv.notify_one(); // 随机唤醒一个
std::this_thread::sleep_for(std::chrono::milliseconds(400));
std::cout << "notifying all" << std::endl;
cv.notify_all();
t1.join();
t2.join();
t3.join();
return 0;
}
条件变量:等待某一条件成真
还可以额外指定一个参数,变成 cv.wait(lck, expr)
的形式,其中 expr 是个 lambda 表达式,只有其返回值为 true 时才会真正唤醒,否则继续等待。
#include <iostream>
#include <thread>
#include <vector>
#include <mutex>
#include <condition_variable>
int main() {
std::condition_variable cv;
std::mutex mtx;
bool ready = false;
std::thread t1([&] {
std::unique_lock lck(mtx);
cv.wait(lck, [&] { return ready; });
lck.unlock();
std::cout << "t1 is awake" << std::endl;
});
std::cout << "notifying not ready" << std::endl;
cv.notify_one();
ready = true;
std::cout << "notifying ready" << std::endl;
cv.notify_one();
t1.join();
return 0;
}
条件变量:多个等待者
cv.notify_one()
只会唤醒其中一个等待中的线程,而 cv.notify_all()
会唤醒全部。这就是为什么 wait()
需要一个 unique_lock 作为参数,因为要保证多个线程被唤醒时,只有一个能够被启动。如果不需要,在 wait()
返回后调用 lck.unlock()
即可。
顺便一提,wait()
的过程中会暂时 unlock()
这个锁。
#include <iostream>
#include <thread>
#include <vector>
#include <mutex>
#include <condition_variable>
int main() {
std::condition_variable cv;
std::mutex mtx;
std::thread t1([&] {
std::unique_lock lck(mtx);
cv.wait(lck);
std::cout << "t1 is awake" << std::endl;
});
std::thread t2([&] {
std::unique_lock lck(mtx);
cv.wait(lck);
std::cout << "t2 is awake" << std::endl;
});
std::thread t3([&] {
std::unique_lock lck(mtx);
cv.wait(lck);
std::cout << "t3 is awake" << std::endl;
});
std::this_thread::sleep_for(std::chrono::milliseconds(400));
std::cout << "notifying one" << std::endl;
cv.notify_one(); // 随机唤醒一个
std::this_thread::sleep_for(std::chrono::milliseconds(400));
std::cout << "notifying all" << std::endl;
cv.notify_all();
t1.join();
t2.join();
t3.join();
return 0;
}
生成者-消费者模式
#include <iostream>
#include <thread>
#include <vector>
#include <mutex>
#include <condition_variable>
int main() {
std::condition_variable cv;
std::mutex mtx;
std::vector<int> foods;
std::thread t1([&] {
for (int i = 0; i < 2; ++i) {
std::unique_lock lck(mtx);
cv.wait(lck, [&] {
return foods.size() != 0;
});
auto food = foods.back();
foods.pop_back();
lck.unlock();
std::cout << "t1 got food:" << food << std::endl;
}
});
std::thread t2([&] {
for (int i = 0; i < 2; ++i) {
std::unique_lock lck(mtx);
cv.wait(lck, [&] {
return foods.size() != 0;
});
auto food = foods.back();
foods.pop_back();
lck.unlock();
std::cout << "t2 got food:" << food << std::endl;
}
});
foods.push_back(42);
cv.notify_one();
foods.push_back(233);
cv.notify_one();
foods.push_back(666);
foods.push_back(4399);
cv.notify_all();
t1.join();
t2.join();
return 0;
}
封装成类
#include <iostream>
#include <condition_variable>
#include <vector>
#include <mutex>
template <class T>
class MTQueue {
std::condition_variable m_cv;
std::mutex m_mtx;
std::vector<T> m_arr;
public:
T pop() {
std::unique_lock lck(m_mtx);
m_cv.wait(lck, [this] { return !m_arr.empty(); });
T ret = std::move(m_arr.back());
m_arr.pop_back();
return ret;
}
auto pop_hold() {
std::unique_lock lck(m_mtx);
m_cv.wait(lck, [this] { return !m_arr.empty(); });
T ret = std::move(m_arr.back());
m_arr.pop_back();
return std::pair(std::move(ret), std::move(lck));
}
void push(T val) {
std::unique_lock lck(m_mtx);
m_arr.push_back(std::move(val));
m_cv.notify_one();
}
void push_many(std::initializer_list<T> vals) {
std::unique_lock lck(m_mtx);
std::copy(
std::move_iterator(vals.begin()),
std::move_iterator(vals.end()),
std::back_insert_iterator(m_arr));
m_cv.notify_all();
}
};
int main() {
MTQueue<int> foods;
std::thread t1([&] {
for (int i = 0; i < 2; ++i) {
auto food = foods.pop();
std::cout << "t1 got food:" << food << std::endl;
}
});
std::thread t2([&] {
for (int i = 0; i < 2; ++i) {
auto food = foods.pop();
std::cout << "t2 got food:" << food << std::endl;
}
});
foods.push(42);
foods.push(233);
foods.push_many({ 666, 4399 });
t1.join();
t2.join();
return 0;
}
std::condition_variable 小贴士
std::condition_variable
仅仅支持std::unique_lockstd::mutex
作为 wait 的参数,如果需要用其他类型的 mutex 锁,可以用std::condition_variable_any
。wait_for()
和wait_until()
函数,分别接受 chrono 时间段和时间点作为参数。详见:https://en.cppreference.com/w/cpp/thread/condition_variable/wait_for。
原子操作
多个线程修改同一个计数器
多个线程同时往一个 int 变量里累加,这样肯定会出错,因为 counter += i
在 CPU 看来会变成三个指令:
读取 counter 变量到 rax 寄存器;
rax 寄存器的值加上 1;
把 rax 写入到 counter 变量。
即使编译器优化成 add [counter], 1
也没用,因为现代 CPU 为了高效,使用了大量奇技淫巧,比如他会把一条汇编指令拆分成很多微指令 (micro-ops),三个甚至有点保守估计了。
#include <iostream>
#include <thread>
int main() {
int counter = 0;
std::thread t1([&] {
for (int i = 0; i < 10000; ++i) {
counter += 1;
}
});
std::thread t2([&] {
for (int i = 0; i < 10000; ++i) {
counter += 1;
}
});
t1.join();
t2.join();
std::cout << counter << std::endl;
return 0;
}
有多个线程同时运行,顺序是不确定的:
t1:读取 counter 变量,到 rax 寄存器;
t2:读取 counter 变量,到 rax 寄存器;
t1:rax 寄存器的值加上 1;
t2:rax 寄存器的值加上 1;
t1:把 rax 写入到 counter 变量;
t2:把 rax 写入到 counter 变量。
如果是这种顺序,最后 t1 的写入就被 t2 覆盖了,从而 counter 只增加了 1,而没有像预期的那样增加 2。
更不用说现代 CPU 还有高速缓存,乱序执行,指令级并行等优化策略,你根本不知道每条指令实际的先后顺序。
用 mutex 上锁
这样的确可以防止多个线程同时修改 counter 变量,从而不会冲突。
问题:mutex 太过重量级,他会让线程被挂起,从而需要通过系统调用,进入内核层,调度到其他线程执行,有很大的开销。
可我们只是想要修改一个小小的 int 变量而已,用昂贵的 mutex 严重影响了效率。
atomic:有专门的硬件指令加持
可以用更轻量级的 atomic,对他的 += 等操作,会被编译器转换成专门的指令。
lock xadd %eax, (%rdx)
CPU 识别到该指令时,会锁住内存总线,放弃乱序执行等优化策略(将该指令视为一个同步点,强制同步掉之前所有的内存操作),从而向你保证该操作是原子 (atomic) 的(取其不可分割之意),不会加法加到一半另一个线程插一脚进来。
对于程序员,只需把 int 改成 atomic<int> 即可,也不必像 mutex 那样需要手动上锁解锁,因此用起来也更直观。
#include <iostream>
#include <thread>
#include <atomic>
int main() {
std::atomic<int> counter = 0;
std::thread t1([&] {
for (int i = 0; i < 10000; ++i) {
counter += 1;
}
});
std::thread t2([&] {
for (int i = 0; i < 10000; ++i) {
counter += 1;
}
});
t1.join();
t2.join();
std::cout << counter << std::endl;
return 0;
}
注意:请用 +=
counter = counter + 1; // 错,不能保证原子性
counter += 1; // OK,能保证原子性
counter++; // OK,能保证原子性
fetch_add
除了用方便的运算符重载之外,还可以直接调用相应的函数名。
fetch_add 对应于 +=;
store 对应于 =;
load 用于读取其中的 int 值。
#include <iostream>
#include <thread>
#include <atomic>
int main() {
std::atomic<int> counter;
counter.store(0);
std::thread t1([&] {
for (int i = 0; i < 10000; ++i) {
counter.fetch_add(1);
}
});
std::thread t2([&] {
for (int i = 0; i < 10000; ++i) {
counter.fetch_add(1);
}
});
t1.join();
t2.join();
std::cout << counter.load() << std::endl;
return 0;
}
fetch_add 会返回其旧值
int old = atm.fetch_add(val);
除了会导致 atm 的值增加 val 外,还会返回 atm 增加前的值,存储到 old。 这个特点使得他可以用于并行地往一个列表里追加数据:追加写入的索引就是 fetch_add 返回的旧值。
当然这里也可以 counter++,不过要追加多个的话还是得用到 counter.fetch_add(n)。
#include <iostream>
#include <thread>
#include <atomic>
#include <vector>
int main() {
std::atomic<int> counter;
counter.store(0);
std::vector<int> data(20000);
std::thread t1([&] {
for (int i = 0; i < 10000; ++i) {
int index = counter.fetch_add(1);
data[index] = i;
}
});
std::thread t2([&] {
for (int i = 0; i < 10000; ++i) {
int index = counter.fetch_add(1);
data[index] = i + 10000;
}
});
t1.join();
t2.join();
std::cout << data[10000] << std::endl;
return 0;
}
exchange:读取的同时写入
exchange(val) 会把 val 写入原子变量,同时返回其旧的值。
#include <iostream>
#include <atomic>
int main() {
std::atomic<int> counter;
counter.store(0);
int old = counter.exchange(3);
std::cout << "old=" << old << std::endl;
int now = counter.load();
std::cout << "cnt=" << now << std::endl;
return 0;
}
compare_exchange_strong: 读取,比较是否相等,相等则写入
#include <iostream>
#include <atomic>
int main() {
std::atomic<int> counter;
counter.store(2);
int old = 1;
bool equal = counter.compare_exchange_strong(old, 3);
std::cout << "equal=" << equal << std::endl; // false
std::cout << "old=" << old << std::endl; // 2
int now = counter.load();
std::cout << "cnt=" << now << std::endl; // 2
old = 2;
equal = counter.compare_exchange_strong(old, 3);
std::cout << "equal=" << equal << std::endl; // true
std::cout << "old=" << old << std::endl; // 2
now = counter.load();
std::cout << "cnt=" << now << std::endl; // 3
return 0;
}
伪代码
struct AtomicInt {
int cnt;
int store(int val) {
cnt = val;
}
int load() const {
return cnt;
}
int fetch_add(int val) {
int old = cnt;
cnt += val;
return old;
}
bool compare_exchange_strong(int& old, int val) {
if (cnt == old) {
cnt = val;
return true;
}
else {
old = cnt;
return false;
}
}
};
可以看到其中 compare_exchange_strong
的逻辑最为复杂,一般简称 CAS (compare-and-swap),他是并行编程最常用的原子操作之一。实际上任何 atomic 操作,包括 fetch_add,都可以基于 CAS 来实现。
https://www.bilibili.com/video/BV1Ya411q7y4/?spm_id_from=333.999.0.0&vd_source=22db36befcbe98b5c4cdb3d1d5f45f5d