C++异步:structured concurrency实现解析

导语| 本篇我们将介绍整个libunifex structure concurrency的实现思路,并结合一部分具体的cpo实现,对这部分的实现做深入分析。

前篇《C++异步:libunifex中的concepts详解!》中我们介绍了libunifex作为框架部分的concept设计,本篇我们将在这个基础上,继续介绍整个libunifex structure concurrency的实现思路,并结合一部分具体的cpo实现,对这部分的实现做深入分析。

一、Structured Concurrency

我们以一个简短的示例代码开启本章的内容:

single_thread_context tcontext;int count = 0;schedule(tcontext.get_scheduler)  | then([&] { ++count; })  | sync_wait;

这段代码的表达方式前面我们也介绍过,主要使用了ranges类同的pipeline表达,我们可以简单将这种表达方式看成是C++的一种特殊LINQ实现,一个专有的DSL,当然,作为一个DSL来说,就execution的整体设计而言,它被赋予了一些专有的特性和意义:所以,如果从一个DSL的角度来看execution的结构化 concurrency,我们容易得出类似以下的观点,对于execution的pipeline表达:

C++异步:structured concurrency实现解析
  • DSL定义(BNF组成)-首先是范式的组成,如上图所示,业务使用结构化并发表达的时候,整个范式是由Concurrency Pipeline::= Sender Factory { '|' Sender Adapter } '|' Receiver组成的。

  • Compiler-通常情况我们可以将|操作以及connect加起来成是编译过程, 借由Compiler Time的特性支持, 我们可以通过connect产生runtime所需的OperationState。

  • Execute-这个阶段就很自然了,OperationState的start就是DSL本身执行的入口点,当然,执行结果最后是通过:set_value,set_error,set_done这几个receiver cpos来传递的。

本篇中我们将以这种思路结合一些Sender Factory,Send Adapter节点,以及这种结果处理节点的具体实现来展开.sync_wait。

二、Sender Factory

各种Sender Factory cpos用于产生各类sender,前面我们也介绍过, sender最大的特征就是会触发set_value,set_error,set_done这几个用于结束通知的receiver cpos。此处我们以just 为例,来看一下一个Sender Factory需要包含的实现内容,在后续文章中我们会再介绍另外一个schedule cpo的实现。

(一)just实现解析

首先just(values...)的语义, 就是生成一个sender, 该sender可以向后续的节点通过receiver cpos传递values..., 我们具体来看一下libunifex的just实现, 会比大家想的复杂一些, 这主要还是因为execution实现的整体思路就是在尝试定义一个DSL, 然后这个DSL本身是自恰的, 比如对于just来说, 必然会包含以下几个部分:

  • sender生产方法-just cpo本身。

  • just::sender的实现-具体的sender实现。

  • 相关的OperationState-节点可参与异步操作执行,则必然可以通过connect来产生其OperationState对象,最后对start作出响应。

我们来分别看一下这三部分的具体代码实现:

  • sender生产方法

constexpr auto just_cpo::operator(Values&&... values) const { return _just::sender{std::in_place, (Values&&)values...};}

just的入口定义比较简单,主要是根据输入的变参values...构造一个_just::sender{}对象并返回。这就是我们下一节要介绍的sender实现。

bind_back版的operator重载主要用于pipeline组织,代码大量雷同,本篇将统一忽略,方便源码的阅读,有兴趣的读者可以自行翻阅相关的实现。

  • just::sender的实现

_just::sender<> 其实真实类型是 _just::_sender<>::type,这个只是libunifex惯用的一种封装方式,具体的实现如下:

template class just::sender { private: std::tuple values_; public: template  using value_types = Variant>;
template using error_types = Variant;
static constexpr bool sends_done = false;
template explicit sender(std::in_place_t, Values2&&... values) : values_((Values2 &&) values...) {}
template friend auto tag_invoke(tag_t, This&& that, Receiver&& r) -> operation { return {static_cast(that).values_, static_cast(r)}; }};

