C++11的future和async等关键字

1.async和future的概念

std::asyncstd::future 是 C++11 引入的标准库功能,用于实现异步编程,使得在多线程环境中更容易处理并行任务。它们可以帮助你在不同线程中执行函数,并且能够方便地获取函数的结果。

在之前使用线程的时候,我们没有办法很好的获取到线程所执行函数的返回值。甚至更多时候,我们使用线程执行的都是不关心返回值的函数。如果真的想要获取线程函数的返回值,可以将一个指针作为输出型参数放入线程所执行的函数中。主执行流执行t.join()等待线程执行结束,并获取到这个返回值。

但这样并不是非常方便。于是C++11就引入了如上两个关键字来帮助我们获取到线程所执行函数的返回值。适用于异步执行某些耗时的函数,提高程序运行的效率:

  • 异步执行耗时函数
  • 主执行流干其他事情
  • 通过std::future获取到返回值
  • 继续向后执行

基本的并行概念在多线程部分都已经讲过了,这里就不多bb,直接上代码吧!

2.使用

2.1 std::launch

在使用std::async之前,还需要认识一个枚举类型 launch,在使用std::async的函数传参的时候会用到(这里先说一下,std::async是用来帮我们创建线程的)

1
enum class launch; // std::launch

cplusplus网站上,有这个枚举类型的释义,这里面只有俩值

image-20230819111941932

说一下这俩值的区别

  • launch::async,立即创建一个线程来执行目标函数
  • launch::deferred,不立即创建线程,而是等待调用std::futureget()函数时才调用(这个get函数是用来获取返回值的)

好了知道这个就够了哈!

2.2 std::result_of

这里还出现了另外一个关键字,就顺带也说说是干嘛的(其实我自己也不知道,现学现卖)

1
2
3
4
5
6
7
#include <type_traits> // 头文件

template <class Fn>
struct result_of;

template <class Fn, class... ArgTypes>
struct result_of<Fn(ArgTypes...)>;

以下是 std::result_of 的基本用法和概念:

  1. 使用 std::result_of 获取函数调用的返回类型: 你可以通过将函数类型和参数类型传递给 std::result_of 来推导函数调用的返回类型。这使得你可以在编译时获取函数调用的结果类型,而不需要手动指定它。
  2. 用法示例: 假设有一个函数 int add(int a, int b),你可以使用 std::result_of 来获取该函数在给定参数下的返回类型。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include <iostream>
#include <type_traits>

int add(int a, int b)
{
return a + b;
}

int main()
{
std::result_of<decltype(add) &(int, int)>::type result = add(3, 5);
std::cout << "Result: " << result << std::endl;

return 0;
}

运行结果如下

1
2
3
$ g++ test.cpp -o test -std=c++11
$ ./test
Result: 8

需要注意的是,使用decltype关键字来指定函数指针的时候,函数名和函数参数之间需要加上&,否则无法正确推导类型

1
2
decltype(add) &(int, int) // 正确
decltype(add) (int, int) // 错误

2.3 async

先来看看async函数的样本,第一个情况是不显示传入 std::launch,第二个函数重载是传入了std::launch作为启动策略

1
2
3
4
5
6
7
8
9
10
#include <future> // 头文件

// unspecified policy (1)
template <class Fn, class... Args>
future<typename result_of<Fn(Args...)>::type>
async (Fn&& fn, Args&&... args);
// specific policy (2)
template <class Fn, class... Args>
future<typename result_of<Fn(Args...)>::type>
async (launch policy, Fn&& fn, Args&&... args);

在cplusplus网站上,说到了第一种情况是由编译器自主决定到底是采用 std::launch::asyncstd::launch::deferred,这就需要根据平台和编译器实现以及调用逻辑的不同来具体分析了。所以不建议使用第一个,还是直接指定launch policy(翻译过来是启动策略)的会好一点。

所以只看第二个👇

