C++ std::promise
由于std::async在理论和实现上并不能满足我们的需求,因为我们常常在异步操作后对异步操作的结果进行一个同步,而std::async并不能提供这点需求,甚至由于默认发射策略导致std::async很有可能退化为单线程顺序运行,所以在std::async的基础上提供了std::promise。
std::promise可以保存某一类型T的值,该值通常使用std::future储存(可能传递给另外一个线程),因此std::promise也提供了一种线程同步的手段 1 。
可以通过get_future来获取与该promise对象相关联的future对象,调用该函数之后,两个对象共享相同的共享状态(Shared State)。
- promise是异步Provider,它可以在某一时刻设置共享状态的值。
- future可以异步返回共享状态的值,或者在必要的情况下阻塞调用者并等待共享状态标志变为ready,然后才能获取共享状态的值。
#include <iostream>
#include <future>
void print_int(std::future<int>& fut) {
int x = fut.get(); // 获取共享状态的值
std::cout << "value: " << x << std::endl; // 打印value: 10
}
int main()
{
std::promise<int> prom;
std::future<int> fut = prom.get_future();
std::thread t(print_int, std::ref(fut));
prom.set_value(10); // 设置共享状态的值, 此处和线程t保持同步
// 若不设置共享状态的值,那么t会被一直阻塞
// 也可选择阻塞一定时间后若仍未设置值,进行异常处理
t.join();
return 0;
}
future本质上是我们发起的一个并发操作,而promise本质上则是并发操作的回调。我们可以通过future对象等待该操作和获取操作的结果,而promise对象则负责写入返回值并通知我们 2 。
具体的实现中,future与promise会有指向同一个共享的状态对象Shared State的共享指针,当promise对象接受到返回值或者错误之后,通过条件变量通知另一端等待的future对象。future对象则可以通过Shared State对象中的状态,来判断接收到回调之后是继续处理业务还是处理错误。
std::promise有如下成员函数:
- get_future返回与promise关联的future
- set_value设置结果为指定值
- set_value_at_thread_exit设置结果为指定值,同时仅在线程退出时分发提醒
- set_exception设置结果为指示异常
- set_exception_at_thread_exit设置结果为指示异常,同时仅在线程退出时分发提醒
#include <vector>
#include <thread>
#include <future>
#include <numeric>
#include <iostream>
#include <chrono>
void accumulate(std::vector<int>::iterator first,
std::vector<int>::iterator last,
std::promise<int> accumulate_promise)
{
int sum = std::accumulate(first, last, 0);
accumulate_promise.set_value(sum); // 提醒future已就绪
}
int main()
{
// 演示用promise<int> 在线程间传递结果。
std::vector<int> numbers = { 1, 2, 3, 4, 5, 6 };
std::promise<int> accumulate_promise;
std::future<int> accumulate_future = accumulate_promise.get_future();
std::thread work_thread(accumulate, numbers.begin(), numbers.end(),
std::move(accumulate_promise));
// future::get() 将等待直至该future拥有合法结果并取得它
// 无需在get() 前调用wait
//accumulate_future.wait(); // 等待结果
std::cout << "result=" << accumulate_future.get() << std::endl;
work_thread.join(); // wait for thread completion
}
由于std::future所引用的共享状态不与另一异步返回对象共享,所以std::promise也具有同样的性质,所以std::promise一般使用移动构造的方式来传递(原代码中使用了引用,但是这并不值得展示)。
C++并发编程实战 第二版4.2.3使用promises中提供了一个简化的使用场景:
void process_connections(connection_set& connections)
{
while (!done(connections))
{
for (connection_iterator
connection = connections.begin(), end = connections.end();
connection != end;
++connection)
{
if (connection->has_incoming_data())
{
data_packet data = connection->incoming();
std::promise<payload_type>& p =
connection->get_promise(data.id);
p.set_value(data.payload);
}
if (connection->has_outgoing_data())
{
outgoing_packet data =
connection->top_of_outgoing_queue();
connection->send(data.payload);
data.promise.set_value(true);
}
}
}
}
假设connections是一个网络连接池,自旋锁不断通过循环检查是否整个连接池都已经结束连接,并每次遍历整个连接池。
每一个connection就是一个连接,如果等待传入的数据,那么就构造一个promise,然后设置promise的共享状态的值。
如果有传出数据(通常同时可能存在多个等待传出的数据),则从队列里得到顶部数据,然后构造一个promise发送数据。
显然的,promise总是积极的创造一个线程。
以下是向promise传递一个异常的示例 1 :
#include <iostream> // std::ios
#include <thread> // std::thread
#include <future> // std::promise, std::future
#include <exception> // std::exception, std::current_exception
void get_int(std::promise<int> prom) {
int x;
std::cout << "Please, enter an integer value: ";
std::cin.exceptions(std::ios::failbit); // throw on failbit
try {
std::cin >> x; // sets failbit if input is not int
prom.set_value(x);
}
catch (std::exception&) {
prom.set_exception(std::current_exception());
}
}
void print_int(std::future<int> fut) {
try {
int x = fut.get();
std::cout << "value: " << x << std::endl;
}
catch (std::exception& e) {
std::cout << "[exception caught: " << e.what() << "]" << std::endl;
}
}
int main()
{
std::promise<int> prom;
std::future<int> fut = prom.get_future();
std::thread th1(get_int, std::move(prom));
std::thread th2(print_int, std::move(fut));
th1.join();
th2.join();
return 0;
}
其中get_int负责接收一个整数的输入,如果输入不正确则设置一个异常,print_int用于捕获promise的输入,如果promise储存了一个异常,则输出异常信息。
下面是一个考虑更加周全的例子 3 :
#include <thread>
#include <iostream>
#include <future>
int main()
{
std::promise<int> p;
std::future<int> f = p.get_future();
std::thread t([&p] {
try {
// 可能抛出的代码
throw std::runtime_error("Example");
}
catch (...) {
try {
// 存储任何抛出的异常于promise
p.set_exception(std::current_exception());
}
catch (...) {} // set_exception也可能抛出异常
}
});
try {
std::cout << f.get();
}
catch (const std::exception& e) {
std::cout << "Exception from the thread: " << e.what() << std::endl;
}
t.join();
}
-
[std::promise<R>::set_exception](https://zh.cppreference.com/w/cpp/thread/promise/set_exception) ↩