这本书看前两章的时候还蛮吸引我的,从操作符开始,可能这种琐碎的东西真的很难讲吧,没什么具体的内容,相比直接看官网来说,并没有什么更总结性或者深层次的见解,也没什么经验之谈的应用举例。看到后半部分的内容,觉得略显拖沓,大量的细致描述其实并不等于讲清楚了。最后的breakout游戏的例子还行。可以看着入门,留下一个大体的印象,但还是要看官网哦。不说什么了,我去看官网了。
1 | // 记录鼠标按下到放开的时间和1s的差距 |
RxJS世界中有一种特殊的对象,称为“流”(stream),也会以“数据流”或者“Observable对象”称呼
这种对象实例。
代表“流”的变量标⽰符,都是以$符号结尾,这是RxJS编程中普遍使用的风格,被称为“芬兰式命名法”(Finnish Notation)。
在RxJS的代码中,代码是一个个函数,每个函数只是对输入的参数做了响应,然后返回结果。
RxJS引入了两个重要的编程思想:
1 | import {Observable} from 'rxjs/Observable'; |
实际项目中,如果每一个代码文件都写这么多import语句,那就实在太麻烦了。更好的方式使用一个代码文件专门导入RxJS相关功能,其他的代码文件在导入导入文件,这样就把RxJS导入的工作集中管理。
Observable——可被观察者,Observer——观察者,连接两者的桥梁就是Observable对象的函数subscribe。
Observable对象实现了下面两种设计模式:
观察者模式
迭代器模式
观察者模式要解决的问题,就是在一个持续产生事件的系统中,如何分割功能,让不同模块只需要处理一部分逻辑,这种分而治之的思想是基本的系统设计概念。
将逻辑分为发布者和观察者,发布者只管负责生产事件,观察者只管接收到事件之后就处理。
在RxJS的世界中,Observable对象就是一个发布者,通过Observable对象的subscribe函数,可以把这个发布者和某个观察者连接起来。
1 | import { of } from 'rxjs'; |
迭代器的作用就是提供一个通用的接口,让使用者完全不用关心这个数据集合的具体实现方式。
迭代器另一个容易理解的名字叫游标(cursor),就像是一个移动的指针一样,从集合一个元素移到另一个元素,完成对整个集合的遍历。
RxJS实现的是“推”式的迭代器实现。
在RxJS中,作为迭代器的使用者,并不需要主动去从Observable中“拉”数据,而是只要subscribe上Observable对象之后,自然就能够收到消息的推送。
1 | rxjsNext() { |
1 | rxjsNext() { |
1 | rxjsInterval() { |
Observable这样的时间特性使得异步操作十分容易,因为对于观察者Observable,只需要被动接受推送数据来处理,而不用关心数据何时产生。
Observable对象中吐出来的数据可以是无穷的。如果我们不中断上一个例子中的程序,让他一直运行下去,这个程序也不会消耗更多的内存,这是因为Observable对象每次只吐出一个数据,然后这个数据就被Observer消化处理了,不会存在数据的堆积。
不过,并不能给予Observer一个终止信号,Observer依然时刻准备着接收Observable的推送数据,相关的资源也不会被释放。所以,还需要一个宣称Observable对象已经完结的方式。
1 | // Observer的complete函数 |
只有observerable主动调用complete,完结信号才会退给observer
1 | rxjsError() { |
在RxJS中,一个Observable对象只有一种终结状态,要么是完结,要么是出错,一旦进入出错状态,这个Observable对象也就终结了,在不会调用对应的Observer的next函数或complete函数;同样,进入完结状态也不能再调用Observer的next和error。
1 | rxjsSimple() { |
例如,Observer只需要监听一个Observable对象三秒钟时间,三秒钟之后就不关心这个Observable对象的任何事件了。
1 | rxjsSimple() { |
可以看出,在调用unsubscribe函数调用之后,onSubscribe中依然在不断地调用next函数,但是Observer已经不会再做出任何响应了。
Cold Observable - 每一次subscribe都产生一个新的生产者,传输这个生产者产生数据(后一次订阅会从头开始)
1 | const cold$ = new Observable(observer => { |
Hot Observable - 每次订阅的时候,已经准备好了一个生产者。在Hot Observable中,Observable明显并不产生数据,只是数据的搬运工。(后一次订阅会接着上一次订阅的结果)
1 | const producer = new Producer(); |
对于现实中复杂的问题,并不会创造一个数据流之后就直接通过subscribe接上一个Observer,往往需要对这个数据流做一系列处理,然后才交Observer。
在数据管道里流淌的数据就像是水,从上游 流向下游,对一个操作符来说,上有可能是一个数据源,也可能是其他操作符,下游可能是最终的观察者,也可能是另一个操作符,每一个操作符之间都是独立的。
1 | const onSubscribe = observer => { |
操作符就是用来产生全新Observable对象的函数(每一个操作符都是创造一个新的Observable对象,不会对上游的Observable对象做任何修改)。
弹珠图可以表示一个Observable对象所表示的数据流。
每个弹珠之间的间隔,代表的是吐出数据之间的时间间隔;
|符号代表数据流的完结,对应调用下游的complete函数;
×代表数据流中的异常,对应于调用下游的error函数
…
为了描述操作符的功能,弹珠图中往往会出现多条时间轴。
共同特征- 返回一个Observable对象。
对Observable对象能够链式调⽤filter和map,是因为:
RxJS v5版本自带60多个操作符。
不需要Observable实例就可以执行的函数,称为静态操作符;前提是要有一个创建好的Observable对象,称为实例操作符。
类似于:(和JS中的对象类比)
在Observable类上加一个静态方法
Observable.of
在Observable的prototype属性上加一个函数
Observable.prototype.map
如果要导入静态操作符:1
import 'rxjs/add/observable/of';
如果要导入实例操作符:1
import 'rxjs/add/operator/map';
如果要调用静态操作符:1
const source$ = Observable.of(/*⼀些参数*/);
如果要调用实例操作符:1
const result$ = source$.map(/*⼀些参数*/);
静态操作符只能出现在首位,实例操作符则可以出现在任何位置,有些功能既可以作为Observable对象的静态方法,也可以作为Observable对象的实例方法。
具体的应用项目中,很可能会用上一些可以重复使用的逻辑,这些逻辑可以封装在自定义的操作符中,这时候就需要知道如何定制一个新的操作符。
每个操作符都是一个函数,不管实现什么功能,都必须考虑下面这些功能要点:
例:
// map操作符1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25function map(project) {
// 返回一个全新的Observable对象
return new Observable(observer => {
// 对上游的订阅
const sub = this.subscribe({
// 下游的订阅处理
next: value => {
// 处理异常
try {
observer.next(project(value))
} catch (err) {
observer.error(err);
}
},
error: err => observer.error(err),
complete: {} => observer.complete(),
});
return {
// 退订处理(map并不占用什么资源,有的操作符要在退订的时候,及时释放资源)
unsubscrib: () => {
sub.unsubscribe();
}
}
});
}
最简单的方法(不推荐,避免污染全局Observable)
Observable.prototype.map = map;
注意:这里不能使用箭头函数,箭头函数中的this直接绑定于定义函数环境下的this,而不是执行时指定的this。
如果只想给指定的Observable对象使用
使用绑定操作符::,可以链式调用
const result$ = source$::map(x => x*2) :: map(x => x+1);
list是Observable的实力函数,它会返回一个新的Observable对象,通过传递给lift的函数参数可以赋予个这个新的Observable对象特殊功能。
1 | // 如果使用lift,那么map的实现代码如下 |
虽然RxJS v5的操作符都架构在lift上,应用层开发者并不经常使用lift,这个lift更多的是给RxJS库开发者使用。
如果函数中需要使用this,那就多了一个改变函数行为的因素,也就算不上真正的纯函数了。
最理想的方式是使用RxJS v5.5引入的pipeable操作符,这种方式不仅让代码更加简洁,而且可以让Tree Shanking发挥作用。
1 | // 通过pipe串接了filter和map两个lettable操作符 |
以前导入map的代码如下:1
import 'rxjs/add/operator/map';
现在导入pipeable操作符map的代码就是如下这样:1
2import {map} from 'rxjs/operators/map';
import {map} from 'rxjs/operators';
有的传统的操作符名称和pipeable操作符名称不同
管道操作符是一个竖杠和大于号的组合,也就是|>,用来连接两个值,前一个值没有任何要求,后一个值必
须是函数,管道操作符的作用就是把前面的值作为参数来调用后面的函数。
管道操作符还可以对以一个值连续进行多次函数操作。1
2666 |> foo |> bar
bar(foo(666))
可以用下面这种形式来使用pipeable操作符
1 | const result$ = source$ |
本章介绍RxJS中用于创造Observable对象的操作符,这些操作符是RxJS中数据流的源头。
概念: 让数据管道中的数据发生变化
让一个数据流的内容被多个Observer订阅。
Scheduler可以作为创造类和合并类操作符的函数使用。此外,RxJS还提供了observeOn和subscribeOn两个操作符,用于在数据管道任何位置插入给定Scheduler。
在RxJS提供的很多操作符中都带有Scheduler类型的参数,在之前的章节中,为了简化问题,我们有意不提这些参数,不过,现在是时候来研究一下Scheduler参数的作用了。
因为scheduler不经常使用,所以scheduler总是一个可选参数,如果一个操作符有scheduler参数,那么这个参数也肯定是最后一个参数。
1 | function range(start, count, scheduler) { |
如果使用操作符的时候不传递scheduler参数,那么RxJS就会使用默认的Scheduler实现。
Scheduler的官方定义:
Scheduler可以改变Observable对象的数据产产方式。
在RxJS中,提供了下列Scheduler实例。
RxJS默认选择Scheduler的原则是:尽量减少并发运行。
JavaScript只有一个线程。
JavaScript的解析和运行环境称为“JavaScript引擎”,JavaScript引擎有诸多实现,Chrome浏览器和Node.js使用的是v8。
“调⽤栈”——当调用一个函数的时候,就在调用栈上创建这个函数运行的空间,参数的传递、局部变量的创建都是通过调用栈完成;当一个函数执行完毕的时候,对应调用栈上这个函数的本次运行空间就被清除。
“事件循环”——可以看作一个死循环,重复的工作就是从“事件队列”中拿到需要处理的事件任务,然后把这个任务交给调用栈去执行,当这个任务处理结束之后,再从“事件队列”中拿下一个任务塞给调用栈……
当调用栈正在执行一个任务的时候,事件循环也只能等着,只有当前一个任务完成之后,才能塞给调用栈下一个任务。这也就是setTimeout不可能百分之百准确的原因。
“事件队列”中的任务可以细分为Micro Task和Macro Task。
如果把“事件队列”看作是等待执行而排队的话,那实际上也不只是排一条队(Macro Task)。
Micro Task只有一个队列,而且这个队列简直就是VIP快速通道。当调用栈处理完一个任务,准备迎接下一个任务的时候,“事件循环”总是会优先看一看Micro Task的队列,只要还有Micro Task存在,就直接把Micro Task交给调用栈,其他Macro Task队列的任务都只能等下次机会。
利用调用栈实现的Scheduler方式
1 | // 默认的Scheduler |
在RxJS中,每一个Scheduler类都继承形式如下的接口IScheduler:
1 | interface IScheduler { |
是asap会尽量使用Micro Task,而async利用的是Macro Task。queue这个Scheduler,如果调用它的schedule函数式参数delay是0,那它就用同步的方式执行,如果delay参数大于0,那queue的表现其实就和async一模一样。
1 | import {asap} from 'rxjs/scheduler/asap'; |
RxJS代码多会涉及异步操作,所以用不上传统的Debugger。
最有效的调试工具,依然是仔细考虑过而且位置恰当的打印语句。
利用do来插入调试代码
1 | source$.do( |
改进的日志调试方法,甚至可以更近一步分为debug,info,warn,error
1 | Observable.prototype.debug = function(fn) { |
画出数据流依赖图
单元测试
1 | it('should sum up string value', () => { |
操纵时间的TestScheduler,弹珠测试
1 | it('should work with map operator', () => { |
如何把RxJS的数据流和React的状态管理关联起来。利用Subject对象作为连接RxJS和React的纽带。
1 | this.counter = new Subject(); |
Redux维持一个全局的Store,这个Store存储的就是应用的状态,因为这个Store是全局可见的,所以,当一个组件A修改Store上的状态的时候,与之相对的组件B依然可以读取到这个变化,这样就实现了组件之间的通信。
为了完成这样一个功能,一个使用Redux的系统需要如下这些元素:
Store——一个对象,提供一个getState函数,可以获得当前Store上存储的状态
action——修改状态,store.dispatch(action);
reducer——为了能够处理action,创建Store的时候提供对应的reducer函数,
1 | function reducer (state, action) => { |
1 | action$.scan(reducer, initialState).subscribe(renderView); |