Non-Profit, International

Spirit unsterblich.

C++协程 - 并发原语

字数统计:2425

本节内容主要介绍了实现线程池所用的并发原语:原子操作、互斥锁、条件变量以及信号量。

内存模型

任何编程语言(包括汇编这种低级编程语言)都必须定义内存模型以指导程序如何编写才不会在多线程中出现数据竞争,在2004年有两篇经典论文指出了高级编程语言中存在的这个问题:Hans Boehm的线程不能实现为库以及Scott Meyers和Andrei Alexandrescu的C++和双重检查锁定模式(DCLP)的风险;Russ Cox在2021年编写的文章硬件内存模型则总结性的概括了自1997年起硬件内存模型发展的辛路历程。

原子操作

C++提供了六个内存序标签去使用四种内存序来描述C++的内存模型,这些内存序的定义比较复杂,如果读者想要全面的学习,可以阅读std::memory_order编程语言内存模型以及《C++并发编程实战 第二版》等专业资料,本文所讲述的只是一种简化模型。

C++原子操作的结果,可以通过观测原子变量的值来定义。无论哪种内存序标签以及何种操作,都保证读取时一定完整的读取到“上一次”储存的值,而不会是两次写入叠加在一起产生的值。“上一次”指的就是观测时观测到的值被写入的那一次。

acquire的读,能够在它本身以及后续的非原子读取,和发出这次读取的值的release的写,以及release写之前的非原子写入之间建立同步关系。该关系被称作“先发生于(happens-before)”,如果A先发生于B,则B能观察到A及A之前的所有副作用。

生产出的数据,使用release写原子变量x的值为v,能在acquire读x值为v后使用。

而relaxed操作不建立“先发生于”关系,不能用于同步:relaxed的写只生产它字节本身的值,relaxed的读只能够消费该变量自己的值。

因此,C++11的所有锁的 unlock 操作都先发生于 lock,使得每次生产出数据而释放锁后,都能在获得锁后使用。

互斥锁

可以这样实现锁:


class mutex
{
    std::atomic<int> s_{};

public:
    void lock() noexcept
    {
        while (s_.exchange(1, std::memory_order::acquire))
                s_.wait(1, std::memory_order::relaxed);
    }

    bool try_lock() noexcept
    {
        return !s_.exchange(1, std::memory_order::acquire);
    }

    void unlock() noexcept
    {
        s_.store(0, std::memory_order::release);
        s_.notify_one();
    }
};

s_ 被值初始化为0。

lock 函数在每次循环时尝试用acquire交换1给 s_,如果 s_ 返回1,说明该锁已经被锁定。

随后,使用relaxed wait 等待 s_ 的值直到 s_ 的值被其他线程改为不是1(改为0)。

如果 exchange 返回0,则 s_ 被原子的设置为了1,并且先前值是0(没上锁)。

try_lock 内,仅尝试一次 exchange

最后,在 unlock 内,release store 0给 s_,并且发出通知。

由于同步关系是 store/exchange 建立的,因此 wait 只需要使用relaxed即可。

在其他使用 wait 的场景中,例如通过原子实现信号量时,wait 需要使用acquire。

通过添加C++20新增的的 waitnotify_one 函数调用,可以做到让锁不再自旋。这项技术是增加等待增强std::atomic_flag (P0514R0)中提出的,同时作者给出了性能数据和参考实现

Linus Torvalds指出,一个吞吐量高的锁通常具有更高的延迟,而延迟更低的锁可能花费了更多时间在自旋上,占用了更多的CPU,内核提供的锁是针对这两点的平衡。虽然Linus Torvalds同时声称不要自己实现锁,但实际上我们知道 waitnotify 使用futex实现,因此在Linux上这效果不错。在Windows上,则由 WaitOnAddressWakeOnAddressSingle 实现。读者可以自行阅读各大标准库相关源码来观测这点。

实际上 WaitOnAddress 也是实现Windows Vista开始存在的SRWLock的方式,甚至也是CriticalSection在Windows Vista开始的实现方式(但为了二进制兼容性,CriticalSection无法减小内存占用)。