这是一个很标准的sender实现,如我们在《C++异步:libunifex中的concepts详解!》中介绍的一样。首先是sender traits需要的类型定义部分,决定了sender可能触发的receiver cpos的参数和类型:

 template  using value_types = Variant>;
template using error_types = Variant;
static constexpr bool sends_done = false;

其次是通过tag_invoke定义的connect实现:

 template friend auto tag_invoke(tag_t, This&& that, Receiver&& r) -> operation { return {static_cast(that).values_, static_cast(r)}; }

此处返回的operation<>也是我们下一节要介绍的just专用的OperationState实现。

  • 相关的OperationState

template struct just::operation::type { std::tuple values_; Receiver receiver_;
void start & noexcept { try { std::apply( [&](Values&&... values) { execution::set_value((Receiver &&) receiver_, (Values &&) values...); }, std::move(values_)); } catch (...) { execution::set_error((Receiver &&) receiver_, std::current_exception); } }};

抛开绕来绕去的alias name来说, 这个OperationState的实现很简单, 存储了传入的values...和connect时关联的Receiver, 并且在start时向存储的Receiver调用set_value传递存储下来的values...

(二)本章小结

对于一个sender factory类型的cpo来说,我们始终可以将其实现简单的分成以下几部分:

  • sender生产方法-如just。

  • sender的实现-具体的sender实现。

  • 相关的OperationState-节点可参与异步操作执行,则必然可以通过connect来产生其OperationState对象,最后对start作出响应。因为用于产生一个sender,这类节点一般都出现在structured concurrency描述的最左侧,负责作为后续节点的数据来源,如最开始的示例代码中那样。

三、Sender Adapter

首先我们知道Sender Adapter是作为中间节点存在的:

Concurrency Pipeline ::= Sender Factory { '|' Sender Adapter } '|' Receiver

我们先来看一下Sender Adapter语义层面的特征:

  • Sender Adapter是Sender的包装器,接收前置Sender对象后形成新的Sender对象。

  • 新的Sender对象有自己的异步类型定义,同样也通过receiver cpos向后续节点传递异步操作结果。

Sender Adapter其实就像一个filter,它对原始的异步处理结果进行加工,产生新的结果,大致的工作情况如下图所示:

C++异步:structured concurrency实现解析

如上图所示,对于一个Sender Adapter定义,至少会包含两个对象:

Internal Receiver-用于接收Previous Sender发送的结果,处理自己的逻辑后再将结果发往后续节点。

Internal Sender-SenderAdapter(Sender,args...)形成一个新的Sender,连接到后续节点。当然,还会有一个用于作为入口的cpo。

我们具体以比较常用的then的实现来具体看一下libunifex中一个典型的Sender Adapter是如何实现的:

(一)then cpo

then节点的作用是从上一个节点中获取异步返回值后,用该返回值作为输入值调用传入then节点的函数后,将调用结果作为异步操作的结果返回后续节点,简单的图示如下:

C++异步:structured concurrency实现解析

对应的代码实现为:

templateauto then_cpo::operator(Sender&& predecessor, Func&& func) const -> _result_t { return execution::tag_invoke(_fn{}, (Sender&&)predecessor, (Func&&)func);}
templateauto then_cpo::operator(Sender&& predecessor, Func&& func) const -> _result_t { return _then::sender{(Sender &&) predecessor, (Func &&) func};}};

then调用的处理区分了传入的Func是否可tag_invoke的判断,我们直接看最通常的情况,传入的是普通函数:

templateauto then_cpo::operator(Sender&& predecessor, Func&& func) const -> _result_t { return _then::sender{(Sender &&) predecessor, (Func &&) func};}

最后返回的是一个_then命名空间下定义的_then::sender<>对象,并且这个对象将前置的Sender对象和传入的func作为构造这个对象的参数。我们来看一下这个sender的具体实现:

(二)then的Internal Sender实现