1
2
3
template <class Fn, class... Args>
future<typename result_of<Fn(Args...)>::type>
async (launch policy, Fn&& fn, Args&&... args);

这里采用了可变模板参数来接收多个函数参数,类似于可变参数列表。这里还使用了typename关键字来告知编译器result_of<Fn(Args...)>::type是一个参数类型,需要在模板实例化了之后再去获取确定的类型。而class Fn是一个函数指针的模板变量。

  • 第一个参数是std::launch,上文已经提到过两个不同选项的区别了
  • 第二个参数是函数,直接丢函数名就可以了
  • 第三个参数是这个函数的参数,也是直接丢参数就可以了

如下是一个简单的调用示例(并非完整示例)

1
2
3
4
5
6
7
#include <future> // 头文件
// 函数
int add(int a, int b) {
return a + b;
}
// 调用
std::async(std::launch::async, add, 3, 5);

调用了这个函数后,CPP会帮我们创建一个线程来执行函数,并根据第一个启动参数的不同,决定啥时候创建这个线程。最终我们可以通过future获取到线程执行函数的返回值。

2.4 future

人如其名,这个类型是用来声明一个未来的变量的。因为std::async会帮我们创建一个线程来执行函数,此时该线程函数的返回值是未知的,这个未来变量就是提前的一个声明,当线程执行完毕函数并返回值的时候,这个变量的值才真正被初始化为我们真正需要的那个值。

1
2
3
4
5
template <class T>  future;
// specialization : T is a reference type (R&)
template <class R&> future<R&>;
// specialization : T is void
template <> future<void>;

其有如下几个成员函数

  • get:获取对应async所执行函数的返回值,如果函数没有执行完毕则阻塞等待
  • valid:bool,判断当前future类型到底有没有和一个async函数所对应
  • share:将future对象转成一个std::shared_future对象
  • wait:等待异步任务完成,但不获取结果
  • wait_for:等待异步任务完成,但有等待的时长(没等到就返回错误)
  • wait_until:等待异步任务完成,直到一个确定的时间(没等到就返回错误)

后面三个 wait 函数和 CPP 线程中的 wait 函数如出一辙。

2.5 share_future

share_future就好比share_ptr智能指针,其让future对象从单一所有权变成了多人可用。本来是一个只能坐一人的餐桌,现在变成了可以坐很多人的大桌子。

1
2
3
template <class T>  shared_future;
template <class R&> shared_future<R&>; // specialization : T is a reference type (R&)
template <> shared_future<void>; // specialization : T is void

成员函数和future完全一样(只不过么有share()函数)这里就不赘述了;

  • future是单人餐桌,一次只能有一个线程执行get函数;当get被执行后,这个future会失效
  • share_future是大桌子,所有人一起坐在这个桌子上等服务员上菜,互不干扰;

当一个函数的返回值需要在多个线程中被共享使用的时候,就可以用shared_future了。

2.5 测试

2.5.1 正常测试

如下代码是一个简单的使用示例,并且通过提供不同的std::launch启动策略,我们也能观察到不同的现象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include <iostream>
#include <future>
#include <type_traits> // result_of
#include <sys/unistd.h> //sleep

int add(int a, int b)
{
std::cout << "Add Thread " << std::this_thread::get_id() << " | Sleeping before add..." << std::endl;
sleep(4);
return a + b;
}

int main()
{
std::cout << "Main Thread " << std::this_thread::get_id() << " | Start" << std::endl;
//std::future<int> futureResult = std::async(std::launch::deferred, add, 3, 5); // deferred不会创建新线程
std::future<int> futureResult = std::async(std::launch::async, add, 3, 5); // 创建新线程
sleep(3);
std::cout << "Main Thread " << std::this_thread::get_id() << " | Waiting for result..." << std::endl;
int result = futureResult.get(); // 等待异步任务完成并获取结果
std::cout << "Main Thread " << std::this_thread::get_id() << " | Result: " << result << std::endl;
sleep(3);

return 0;
}

