RxJS官网阅读小记

主要阅读OVERVIEW部分,加深一下理解,还有体验一下Operator Decision Tree,感觉说不定很厉害。操作符部分还是等用到了再仔细感悟吧。

RxJS官网:RxJS官网

OVERVIEW

Introduction

Think of RxJS as Lodash for events.

把RxJS看做是事件的数组。

核心思想:

  • Observable: 一个概念,表示一个包含未来值和事件的可调用集合。
  • Observer: 一个监听Observable对象提供值的回调函数的集合。
  • Subscription: 表示Observable的执行,主要用于取消Observable的执行。
  • Operators: 采用函数式编程的纯函数,使用像map、filter、concat、flatMap等这样的操作符来处理集合。
  • Subject: 相当于EventEmitter, 将一个值或者事件多播给多个Observer的唯一方式。
  • Schedulers: 用来控制并发性的集中调度器,允许我们调整计算的发生时间。e.g. setTimeout or requestAnimationFrame or others.

Purity

使用纯函数来生产值使得RxJS十分强大,不易出错。使用RxJS将状态分离。

Flow

RxJS有各种各样的操作符来帮助你控制事件如何流经observables。

Values

你可以改变经过你的observables的值。

Observable

SINGLE MULTIPLE
Pull Function Iterator
Push Promise Observable

Pull versus Push

拉取和推送是一个数据生产者和一个数据消费者之间通信的两种不同的协议。

拉取的话,生产者不知道什么时候将数据发送给消费者,消费者决定它从生产者那里接受数据的时机。

JavaScript中的函数就是一个数据生产者,调用函数的代码从函数的返回值中“拉取”到一个值。
generator函数和iterators(function*)就是多个数据的生产者,调用iterator.next()的代码就是消费者,从iterator中“拉取”多个值。

推送的话,生产者决定发送数据给消费者的时机,消费者不知道什么时候会接收到数据。

Promise(生产者)发送一个解析过的值给已经注册的回调函数(消费者),Promise决定把数据推送给callbacks的时机。

Observable生产多个值,推送给Observers(消费者)。

Observables as generalizations of functions

Observables 像是没有参数, 但可以泛化为多个值的函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
function foo() {
console.log('Hello');
return 42;
}

const x = foo.call(); // same as foo()
console.log(x);
const y = foo.call(); // same as foo()
console.log(y);

import { Observable } from 'rxjs';

const foo = new Observable(subscriber => {
console.log('Hello');
subscriber.next(42);
});

foo.subscribe(x => {
console.log(x);
});
foo.subscribe(y => {
console.log(y);
});

// result

"Hello"
42
"Hello"
42

上面的例子你可以看到他们的结果一致,为什么呢?因为functions和Observables都是惰性计算。你只有“调用/subscribe”了这个函数/obervable,console.log('Hello')才会执行。此外,call/subscribe是一个独立的操作。(两个函数调用触发两个独立的副作用)

订阅observable和调用function类似。他们的不同之处在于,observables传递值既可以同步的,也可以异步的(随时间变化传输多个值)。函数return只能返回一次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
function foo() {
console.log('Hello');
return 42;
return 100; // dead code. will never happen
}

// 同步

import { Observable } from 'rxjs';

const foo = new Observable(subscriber => {
console.log('Hello');
subscriber.next(42);
subscriber.next(100); // "return" another value
subscriber.next(200); // "return" yet another
});

console.log('before');
foo.subscribe(x => {
console.log(x);
});
console.log('after');

"before"
"Hello"
42
100
200
"after"

// 异步

import { Observable } from 'rxjs';

const foo = new Observable(subscriber => {
console.log('Hello');
subscriber.next(42);
subscriber.next(100);
subscriber.next(200);
setTimeout(() => {
subscriber.next(300); // happens asynchronously
}, 1000);
});

console.log('before');
foo.subscribe(x => {
console.log(x);
});
console.log('after');

"before"
"Hello"
42
100
200
"after"
300

总结

  • func.call() 给我一个同步的值

  • observable.subscribe() 给我一堆值,随便你是同步的还是异步的。

Anatomy of an Observable

核心的Observable概念:

  • 创建Observables(new Observable()或者创建操作符)
  • 订阅Observables(订阅 Observable 像是调用函数, 并提供接收数据的回调函数。)
  • 执行Observables(next/complete/error)
  • 销毁Observables(subscription.unsubscribe())

Operators

What are operators

操作符就是函数。有两种操作符:

管道操作符使用语法observableInstance.pipe(operator())来输送Observables。(满足纯函数,不改变Observable实例,返回一个新的Observable)

创建操作符被独立调用来创建一个新的Observable。

Piping

1
2
3
4
5
6
7
8
9
obs.pipe(
op1(),
op2(),
op3(),
op3(),
)

// 即使是一个,也最好写成:
obs.pipe(op())

Creation Operators

1
2
3
import { interval } from 'rxjs';

const observable = interval(1000 /* number of milliseconds */);

Higher-order Observables

1
2
3
const fileObservable = urlObservable.pipe(
map(url => http.get(url)),
);

http.get()返回一个Observable,fileObservable就是Observables of Observables,也就是高阶Observable。

