《深入浅出RxJS》阅读笔记

这本书看前两章的时候还蛮吸引我的,从操作符开始,可能这种琐碎的东西真的很难讲吧,没什么具体的内容,相比直接看官网来说,并没有什么更总结性或者深层次的见解,也没什么经验之谈的应用举例。看到后半部分的内容,觉得略显拖沓,大量的细致描述其实并不等于讲清楚了。最后的breakout游戏的例子还行。可以看着入门,留下一个大体的印象,但还是要看官网哦。不说什么了,我去看官网了。

第一章 函数响应式编程

  • RxJS使用了一种不同于传统的编程模式——函数响应式编程。

一个简单的RxJS例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 记录鼠标按下到放开的时间和1s的差距

const holdMeButton = document.querySelector('#hold-me');
const mouseDown$ = Rx.Observable.fromEvent(holdMeButton, 'mousedown');
const mouseUp$ = Rx.Observable.fromEvent(holdMeButton, 'mouseup');

const holdTime$ = mouseUp$.timestamp().withLatestFrom(mouseDown$.timestamp(), (mouseUpEvent, mouseDownEvent) => {
return mouseUpEvent.timestamp- mouseDownEvent.timestamp;
});

holdTime$.subscribe(ms => {
document.querySelector('#hold-time').innerText = ms;
});

holdTime$.flatMap(ms => Rx.Observable.ajax('https://timing-sense-score-board.herokuapp.com/score/' + ms))
.map(e => e.response)
.subscribe(res => {
document.querySelector('#rank').innerText = '你超过了' + res.rank + '% 的用户';
});

RxJS世界中有一种特殊的对象,称为“流”(stream),也会以“数据流”或者“Observable对象”称呼
这种对象实例。

代表“流”的变量标⽰符,都是以$符号结尾,这是RxJS编程中普遍使用的风格,被称为“芬兰式命名法”(Finnish Notation)。

在RxJS的代码中,代码是一个个函数,每个函数只是对输入的参数做了响应,然后返回结果。

RxJS引入了两个重要的编程思想:

  • 函数式
  • 响应式

函数响应式编程

函数响应式编程

第二章 RxJS入门

RxJS的版本和运行环境

  • 如今的JavaScript打包工具,有一个功能叫做Tree-Shaking,指的是在打包过程中发现根本没有用上的函数,最终的打包文件也就不需要包含这些没被使用的函数代码。Tree-Shaking只对import语句导入产生作用,因为Tree-Shaking的工作方式是对代码进行静态分析。但是,Tree-Shaking对RxJS不起作用,因为RxJS中的这部分函数都是要“挂”到Observable这个类上去,在RxJS内部都已经被Observable这个类“引用”了,不会被树摇掉。如果不想使用所有的RxJS功能,最好是按需要去导入模块。
1
2
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/of';

实际项目中,如果每一个代码文件都写这么多import语句,那就实在太麻烦了。更好的方式使用一个代码文件专门导入RxJS相关功能,其他的代码文件在导入导入文件,这样就把RxJS导入的工作集中管理。

Observable和Observer

Observable——可被观察者,Observer——观察者,连接两者的桥梁就是Observable对象的函数subscribe。

Observable对象实现了下面两种设计模式:

  • 观察者模式

  • 迭代器模式

观察者模式

观察者模式要解决的问题,就是在一个持续产生事件的系统中,如何分割功能,让不同模块只需要处理一部分逻辑,这种分而治之的思想是基本的系统设计概念。

将逻辑分为发布者和观察者,发布者只管负责生产事件,观察者只管接收到事件之后就处理。

在RxJS的世界中,Observable对象就是一个发布者,通过Observable对象的subscribe函数,可以把这个发布者和某个观察者连接起来。

Observable

1
2
3
4
5
6
import { of } from 'rxjs';

// 发布者 发布字符串‘hello world’事件
const helloWorld$ = of('hello world');
// 观察者扮演console.log事件
helloWorld$.subscribe(console.log);