如果使用std::launch::async作为启动策略,可以看到,执行add函数的线程id和主线程的id是不同的,通过linux下的ps -aL命令也能观察到出现两个线程。可以用如下脚本来实时监控线程状态。

1
while :; do ps jax | head -1 && ps -aL |  grep -v grep;sleep 1; echo "########################"; done

std::launch::async下程序执行的输出结果

1
2
3
4
Main Thread 139869475694400 | Start
Add Thread 139869457655552 | Sleeping before add...
Main Thread 139869475694400 | Waiting for result...
Main Thread 139869475694400 | Result: 8

如果使用std::launch::deferred作为启动策略,则会发现这两个线程的id是完全相同的,这代表实际上其执行了并行的策略,程序就是在主执行流上面运行的,并没有新开一个线程来执行。这也符合deferred作为“延迟运行”的特性。

1
2
3
4
Main Thread 139789724776256 | Start
Main Thread 139789724776256 | Waiting for result...
Add Thread 139789724776256 | Sleeping before add...
Main Thread 139789724776256 | Result: 8

2.5.2 多线程get一个future

在如下代码中,我写了一个void future_get_func(std::future<int>& fu)的函数,尝试开一个线程来get,然后主执行流又get一次,看看会发生什么。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#include <iostream>
#include <future>
#include <functional>
#include <thread>
#include <type_traits> // result_of
#include <sys/unistd.h> // sleep

int add(int a, int b)
{
std::cout << "Add Thread " << std::this_thread::get_id() << " | Sleeping before add..." << std::endl;
sleep(4);
return a + b;
}

void future_get_func_shared(std::shared_future<int>& fu)
{
int result = fu.get(); // 等待异步任务完成并获取结果
std::cout << "Func Thread " << std::this_thread::get_id() << " | Result: " << result << std::endl;
sleep(2);
}

void future_get_func(std::future<int>& fu)
{
int result = fu.get(); // 等待异步任务完成并获取结果
std::cout << "Func Thread " << std::this_thread::get_id() << " | Result: " << result << std::endl;
sleep(2);
}


int main()
{
std::cout << "Main Thread " << std::this_thread::get_id() << " | Start" << std::endl;
//std::future<int> futureResult = std::async(std::launch::deferred, add, 3, 5); // 不会创建新线程
std::future<int> futureResult = std::async(std::launch::async, add, 3, 5); // 创建新线程
sleep(3);
std::cout << "Main Thread " << std::this_thread::get_id() << " | Waiting for result..." << std::endl;

// 尝试测试多线程get会发生什么

std::thread t1(future_get_func, std::ref(futureResult)); // 开个线程来get
t1.detach(); // 直接分离线程
sleep(1);

int result = futureResult.get(); // 等待异步任务完成并获取结果
std::cout << "Main Thread " << std::this_thread::get_id() << " | Result: " << result << std::endl;
sleep(3);

return 0;
}

需要注意的是,如下创建线程的传参必须要用std::ref包裹,来告知线程这是一个引用对象,否则编译会报错。因为 std::thread 要求参数可以在构造函数中被调用,而 std::future 并不能直接传递给 std::thread

1
2
std::thread t1(future_get_func, std::ref(futureResult)); //正确
std::thread t1(future_get_func, futureResult); //错误

编译通过后执行,会发现跑出来了一个std::future_error异常,代表我们在一个无效的future上调用了get函数。

1
2
3
4
5
6
7
Main Thread 139869577889600 | Start
Add Thread 139869559850752 | Sleeping before add...
Main Thread 139869577889600 | Waiting for result...
Func Thread 139869551458048 | Result: 8
terminate called after throwing an instance of 'std::future_error'
what(): std::future_error: No associated state
Aborted

记住了,std::future在调用了一次get之后将不再与对应的std::async关联,所以才会需要share_future的出现!

改成share_future再执行上面这套逻辑,就会发现成功跑起来了。