通常用展开(flatten)的方式来处理高阶Observable,转化为普通的Observable。

1
2
3
4
const fileObservable = urlObservable.pipe(
map(url => http.get(url)),
concatAll(),
);

Marble diagrams

弹珠图是来展示说明操作符如何运行的一个工具。

Categories of operators

略,列举了各种操作符,边用边看。

Subscription

Subscription就是一个可以销毁的对象,通常执行一个Observable。

1
2
3
4
5
6
7
8
import { interval } from 'rxjs';

const observable = interval(1000);
const subscription = observable.subscribe(x => console.log(x));
// Later:
// This cancels the ongoing Observable execution which
// was started by calling subscribe with an Observer.
subscription.unsubscribe();

Subscription可以合并,所以调用一个unsubscribe()可以取消订阅多个Subscriptions。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import { interval } from 'rxjs';

const observable1 = interval(400);
const observable2 = interval(300);

const subscription = observable1.subscribe(x => console.log('first: ' + x));
const childSubscription = observable2.subscribe(x => console.log('second: ' + x));

subscription.add(childSubscription);

setTimeout(() => {
// Unsubscribes BOTH subscription and childSubscription
subscription.unsubscribe();
}, 1000);

second: 0
first: 0
second: 1
first: 1
second: 2

Subscriptions也可以取消一个child Subscription通过remove(otherSubscription)方法。

Subject

Subject是一种特殊的Observable,允许把值多播给多个Observers。一般的Observable是单播的,Subject是多播。

Subject就像是EventEmitter一样,存着一张有许多监听者的注册表。

每个Subject都是一个Observable。你可以提供一个观察者并使用subscribe方法。

每个Subject都是一个Observer对象,有next(v),error(e),complete()方法。调用next(theValue)给这个Subject一个新的值,它会多播到所有被注册的监听这个Subject的Observers。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import { Subject } from 'rxjs';

const subject = new Subject<number>();

subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});

subject.next(1);
subject.next(2);

// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2

因为Subject是一个Observer,所以你可以提供一个Subject对象作为任意Observable的subscribe方法的参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import { Subject, from } from 'rxjs';

const subject = new Subject<number>();

subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});

const observable = from([1, 2, 3]);

observable.subscribe(subject); // You can subscribe providing a Subject

// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3

通过上面这个方法,我们基本上只需通过Subject就可以把一个单播的Observable转化为多播。

Multicasted Observables

多播Observable在底层是通过使用Subject使得多个观察者可以看见同一个Observable执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import { from, Subject } from 'rxjs';
import { multicast } from 'rxjs/operators';

const source = from([1, 2, 3]);
const subject = new Subject();
const multicasted = source.pipe(multicast(subject));

// These are, under the hood, `subject.subscribe({...})`:
multicasted.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
multicasted.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});

// This is, under the hood, `source.subscribe(subject)`:
multicasted.connect();

multicast返回一个看上去像普通的Observable,但是实际上被订阅的时候像是一个subject。multicast返回一个ConnectableObservable(有connect()方法的Observable对象)。这个connect()方法决定了何时开始这个被共享的Observable对象的执行。也可以退订这个被共享的Observable对象的执行。

Reference counting

用connect()手动处理Supscription太麻烦。通常我们希望第一个Observer来的时候自动订阅,最后一个Observer退订的时候自动取消。

比如,举个例子,要实现如下顺序:

  1. 第一个Observer订阅了这个多播的观察者对象
  2. connect()
  3. 发送0到Observer1
  4. 第二个Observer订阅了这个多播的观察者对象
  5. 发送1到Observer1
  6. 发送1到Observer2
  7. 第一个Observer退订
  8. 发送2到第二个Observer
  9. 第二个Observer退订
  10. 退订Connection
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 用connect方法如下

import { interval, Subject } from 'rxjs';
import { multicast } from 'rxjs/operators';

const source = interval(500);
const subject = new Subject();
const multicasted = source.pipe(multicast(subject));
let subscription1, subscription2, subscriptionConnect;

subscription1 = multicasted.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
// We should call `connect()` here, because the first
// subscriber to `multicasted` is interested in consuming values
subscriptionConnect = multicasted.connect();

setTimeout(() => {
subscription2 = multicasted.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
}, 600);

setTimeout(() => {
subscription1.unsubscribe();
}, 1200);

// We should unsubscribe the shared Observable execution here,
// because `multicasted` would have no more subscribers after this
setTimeout(() => {
subscription2.unsubscribe();
subscriptionConnect.unsubscribe(); // for the shared Observable execution
}, 2000);

refCount使多播Observable在第一个订阅者来到时自动开始执行,并且当最后一个订阅者离开时自动停止执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// 用refCount的例子

import { interval, Subject } from 'rxjs';
import { multicast, refCount } from 'rxjs/operators';

const source = interval(500);
const subject = new Subject();
const refCounted = source.pipe(multicast(subject), refCount());
let subscription1, subscription2;

// This calls `connect()`, because
// it is the first subscriber to `refCounted`
console.log('observerA subscribed');
subscription1 = refCounted.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});