迭代器模式

迭代器的作用就是提供一个通用的接口,让使用者完全不用关心这个数据集合的具体实现方式。

迭代器另一个容易理解的名字叫游标(cursor),就像是一个移动的指针一样,从集合一个元素移到另一个元素,完成对整个集合的遍历。

RxJS实现的是“推”式的迭代器实现。

在RxJS中,作为迭代器的使用者,并不需要主动去从Observable中“拉”数据,而是只要subscribe上Observable对象之后,自然就能够收到消息的推送。

创造Observable

1
2
3
4
5
6
7
8
9
rxjsNext() {
const next = observer => {
observer.next(1);
observer.next(2);
observer.next(3);
};
const next$ = new Observable(next);
next$.subscribe(item => console.log(item));
}
1
2
3
4
5
6
7
8
9
10
rxjsNext() {
const next = observer => {
observer.next(1);
observer.next(2);
observer.next(3);
};
const next$ = new Observable(next);
const theObserver = {next: item => console.log(item)};
next$.subscribe(theObserver);
}

跨时间的Observable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
rxjsInterval() {
const onSubscribe = observer => {
let num = 1;
const handle = setInterval(() => {
observer.next(num++);
if (num > 3) {
clearInterval(handle);
}
}, 1000);
};
const next$ = new Observable(onSubscribe);
const theObserver = {next: item => console.log(item)};
next$.subscribe(theObserver);
}

Observable这样的时间特性使得异步操作十分容易,因为对于观察者Observable,只需要被动接受推送数据来处理,而不用关心数据何时产生。

永无止境的Observable

Observable对象中吐出来的数据可以是无穷的。如果我们不中断上一个例子中的程序,让他一直运行下去,这个程序也不会消耗更多的内存,这是因为Observable对象每次只吐出一个数据,然后这个数据就被Observer消化处理了,不会存在数据的堆积。

不过,并不能给予Observer一个终止信号,Observer依然时刻准备着接收Observable的推送数据,相关的资源也不会被释放。所以,还需要一个宣称Observable对象已经完结的方式。

Observable的完结

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Observer的complete函数
rxjsComplete() {
const onSubscribe = observer => {
let num = 1;
const handle = setInterval(() => {
observer.next(num++);
if (num > 3) {
clearInterval(handle);
observer.complete();
}
}, 1000);
};
const next$ = new Observable(onSubscribe);
const theObserver = {
next: item => console.log(item),
complete: () => console.log('No more Data')
};
next$.subscribe(theObserver);
}

只有observerable主动调用complete,完结信号才会退给observer

Observable的出错处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
rxjsError() {
const onSubscribe = observer => {
observer.next(1);
observer.error('Something Wrong');
observer.complete();
};
const next$ = new Observable(onSubscribe);
const theObserver = {
next: item => console.log(item),
error: err => console.log(err),
complete: () => console.log('No more data')
};
next$.subscribe(theObserver);
}

在RxJS中,一个Observable对象只有一种终结状态,要么是完结,要么是出错,一旦进入出错状态,这个Observable对象也就终结了,在不会调用对应的Observer的next函数或complete函数;同样,进入完结状态也不能再调用Observer的next和error。

observable-status

Observer的简单形式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
rxjsSimple() {
const onSubscribe = observer => {
observer.next(1);
observer.error('Something Wrong');
observer.complete();
};
const next$ = new Observable(onSubscribe);
// 可以直接接收,而不用去创造一个observer对象
next$.subscribe(
item => console.log(item),
err => console.log(err),
() => console.log('No More Data')
);
}

退订Observable

