Non-Profit, International

Spirit unsterblich.

C++ std::async

字数统计:2317 blog

std::async 是 C++11 开始支持多线程时加入的同步多线程构造函数,其弥补了 std::thread 没有返回值的问题,并在 std::thread 的基础上加入了更多的特性,使得多线程的使用更加灵活。

std::async

虽然任何函数的返回值都可以通过函数参数来进行传递,但是返回值明显是一种更清晰优雅的方式,并且 std::thread 并不能储存或者返回传入线程的函数的返回值,这就带来了一些不变:必须再构造一个函数将本来的返回值包装为函数参数传递。

由于无论是否接收返回值,函数都需要处理返回值,这会造成一定程度的性能损失,并且如果将任何无返回值的函数当作有返回值的函数进行包装,同样会造成性能的损失,所以标准库设计了 std::async 和 std::thread 来适应不同的情况。

此外,std::async 在调度策略上和 std::thread 稍有不同,这使得 std::async 比 std::thread 更灵活一些。

std:: async 大致有两种重载,其区别是第二种多了一个被叫做发射策略的参数:


// <future>
// cppreference.com
// 1
template<class Function, class... Args>// C++11起,C++17前
std::future<std::result_of_t<std::decay_t<Function>(std::decay_t<Args>...)>>
async(Function&& f, Args&&... args);

template<class Function, class... Args>// C++17 起,C++20前
std::future<std::invoke_result_t<std::decay_t<Function>,
    std::decay_t<Args>...>>
    async(Function&& f, Args&&... args);

template<class Function, class... Args>// C++20 起
[[nodiscard]]
std::future<std::invoke_result_t<std::decay_t<Function>,
    std::decay_t<Args>...>>
    async(Function&& f, Args&&... args);

// 2
template<class Function, class... Args >// C++11起,C++17前
std::future<std::result_of_t<std::decay_t<Function>(std::decay_t<Args>...)>>
async(std::launch policy, Function&& f, Args&&... args);

template<class Function, class... Args >// C++17 起,C++20前
std::future<std::invoke_result_t<std::decay_t<Function>,
    std::decay_t<Args>...>>
    async(std::launch policy, Function&& f, Args&&... args);

template<class Function, class... Args >// C++20起
[[nodiscard]]
std::future<std::invoke_result_t<std::decay_t<Function>,
    std::decay_t<Args>...>>
    async(std::launch policy, Function&& f, Args&&... args);

由于 C++17 添加了 std::invoke 代替了 std::result_of,所以 C++11 版本的 std::async 已经废弃

C++20 增加了 [[nodiscard]] 属性用于在编译时对未接收返回值这一类行为发出警告,由于 std::async 的返回值具有特殊的意义 1 ,所以需要接收返回值,不能仅调用。如果不接收返回值,可以用 std::thread 代替。

与 std::thread 类似 2 ,std::async 传入的函数的参数也同样会被去除引用, 如果需要传入引用,则应使用 std::ref

其中 std::launch policy 是 std::async 执行的策略,实际上默认是两个的枚举,不过实现并不重要:

  • std::launch::async 运行新线程,以异步执行任务
  • std::launch::deferred 调用方线程上首次请求其结果时执行任务

在继续讲解 std::async 之前,需要了解一下 std::future。

std::future

std::future 来自于 std::async、std::packaged_task 或 std::promise 的返回值,其储存了传入这三个异步对象的函数的返回值,并一定程度上控制着这三个异步函数的状态。

std::future 有如下的 5 个公开成员函数:

  • get 返回结果
  • valid 检查 std::future 是否拥有共享状态
  • wait 等待结果变得可用
  • wait_for 等待结果,如果在指定的超时间隔后仍然无法得到结果,则返回 std::future_status
  • wait_until 等待结果,如果在已经到达指定的时间点时仍然无法得到结果,则返回 std::future_status

wait 系列函数可以参考之前的文章:C++ std::condition_variable 和 std::this_thread,不过有一个区别就是 std::future 的 wait_for 和 wait_until 函数会返回 std::future_status。

如果一个函数被延迟启动(使用 std::launch::deferred 策略),那么只有在使用 wait 或者 get 后才能确保函数的结果被计算,使用 wait_for 和 wait_until 是不可靠的。

std::future_status

std::future_status 是一个枚举,有三个值:

  • deferred 共享状态持有的函数正在延迟运行,结果将仅在显式请求时计算
  • ready 共享状态就绪
  • timeout 共享状态在经过指定的等待时间内仍未就绪

回到 std::async