setTimeout(() => {
console.log('observerB subscribed');
subscription2 = refCounted.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
}, 600);

setTimeout(() => {
console.log('observerA unsubscribed');
subscription1.unsubscribe();
}, 1200);

// This is when the shared Observable execution will stop, because
// `refCounted` would have no more subscribers after this
setTimeout(() => {
console.log('observerB unsubscribed');
subscription2.unsubscribe();
}, 2000);

// Logs
// observerA subscribed
// observerA: 0
// observerB subscribed
// observerA: 1
// observerB: 1
// observerA unsubscribed
// observerB: 2
// observerB unsubscribed

refCount()只存在于ConnectableObservable,并返回一个Observable,而不是另一个ConnectableObservable。

BehaviorSubject

是一种Subject对象的变型,会存储上一个发送给消费者的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import { BehaviorSubject } from 'rxjs';
const subject = new BehaviorSubject(0); // 0 is the initial value

subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});

subject.next(1);
subject.next(2);

subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});

subject.next(3);

// Logs
// observerA: 0
// observerA: 1
// observerA: 2
// 注意这里接受到了2,这个是上一个发送值
// observerB: 2
// observerA: 3
// observerB: 3

ReplaySubject

ReplaySubject可以记录多个Observable执行过的值,然后重现给新的订阅者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(3); // buffer 3 values for new subscribers

subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});

subject.next(5);

// Logs:
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// 这里replay
// observerB: 2
// observerB: 3
// observerB: 4
// observerA: 5
// observerB: 5

除了一个缓冲区大小,也可以制定一个毫秒单位的窗口时间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(100, 500 /* windowTime */);

subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});

let i = 1;
setInterval(() => subject.next(i++), 200);

setTimeout(() => {
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
}, 1000);

// Logs
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerA: 5
// observerB: 3
// observerB: 4
// observerB: 5
// observerA: 6
// observerB: 6
// ...

AsyncSubject

执行结束后才会把最后一个值发送给observers。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import { AsyncSubject } from 'rxjs';
const subject = new AsyncSubject();

subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});

subject.next(5);
subject.complete();

// Logs:
// observerA: 5
// observerB: 5

和last() 操作符类似。

Scheduler

调度程序控制着一个subscription何时开始以及何时发送通知。由三部分组成:

  1. 是一种数据结构。它按照优先度或者其他标准,知道如何存储和队列任务。
  2. 是一种执行上下文。它表示何时何地任务执行(立刻,或者在另一个回调机制中执行,例如setTimeout or process.nextTick, or the animation frame)。
  3. 是一个(虚拟)时钟。通过scheduler中的一个getter方法now(),它提供了一种时间概念。在特定的scheduler上,调度的任务将只遵循该时钟表示的时间。

Scheduler Types

SCHEDULER PURPOSE
null 不传任何scheduler,通知将以同步方式递归传递。
queueScheduler 当前事件帧中的队列调度(蹦床调度器)。用于迭代操作。
asapScheduler 微任务的队列调度,它使用可用的最快速的传输机制。用于异步转换。
asyncScheduler 使用 setInterval 的调度。用于基于时间的操作符。
animationFrameScheduler 计划任务将在下一个浏览器内容重绘之前发生。可用于创建流畅的浏览器动画。

使用调度程序

如果您不提供调度程序,RxJS将使用最小并发原则选择默认调度程序。这意味着选择引入满足运算符需求的最少并发性的调度程序。例如,对于返回具有有限和少量消息的observable的运算符,RxJS不使用Scheduler,即null或undefined。对于返回潜在大量或无限数量消息的运算符,使用queueScheduler。对于使用计时器的操作员,使用async。

要指定特定的调度程序,您可以使用那些采用调度程序的运算符方法,例如from([10, 20, 30], asyncScheduler)。

以下静态创建运算符采用Scheduler参数:

  • bindCallback
  • bindNodeCallback
  • combineLatest
  • concat
  • empty
  • from
  • fromPromise
  • interval
  • merge
  • of
  • range
  • throw
  • timer

使用 subscribeOn 来调度 subscribe() 调用在什么样的上下文中执行。 默认情况下,Observable 的 subscribe() 调用会立即同步地执行。然而,你可能会延迟或安排在给定的调度器上执行实际的 subscription ,使用实例操作符 subscribeOn(scheduler),其中 scheduler 是你提供的参数。

使用 observeOn 来调度发送通知的的上下文。 正如我们在上面的示例中所看到的,实例操作符 observeOn(scheduler) 在源 Observable 和目标观察者之间引入了一个中介观察者,中介负责调度,它使用给定的 scheduler 来调用目标观察者。

实例操作符可能会接收调度器作为参数。

测试

这份指引是关于如何在使用testScheduler.run(callback)时,用宝珠图进行测试。

Operator Decision Tree

主要功能是针对你的情况,选择对应的适合的操作符,极大程度上辅助操作符的选择,减小操作符多带来的问题。

看完官网之后,觉得官网描述的十分清晰,基本只要厘清这些基本概念,发现可以把rxjs当做一个简单又强大的工具即可,并不算十分复杂。

关键可能还是要多写。