WaitOnAddress 会发生虚假唤醒,因此需要在循环中调用relaxed wait 来检查 s_ 是否真的被修改为0,而不是仅等待一次。关于 WaitOnAddress 还可以阅读Raymond Chen的一系列文章

由于futex只支持 sizeof(int) 大小的数据被 waitnotify,因此上例中固定使用 std::atomic<int> 实现锁,而 WaitOnAddress 支持1、2、4、8字节,无此限制。

条件变量和信号量

一般来说,线程池使用条件变量来解决忙等问题,C++的条件变量提供了 waitwait_for 函数用于休眠线程,以及 notify_onenotify_all 函数用于唤醒线程。

但本文使用 std::counting_semaphore 代替条件变量,有三个原因:

  1. STL由于ABI兼容问题以及为了兼容Windows XP,当前的 std::mutex 实现并不好,这也影响了 std::condition_variable
  2. macOS的 std::mutex 在一些情况下表现奇差,参考P0514R0
  3. 条件变量的 wait 函数会无条件重新获得锁,在当前线程池设计中这是多余的
  4. 信号量也是一种强大的工具,它的概念比条件变量简单

信号量的 acquire 函数每次会尝试将计数减小1,除非计数是0。当计数是0的时候,函数阻塞并直到能减小1。

信号量的 release 函数会将计数增加n,默认是1。在本实现中中固定每次只增加1。

信号量的 release 函数强先发生于调用观察效果结果的try_acquire,因此可以保证用于安全同步。

但信号量比条件变量缺少 notify_all 函数,这是由于信号量的概念决定的:信号量用于单一消费者。不过多消费者在一些情况下可以转换为单消费者。在抢占式线程池中,所有线程共用一个条件变量,notify_all 可以一次性在线程池销毁时通知所有线程退出,但通过在每个线程池线程退出时使用 notify_one 唤醒下一个线程,也能做到同样的目的。对于派发式线程池,也就是本文所实现的,只需要对每个线程独有的信号量使用 noify_one 即可。

使用信号量可以实现没有 notify_all 函数的条件变量:


class condition_variable
{
    std::counting_semaphore<> s_{0z};

  public:
    condition_variable() noexcept = default;

    void notify_one() noexcept
    {
        s_.release();
    }

    void wait(std::unique_lock<mutex> &lock) noexcept
    {
        lock.unlock();
        s_.acquire();
        lock.lock();
    }

    template <class Rep, class Period>
    std::cv_status wait_for(std::unique_lock<std::mutex> &lock,
                            const std::chrono::duration<Rep, Period> &rel_time)
    {
        lock.unlock();
        auto st = s_.try_acquire_for(rel_time);
        lock.lock();

        return st ? std::cv_status::no_timeout : std::cv_status::timeout;
    }

    template <class Clock, class Duration>
    std::cv_status wait_until(std::unique_lock<std::mutex> &lock,
                            const std::chrono::time_point<Clock, Duration> &abs_time)
    {
        lock.unlock();
        auto st = s_.try_acquire_until(abs_time);
        lock.lock();

        return st ? std::cv_status::no_timeout : std::cv_status::timeout;
    }
};

标准库中的条件变量 wait 函数还包含有谓词的重载,由于它很好实现,因此在本例中不实现它们。

实际上(二元)信号量还可以用于反过来实现锁,留给读者思考。许多并发原语有一些类似之处并且在特定用途下可以互相替代,但信号量和条件变量仍然是不同的并发原语,例如信号量的 acquire 可以保证不发生虚假唤醒,而条件变量的 wait 则不保证这一点。设计不同并发原语的目的是指导用户如何正确使用它们,编写正确的并发代码需要正确的抽象。注意,操作系统可以更有效的实现条件变量,本例子仅用于展示并发原语的可互换性,并不对性能负责。实际生产时应该确保在Windows上使用SRWLock和Windows信号量。


若无特殊声明,本人原创文章以 CC BY-SA 4.0许可协议 提供。