例如,Observer只需要监听一个Observable对象三秒钟时间,三秒钟之后就不关心这个Observable对象的任何事件了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
rxjsSimple() {
const onSubscribe = observer => {
let num = 1;
const handle = setInterval(() => {
console.log('in onSubscribe ', num);
observer.next(num++);
}, 1000);
// 有返回结果,是一个包含退订方法的对象
return {
unsubscribe: () => {
// clearInterval(handle);
}
}
};
const next$ = new Observable(onSubscribe);
const subscription = next$.subscribe(
item => console.log(item),
);

setTimeout(() => {
// unsubscribe调用之后,虽然不再接收到推送的数据,但是source$并没有终结,因为始终没有调用complete,如果继续订阅会接着往下输出
subscription.unsubscribe();
}, 3500);
}

可以看出,在调用unsubscribe函数调用之后,onSubscribe中依然在不断地调用next函数,但是Observer已经不会再做出任何响应了。

Hot Observable 和 Cold Observable

Cold Observable - 每一次subscribe都产生一个新的生产者,传输这个生产者产生数据(后一次订阅会从头开始)

1
2
3
const cold$ = new Observable(observer => {
const producer = new Producer();
});

Hot Observable - 每次订阅的时候,已经准备好了一个生产者。在Hot Observable中,Observable明显并不产生数据,只是数据的搬运工。(后一次订阅会接着上一次订阅的结果)

1
2
3
4
const producer = new Producer();
const hot$ = new Observable( observer => {

});

操作符简介

对于现实中复杂的问题,并不会创造一个数据流之后就直接通过subscribe接上一个Observer,往往需要对这个数据流做一系列处理,然后才交Observer。
在数据管道里流淌的数据就像是水,从上游 流向下游,对一个操作符来说,上有可能是一个数据源,也可能是其他操作符,下游可能是最终的观察者,也可能是另一个操作符,每一个操作符之间都是独立的。

process

operator

1
2
3
4
5
6
7
8
9
10
const onSubscribe = observer => {
observer.next(1);
observer.error('Something Wrong');
observer.complete();
};
// create创造一个新的Observable对象。
const source$ = Observable.create(onSubscribe);
// map是对其中每一个数据映射为一个新的值,残生一个新的Observable对象。
const mapped$ = source$.map(x => x*x);
mapped$.subscribe(console.log);

操作符就是用来产生全新Observable对象的函数(每一个操作符都是创造一个新的Observable对象,不会对上游的Observable对象做任何修改)。

弹珠图

弹珠图可以表示一个Observable对象所表示的数据流。
每个弹珠之间的间隔,代表的是吐出数据之间的时间间隔;
|符号代表数据流的完结,对应调用下游的complete函数;
×代表数据流中的异常,对应于调用下游的error函数

弹珠图工具

rxmarbles

为了描述操作符的功能,弹珠图中往往会出现多条时间轴。

第三章 操作符基础

  • 创建类操作符
  • 合并类操作符
  • 过滤类操作符
  • 转化类操作符
  • 异常处理类操作符
  • 多播类操作符(让一个数据流的数据可以提供给多个观察者)

为什么要有操作符

共同特征- 返回一个Observable对象。

对Observable对象能够链式调⽤filter和map,是因为:

  • filter和map都是Observable对象的成员函数。
  • filter和map的返回结果依然是Observable对象。
  • filter和map不会改变原本的Observable对象。

操作符的分类

RxJS v5版本自带60多个操作符。

功能分类

classify

静态和实例分类

不需要Observable实例就可以执行的函数,称为静态操作符;前提是要有一个创建好的Observable对象,称为实例操作符。

类似于:(和JS中的对象类比)
在Observable类上加一个静态方法
Observable.of
在Observable的prototype属性上加一个函数
Observable.prototype.map

如果要导入静态操作符:

1
import 'rxjs/add/observable/of';

如果要导入实例操作符:

1
import 'rxjs/add/operator/map';

如果要调用静态操作符:

1
const source$ = Observable.of(/*⼀些参数*/);

如果要调用实例操作符:

1
const result$ = source$.map(/*⼀些参数*/);

静态操作符只能出现在首位,实例操作符则可以出现在任何位置,有些功能既可以作为Observable对象的静态方法,也可以作为Observable对象的实例方法。

如何实现操作符