template struct then::sender::type { Predecessor pred_; Func func_; private:
template using result = /*unspecified*/;public:
template using value_types = /*unspecified*/;
template using error_types = /*unspecified*/;
static constexpr bool sends_done = sender_traits::sends_done;
template using receiver_t = receiver_t;

template friend auto tag_invoke(tag_t, Sender&& s, Receiver&& r) -> connect_result_t, receiver_t>> { return execution::connect( static_cast(s).pred_, receiver_t>{ static_cast(s).func_, static_cast(r)}); }};

跟我们前面看到的just内的sender实现一样,包含了基本的sender types定义,以及sender相关的connecttag_invoke定义:

 template friend auto tag_invoke(tag_t, Sender&& s, Receiver&& r) -> connect_result_t, receiver_t>> { return execution::connect( static_cast(s).pred_, receiver_t>{ static_cast(s).func_, static_cast(r)}); }

我们可以看到,对then的sender进行connect的时候,真正发生connect的是我们之前在then(Previous Sender,Func)调用时缓存下来的上一节点,以及新构建出的receiver_t<>对象,这个对象也是Func真正被执行的地方,同时这个对象也保存了后续的Reciver节点,方便向后续节点传递异步执行结果。

(三)then的Internal Receiver实现

template struct then::receiver_t::type { Func func_; Receiver receiver_;
template void set_value(Values&&... values) && noexcept { using result_type = std::invoke_result_t; if constexpr (std::is_void_v) { if constexpr (noexcept(std::invoke( (Func &&) func_, (Values &&) values...))) { std::invoke((Func &&) func_, (Values &&) values...); execution::set_value((Receiver &&) receiver_); } else { try { std::invoke((Func &&) func_, (Values &&) values...); execution::set_value((Receiver &&) receiver_); } catch (...) { execution::set_error((Receiver &&) receiver_, std::current_exception); } } } else { if constexpr (noexcept(std::invoke( (Func &&) func_, (Values &&) values...))) { execution::set_value( (Receiver &&) receiver_, std::invoke((Func &&) func_, (Values &&) values...)); } else { try { execution::set_value( (Receiver &&) receiver_, std::invoke((Func &&) func_, (Values &&) values...)); } catch (...) { execution::set_error((Receiver &&) receiver_, std::current_exception); } } } }
template void set_error(Error&& error) && noexcept { execution::set_error((Receiver &&) receiver_, (Error &&) error); }
void set_done && noexcept { execution::set_done((Receiver &&) receiver_); }};

到receiver的实现这里就很自然了,通过set_value接受前面的Sender传递过来的结果,将结果作为输入参数调用Func后,再通过set_value向后续节点传递Func的返回值。

(四)本章小结

对于一个Sender Adapater类型的cpo来说,主要需要完成以下几件事情:

  • 入口cpo(如then)-完成对前置Sender的接收和需要的参数的接收处理,创建一个专用的Internal Sender对象并返回。

  • Internal Sender-存储前置Sender和需要的参数,并实现tag_invoke(tag_t)用于构建InternalReceiver,并将实际的connect操作重定向到保存下来的前置Sender和新创建的InternalReceiver上。

  • InternalReceiver-获取前置Sender的异步结果,并在处理自身逻辑后,将最终的结果返回给后续节点。整体上来说可以将这看成一种wrapper机制, set_value是拦截点,在拦截点上插入自身逻辑,最后依然还是通过set_value返回下一步需要的异步执行结果。

四、sync_wait_r与sync_wait

libunifex的实现并没有提供一个类似default receiver的节点,但提供了工具节点sync_wait_r和sync_wait,当然,除了通过这种方式来处理返回结果,你也可以自行实现一个自己的Receiver来接收异步返回值。本章我们主要介绍sync_wait_r和sync_wait的实现,通过这两者,我们也能更深入理解libunifex常规状态下是如何发起一个异步操作执行并接收其返回结果的。

(一)cpo入口

sync_wait:

templateauto sync_wait_cpo::operator(Sender&& sender) const -> std::optional { using Result = /*...*/; return _sync_wait::_impl((Sender&&) sender);}

sync_wait_r:

template decltype(auto) sync_wait_r_cpo::operator(Sender&& sender) const { using Result2 = non_void_t>>; return _sync_wait::_impl((Sender&&) sender);}

