死锁(Dead Lock)
当一个操作需要使用两个互斥锁的时候,可能会发生死锁,如下面的例子:
#include <iostream>
#include <thread>
#include <string>
#include <mutex>
#include <fstream>
using namespace std;
class LogFile {
std::mutex _mu;
std::mutex _mu2;
ofstream f;
public:
LogFile() {
f.open("log.txt");
}
~LogFile() {
f.close();
}
void shared_print(string msg, int id) {
std::lock_guard<std::mutex> guard(_mu);
std::lock_guard<std::mutex> guard2(_mu2);
f << msg << id << endl;
cout << msg << id << endl;
}
void shared_print2(string msg, int id) {
std::lock_guard<std::mutex> guard(_mu2);
std::lock_guard<std::mutex> guard2(_mu);
f << msg << id << endl;
cout << msg << id << endl;
}
};
void function_1(LogFile& log) {
for(int i=0; i>-100; i--)
log.shared_print2(string("From t1: "), i);
}
int main()
{
LogFile log;
std::thread t1(function_1, std::ref(log));
for(int i=0; i<100; i++)
log.shared_print(string("From main: "), i);
t1.join();
return 0;
}
运行之后,你会发现程序会卡住,这就是发生死锁了。程序运行可能会发生类似下面的情况:
Thread A Thread B
_mu.lock() _mu2.lock()
//死锁 //死锁
_mu2.lock() _mu.lock()
避免死锁
- 一个线程永远不要同时持有两个锁
-
不要在互斥锁保护的区域使用用户自定义的代码,因为用户的代码可能操作了其他的互斥锁。
{ std::lock_guard<std::mutex> guard(_mu2); user_function(); // never do this!!! f << msg << id << endl; }
-
如果想同时对多个互斥锁上锁,要使用
std::lock()
。它接受任意多个 mutex 作为参数,并且保证在无论任意线程中调用的顺序是否相同,都不会产生死锁问题。 -
给锁定义顺序(使用层次锁,或者比较地址等),每次以同样的顺序进行上锁。
unique_lock
互斥锁保证了线程间的同步,但是却将并行操作变成了串行操作,这对性能有很大的影响,所以我们要尽可能的减小锁定的区域,也就是使用细粒度锁。
lock_guard
不够灵活,lock_guard
只能保证在析构的时候执行解锁操作,lock_guard
本身并没有提供加锁和解锁的接口。unique_lock
提供了lock()
和unlock()
接口,能记录现在处于上锁还是没上锁状态,在析构的时候,会根据当前状态来决定是否要进行解锁(lock_guard
就一定会解锁)。在无需加锁的操作时,可以先临时释放锁,然后需要继续保护的时候,可以继续上锁。当然这也是有代价的,因为它内部需要维护锁的状态,所以效率要比lock_guard
低一点。
class LogFile {
std::mutex _mu;
ofstream f;
public:
LogFile() {
f.open("log.txt");
}
~LogFile() {
f.close();
}
void shared_print(string msg, int id) {
std::unique_lock<std::mutex> guard(_mu);
//do something 1
guard.unlock(); //临时解锁
//do something 2
guard.lock(); //继续上锁
// do something 3
f << msg << id << endl;
cout << msg << id << endl;
// 结束时析构guard会临时解锁
// 这句话可要可不要,不写,析构的时候也会自动执行
// guard.ulock();
}
};
另外,请注意,unique_lock
和lock_guard
都不能复制,lock_guard
不能移动,但是unique_lock
可以!
// unique_lock 可以移动,不能复制
std::unique_lock<std::mutex> guard1(_mu);
std::unique_lock<std::mutex> guard2 = guard1; // error
std::unique_lock<std::mutex> guard2 = std::move(guard1); // ok
// lock_guard 不能移动,不能复制
std::lock_guard<std::mutex> guard1(_mu);
std::lock_guard<std::mutex> guard2 = guard1; // error
std::lock_guard<std::mutex> guard2 = std::move(guard1); // error
条件变量
互斥锁std::mutex
是一种最常见的线程间同步的手段,但是在有些情况下不太高效。
假设想实现一个简单的消费者生产者模型,一个线程往队列中放入数据,一个线程往队列中取数据,取数据前需要判断一下队列中确实有数据,由于这个队列是线程间共享的,所以,需要使用互斥锁进行保护,一个线程在往队列添加数据的时候,另一个线程不能取,反之亦然。用互斥锁实现如下:
#include <iostream>
#include <deque>
#include <thread>
#include <mutex>
std::deque<int> q;
std::mutex mu;
// 生产者
void function_1() {
int count = 10;
while (count > 0) {
std::unique_lock<std::mutex> locker(mu);
q.push_front(count);
locker.unlock();
std::this_thread::sleep_for(std::chrono::seconds(1));
count--;
}
}
// 消费者
void function_2() {
int data = 0;
while ( data != 1) {
std::unique_lock<std::mutex> locker(mu);
if (!q.empty()) {
data = q.back();
q.pop_back();
locker.unlock();
std::cout << "t2 got a value from t1: " << data << std::endl;
} else {
locker.unlock();
}
}
}
int main() {
std::thread t1(function_1);
std::thread t2(function_2);
t1.join();
t2.join();
return 0;
}
//输出结果
//t2 got a value from t1: 10
//t2 got a value from t1: 9
//t2 got a value from t1: 8
//t2 got a value from t1: 7
//t2 got a value from t1: 6
//t2 got a value from t1: 5
//t2 got a value from t1: 4
//t2 got a value from t1: 3
//t2 got a value from t1: 2
//t2 got a value from t1: 1
在function_1()
往队列添加数据的同时,如果std::mutex是busy waiting的实现,那么function_2()
会持续不断地检查互斥锁的状态,直到获得锁为止,这种方式会浪费大量的CPU时间。C++标准并没有规定std::mutex的具体实现方式,因此在编写代码时应该尽量避免依赖于特定的实现细节。如果需要确保互斥锁使用sleep-waiting机制,可以考虑使用std::condition_variable配合std::unique_lock或std::lock_guard来实现。
c++11
中提供了#include <condition_variable>
头文件,其中有两个重要的接口,notify_one()
和wait()
,wait()
可以让线程陷入休眠状态,在消费者生产者模型中,如果生产者发现队列中没有东西,就可以让自己休眠,notify_one()
就是唤醒处于wait
中的其中一个条件变量(可能当时有很多条件变量都处于wait
状态)。那什么时刻使用notify_one()
比较好呢,当然是在生产者往队列中放数据的时候了,队列中有数据,就可以赶紧叫醒等待中的线程起来干活了。
#include <iostream>
#include <deque>
#include <thread>
#include <mutex>
#include <condition_variable>
std::deque<int> q;
std::mutex mu;
std::condition_variable cond;
void function_1() {
int count = 10;
while (count > 0) {
std::unique_lock<std::mutex> locker(mu);
q.push_front(count);
locker.unlock();
cond.notify_one(); // Notify one waiting thread, if there is one.
std::this_thread::sleep_for(std::chrono::seconds(1));
count--;
}
}
void function_2() {
int data = 0;
while ( data != 1) {
std::unique_lock<std::mutex> locker(mu);
while(q.empty())
cond.wait(locker); // Unlock mu and wait to be notified
data = q.back();
q.pop_back();
locker.unlock();
std::cout << "t2 got a value from t1: " << data << std::endl;
}
}
int main() {
std::thread t1(function_1);
std::thread t2(function_2);
t1.join();
t2.join();
return 0;
}
上面的代码有三个注意事项:
- 在
function_2
中,在判断队列是否为空的时候,使用的是while(q.empty())
,而不是if(q.empty())
,这是因为wait()
从阻塞到返回,不一定就是由于notify_one()
函数造成的,还有可能由于系统的不确定原因唤醒(可能和条件变量的实现机制有关),这个的时机和频率都是不确定的,被称作伪唤醒,如果在错误的时候被唤醒了,执行后面的语句就会错误,所以需要再次判断队列是否为空,如果还是为空,就继续wait()
阻塞。 - 在管理互斥锁的时候,使用的是
std::unique_lock
而不是std::lock_guard
,而且事实上也不能使用std::lock_guard
,这需要先解释下wait()
函数所做的事情。可以看到,在wait()
函数之前,使用互斥锁保护了,如果wait
的时候什么都没做,岂不是一直持有互斥锁?那生产者也会一直卡住,不能够将数据放入队列中了。所以,wait()函数会先调用互斥锁的unlock()函数,然后再将自己睡眠,在被唤醒后,又会继续持有锁,保护后面的队列操作。而lock_guard
没有lock
和unlock
接口,而unique_lock
提供了。这就是必须使用unique_lock
的原因。 - 使用细粒度锁,尽量减小锁的范围,在
notify_one()
的时候,不需要处于互斥锁的保护范围内,所以在唤醒条件变量之前可以将锁unlock()
。
除了notify_one()
函数,c++
还提供了notify_all()
函数,可以同时唤醒所有处于wait
状态的条件变量。