1
2
3
4
5
Main Thread 140369883957056 | Start
Add Thread 140369865918208 | Sleeping before add...
Main Thread 140369883957056 | Waiting for result...
Func Thread 140369857525504 | Result: 8
Main Thread 140369883957056 | Result: 8

2.5.3 异常处理

1
2
3
4
5
6
7
int add(int a, int b)
{
std::cout << "Add Thread " << std::this_thread::get_id() << " | Sleeping before add..." << std::endl;
sleep(4);
throw std::runtime_error("An error occurred");
return a + b;
}

如果async执行的函数中抛出了异常,那么这个异常将会被传回主执行流,可以在主执行流中被处理。而如果直接使用线程来执行这个函数,其异常不会被捕捉,而是会导致整个进程退出。

下图,使用async运行,成功打印出异常(捕获成功)

image-20230819141820459

下图,使用线程运行,进程退出

image-20230819141716295

2.6 launch::deferred的真正意义

如果std::launch::deferred是同步执行,这样写不是多此一举吗?为什么不直接在需要的地方调用函数,而是使用async?

NONONO 非也非也,和直接调用Add函数相比,用async+deferred策略这样的方式调用Add函数还是有些区别的:

  • 推迟执行:直接调用Add函数是立马执行,但是用async可以推迟到调用get的时候才执行
  • 延迟计算:有的时候我们并不是需要立马使用这个函数的返回值,所以就可以延迟一会再执行这个函数,先把函数的调用搞起来,后面只需要一个get就能获取到结果了(避免两个部位之间的代码较长,后续找不到我们需要的那两个变量的名字了)
  • 避免线程创建:并不是什么时候多线程都更好,有些时候创建一个线程的消耗还不如直接执行函数来的快(比如函数干的活很小的情况,比如一些O(1)的算法)

所以,这个关键字多少还是有点作用了。

2.7 future_error/errc/status

除了futureshare_future,还有如下几个类型

  1. std::future_error std::future_error 是一个异常类,用于表示与 std::future 相关的错误。当在使用 std::future 时出现错误,例如获取结果时异步任务抛出了异常,就会抛出 std::future_error 异常。它是一个标准异常类型,通常通过捕获异常对象来处理异步任务执行过程中的问题。
  2. std::future_errc std::future_errc 是一个枚举类型,用于表示 std::future_error 中的不同错误情况。这样的枚举类型是为了在处理异常时更加明确和方便。它包含了一系列可能的错误,如 broken_promise(promise 被破坏,即 promise 对象的 set_value 或 set_exception 被多次调用)和 future_already_retrieved(future 对象已经被获取过一次)等。
  3. std::future_status std::future_status 是一个枚举类型,用于表示 std::future 的状态。它描述了一个 std::future 对象的当前情况,指示异步任务是否已完成、是否有效等。std::future_status 包含三个值:ready(异步任务已完成,可以获取结果)、timeout(等待超时,即异步任务还未完成)、deferred(异步任务延迟执行)。

其中 wait_for 的返回值是 std::future_status,包含三种情况

valuedescription
future_status::readyThe shared state is ready: the producer has set a value or exception.
future_status::timeoutThe function waited for rel_time without the shared state becoming ready.
future_status::deferredThe shared state contains a deferred function.

第一个情况是future的目标函数已经执行完毕,可以去获取返回值了。

第二个情况是wait_for等待超时了。

第三个情况是future中包含的函数使用的策略是std::launch::deferred,此时wait_for函数是没有意义的,因为在这种情况下,函数只有在future.get()的时候才会被执行,wait_for会被直接返回。

3.为什么C++会出现futrue?

为啥要出一个future?直接用老办法不也可以这么玩吗?

这部分是GPT的回答

