异步
std::async
接受一个带返回值的 lambda,自身返回一个std::future
对象。lambda 的函数体将在另一个线程里执行。调用 future 的 get() 方法可以得到返回值。
std::async 的第一个参数可以设为 std::launch::deferred,这时不会创建一个线程来执行,他只会把 lambda 函数体内的运算推迟到 future 的 get() 被调用时。也就是 main 中的 interact 计算完毕后。
如果不想让std::async 帮你自动创建线程,想要手动创建线程,可以直接用std::promise,也就是std::async的底层实现。
读写锁
读可以共享,写必须独占,且写和读不能共存
原子操作
mutex 太过重量级,他会让线程被挂起,从而需要通过系统调用,进入内核层,调度到其他线程执行,有很大的开销。
因此可以用更轻量级的 atomic,对他的 += 等操作,会被编译器转换成专门的指令。
CPU 识别到该指令时,会锁住内存总线,放弃乱序执行等优化策略(将该指令视为一个同步点,强制同步掉之前所有的内存操作),从而向你保证该操作是原子 (atomic) 的(取其不可分割之意),不会加法加到一半另一个线程插一脚进来。
对于程序员,只需把 int 改成 atomic
线程池(Thread Pool)
- 线程池负责加入线程,析构函数进行线程的join,会在 main 退出后自动调用(RAII)。
可以通过创建线程池的方式来管理线程,会比手动管理线程效率要高,而且可以更好地控制线程数量
下面是线程池的一种实现:
#ifndef THREADPOOL_BASIC_HPP
#define THREADPOOL_BASIC_HPP
#include <cstdint>
#include <future>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
class ThreadPool {
private:
// storage for threads and tasks
std::vector<std::thread> threads;
std::queue<std::function<void(void)>> tasks; // first in first out
// primitives for signaling
std::mutex mutex;
std::condition_variable cv;
// the state of the thread, pool
bool stop_pool; // indicates whether the pool is still running
uint32_t active_threads; // number of active threads
const uint32_t capacity; // max number of executable threads
// custom task factory
template <
typename Func,
typename ... Args,
typename Rtrn=typename std::result_of<Func(Args...)>::type>
auto make_task(
Func && func,
Args && ...args) -> std::packaged_task<Rtrn(void)> {
auto aux = std::bind(std::forward<Func>(func),std::forward<Args>(args)...); // std::forward 完美转发
return std::packaged_task<Rtrn(void)>(aux);
}
// will be executed before execution of a task
void before_task_hook() {
active_threads++;
}
// will be executed after execution of a task
void after_task_hook() {
active_threads--;
}
public:
ThreadPool(uint64_t capacity_) :
stop_pool(false), // pool is running
active_threads(0), // no work to be done
capacity(capacity_) { // remember size
// this function is executed by the threads
auto wait_loop = [this] ( ) -> void {
// wait forever
while (true) {
// this is a placeholder task
std::function<void(void)> task;
{ // lock this section: 1. for conditional wait, 2. lock the task queue
std::unique_lock<std::mutex> unique_lock(mutex);
// ensures that no spurious wake-up occurs, actions must be performed on wake-up if
// (i) the thread pool has been stopped
// or
// (ii) there are still tasks to be processed
auto predicate = [this] ( ) -> bool {
return (stop_pool) || !(tasks.empty());
};
// wait to be waken up on aforementioned conditions
cv.wait(unique_lock, predicate);
// exit if thread pool stopped and no tasks to be performed
if (stop_pool && tasks.empty())
return;
// else extract task from queue
task = std::move(tasks.front());
tasks.pop();
before_task_hook();
} // here we release the lock
// execute the task in parallel
task();
{ // adjust the thread counter
std::lock_guard<std::mutex> lock_guard(mutex);
after_task_hook();
} // here we release the lock
}
};
// initially spawn capacity many threads
for (uint64_t id = 0; id < capacity; id++)
threads.emplace_back(wait_loop);
}
~ThreadPool() {
{ // acquire a scoped lock
std::lock_guard<std::mutex> lock_guard(mutex);
// alter the global state to stop
stop_pool = true;
} // here we release the lock
// signal all threads
cv.notify_all();
// finally join all threads
for (auto& thread : threads)
thread.join();
}
template <
typename Func,
typename ... Args,
typename Pair=Func(Args...),
typename Rtrn=typename std::result_of<Pair>::type>
auto enqueue(
Func && func,
Args && ... args) -> std::future<Rtrn> {
// create the task, get the future
// and wrap task in a shared pointer
auto task = make_task(func, args...);
auto future = task.get_future();
auto task_ptr = std::make_shared<decltype(task)>(std::move(task));
{ // lock the scope
std::lock_guard<std::mutex> lock_guard(mutex);
// you cannot reuse pool after being stopped
if(stop_pool)
throw std::runtime_error("enqueue on stopped ThreadPool");
// wrap the task in a generic void
// function void -> void
auto payload = [task_ptr] ( ) -> void {
// basically call task()
task_ptr->operator()();
};
// append the task to the queue
tasks.emplace(payload);
}
// tell one thread to wake-up
cv.notify_one();
return future;
}
};
#endif
线程安全
一个函数被称为线程安全的(thread-safe),当且仅当被多个并发线程反复调用时,它会一直产生正确的结果。
能够定义出四个(不相交的)线程不安全函数类
-
不保护共享变量的函数
-
保持跨越多个调用的状态的函数
rand
函数是线程不安全的,因为当前调用的结果依赖前次调用的中间结果。当调用srand
为rand
设置了一个种子后,我们从一个单线程中反复地调用rand
,能够预期得到一个可重复的随机数字序列。但是,如果多线程调用rand
函数,这种假设就不再成立了。
unsigned int next = 1; int rand(void) { next = next * 1103515245 + 12345; return (unsigned int)(next/65536) % 32768; } void srand(unsigned int seed) { next = seed; }
-
返回指向静态变量的指针的函数
-
调用线程不安全函数的函数
大多数Unix都是线程安全的,只有一小部分是例外,下表是常见的例外
线程不安全函数 | 线程不安全 | 类unix线程安全版本 |
---|---|---|
rand | 2 | rand_r |
strtok | 2 | strtok_r |
asctime | 3 | asctime_r |
gethostbyaddr | 3 | gethostbyaddr_r |
geyhostbyname | 3 | geyhostbyname_r |
inet_ntoa | 3 | |
localtime | 3 | localtime_r |