0%

浅析 Dart 异步机制

关于异步,大家并不陌生,每种语言都有自己的实现。关于 Dart 的异步,大家也许听到过 Dart 是单线程语言,但是为什么单线程能实现异步?单线程如何利用 CPU 多核的能力?之前没有过类似经验的同学可能会有很多问号。也许本文能解释其中一部分问题,让刚接触 Dart 的同学有一个整体的概念,对于后续 Dart 的异步编程会有不少帮助。

本文主要包括两部分,第一部分是关于 FutureStream 这类异步 API 的简单介绍,对 Dart 异步代码有一个初步印象;第二部分是关于 Dart 异步机制的整体介绍,从全局的层面了解 Dart 的异步实现,先有了大的概念,不管是后续的 Coding 还是更细致的原理研究,都会有一定的帮助。

异步 API

说到异步,对于 Android 端的同学来讲,最熟悉的莫过于网络请求了。因此,作为 Android 出身的我,就从一个网络请求说起,开始这一次 Dart 异步之旅。

1
2
3
4
5
6
7
HttpClient()                                                            // 创建 HttpClient
.getUrl(Uri.parse('<https://jsonplaceholder.typicode.com/posts/1’>)) // 返回 Future<HttpClientRequest>
.then((HttpClientRequest request) => request.close()) // 返回 Future<HttpClientResponse>
.then((HttpClientResponse response) =>
response.transform(utf8.decoder).listen(print)) // 返回 Future<void>
.catchError((e) => print(e))
.whenComplete(() => print(‘Complete'));

以上就是利用 Dart SDK 中的 API 实现的网络请求,看不懂没关系,现在只要知道 HttpClientRequest 可以用来设置网络请求的配置比如请求头之类的,而 HttpClientResponse 则是请求结束返回的响应就行了。这两个很好理解,再注意到注释里标明每一次链式调用的返回类型都是 Future<T> 。先知道这么多就够了,很明显,Future 在这里起着至关重要的作用,当然,在整个 Dart 的异步体系中,Future 也是主角之一。所以,让我们先来看看 Future 到底是什么。

Future

Future 是什么

从 API 文档的解释来说,Future 是一个代表延迟计算的对象,或者说,Future 包裹着在未来某一时刻可用的值或错误。

先看 Future 的一个构造器:Future(FutureOr<T> computation())FutureOr<T> computation() 就是一个函数,Future 包裹的,就是这个函数的执行结果,如果 computation 没有返回值,那这个 Future 就是 Future<void>computation 有返回值 T,那这个 Future 就是 Future<T>;如果 computation 在执行过程抛出异常了,那这个 Future 就包裹了一个错误。除此之外,最重要的一点是,这些结果都是延迟的,是在未来某一时刻才可用的。下面以一个更通俗的例子来解释 Future

比如小明为他女朋友准备了一个礼物,并且做了精美的包装,从外表看不出来里面装的是什么礼物,这时可以把这个包装盒看成是一个 Future,他女朋友撒娇要小明给她拆礼物,在小明拆开之前,那她拥有的就是一个盒子,而不是真正的礼物,当小明把包装盒拆开,他女朋友就得到了一个礼物,但是如果小明往盒子里装错了,放了一封别人写给他的情书,那这时候就引发了一个 Error

由此,可以引出 Future 的 3 种状态:

  • Uncompleted:计算未完成。即盒子未打开时,小明的女朋友拥有的是一个礼物盒而不是一份礼物
  • Completed with a value:计算完成并返回一个期望值。即打开的盒子里装着小明准备的礼物,他女朋友得到了一份礼物
  • Completed with an error:计算完成并返回一个错误。即打开的盒子里装的不是礼物而是别人写给小明的情书,他女朋友得到了一份愤怒

Future 的使用

现在我们已经知道了 Future 以及它的 3 种状态了,那么这三种状态如何体现到代码里呢。

1
2
3
4
5
6
7
8
9
10
11
Future<Gift> giftBox = Future((){
// 准备礼物的过程,如果准备的过程出错了,则抛出一个异常 PrepareException
Gift gift = prepareGift();
return gift;
});

// 如果在拿到礼物之后出现了小问题,则抛出一个异常 GiftException
giftBox.then((gift) => handleGift(gift))
.catchError((e) => handlePrepareException(), test: (e) => e is PrepareException)
.catchError((e) => handleGiftException(), test: (e) => e is GiftException)
.whenComplete(() => replay());

可以看到,通过 Future.then() 函数可以拿到 Future 中计算完成的值,在 then 中可以对礼物进行处理,拍照发朋友圈或者十动然鱼都是 ok 的,处理结束后,返回值又被包装成一个 Future 作为 then() 的返回值。如果在准备礼物的过程中出错了,可以通过 catchError 进行兜底,当然任何一个步骤出现了错误都会直接转到兜底环节,就好像在拆开礼物之前小明发现自己准备的礼物在一个角落里,这时他可以直接进行兜底操作而不再去打开礼物盒,即不会执行 then() 里的方法。如果拿到礼物后女朋友并不是很满意,那小明可能还需要做一些补救措施,即通过第二个 catchError 进行兜底。可以看到,可以有多个 catcheError 针对不同类型的异常进行捕获处理,一旦发生异常,则直接跳到 catcheError 进行异常处理操作。当然了,完成了送礼物及后续一系列的操作后,小明应该有一个复盘总结的环节了,这时 whenComplete() 就发生作用了,不管中途有没有发生异常,都会有这样一个复盘总结的操作。

看到这里,应该有一种 try-catch 的熟悉感了,then() 就好比 try 这个操作,而 catcheError 就像是 catchwhenComplete() 就像是 finally。当然了,既然 then() 的返回值也是一个 Future,那就可以不停的往后面链接更多操作了,这样可以避免嵌套地狱的出现,也能比较清晰的看到代码执行的先后顺序。这时再回过头去看网络请求的例子就很好理解了。既然 Future 的 API 跟 try-catch 模式类似,也可以通过 try-catch 来实现同样的逻辑:

1
2
3
4
5
6
7
8
9
10
Future<void> openGift() async {
try {
Gift gift = await giftbox;
handleGift(gift);
} catch(e) {
handleError();
} finally {
replay();
}
}

稍有不同的是,在 try-catch 中出现了两个关键字:asyncawait,这两个关键字也是 Dart 异步的一部分,可以将异步代码写成同步的样子。

await 用来获取 Future 的结果,因为 Future 的是异步的,而且不知道在哪个时刻会结束,所以顾名思义,就是需要等它结束之后再去执行后面的代码。需要注意的是,如果 awaitFuture 一直不结束,那么后面的代码则不会执行。

async 会将它标注的函数返回值包装成一个 Future,当执行 async 标注的函数时,在遇到 await 之前,会将该函数当成同步函数执行,如果 遇到 await,则将 await 及后面的代码包装到一个 Future 中并结束当前函数,当 await 拿到对应的值再执行 await 后面的代码

在理解了 awaitasync 之后,就可以把异步代码当成同步来写了,但是由于 await 有等不到结果的可能,因此需要谨慎。

以上就是 Future 相关的一些基础知识了,接下来看看 Dart 异步的另一个重要 API – Stream

Stream

Stream 提供一个数据的异步序列,或者说一个异步的 Iterable,甚至可以理解为一个观察者模式。当注册了监听后,只要有新的数据就会执行回调,比如说点击事件、大段数据的一系列数据块或者从文件读取的字节流。再比如,小明每个节日都为他女朋友准备礼物,这样到了某个节日,小明就把礼物交给他女朋友,这样一个一个礼物的送就相当于 Stream 的一次次数据发送。

1
2
3
4
5
6
Stream<Gift> gifts = ...;
StreamSubscription giftSubscription = gifts.listen(
(gift) { // 收到礼物 },
onError: (e) { //某个礼物送错了 },
cancelOnError: false,
);

就像以上代码,通过 Stream.listen() 方法监听数据。女朋友接收礼物就相当于生成了一个订阅 StreamSubscription,通过这个 StreamSubscription 可以控制暂停接收(pause())、恢复接收(resume())或者取消接收(cancel())礼物。同样的,除了使用 Stream 的 API 接收数据外,也可以写成同步的形式:

1
2
3
4
5
void receiveGift(Stream<Gift> gifts) async {
await for (Gift gift in gifts) {
// 收到礼物
}
}

Future 不同的是,Stream 使用 await for 等待数据的到来,这也体现了 Stream 可以理解为一个异步 Iterable,当 Stream 数据发送完毕时,循环才会结束,才会执行循环下面的代码,因此也需要注意死循环的问题。

Stream 的创建

以上了解到了 Stream 的简单使用,那作为小明,该如何准备这个 GiftStream 呢?有以下两种方式:

  1. 通过 StreamController 创建

    1
    2
    3
    4
    5
    StreamController<Gift> giftController = StreamController();
    Stream<Gift> giftStream = controller.stream;

    // 时间到了,送出去一个礼物
    giftController.add(Gift());
  2. 通过 async*yieldyield* 关键字创建

    1
    2
    3
    4
    generateGift() async* {
    // 时间到了,送出去一个礼物
    yield Gift();
    }
    1
    2
    3
    4
    generateGift() async* {
    // 时间到了,堆积了几个礼物一次性送出去
    yield* giftStream;
    }

    yieldyield* 的区别就是,yield 发送单个数据,而 yield* 发送多个数据。

Stream 的种类

Stream 有两种类型,一种是单一订阅型,另一种是广播型的。顾名思义,单一订阅型的表示这个 Stream 只能被监听一次,即只能调用一次 listen() 方法生成一个 StreamSubscription,比如从文件读取字节流。而广播型的则是可以被多次监听,可以产多多个 StreamSubscription

以上便是 Stream 的入门知识。了解完 Dart 异步 API 的基础知识后,我们再来看看 Dart 异步机制背后的逻辑。

异步机制

Dart 是单线程语言,这句话大家可能见到过很多次,也可能第一次见到,如果之前没有过类似异步机制语言的经验,小朋友的脑子里可能会充满了问号。单线程如何异步?单线程如何利用现代 CPU 的多核能力?这一部分,就来解决这些问题。

异步概览

首先认识两个概念:

  • 阻塞式调用:调用后等待结果,当前线程被挂起,得到结果之前不会继续执行。就好比定了个外卖,在外卖送到之前不做任何事了,就死死的等着外卖小哥的电话
  • 非阻塞式调用:调用后不用等待结果,当前线程继续执行其他任务,等该调用执行完毕再处理结果。就好比定完外卖继续努力工作,外卖小哥电话到了再去拿外卖

现在应该不难理解为什么单线程能实现异步操作了,基于非阻塞式调用就行呗,在等待耗时操作结果的时候继续执行其他任务,当耗时操作结束了再处理它的结果。那么现在问题来了,怎么样才知道耗时操作结束了呢?你不知道,也不需要知道,这些结果都会添加到一个队列中,单线程不停的去队列中取出事件进行处理。这个机制就是 事件循环机制,也就是 Event Loop。 Event loop 就是单线程实现异步的关键。

event-loop

如上图所示,每当有事件发生,比如点击了一个区域、设置了一个 Timer,这个事件就被添加到 Event queue 中,这个 Event queue 就是专门维护所有事件的一个队列,而 Event loop 则不停的从 Event queue 中获取事件并处理这个事件,直到 Event queue 中没有事件。

那么这种机制在 Dart 中是如何运作的呢?在 Dart 中,有一个 Isolate 的概念,IsolateDart 代码的运行环境,每个 Isolate 在同一时刻只能有一个工作线程与其绑定,可以先简单理解为一个 Isolate 就是一个线程。

Dart 程序的 main() 函数所在的 Isolate 称为 mainIsolate,可以在 Isolate 中开辟出一个新的 Isolate,但是各个 Isolate 之间代码是孤立的,彼此不共享内存,所以也可以理解为,一个 Isolate 就是一个隔离的内存块,其他 Isolate 不能访问当前 Isolate 的内存。

在每个 Isolate 中都有一套 Event loop 机制。每当 Isolate 启动的时候,先执行它的入口函数 – 在 mainIsolate 中即为 main(),当入口函数执行完毕则开始从 Event queue 中一个一个地拿取事件进行处理。所以 Dart 程序的运行顺序显而易见,先执行 main() 函数,再执行 Event queue 中的事件。

在理解了上面的主流程之后,再稍微往细一点看,Dart 的 Event loop 机制其实维护了两个队列,一个是 microtask queue,一个是 event queue

  • event queue:*event queue 中包含所有的外部事件,比如 I/O、绘制事件、点击事件、Timer、Isolate 间的消息处理等
  • microtask queue:*在 Event loop 开始自动的从 event queue 拿取事件之前做一些额外的操作。

event queue 中的事件可以来自 Dart 代码和系统的其他部分(C 层),而 microtask queue 只能从 Dart 代码中添加事件。

dart-event-queues-order

这是,Dart 程序的运行顺序可以整理成上图的模式。当 Dart 程序或者说 Isolate 启动时,先执行入口函数,期间会有各种各样的事件或者任务加入到对应的 queue 中,当入口函数执行完毕时,Event loop 开始工作,先从 microtask queue 中拿取事件进行处理,当 microtask queue 中的任务处理完了,即 microtask queue 空了,这时开始从 event queue 中拿取第一个事件进行处理,处理完第一个 event 之后再检查一遍 microtask queue,如此往复,这就是 Event loop 的工作过程:先执行 microtask 中所有任务,再执行 event queue 中第一个事件,之后再重复这两个步骤,直到 event queue 为空了,这时意味着程序的任务完成了,处于可以随时退出的状态。

需要注意的是,当执行 microtask queue 中的任务时,event queue 时处于挂起状态的,在处理这两个队列的任务或者事件时,也可以往其中插入新的任务或事件。鉴于 microtask queue 会阻塞 event queue,所以不要再 microtask queue 中执行耗时任务,这会阻塞 event queue 导致绘制、渲染事件被暂停,造成卡顿。

OK,现在对于 Event loop 运行的过程有个整体的了解,那么我们如何把任务添加到这两个队列中呢?可以通过以下特性将任务添加到对应的队列:

  1. FutureTimer 中的代码将作为一个事件添加到 event queue 的队尾
  2. StreamscheduleMicrotask() 中的代码将作为一个任务添加到 microtask queue 的队尾

通过以上的内容,结合 Future 可以通过 then() 来控制每个 Future 的执行顺序,我们便可以自由的控制每一行代码的执行顺序了。

Isolate 的使用

现在我们已经解决了 Dart 单线程异步的实现问题,也知道了,在 Dart 中,Isolate 就是这个单线程模型的运行环境。那么就还剩下一个问题,Dart 如何利用 CPU 的多核能力了。很显然,就是开辟不同的 Isolate 去处理那些耗时任务嘛。但是前面也提到了,Isolate 之间是不共享内存的,那耗时任务处理完了把结果通知给 mainIsolate 就需要一套通信机制来实现了,Dart 采用的是 Port 的机制,通过 ReceivePortSendPort 实现 Isolate 间的通信。

每个 Isolate 都可以创建出 ReceivePort,从 ReceivePort 可以通过 getter 方法 – sendPort 来获取对应这个 ReceivePortSendPortReceivePortSendPort 是一套东西,就相当于连接两个 Isolate 的管道,ReceivePort 是当前 Isolate 用来接收消息的一端,对应的 SendPort 是管道的另一端,通过 SendPort 发送的消息可以在另一端的 ReceivePort 接收到。如下图,一个 ReceivePort 可以有多个 SendPort,在开辟 Isolate 的时候,当前 Isolate 将自己 ReceivePort 对应的一个 SendPort 交给新开辟出来的 Isolate 就相当于把自己通信管道的发送端给了新的 Isolate,这样就可以愉快的接收新 Isolate 发送过来的消息了。

isolate-communicate

来看看具体代码实现吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void main(){
ReceivePort receivePort = ReceivePort();
// 通过 Isolate.spawn() 开辟一个新的 Isolate
// 第一个参数是带有单个参数的顶级函数或者静态函数,这个函数就是新 Isolate 的入口函数
// 第二个参数是新 Isolate 入口函数的参数值,由于想要实现消息回传,因此传入 mainIsolate 的一个 sendPort
Isolate.spawn(entryPoint, receivePort.sendPort);
receivePort.listen((message) {
// 可以处理新 Isolate 发送过来的消息
});
}

void entryPoint(dynamic message) {
// 新开辟的 Isolate 会接收到入口函数带过来的一条消息
SendPort sendPort = message;
sendPort.send(messageFromNewIsolate);
}

以上便是 Isolate 的基础知识了,了解完这些,想在 Dart 程序利用多线程就很简单了,根据需要开辟新的 Isolate 就行了。

总结

至此,便完成了文章开头提到的几个问题,简单了解了 Dart API 的一些基础知识,也从全局对 Dart 的异步机制有了一定的了解,如此,不管是更好的实现异步编程还是更深入的研究 Dart 异步的实现原理,都能有一个相对明确的道路了。

希望对你有所帮助,欢迎一起交流~

Thanks To