谈谈 Observable(上)
因为工作中的某个 ng2 的项目中使用到了 Observable(具体地说,是 ng2 http 模块中的请求方法的返回结果正是 Observable),所以做一些简单的学习,在此记录。
由于篇幅太大,本篇先讲述 Observable 的基本概念,(如果有时间的话)就继续写更复杂一些的内容
什么是 Observable
tl;dr
简单的说:
Observable 是一个可被订阅的对象,该对象将随着时间推移发送有限或无限个值供其订阅者消费。
在这个说法中 Observable 有两个特征:
- 它是可被订阅的 (Pub/Sub)
- 它的值是一个有限/无限的队列
以下将分别说明这两点。
对比 EventEmitter
提到订阅,我们自然会想起经典的 Pub/Sub 模式(也有叫观察者模式),在 JS 中的实现就是 EventEmitter
:
|
|
在上述的代码片段中,我们创建了一个 EventEmitter 对象,并向该对象注册了一个事件和对应的监听函数。在后续的程序代码中,我们可以在任何位置,让 EventEmitter 对象触发(emit
)一个事件,从而唤醒该事件关联的监听函数(可以是多个)。浏览器中的 DOM 元素也实现了类似 EventEmitter 的特性。
和 EventEmitter 一样,Observable 可以实现基本的发布订阅:
|
|
而在实际的需求中,我们可能要处理一个 队列的事件触发,且队列可能不是有限的(如 IM 消息,弹幕,用户在页面上的操作),我们需要从代码组织层面上提供更加方便的处理方式,这就引出了 Observable。
比如我们用 Observable 处理 WebSocket 数据:
|
|
在例子中我们新建并监听一个 WebSocket 连接,并将收到的信息进行处理。将监听逻辑封装成一个 Observable,让我们可以在后续使用 .map()
等操作符,来对收到的数据进行处理,并最后用 subscribe()
完成订阅。
Stage-1 提案
Observable 目前是 ECMAScript 的新提案 (Stage-1)。在 ECMAScript 的提案中,Observable 的定义如下:
The Observable type can be used to model push-based data sources such as DOM events, timer intervals, and sockets. In addition, observables are:
- Compositional: Observables can be composed with higher-order combinators
- Lazy: Observables do not start emitting data until an observer has subscribed.
简单翻译如下:
Observable 类型可以用于表示基于推送的数据源模型,例如 DOM 事件,计时器,或者 socket。此外,Observable 还具有(以下特征):
- 可组合的: Observable 可以使用高阶的连接符进行拼接组合
- 惰性: Observable 仅当一个 observer 订阅时才会开始发送数据。
文章的后续会解释这两个属性,在这里读者可以先跳过概念部分,或者大概有个印象就可以了。
当前对 Observable 比较流行的实现有 RxJS, Bacon.js, zen-observable 等。接下来本文将基于 RxJS 中的实现来介绍 Observable 的基本概念。
创建和订阅
有别于固定长度的数组,Observable 的值是随时间发送的一连串的值,像水流一样,所以也有说法称 Observable 是一个流(Stream),为了更好的直观理解所谓的 “流”,首先我们来了解 Observable 的一种表示方法:Marble Diagram。
Marble Diagram
Marble Diagram 由两个部分组成:timeline 和 item。timeline 表示时间轴,item 表示在时间轴上触发的元素(类型可以是任意的)。下图表示一个事件流先后触发(emit)了三个 item,最后成功结束,出现错误的符号为叉:
我们也可以用类似 ASCII 的绘画方式来表达 Marble Diagram:
|
|
接下来我们将了解 RxJS 中 Observable 相关的 API,其中将会用 Marble Diagram 的 ASCII 绘画来表示 Observable。如果有不太明白的地方,可以使用 RxViz 将代码实际运行一下,观察其 Marble Diagram 的具体形态。
创建和订阅
在上文中我们知道可以用 Observable
构造函数直接初始化一个 Observable 实例;RxJS 还提供了相同效果的静态方法 Observable.create
:
|
|
在创建了 Observable 之后,可以通过 subscribe
方法订阅该 Observable:
|
|
如上述代码所示,调用 subscribe
方法时我们传入了一个对象,该对象我们可以称为 观察者(Observer)。
观察者具有三个方法,每当 Observable 发生事件时便会呼叫观察者相对应的方法:
next
: 每当 Observable 发送新的值,触发该方法complete
: 当 Observable 不再获得新的值时,complete 方法就会被触发,该方法被触发后,next 方法将不会再起作用。error
: 每当 Observable 内发生错误时,error 方法被触发。
可以查看另一个 观察者的例子
和 EventEmitter 不同,Observable 在内部没有一个订阅者清单,订阅 Observable 的行为实际上是执行一个函数,这个函数接收一个 Observer 对象 并在函数体内触发 Observer 对象的方法(next, complete, error),也就是说,对于某个 Observable,其在构建时传入的回调函数,必须要在该 Observable 被订阅之后,才会调用执行。可以看一下这个例子:
|
|
执行这段代码我们会发现,Observable 构造方法的回调函数实际上被调用了两次,这是因为这个 Observable 有两个订阅者,且回调函数是在 subscribe 时才被触发的。 如果我们将代码片段中 subscribe
的语句注释掉,执行时不会有任何输出。
在 RxJS 中,subscribe
方法会返回一个类型为 Subscription
的对象,可以用对象的 unsubscribe
方法可以停止对 observable 对象的监听(订阅)。
Creation Operator
除了用 Observable.create
方法之外,RxJS 还提供了很多便捷的创建 Observable 的 API,我们统称为 creation operator,其中包括:
of
:of(1,2,3,4)
from
:from([1,2,3,4])
fromEvent
fromPromise
never
: 永远不会结束但什么都不做的empty
: 空的且立刻结束的throw
: 抛出错误interval
timer
利用这些 operator 我们可以快速实现一些常见的功能,如点击监听事件:
|
|
或者是数数:
|
|
Transform Operator
前面提到,Observable 可以发送有限个或无限个值,我们可以将一定时间内 Observable 发出的值看做是一个数组,那么对这些值我们可以应用数组的所有方法如 map()
, filter()
, pluck()
等。事实上 Observable 确实提供了一系列的操作符(operator),允许我们链式调用。
Observable 本质上就是表示随时间发展而不断发送的一系列的值(流),我们可以像对待数组一样去对 Observable 进行操作,这样的操作方式,我们称为 Transform Operator。Transform Operator 可以分为几类(我的理解):
- 处理单个流的:
- 简单的队列映射:
map
,pluck
,filter
,scan
,reduce
,take
,first
,distinctUntilChanged
… - 和时序有关的:
debounce
,debounceTime
,throttle
,throttleTime
- 简单的队列映射:
- 处理多个流之间关系的:
merge
,concat
,combineLatest
,zip
,withLatestFrom
- 降维的(源 observable 所释放的每个值又是一个 observable):
concatAll
,megeAll
,combineAll
,switch
- 映射+降维(源 observable 通过映射生成一个二维的 observable, 然后再降维):
concatMap
,mergeMap
,switchMap
- 其他:
catch
,every
,defaultEmpty
,sequenceEqual
,delay
等
分类方法还有一种是按照 RxJS Marbles 中的分类。有兴趣的读者也可以查看。
在介绍具体的 operator 之前,首先我们先看 operator 的运作方式。
operators 运作方式
和数组的 operators 相比,Observable 的 operators 有两个特点:
- 延迟运算:只有在 observable 被订阅时,operators 才开始对元素进行运算
渐进式取值:
每次的运算是一个元素运算到底,而不是将全部元素运算完再返回
数组的取值方式:
Observable 的取值方式:
为了理解方便,以下介绍 operator 时会使用 ASCII Marble Diagram。
处理单个流
map
|
|
take
|
|
distinctUntilChanged
如果要发送的元素和最后一次发送的元素相同,则过滤掉该元素
|
|
处理多个流之间的关系
merge
合并 observable,在时序上两个 observable 同时执行,但当两个 observable 同时触发元素时,被 merge 的 observable 所触发的元素在后面。
merge
的逻辑有点像 OR,就是当两个 observable 其中一个被触发时都可以被处理。
例子:
|
|
concat
concat
可以把多个 observable 合并成一个:
|
|
降维操作
和数组类似,Observable 也可能出现类似二维数组这样的 “高维” 情况,即 Observable 中所发出的每项元素又是一个单独 Observable。RxJS 提供了一系列的 API 允许我们将其转换为普通的 “一维 Observable”。
concatAll
比如对应数组中 concat
操作,RxJS 也有 concatAll
operator,会将所有的 Observable “拼接” 起来:
|
|
观察上面的 Marble Diagram,我们可以发现,concatAll
将 source 中的三个 Observable 按顺序拼接起来依次输出。
switch
swtich 总是会将返回的 Observable 的 “控制权” 交给原 Observable 中 最近返回 的一个 Observable。
尝试理解下这个例子:
再看看这个例子对应的 Marble Diagram:
映射+降维
为了更加方便操作,RxJS 还提供了一些复合 operator,可以同时完成映射(成一个 Observable)和降维的操作。
switchMap
|
|
其他 operator
catch
catch
可以处理 observable 处理过程中出现的异常,可以通过返回一个新的 observable 来发送新的值:
|
|
这段代码对应的 Marble Diagram 是这样的:
|
|
可以在 catch
的回调函数中,通过返回一个空的 observable(如:Rx.Observable.empty()
)来让原有的 observable 结束。
Practical Example
为了让读者更加直观理解 Observable 的具体使用,来几个例子:这里要鸣谢 “30天精通 RxJS” 教程的作者 @jerryhong,以下例子均出自他的教程。
- 拖拽方块
- 类似 Youku 的滚动视频窗口+可拖拽效果
- 简单加减
- AutoComplete: 使用了
debounceTime
,switchMap
,filter
,map
,fromEvent
Recap
在了解了 RxJS 和实践了几个例子之后,我们对 Observable 有一个基本的认识:
- 从编程思想上来说,Observable 可以说是
Reactive Programming
和Functional Programming
两种思想的结合,关于两种思想读者可以自行查阅。 - 从内容上看,Observable 实现了随时间变化的队列数据的发布订阅。
- 从使用场景上看,Observable 适合处理需要结合多个事件源(如:DOM事件)的复杂逻辑(应用适合的 operator),也适合处理弹幕,IM 聊天等无限数据流的需求。ng2 中大量的使用了 Observable 来管理其内部的 UI 状态。在下篇中我会提到这部分的内容
(挖了个坑,逃
Reference
- Understanding Marble Diagrams for Reactive Streams 注: 文章中使用的还是 Rx 4.x 的语法
- RxMarbles
- 30天精通 RxJS
- rxjs document
- https://rxjs-cn.github.io/learn-rxjs-operators/operators/transformation/switchmap.html
- Becoming more reactive with RxJS flatMap and switchMap
- https://angular.io/guide/observables
- https://medium.com/@luukgruijs/understanding-creating-and-subscribing-to-observables-in-angular-426dbf0b04a3
- https://blog.thoughtram.io/angular/2016/01/06/taking-advantage-of-observables-in-angular2.html
- Comprehensive Introduction to @ngrx/store
- @ngrx/store