C++ 标准库引入 std::future 和相关的异步编程机制,是为了更好地支持并发编程和多线程环境。这些机制的出现有几个原因和动机:

  1. 并发性和性能提升: 在现代计算机体系结构中,多核处理器已经成为常态。为了充分利用这些多核资源,编写并发代码变得重要。std::future 提供了一种方式,可以在多个线程中同时执行任务,并且可以方便地获取任务的结果,从而允许程序在多核处理器上并行执行,提高性能。
  2. 任务分离: 在很多情况下,我们希望将一个大的任务拆分成多个子任务,在不同的线程中并行执行,然后合并子任务的结果。std::future 允许你在一个线程中等待另一个线程的任务完成,从而支持这种任务分离和并行执行的模式(其实最主要的还是获取线程所执行函数的返回值)。
  3. 避免阻塞: 在传统的同步编程中,如果某个操作需要等待,会导致线程阻塞。而异步编程机制允许线程继续执行其他操作,而不必等待一个潜在的耗时操作完成。std::future 允许你在一个线程中发起异步操作,并在需要的时候获取操作的结果,从而避免了不必要的阻塞。
  4. 异常处理: 在多线程环境中,处理异步任务的异常可能变得复杂,因为异步任务在不同的线程中执行。std::future 引入了异常传递机制,允许异步任务在执行过程中抛出异常,并将这些异常传递到等待结果的线程中。

总之,C++ 的 std::future 和相关的异步编程机制提供了一种更高级别、更方便的方式来处理多线程并发编程。这些机制使得开发者能够更容易地利用多核处理器的性能,并更灵活地设计并发代码,从而在面对并发和异步任务时能够更好地管理资源、提高效率和处理异常。

4.promise

4.1 概念

std::promise 是 C++ 标准库中用于在一个线程中产生结果,然后在另一个线程中获取结果的工具。它提供了一些成员函数来设置结果、处理异常以及获取关联的 std::future 对象等。

1
2
3
template <class T>  promise;
template <class R&> promise<R&>; // specialization : T is a reference type (R&)
template <> promise<void>; // specialization : T is void

下面是一些常用的 std::promise 成员函数及其用法:

  1. set_value 用于设置结果值。如果你已经通过 get_future() 获取了一个 std::future 对象,调用 set_value 将会使等待结果的线程被唤醒并获取结果。

    1
    2
    3
    4
    5
    std::promise<int> promiseObj;
    std::future<int> futureResult = promiseObj.get_future();

    // 在某个线程中设置结果值
    promiseObj.set_value(42);
  2. set_exception 用于设置异常,将在等待结果的线程中抛出。这允许你在产生结果的线程中处理异常情况。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    try {
    // 产生异常
    throw std::runtime_error("An error occurred");
    }
    catch (...)
    {
    // 将异常设置到 promise 对象中
    promiseObj.set_exception(std::current_exception());
    }
  3. get_future 返回与 std::promise 关联的 std::future 对象。通过这个 std::future,你可以在另一个线程中等待并获取结果。

    1
    2
    std::promise<int> promiseObj;
    std::future<int> futureResult = promiseObj.get_future();
  4. swap 交换两个 std::promise 对象的状态,包括关联的 std::future 对象和设置的结果。

    1
    2
    3
    4
    5
    std::promise<int> promise1;
    std::promise<int> promise2;

    // 交换两个 promise 对象的状态
    promise1.swap(promise2);
  5. valid 检查 std::promise 对象是否有效,即是否与一个 std::future 对象关联。

    1
    2
    3
    4
    5
    6
    7
    std::promise<int> promiseObj;

    if (promiseObj.valid()) {
    // promiseObj 有效
    } else {
    // promiseObj 无效
    }

还有下面这俩个成员函数,看名字也能猜出来它是干嘛的,就不多说了

这些成员函数允许你在一个线程中产生结果或异常,并在另一个线程中等待和处理这些结果或异常。它们为多线程编程提供了一种可靠的方式来传递数据和控制流。请注意,在使用 std::promise 时,你需要仔细处理异常和线程同步,以确保正确的结果传递。

