Non-Profit, International

Spirit unsterblich.

C++ std::promise

字数统计:1852 blog

由于 std::async 在理论和实现上并不能满足我们的需求,因为我们常常在异步操作后对异步操作的结果进行一个同步,而 std::async 并不能提供这点需求,甚至由于默认发射策略导致 std::async 很有可能退化为单线程顺序运行,所以在 std::async 的基础上提供了 std::promise。

std::promise 可以保存某一类型 T 的值,该值通常使用 std::future 储存(可能传递给另外一个线程),因此 std::promise 也提供了一种线程同步的手段 1

可以通过 get_future 来获取与该 promise 对象相关联的 future 对象,调用该函数之后,两个对象共享相同的共享状态(Shared State)。

  • promise 是异步 Provider,它可以在某一时刻设置共享状态的值。
  • future 可以异步返回共享状态的值,或者在必要的情况下阻塞调用者并等待共享状态标志变为 ready,然后才能获取共享状态的值。

#include <iostream>
#include <future>

void print_int(std::future<int>& fut) {
    int x = fut.get(); // 获取共享状态的值
    std::cout << "value: " << x << std::endl; // 打印 value: 10
}

int main()
{
    std::promise<int> prom;
    std::future<int> fut = prom.get_future();
    std::thread t(print_int, std::ref(fut));
    prom.set_value(10); // 设置共享状态的值, 此处和线程t保持同步
    // 若不设置共享状态的值,那么t会被一直阻塞
    // 也可选择阻塞一定时间后若仍未设置值,进行异常处理
    t.join();
    return 0;
}

future 本质上是我们发起的一个并发操作,而 promise 本质上则是并发操作的回调。我们可以通过 future 对象等待该操作和获取操作的结果,而 promise 对象则负责写入返回值并通知我们 2

具体的实现中,future 与 promise 会有指向同一个共享的状态对象 Shared State 的共享指针,当promise 对象接受到返回值或者错误之后,通过条件变量通知另一端等待的 future 对象。future 对象则可以通过 Shared State 对象中的状态,来判断接收到回调之后是继续处理业务还是处理错误。

std::promise 有如下成员函数 3

  • get_future 返回与 promise 关联的 future
  • set_value 设置结果为指定值
  • set_value_at_thread_exit 设置结果为指定值,同时仅在线程退出时分发提醒
  • set_exception 设置结果为指示异常
  • set_exception_at_thread_exit 设置结果为指示异常,同时仅在线程退出时分发提醒

#include <vector>
#include <thread>
#include <future>
#include <numeric>
#include <iostream>
#include <chrono>

void accumulate(std::vector<int>::iterator first,
    std::vector<int>::iterator last,
    std::promise<int> accumulate_promise)
{
    int sum = std::accumulate(first, last, 0);
    accumulate_promise.set_value(sum);  // 提醒 future 已就绪
}

int main()
{
    // 演示用 promise<int> 在线程间传递结果。
    std::vector<int> numbers = { 1, 2, 3, 4, 5, 6 };
    std::promise<int> accumulate_promise;
    std::future<int> accumulate_future = accumulate_promise.get_future();
    std::thread work_thread(accumulate, numbers.begin(), numbers.end(),
        std::move(accumulate_promise));

    // future::get() 将等待直至该 future 拥有合法结果并取得它
    // 无需在 get() 前调用 wait
    //accumulate_future.wait();  // 等待结果
    std::cout << "result=" << accumulate_future.get() << std::endl;
    work_thread.join();  // wait for thread completion
}

由于 std::future 所引用的共享状态不与另一异步返回对象共享 4 ,所以 std::promise 也具有同样的性质,所以 std::promise 一般使用 移动构造 的方式来传递(原代码中使用了引用,但是这并不值得展示)。

C++ 并发编程实战 第二版 4.2.3 使用 promises 中提供了一个简化的使用场景:


void process_connections(connection_set& connections)
{
    while (!done(connections))
    {
        for (connection_iterator
            connection = connections.begin(), end = connections.end();
            connection != end;
            ++connection)
        {
            if (connection->has_incoming_data())
            {
                data_packet data = connection->incoming();
                std::promise<payload_type>& p =
                    connection->get_promise(data.id);
                p.set_value(data.payload);
            }
            if (connection->has_outgoing_data())
            {
                outgoing_packet data =
                    connection->top_of_outgoing_queue();
                connection->send(data.payload);
                data.promise.set_value(true);
            }
        }
    }
}

假设 connections 是一个网络连接池,自旋锁不断通过循环检查是否整个连接池都已经结束连接,并每次遍历整个连接池。

每一个 connection 就是一个连接,如果等待传入的数据,那么就构造一个 promise,然后设置 promise 的共享状态的值。

如果有传出数据(通常同时可能存在多个等待传出的数据),则从队列里得到顶部数据,然后构造一个 promise 发送数据。

显然的,promise 总是积极的创造一个线程。

以下是向 promise 传递一个异常的示例 1


#include <iostream>       // std::ios
#include <thread>         // std::thread
#include <future>         // std::promise, std::future
#include <exception>      // std::exception, std::current_exception

void get_int(std::promise<int> prom) {
    int x;
    std::cout << "Please, enter an integer value: ";
    std::cin.exceptions(std::ios::failbit);   // throw on failbit
    try {
        std::cin >> x;                         // sets failbit if input is not int
        prom.set_value(x);
    }
    catch (std::exception&) {
        prom.set_exception(std::current_exception());
    }
}

void print_int(std::future<int> fut) {
    try {
        int x = fut.get();
        std::cout << "value: " << x << std::endl;
    }
    catch (std::exception& e) {
        std::cout << "[exception caught: " << e.what() << "]" << std::endl;
    }
}

int main()
{
    std::promise<int> prom;
    std::future<int> fut = prom.get_future();

    std::thread th1(get_int, std::move(prom));
    std::thread th2(print_int, std::move(fut));

    th1.join();
    th2.join();
    return 0;
}

其中 get_int 负责接收一个整数的输入,如果输入不正确则设置一个异常,print_int 用于捕获 promise 的输入,如果 promise 储存了一个异常,则输出异常信息。

下面是一个考虑更加周全的例子 5


#include <thread>
#include <iostream>
#include <future>

int main()
{
    std::promise<int> p;
    std::future<int> f = p.get_future();

    std::thread t([&p] {
        try {
            // 可能抛出的代码
            throw std::runtime_error("Example");
        }
        catch (...) {
            try {
                // 存储任何抛出的异常于 promise
                p.set_exception(std::current_exception());
            }
            catch (...) {} // set_exception 也可能抛出异常
        }
        });

    try {
        std::cout << f.get();
    }
    catch (const std::exception& e) {
        std::cout << "Exception from the thread: " << e.what() << std::endl;
    }
    t.join();
}

注:

若无特殊声明,本人原创文章以 CC BY-SA 3.0 许可协议 提供。