具体的应用项目中,很可能会用上一些可以重复使用的逻辑,这些逻辑可以封装在自定义的操作符中,这时候就需要知道如何定制一个新的操作符。

操作符函数的实现

每个操作符都是一个函数,不管实现什么功能,都必须考虑下面这些功能要点:

  • 返回一个全新的Observable对象。
  • 对上游和下游的订阅及退订处理。
  • 处理异常情况
  • 及时释放资源

例:
// map操作符

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
function map(project) {
// 返回一个全新的Observable对象
return new Observable(observer => {
// 对上游的订阅
const sub = this.subscribe({
// 下游的订阅处理
next: value => {
// 处理异常
try {
observer.next(project(value))
} catch (err) {
observer.error(err);
}
},
error: err => observer.error(err),
complete: {} => observer.complete(),
});
return {
// 退订处理(map并不占用什么资源,有的操作符要在退订的时候,及时释放资源)
unsubscrib: () => {
sub.unsubscribe();
}
}
});
}

关联Observable

  • 最简单的方法(不推荐,避免污染全局Observable)
    Observable.prototype.map = map;
    注意:这里不能使用箭头函数,箭头函数中的this直接绑定于定义函数环境下的this,而不是执行时指定的this。

  • 如果只想给指定的Observable对象使用

使用绑定操作符::,可以链式调用
const result$ = source$::map(x => x*2) :: map(x => x+1);

  • 使用lift

list是Observable的实力函数,它会返回一个新的Observable对象,通过传递给lift的函数参数可以赋予个这个新的Observable对象特殊功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 如果使用lift,那么map的实现代码如下
function map(project) {
return this.lift(function(source$) {
return source$.subscribe({
next: value => {
try {
this.next(project(value));
} catch (err) {
this.error(err);
}
},
error: err => this.error(err),
complete: () => this.complete(),
});
});
}

虽然RxJS v5的操作符都架构在lift上,应用层开发者并不经常使用lift,这个lift更多的是给RxJS库开发者使用。

改进的操作符定义

如果函数中需要使用this,那就多了一个改变函数行为的因素,也就算不上真正的纯函数了。

lettable/pipeable操作符

最理想的方式是使用RxJS v5.5引入的pipeable操作符,这种方式不仅让代码更加简洁,而且可以让Tree Shanking发挥作用。

1
2
3
4
5
6
7
8
9
// 通过pipe串接了filter和map两个lettable操作符
import {of} from 'rxjs/observable/of';
import {map, filter} from 'rxjs/operators';
const source$ = of(1, 2, 3);
const result$ = source$.pipe(
filter(x => x % 2 === 0),
map(x => x * 2)
);
result$.subscribe(console.log);

以前导入map的代码如下:

1
import 'rxjs/add/operator/map';

现在导入pipeable操作符map的代码就是如下这样:

1
2
import {map} from 'rxjs/operators/map';
import {map} from 'rxjs/operators';

有的传统的操作符名称和pipeable操作符名称不同

管道操作符

管道操作符是一个竖杠和大于号的组合,也就是|>,用来连接两个值,前一个值没有任何要求,后一个值必
须是函数,管道操作符的作用就是把前面的值作为参数来调用后面的函数。
管道操作符还可以对以一个值连续进行多次函数操作。

1
2
666 |> foo |> bar
bar(foo(666))

可以用下面这种形式来使用pipeable操作符

1
2
3
const result$ = source$
|> filter (x => x % 2 === 0)
|> map(x => x * 2)

第4章 创建数据流

本章介绍RxJS中用于创造Observable对象的操作符,这些操作符是RxJS中数据流的源头。

4.1 创建类操作符

创建类操作符

4.2 创建同步数据流

创建同步数据流

4.3 创建异步数据的Observable对象

创建异步数据的Observable对象

第5章 合并数据流

合并数据流

5.1 合作类操作符

合作类操作符

5.2 高阶Observable

高阶Observable

第6章 辅助类操作符

辅助类操作符

