昨天我们看完了理论相关的
今天我们继续往下
PS:今天的内容依旧是理论上的,如果不感兴趣,只是想学怎么用,那就不用看了。
under the hood
):执行FutureS
和任务(Tasks
)我们接下来会了解到幕后是如何安排异步任务和执行Future S
的。
这些知识对我们理解async/await
是如何工作的来说是相当有用的。
Future
这个trait
是rust
中异步编程的核心。一个Future
是一异步运算,可以提供一个值(包括()
)。
一个简单的Future trait
大概长这样
你可以调用poll
这个方法,驱动这个future
继续执行直到完成。
当一个future
完成时,它会返回一个Poll::Ready(result)
类型的值。
而如果还没完成,它返回值的类型则是Poll::Pending
,并且安排wake()
函数在完成的时候被调用,wake
被调用之后通知执行器(executor
)驱动Future
再重新调用poll
方法。
看不懂?看不懂就对了,我也看不懂。。。
我们先mark
,后面看例子慢慢来。
不过从这里可以看出这个wake
方法是Future
和执行器的桥梁,没有它执行器不知道什么时候通知这个Future
去执行poll
,那么如果还想去完成,就得连续的通知Future
调用poll
方法。
不过这种方式也不是没有,比如前端主动请求某个状态(基于http
),只能通过定时轮询获取等。
另外一个例子就是socket
,它获取的数据有可能是无效的,当获取的值是有效的时候就会返回Poll::Ready(data)
,而如果值无效就会被blocked
堵塞直到有值,这个堵塞的过程包括了注册wake
方法直到这个方法被executor
也就是执行器调用。
来看大概的代码
当poll
时发现值是有效的,直接就Poll::Ready
返回,否则把wake
函数注册到socket
的set_readable_callback
里,然后返回状态Pending
,等到时有有效的值之后socket
就会执行我们的wake
方法,然后我们的wake
方法里面有调用poll
的逻辑,这样就完成poll
的触发了。
这个不是持续性的,因为socket
在自身拿到值之后才会触发wake
通知future
去再次调用poll
方法。
这个socket
就是我们的执行器(executor
)。
现在是不是有些清楚了~
这种Future S
的模型允许我们组合不同的异步操作,这中间没有任何的内存分配损耗。
所以一次执行多个异步特性可以用allocation-free
也就是免内存分配状态机(state machines
)来实现,比如下面这个例子
可以看到join
调用poll
方法,它的里面分别调用不同的future
的poll
方法
当所有future
都ready
之后状态才会变成ready
。
然后将它们的值放到自己身上。
这中间没有任何的内存分配。
和前端的Promise.all
方法有些类似。
我们再来看个例子
这回的例子是连续的两个异步,只有第一个future
完成了才能执行第二个异步。
如果第一次匹配到Pending
,直接return
终止当前poll
行为。
当futureA
完成之后才会执行第二个future
的poll
。
然后我们来看下真的Future trait
长啥样
真的和我们的demo Future
有以下几点不同:
self
用Pin
这个类型包裹,这玩意儿具体下一章会说,现在只需要知道是创建不可移动的(immovable
) futures
实例的。不可移动的对象(immovable objects
)可以在它的字段里存储指针。比如:struct MyFut { a: i32, ptr_to_a: *const i32 }
。总之Pinning
对于async/await
来说是不可或缺的。wake: fn()
也就是wake
函数参数被替换成&mut Context<'_>
。wake
只是一个函数参数(函数指针),它能做的事情相对于上下文来说是少很多的,比如存储数据等。看过一些前端打包工具比如webpack
之后你应该觉得这是有道理的,太多东西了,什么插件啊loader
啊之类的。最重要的一点就是这个编译器(compiler
)是(相对)唯一的,可以通过它联通上下文。waker
唤醒task
一般futures
都不会在第一次poll
的时候就已经是Ready
状态。
因此,waker
是相当重要的一环,它可以确保下一次future
调用poll
的时候已经是Ready
状态了。
每次轮询future
的时候,它都会作为这个task(下面直接叫任务了)
轮询的一部分。
task
是future
的最外层(top-level
)表现,它会被提交给执行器。
waker
提供了wake
方法,wake
方法会被用来通知执行器去唤醒关联的任务。
当wake
方法被调用之后,执行器就知道这个任务可以被唤醒了。
另外Waker
也实现了clone
这个方法,可以用来克隆和存储。
我们来尝试实现一个自己的Waker
。
定时器是一个常见的异步场景,会在固定且明确的时间之后唤醒任务。
我们来创建一个新项目
然后在lib.rs
中
看着有些懵是正常的。
我们一点一点来分析,先来看下TimerFuture
,它就是我们的定时器。
它有一个字段shared_state
,这个字段是多多线程多所有权可修改类型。Arc
和Mutex
相信大家应该还记得。
这个字段为什么要允许多线程多所有权可修改呢?
因为定时器实际上是开一个新的线程,然后将任务放到新的线程里,等待时间后再唤醒这个任务运行。
既然如此,那这个SharedState
应该就是管理定时器状态的玩意儿。
SharedState
有俩字段:
completed
:是个bool
类型用来代表这个定时器的状态,当为true
的时候就可以进入Ready
状态了。waker
:自然就是这一章的主角,到时会唤起这个定时器里面的任务。然后我们给这个TimerFuture
实现Future trait
。
关联类型Output
是()
。
poll
方法是必须实现的,它获取当前share_state
的状态。
如果completed
是true
,那么这个时候就不需要waker
了,直接返回Ready
状态。
如果还是false
,那么就clone
一份context.waker
到shared_state.waker
里,这样就能在别的线程唤醒这个任务了。
每一次poll
状态不是true
时都需要重新赋值一遍,为什么呢?因为这个waker
可能变成别的任务的waker
了。
然后给这个TimerFuture
实现一个new
关联函数用来创建timer
实例。
里面会开一个线程,这个线程会等待duration
之后才会执行walker.wake
。
那么现在这个timer
我们就实现了,我们来调用下。
不过现在它并不能跑任务,所以我们来拓展下。
首先加了个task
回调,然后join
等待这个线程。 注意我们的join
得在主线程里等,不然和同步没差别。
然后我们来调用下
可以看到3
比2
执行的早。
实际上这里有一个大问题,下面会说。
executor
)上面我留了一个大问题,你应该也发现了,当你运行时4
和5
都没有打印出来,也就是说整个异步流程压根没有执行。
其实我们这里的异步是用的OS threads
模型,当你去掉shared_state
的代码之后,你会发现依旧运行正常,完全不影响。
为什么会这样呢?
因为没有执行器来驱动这块代码。
相信这个执行器你已经思考很久是个啥玩意儿了。
Rust
中的Future S
是lazy
的,也就是惰性的,只有被激活时才会干活。
所以这里需要有一个执行器去调用poll
来给这一块异步代码一个冲力让它运行起来。
看了这张图,应该比较清晰了吧。不过这里有个点需要注意,这个wake
方法的调用会影响执行器触发poll
的频率,所以并不是每次都是第二次就ready
的。
简单的总结下执行器在这个流程中的作用:
poll
)wake
通知也就是被唤醒,再次调用poll
接下来我们来实现一个我们自己的执行器,这里需要借助futures
这个crate
,先在Cargo.toml
里引入。
这代码多看一眼就会爆炸,多学一点就会融化~
我们从上到下一点一点分析
Executor
这玩意儿不用多说,就是这代码的核心。
它有一个字段ready_queue
,是一个队列,它接收task
也就是任务。前面有说过task
是future
的最外层表现,这里可以简单理解为一个task
包裹一个future
。
我们上面分析的执行器的作用的两点还可以理解为:执行器只是接收task,然后运行task。
那么这个queue
在这里就很好解释了,另外也是能兼容同时存在多个异步的场景,单个执行器用于多个异步场景是很正常的。
至于这个Receiver
是个啥,它来自std::mpsc::Receivier
,上一次我们接触到std::mpsc
应该是多线程那一节,创建一个“通道”也就是std::mpsc::channel
,它返回一个元组:(发射器sender
和接收器receiver
)。所以这里的Receiver
自然就是通道的receiver
的接收器类型。
Spawner
就不用多说了,用于发送task
给Executor
。
Task
,它有俩字段,future
就不用多了,而task_sender
,它的类型和Spawner
的task_sender
一样,它是用来主动发送自己给执行器的,至于和Spawner
的重复了,我们等下会知道。
BoxFeture
来自futures
这个crate
,效果类似Box
。
new_executor_and_spawner
,看名字就知道是创建发射器和接收器。
sync_channel
,看名字就知道是同步的通道,之前学的时候我们知道可以同时存在多个发射器,但是只能存在一个接收器,所以如果有多个发射器同时发送数据,对于接收器来说一般是没办法确定接收顺序的。那么这个sync_cannel
自然就是用来确保接收器按顺序接收的。
然后给Spawner
实现了spawn
这个方法,它用来接收future
,然后创建task
再发送给executor
。
这里就可以分析出为什么Spawner
和Task
要分别存放一个task_sender
了,这个Spawner::spawn
就是用来给这个异步流程注入动力的其中一部分:将初始数据交由执行器,这样执行器才能去触发驱动。而Task
的task_sender
则是等自己被唤醒之后再重新发射一次task
给接收器,这样接收器就可以再次执行了。
然后是给Task
实现了ArcWake
这个trait
,这玩意儿看名字就知道是一个waker
。
也就是用来唤醒task
自己的,这里就不说这个ArcWake
是啥了。
接着就该轮到我们的主角Executor
了,我们给它实现了一个run
方法。
这个run
自然就是按顺序执行这些任务了,把future
从task
中拿出来执行poll
,如果还是is_pending
,那么就塞回去。
那么这就完成了,我们来调用下。
这没什么好说的。
我们来跑一下
其实还是有些乱的,所以咱来画张图。
这回应该是比较清晰了(可能你看了之后更乱了?)
IO
我们在最开头说过一个socket
的例子,那个例子中当数据不再是有效的之后我们的task
就会停止工作,等待下一次值有效之后才会再次唤醒它。
不过这个过程中有些方法来自于socket
自身,我们并不清楚它里面是怎么做的,比如set_readable_callback(wake)
这一步,之前我们是自己控制的,把waker
缓存到future
自身上。如果我们还有特殊操作涉及到waker
的,那么这就很麻烦了。
但如果让我们自己来写,我们又怎么知道什么时候数据会是有效的呢?
第一想法自然就是比较笨的做类似轮询的操作,开个新线程连续的试探。
但是这是极其不推荐的。开销蹭蹭蹭就上去了,性能刷刷刷就掉下来了。
那么还有啥办法呢?
有:IO 感知系统阻塞元件(。。这玩意儿太过专业术语IO-aware system blocking primitive
)
比如Linux
的epoll
[7]、FreeBSD
[8]/Mac OS
的kqueue
[9]、Windows
的 IOCP
[10]以及Fuchsia
[11]的port S
[12]。上面这些个都是可以接入到rust
中的,通过mio
[13]这个crate
这些元件都允许线程在多重异步IO
事件中堵塞,只有这些事件中有事件变成Ready
状态时才会返回一次。
那么这一点就正好填补了我们上面提到的那个问题
API
们大概长下面这样
其实socket
自己的方法就是用这种方式,可以看下set_readable_callback
方法的源码
将waker
注册到事件中,然后等待socket
有有效数据后就会去触发事件,这个过程是阻塞的。
这一节依旧是理论知识偏多,虽然可以跳过,但是还是建议先过一遍,因为它讲的是异步的执行流程和原理。
编辑于 2023-01-29 22:28・IP 属地广东