Non-Profit, International

Spirit unsterblich.

C++ std::async

字数统计:2745

std::async是C++11开始支持多线程时加入的同步多线程构造函数,其弥补了std::thread没有返回值的问题,并在std::thread的基础上加入了更多的特性,使得多线程的使用更加灵活。

std::async

虽然任何函数的返回值都可以通过函数参数来进行传递,但是返回值明显是一种更清晰优雅的方式,并且std::thread并不能储存或者返回传入线程的函数的返回值,这就带来了一些不变:必须再构造一个函数将本来的返回值包装为函数参数传递。

由于无论是否接收返回值,函数都需要处理返回值,这会造成一定程度的性能损失,并且如果将任何无返回值的函数当作有返回值的函数进行包装,同样会造成性能的损失,所以标准库设计了std::async和std::thread来适应不同的情况。

此外,std::async在调度策略上和std::thread稍有不同,这使得std::async比std::thread更灵活一些。

std:: async大致有两种重载,其区别是第二种多了一个被叫做发射策略的参数:


// <future>
// cppreference.com
// 1
template<class Function, class... Args>// C++11起,C++17前
std::future<std::result_of_t<std::decay_t<Function>(std::decay_t<Args>...)>>
async(Function&& f, Args&&... args);

template<class Function, class... Args>// C++17起,C++20前
std::future<std::invoke_result_t<std::decay_t<Function>,
    std::decay_t<Args>...>>
    async(Function&& f, Args&&... args);

template<class Function, class... Args>// C++20起
[[nodiscard]]
std::future<std::invoke_result_t<std::decay_t<Function>,
    std::decay_t<Args>...>>
    async(Function&& f, Args&&... args);

// 2
template<class Function, class... Args >// C++11起,C++17前
std::future<std::result_of_t<std::decay_t<Function>(std::decay_t<Args>...)>>
async(std::launch policy, Function&& f, Args&&... args);

template<class Function, class... Args >// C++17起,C++20前
std::future<std::invoke_result_t<std::decay_t<Function>,
    std::decay_t<Args>...>>
    async(std::launch policy, Function&& f, Args&&... args);

template<class Function, class... Args >// C++20起
[[nodiscard]]
std::future<std::invoke_result_t<std::decay_t<Function>,
    std::decay_t<Args>...>>
    async(std::launch policy, Function&& f, Args&&... args);

由于C++17添加了std::invoke代替了std::result_of,所以C++11版本的std::async已经废弃

C++20增加了 [[nodiscard]] 属性用于在编译时对未接收返回值这一类行为发出警告,由于std::async的返回值具有特殊的意义 1,所以需要接收返回值,不能仅调用。如果不接收返回值,可以用std::thread代替。

与std::thread类似 2 ,std::async传入的函数的参数也同样会被去除引用,如果需要传入引用,则应使用std::ref

其中std::launch policy是std::async执行的策略,实际上默认是两个的枚举,不过实现并不重要:

  • std::launch::async 运行新线程,以异步执行任务
  • std::launch::deferred 调用方线程上首次请求其结果时执行任务

在继续讲解std::async之前,需要了解一下std::future。

std::future

std::future来自于std::async、std::packaged_task或std::promise的返回值,其储存了传入这三个异步对象的函数的返回值,并一定程度上控制着这三个异步函数的状态。

std::future有如下的5个公开成员函数:

  • get返回结果
  • valid检查std::future是否拥有共享状态
  • wait等待结果变得可用
  • wait_for等待结果,如果在指定的超时间隔后仍然无法得到结果,则返回std::future_status
  • wait_until等待结果,如果在已经到达指定的时间点时仍然无法得到结果,则返回std::future_status

wait系列函数可以参考之前的文章:C++ std::condition_variable和std::this_thread,不过有一个区别就是std::future的wait_for和wait_until函数会返回std::future_status。

如果一个函数被延迟启动(使用std::launch::deferred策略),那么只有在使用 wait或者 get后才能确保函数的结果被计算,使用wait_for和wait_until是不可靠的。

std::future_status

std::future_status是一个枚举,有三个值:

  • deferred共享状态持有的函数正在延迟运行,结果将仅在显式请求时计算
  • ready共享状态就绪
  • timeout 共享状态在经过指定的等待时间内仍未就绪

