C++多线程之三

目录

异步

std::async 接受一个带返回值的 lambda,自身返回一个std::future 对象。lambda 的函数体将在另一个线程里执行。调用 future 的 get() 方法可以得到返回值。

std::async 的第一个参数可以设为 std::launch::deferred,这时不会创建一个线程来执行,他只会把 lambda 函数体内的运算推迟到 future 的 get() 被调用时。也就是 main 中的 interact 计算完毕后。

如果不想让std::async 帮你自动创建线程,想要手动创建线程,可以直接用std::promise,也就是std::async的底层实现。

读写锁

读可以共享,写必须独占,且写和读不能共存

原子操作

mutex 太过重量级,他会让线程被挂起,从而需要通过系统调用,进入内核层,调度到其他线程执行,有很大的开销。

因此可以用更轻量级的 atomic,对他的 += 等操作,会被编译器转换成专门的指令。

CPU 识别到该指令时,会锁住内存总线,放弃乱序执行等优化策略(将该指令视为一个同步点,强制同步掉之前所有的内存操作),从而向你保证该操作是原子 (atomic) 的(取其不可分割之意),不会加法加到一半另一个线程插一脚进来。

对于程序员,只需把 int 改成 atomic 即可,也不必像 mutex 那样需要手动上锁解锁,因此用起来也更直观。

线程池(Thread Pool)

  • 线程池负责加入线程,析构函数进行线程的join,会在 main 退出后自动调用(RAII)。

可以通过创建线程池的方式来管理线程,会比手动管理线程效率要高,而且可以更好地控制线程数量

下面是线程池的一种实现:

#ifndef THREADPOOL_BASIC_HPP

#define THREADPOOL_BASIC_HPP

#include <cstdint>

#include <future>

#include <vector>

#include <queue>

#include <thread>

#include <mutex>

#include <condition_variable>

#include <functional>

class ThreadPool {

private:

    // storage for threads and tasks
    std::vector<std::thread> threads;
    std::queue<std::function<void(void)>> tasks;    // first in first out

    // primitives for signaling
    std::mutex mutex;
    std::condition_variable cv;
    
    // the state of the thread, pool
    bool stop_pool;      // indicates whether the pool is still running
    uint32_t active_threads;   // number of active threads
    const uint32_t capacity;   // max number of executable threads

    // custom task factory
    template <
        typename     Func,
        typename ... Args,
        typename Rtrn=typename std::result_of<Func(Args...)>::type>
    auto make_task(
        Func &&    func,
        Args && ...args) -> std::packaged_task<Rtrn(void)> {

        auto aux = std::bind(std::forward<Func>(func),std::forward<Args>(args)...);   // std::forward 完美转发

        return std::packaged_task<Rtrn(void)>(aux);
    }

    // will be executed before execution of a task
    void before_task_hook() {
        active_threads++;
    }

    // will be executed after execution of a task
    void after_task_hook() {
        active_threads--;
    }

public:
    ThreadPool(uint64_t capacity_) :
        stop_pool(false),     // pool is running
        active_threads(0),    // no work to be done
        capacity(capacity_) { // remember size

        // this function is executed by the threads
        auto wait_loop = [this] ( ) -> void {

            // wait forever
            while (true) {

                // this is a placeholder task
                std::function<void(void)> task;

                {   // lock this section: 1. for conditional wait, 2. lock the task queue
                    std::unique_lock<std::mutex> unique_lock(mutex);

                    // ensures that no spurious wake-up occurs, actions must be performed on wake-up if 
                    // (i) the thread pool has been stopped 
                    // or 
                    // (ii) there are still tasks to be processed
                    auto predicate = [this] ( ) -> bool {
                        return (stop_pool) || !(tasks.empty());
                    };

                    // wait to be waken up on aforementioned conditions
                    cv.wait(unique_lock, predicate);

                    // exit if thread pool stopped and no tasks to be performed
                    if (stop_pool && tasks.empty())
                        return;

                    // else extract task from queue
                    task = std::move(tasks.front());
                    tasks.pop();
                    before_task_hook();
                } // here we release the lock

                // execute the task in parallel
                task();

                {   // adjust the thread counter
                    std::lock_guard<std::mutex> lock_guard(mutex);
                    after_task_hook();
                } // here we release the lock
            }
        };

        // initially spawn capacity many threads
        for (uint64_t id = 0; id < capacity; id++)
            threads.emplace_back(wait_loop);
    }

    ~ThreadPool() {

        {   // acquire a scoped lock
            std::lock_guard<std::mutex> lock_guard(mutex);

            // alter the global state to stop
            stop_pool = true;
        } // here we release the lock

        // signal all threads
        cv.notify_all();

        // finally join all threads
        for (auto& thread : threads)
            thread.join();
    }

    template <
        typename     Func,
        typename ... Args,
        typename Pair=Func(Args...),
        typename Rtrn=typename std::result_of<Pair>::type>
    auto enqueue(
        Func &&     func,
        Args && ... args) -> std::future<Rtrn> {

        // create the task, get the future
        // and wrap task in a shared pointer
        auto task = make_task(func, args...);
        auto future = task.get_future();
        auto task_ptr = std::make_shared<decltype(task)>(std::move(task));

        {   // lock the scope
            std::lock_guard<std::mutex> lock_guard(mutex);

            // you cannot reuse pool after being stopped
            if(stop_pool)
                throw std::runtime_error("enqueue on stopped ThreadPool");

            // wrap the task in a generic void
            // function void -> void
            auto payload = [task_ptr] ( ) -> void {
                // basically call task()
                task_ptr->operator()();
            };

            // append the task to the queue
            tasks.emplace(payload);
        }

        // tell one thread to wake-up
        cv.notify_one();

        return future;
    }
};

#endif

线程安全

一个函数被称为线程安全的(thread-safe),当且仅当被多个并发线程反复调用时,它会一直产生正确的结果。

能够定义出四个(不相交的)线程不安全函数类

  1. 不保护共享变量的函数

  2. 保持跨越多个调用的状态的函数

    • rand函数是线程不安全的,因为当前调用的结果依赖前次调用的中间结果。当调用srandrand设置了一个种子后,我们从一个单线程中反复地调用rand,能够预期得到一个可重复的随机数字序列。但是,如果多线程调用rand函数,这种假设就不再成立了。
    unsigned int next = 1;
       
    int rand(void)
    {	
        next = next * 1103515245 + 12345;
        return (unsigned int)(next/65536) % 32768;
    }
       
    void srand(unsigned int seed)
    {
        next = seed;
    }
    
  3. 返回指向静态变量的指针的函数

  4. 调用线程不安全函数的函数

大多数Unix都是线程安全的,只有一小部分是例外,下表是常见的例外

线程不安全函数 线程不安全 类unix线程安全版本
rand 2 rand_r
strtok 2 strtok_r
asctime 3 asctime_r
gethostbyaddr 3 gethostbyaddr_r
geyhostbyname 3 geyhostbyname_r
inet_ntoa 3  
localtime 3 localtime_r