4.2 示例

然后下头是一个基本的使用示例;你可以理解为promise就是一个用来承担线程所执行函数的参数和异常的一个变量,我们可以通过set_value并在主执行流中使用future.get来获取到这个值,也可以设置异常,并在主执行流中处理这个异常;

不过promisefuture一样,是一次性的,设置value和异常都只能设置一次,设置完毕后就不能再设置了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include <iostream>
#include <future>

void worker(std::promise<int>& p)
{
p.set_value(42); // 设置值
}

int main()
{
std::promise<int> promiseObj;
std::future<int> futureResult = promiseObj.get_future();

std::thread t(worker, std::ref(promiseObj)); // 通过线程执行
t.join();

int result = futureResult.get(); // 主线程中获取值
std::cout << "Result: " << result << std::endl;

return 0;
}

5.packaged_task

std::packaged_task 是 C++ 标准库中的一个类模板,用于将一个可调用对象(函数、函数对象或可调用成员函数)封装成一个可以异步执行的任务,并且可以通过 std::future 获取任务的返回值。它在多线程编程中起到了连接异步任务和线程间通信的桥梁作用。

std::packaged_task 的主要作用有以下几个方面:

  1. 封装任务: std::packaged_task 允许你将一个可调用对象封装成一个任务,这个任务可以在另一个线程中异步执行。你可以将函数、函数对象或可调用成员函数封装为一个 std::packaged_task 实例。
  2. 异步执行: 通过将 std::packaged_task 实例传递给一个 std::thread 或其他支持异步执行的机制,你可以在新的线程中执行封装的任务,而不需要显式创建线程函数。
  3. 获取返回值: std::packaged_task 可以与 std::future 一起使用,以获取异步任务的返回值。你可以通过 packaged_taskget_future 方法获取一个与任务关联的 std::future 对象,然后在适当的时候使用 std::futureget 方法来获取任务的返回值。
  4. 线程池: std::packaged_task 结合线程池的使用,可以更灵活地控制任务的执行方式。线程池可以预先创建一组线程,然后将封装好的任务分配给这些线程执行,避免了频繁创建和销毁线程的开销。

5.1 示例

下面是一个简单示例,演示了如何使用 std::packaged_task 来异步执行一个函数并获取其返回值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <iostream>
#include <thread>
#include <future>

int add(int a, int b)
{
return a + b;
}

int main()
{
std::packaged_task<int(int, int)> task(add);
std::future<int> future = task.get_future();

std::thread worker(std::move(task), 3, 5);
worker.join(); // 通过线程异步执行这个task

int result = future.get(); // 等待返回值

std::cout << "Result: " << result << std::endl;

return 0;
}

在这个例子中,std::packaged_task 封装了一个函数 add,然后通过 std::thread 异步执行,最后通过 std::future 获取异步任务的返回值。

5.2 make_ready_at_thread_exit

packaged_task的成员函数中,主要还是这个需要单独说明;先看如下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <iostream>
#include <thread>
#include <future>

int main()
{
std::packaged_task<int()> task([](){ return 42; });
std::future<int> future = task.get_future();

std::thread worker([&task]() {
// 模拟一些工作
std::this_thread::sleep_for(std::chrono::seconds(2));

// 设置任务结果值
task.make_ready_at_thread_exit();
});

worker.detach(); // 分离线程,不等待其结束

// 等待任务结果
int result = future.get();

std::cout << "Result: " << result << std::endl;

return 0;
}

我们让packaged_task执行的是一个return 42的函数,而线程里面还会进行其他处理。

make_ready_at_thread_exit()的作用,就是确认当前的worker线程已经干完自己的活了,可以执行packaged_task封装的函数了!

相当于是一个确认packaged_task中封装的任务到底在什么时候执行的一个函数。调用这个函数的时候,就会开始执行其包装的异步函数,并返回结果给future.get()的执行流

The end

收工