回到std::async

参考C++如果异步执行是必需的,指定std::launch::async策略

如果你使用第一种重载函数,即不指定发射策略,则具体策略由实现和系统决定:

  • 没有办法预知函数f是否会和线程t并发执行,因为f可能会被调度为推迟执行。
  • 没有办法预知函数f是否运行在——与调用get或wait函数的线程不同的——线程。如果那个线程是t,这句话的含义是没有办法预知f是否会运行在与t不同的线程。
  • 没有办法预知函数f是否执行完全,因为没有办法保证fut会调用get或wait。

值得注意的是,如果使用std::launch::deferred策略,那么就意味着std::async包装的函数的执行会被推迟到使用get获取返回值,或者使用wait等待,换言之该行为会阻塞住调用get或者wait的线程。

问题一

默认发射策略的调度灵活性经常会混淆使用thread_local变量,这意味着如果f写或读这种线程储存期变量,预知取到哪个线程的本地变量是不可能的:


auto fut = std::async(f);// f使用的线程本地存储变量可能是独立的线程的,
                         // 也可能是fut调用get或wait的线程的

问题二

默认发射策略也影响了基于wait循环中的超时情况,因为对一个推迟(策略为deferred)的任务调用wait_for或者wait_until会返回std::launch::deferred。这意味着下面的循环,看起来最终会停止,但是实际上可能会一直运行


using namespace std::literals;   // 对于C++14的持续时间后缀,也可使用C++11的std::chrono

void f()           // f睡眠1秒后返回
{
    std::this_thread::sleep_for(1s);
}

auto fut = std::async(f);        // (概念上)异步执行f

while(fut.wait_for(100ms) !=     // 循环直到f执行结束
      std::future_status::ready) // 但这可能永远不会发生
{
    ...
}

如果f与调用std::async的线程并发执行(即使用std::launch::async发射策略),这里就没有问题(假设f能结束执行,不会一直循环或者等待)。但如果f被推迟(deferred),fut.wait_for将总是返回std::future_status::deferred。那永远也不会等于std::future_status::ready,所以循环永远不会终止。

这种bug在开发或单元测试中很容易被忽略,因为它只会在机器负载很重时才会显现。在机器过载(over subscription)或线程池消耗完的状况下,任务很可能会被推迟(如果使用的是默认发射策略)。总之,如果不是过载或者线程耗尽,运行系统没有理由不调度任务并发执行。

解决方案

解决办法很简单:检查std::async返回的future,看它是否把任务推迟,然后呢,如果真的是那样,就避免进入基于超时的循环。不幸的是,没有办法直接询问future的任务是否被推迟。取而代之的是,你必须调用一个基于超时的函数——例如wait_for函数。在这种情况下,你不用等待任何事情,你只是要看看返回值是否为std::future_status::deferred,所以请相信这迂回的话语和用0来调用wait_for:


auto fut = std::async(f);

if (fut.wait_for(0) == std::future_status::deferred)  // 如果任务被推迟
{
    ...     // fut使用get或wait来同步调用f
} else {                                              // 任务没有被推迟
    while(fut.wait_for(100ms) != 
         std::future_status::ready) {                 // 不可能无限循环(假定f会结束)

      ...    // 任务没有被推迟也没有就绪,所以做一些并发的事情直到任务就绪
    }

    ...        // fut就绪
}

考虑多种因素的结论是,只有满足了下面的条件,以默认发射策略调用的std::async才能正常工作:

  • 任务不需要与调用get或wait的线程并发执行。
  • 修改哪个线程的thread_local变量都没关系。
  • 要么保证std::async返回的future会调用get或wait,要么你能接受任务可能永远都不执行。
  • 使用wait_for或wait_until的代码要考虑到任务推迟的可能性。
参考
  1. std::async返回的std::future是一个纯右值,那么此时如果不选择去使用左值去移动性的接收返回值,则此纯右值会在下一条语句执行前被析构,这将导致调用std::async的线程被该析构过程阻塞,造成事实上的同步执行,而不是异步,因此会造成严重的设计缺陷。

  2. 参考之前的文章std-reference_wrapper和std-ref


若无特殊声明,本人原创文章以 CC BY-SA 4.0许可协议 提供。