参考 C++ 如果异步执行是必需的,指定 std::launch::async 策略

如果你使用第一种重载函数,即不指定发射策略,则具体策略由 实现和系统 决定:

  • 没有办法 预知函数 f 是否会和线程 t 并发执行,因为 f 可能会被调度为推迟执行。
  • 没有办法 预知函数 f 是否运行在——与调用 get 或 wait 函数的线程不同的——线程。如果那个线程是 t,这句话的含义是没有办法预知 f 是否会运行在与 t 不同的线程。
  • 没有办法 预知函数 f 是否执行完全,因为没有办法保证 fut 会调用 get 或 wait。

值得注意的是,如果使用 std::launch::deferred 策略,那么就意味着 std::async 包装的函数的执行会被推迟到使用 get 获取返回值,或者使用 wait 等待,换言之该行为会阻塞住调用 get 或者 wait 的线程。

问题一

默认发射策略的调度灵活性经常会混淆使用 thread_local 变量,这意味着如果 f 写或读这种线程储存期变量,预知取到哪个线程的本地变量是 不可能 的:


auto fut = std::async(f);// f使用的线程本地存储变量可能是独立的线程的,
                         // 也可能是 fut 调用 get 或 wait 的线程的

问题二

默认发射策略也影响了基于 wait 循环中的超时情况,因为对一个推迟(策略为 deferred)的任务调用 wait_for 或者 wait_until 会返回std::launch::deferred。这意味着下面的循环,看起来最终会停止,但是实际上可能会 一直运行


using namespace std::literals;   // 对于C++14的持续时间后缀,也可使用C++11的std::chrono

void f()           // f睡眠1秒后返回
{
    std::this_thread::sleep_for(1s);
}

auto fut = std::async(f);        // (概念上)异步执行f

while(fut.wait_for(100ms) !=     // 循环直到f执行结束
      std::future_status::ready) // 但这可能永远不会发生
{
    ...
}

如果 f 与调用 std::async 的线程并发执行(即使用 std::launch::async 发射策略),这里就没有问题(假设 f 能结束执行,不会一直循环或者等待)。但如果 f 被推迟(deferred),fut.wait_for 将总是返回 std::future_status::deferred。那永远也不会等于 std::future_status::ready,所以循环永远不会终止。

这种 bug 在开发或单元测试中很容易被忽略,因为它只会在机器负载很重时才会显现。在机器过载(over subscription)或线程池消耗完的状况下,任务很可能会被推迟(如果使用的是默认发射策略)。总之,如果不是过载或者线程耗尽,运行系统没有理由不调度任务并发执行。

解决方案

解决办法很简单:检查 std::async 返回的 future,看它是否把任务推迟,然后呢,如果真的是那样,就避免进入基于超时的循环。不幸的是,没有办法直接询问 future 的任务是否被推迟。取而代之的是,你必须调用一个基于超时的函数——例如 wait_for 函数。在这种情况下,你不用等待任何事情,你只是要看看返回值是否为 std::future_status::deferred,所以请相信这迂回的话语和用 0 来调用 wait_for:


auto fut = std::async(f);

if (fut.wait_for(0) == std::future_status::deferred)  // 如果任务被推迟
{
    ...     // fut使用get或wait来同步调用f
} else {                                              // 任务没有被推迟
    while(fut.wait_for(100ms) != 
         std::future_status::ready) {                 // 不可能无限循环(假定f会结束)

      ...    // 任务没有被推迟也没有就绪,所以做一些并发的事情直到任务就绪
    }

    ...        // fut就绪
}

考虑多种因素的结论是,只有满足了下面的条件,以默认发射策略调用的 std::async 才能正常工作:

  • 任务不需要与调用 get 或 wait 的线程并发执行。
  • 修改哪个线程的 thread_local 变量都没关系。
  • 要么保证 std::async 返回的 future 会调用 get 或 wait,要么你能接受任务可能永远都不执行。
  • 使用 wait_for 或 wait_until 的代码要考虑到任务推迟的可能性。
参考:
注:
  1. std::async 返回的 std::future 是一个纯右值,那么此时如果不选择去使用左值去移动性的接收返回值,则此纯右值会在下一条语句执行前被析构,这将导致调用 std::async 的线程被该析构过程阻塞,造成事实上的同步执行,而不是异步,因此会造成严重的设计缺陷。 

  2. 参考之前的文章 std-reference_wrapper 和 std-ref 


若无特殊声明,本人原创文章以 CC BY-SA 3.0 许可协议 提供。