6.1 数学类操作符

数学类操作符

6.2 条件布尔类操作符

条件布尔类操作符

第7章 过滤数据流

过滤数据流

7.1 过滤类操作符的模式

过滤类操作符的模式

7.2 回压控制

回压控制

7.3 其他过滤方式

其他过滤方式

第8章 转化数据流

概念: 让数据管道中的数据发生变化

8.1 转化类操作符

转化类操作符

8.2 映射数据

映射数据

8.3 缓存窗口:无损回压控制

缓存窗口:无损回压控制

8.4 高阶的map

高阶的map

8.5 数据分组

数据分组

8.6 累计数据

累计数据

第9章 异常错误处理

9.1 异常处理不可避免

9.2 异常处理的难度

异常处理的难度

9.3 RxJS的异常处理

RxJS的异常处理

9.4 重试的本质

重试的本质

第10章 多播

让一个数据流的内容被多个Observer订阅。

数据流的多播

数据流的多播

Hot和Cold数据流差异

Hot和Cold数据流差异

Subject

Subject

支持多播的操作符

支持多播的操作符

高级多播功能

高级多播功能

第11章 掌握时间的Scheduler

Scheduler可以作为创造类和合并类操作符的函数使用。此外,RxJS还提供了observeOn和subscribeOn两个操作符,用于在数据管道任何位置插入给定Scheduler。

Scheduler的调度作用

在RxJS提供的很多操作符中都带有Scheduler类型的参数,在之前的章节中,为了简化问题,我们有意不提这些参数,不过,现在是时候来研究一下Scheduler参数的作用了。

因为scheduler不经常使用,所以scheduler总是一个可选参数,如果一个操作符有scheduler参数,那么这个参数也肯定是最后一个参数。

1
2
function range(start, count, scheduler) {
}

如果使用操作符的时候不传递scheduler参数,那么RxJS就会使用默认的Scheduler实现。

Scheduler的官方定义:

  • Scheduler是一种数据结构;(可以根据优先级或者其他某种条件来安排任务执行队列)
  • Scheduler是一个执行环境;(可以指定一个任务何时何地执行)
  • Scheduler拥有一个虚拟时钟(virtual clock)。

Scheduler可以改变Observable对象的数据产产方式。

RxJS提供的Scheduler

在RxJS中,提供了下列Scheduler实例。

  • undefined/null,也就是不指定Scheduler,代表同步执行的Scheduler。
  • asap,尽快执行的Scheduler。
  • async,利用setInterval实现的Scheduler,用于基于时间吐出数据的场景。
  • queue,利用队列实现的Scheduler,用于迭代一个大的集合的场景。
  • animationFrame,用于动画场景的Scheduler。

RxJS默认选择Scheduler的原则是:尽量减少并发运行。

Scheduler的工作原理

JavaScript只有一个线程。

JavaScript的解析和运行环境称为“JavaScript引擎”,JavaScript引擎有诸多实现,Chrome浏览器和Node.js使用的是v8。

“调⽤栈”——当调用一个函数的时候,就在调用栈上创建这个函数运行的空间,参数的传递、局部变量的创建都是通过调用栈完成;当一个函数执行完毕的时候,对应调用栈上这个函数的本次运行空间就被清除。

“事件循环”——可以看作一个死循环,重复的工作就是从“事件队列”中拿到需要处理的事件任务,然后把这个任务交给调用栈去执行,当这个任务处理结束之后,再从“事件队列”中拿下一个任务塞给调用栈……

当调用栈正在执行一个任务的时候,事件循环也只能等着,只有当前一个任务完成之后,才能塞给调用栈下一个任务。这也就是setTimeout不可能百分之百准确的原因。

“事件队列”中的任务可以细分为Micro Task和Macro Task。

如果把“事件队列”看作是等待执行而排队的话,那实际上也不只是排一条队(Macro Task)。