两者代码高度相似

  • 输入参数都是Sender。

  • 利用_sync_wait::_impl<>来完成具体的实现。

两者的差异:

  • sync_wait_r允许业务侧指定返回值的类型,不支持pipeline操作,一般直接以sync_wait_r(Sender)的方式来使用。

  • sync_wait 直接使用传入的Sender来推导返回值类型,可以作为pipeline的终结节点使用,如just(1)|sync_wait。

我们接下来看一看sync_wait和sync_wait_r都引用的_sync_wait::_impl的实现:

(二)sync_wait::_impl的实现

auto _impl(Sender&& sender) { manual_event_loop ctx; // Store state for the operation on the stack. auto operation = connect( (Sender&&)sender, _sync_wait::receiver_t{promise, ctx});
start(operation);
ctx.run;
// ... (retsult handling here)}

整体实现比较简洁,我们主要关注几点:

  • _impl最终的返回值类型为std::optional

  • 整个函数的实现完成了前面的们提到的connect产生OperationState,再执行start的过程。

  • connect时与传入的Sender进行连接的Receiver是自定义的_sync_wait::_receiver::type类型。

  • ctx.run等待最终执行的完成(相关详细分析可参考后续文章)。

  • 根据promise.state_记录的类型对返回值进行处理(正确返回值还是抛异常)。

剩下的就只有_sync_wait::receiver_t<>的实现了,我们接着来看一下这部分的实现:

(三)_sync_wait::receiver_t<>的实现

template struct sync_wait::receiver_t { promise& promise_; manual_event_loop& ctx_;
template void set_value(Values&&... values) && noexcept { try { execution::activate_union_member(promise_.value_, (Values&&)values...); promise_.state_ = promise::state::value; } catch (...) { execution::activate_union_member(promise_.exception_, std::current_exception); promise_.state_ = promise::state::error; } signal_complete; }
void set_error(std::exception_ptr err) && noexcept { execution::activate_union_member(promise_.exception_, std::move(err)); promise_.state_ = promise::state::error; signal_complete; }
void set_error(std::error_code ec) && noexcept { std::move(*this).set_error(make_exception_ptr(std::system_error{ec, "sync_wait"})); }
template void set_error(Error&& e) && noexcept { std::move(*this).set_error(make_exception_ptr((Error&&)e)); }
void set_done && noexcept { promise_.state_ = promise::state::done; signal_complete; } private: void signal_complete noexcept { ctx_.stop; }};

这就是一个很标准的receiver实现,利用set_value,set_error,set_done的重载来完成对前置Sender执行结果的获取,通过前面的代码我们容易知道,如果是无异常的状态,则正常的通过std::optional<>来返回执行结果,否则抛出异常。另外,代码中的signal_complete用于通知_impl函数中的 ctx.run返回,最终向用户返回异步操作的结果。

五、总结

本篇我们从libunifex的structured concurrency设计开始,简述了整套execution整套DSL的组织和执行的逻辑,并结合具体的:

  • Sender Factory实现举例-just。

  • Sender Adapter实现举例-then。

  • 终结节点-sync_wait和sync_wait_r加深大家对execution各类节点实现的理解。

structured concurrency的设计是整个库的核心,理解了它,也能方便我们理解一些基础节点的实现,也为自己定制更多业务化的节点提供良好的基础。这也是为什么execution库也被当成一个库作者向的特性的原因,与其说它是一个异步库,不如说它在尝试定义一套从DSL到执行态都比较完备的c++异步专用语言。当然,后者的学习成本比学习一个库明显会高出比较多。

参考资料:

1.libunifex源码库

作者简介

沈芳

腾讯后台开发工程师

IEG研发效能部开发人员,毕业于华中科技大学。目前负责CrossEngine Server的开发工作,对GamePlay技术比较感兴趣。

本文由高可用架构转载。技术原创及架构实践文章,欢迎通过公众号菜单「联系我们」进行投稿。

活动预告

C++异步:structured concurrency实现解析
发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章