Micro Task只有一个队列,而且这个队列简直就是VIP快速通道。当调用栈处理完一个任务,准备迎接下一个任务的时候,“事件循环”总是会优先看一看Micro Task的队列,只要还有Micro Task存在,就直接把Micro Task交给调用栈,其他Macro Task队列的任务都只能等下次机会。

利用调用栈实现的Scheduler方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 默认的Scheduler
Observable.range = function(start, count) {
return Observable.create((observer) => {
let index = start;
let end = start + count;
do {
if (index >= end) {
observer.complete();
break;
}
observer.next(index++);
} while (true);
});
};

在RxJS中,每一个Scheduler类都继承形式如下的接口IScheduler:

1
2
3
4
5
6
interface IScheduler {
// 当前时间
now();
// 交给Scheduler一个工作
schedule(work, delay, state);
}

是asap会尽量使用Micro Task,而async利用的是Macro Task。queue这个Scheduler,如果调用它的schedule函数式参数delay是0,那它就用同步的方式执行,如果delay参数大于0,那queue的表现其实就和async一模一样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import {asap} from 'rxjs/scheduler/asap';
import {async} from 'rxjs/scheduler/async';
import {queue} from 'rxjs/scheduler/queue';
console.log('before schedule');
async.schedule(() => console.log('async'));
asap.schedule(() => console.log('asap'));
queue.schedule(() => console.log('queue'));
console.log('after schedule');

// before schedule
// queue
// after schedule
// async
// asap

支持Scheduler的操作符

支持Scheduler的操作符

第12章 RxJS的调试和测试

调试方法

RxJS代码多会涉及异步操作,所以用不上传统的Debugger。

最有效的调试工具,依然是仔细考虑过而且位置恰当的打印语句。

利用do来插入调试代码

1
2
3
source$.do(
value => console('source$ data = ', value)
).subscribe(observer);

改进的日志调试方法,甚至可以更近一步分为debug,info,warn,error

1
2
3
4
5
6
7
Observable.prototype.debug = function(fn) {
if (global.debug) {
return this.do(fn);
} else {
return this;
}
};

画出数据流依赖图

单元测试

1
2
3
4
5
6
7
it('should sum up string value', () => {
const source$ = Rx.Observable.of('1', '2', '3');
const result$ = source$.pipe(sum);
result$.subscribe(
value => assert.equal(6, value)
);
});

操纵时间的TestScheduler,弹珠测试

1
2
3
4
5
6
7
8
9
it('should work with map operator', () => {
const source = '-a-b|';
const expected = '-a-b|';
const source$ = scheduler.createColdObservable(source, {a: 1, b: 3});
scheduler.expectObservable(source$.map(x => x * 2)).toBe(expected, {
a: 2, b: 6
});
scheduler.flush();
});

第13章 用RxJS驱动React

如何把RxJS的数据流和React的状态管理关联起来。利用Subject对象作为连接RxJS和React的纽带。

1
2
3
4
this.counter = new Subject();
const observer = value => this.setState({count: value});
this.counter.scan((result, inc) => result + inc, 0)
.subscribe(observer);

第14章 Redux和RxJS结合

Redux简介

Redux维持一个全局的Store,这个Store存储的就是应用的状态,因为这个Store是全局可见的,所以,当一个组件A修改Store上的状态的时候,与之相对的组件B依然可以读取到这个变化,这样就实现了组件之间的通信。

为了完成这样一个功能,一个使用Redux的系统需要如下这些元素:

  • Store——一个对象,提供一个getState函数,可以获得当前Store上存储的状态

  • action——修改状态,store.dispatch(action);

  • reducer——为了能够处理action,创建Store的时候提供对应的reducer函数,

1
2
3
function reducer (state, action) => {
//返回⼀个新的state
}
  • view——视图部分根据Store上的状态来渲染用户界面

用RxJS实现Redux

1
action$.scan(reducer, initialState).subscribe(renderView);

Redux和RxJS比较

Redux-status

RxJS-status

Redux-Observable:Redux和RxJS的结合

第15章 RxJS游戏开发