RxJS 是一个库,它通过使用 observable 序列来编写异步和基于事件的程序。它提供了一个核心类型 Observable,附属类型 (Observer、 Schedulers、 Subjects) 和受 [Array#extras] 启发的操作符 (map、filter、reduce、every, 等等),这些数组操作符可以把异步事件作为集合来处理。
ReactiveX 结合了 观察者模式、迭代器模式 和 使用集合的函数式编程,以满足以一种理想方式来管理事件序列所需要的一切。
在 RxJS 中用来解决异步事件管理的的基本概念是:
- Observable (可观察对象): 表示一个概念,这个概念是一个可调用的未来值或事件的集合。
- Observer (观察者): 一个回调函数的集合,它知道如何去监听由 Observable 提供的值。
- Subscription (订阅): 表示 Observable 的执行,主要用于取消 Observable 的执行。
- Operators (操作符): 采用函数式编程风格的纯函数 (pure function),使用像 map、filter、concat、flatMap 等这样的操作符来处理集合。
- Subject (主体): 相当于 EventEmitter,并且是将值或事件多路推送给多个 Observer 的唯一方式。
- Schedulers (调度器): 用来控制并发并且是中央集权的调度员,允许我们在发生计算时进行协调,例如 setTimeout 或 requestAnimationFrame 或其他。
注册事件监听器的常规写法。
var button = document.querySelector('button');
button.addEventListener('click', () => console.log('Clicked!'));
使用 RxJS 的话,创建一个 observable 来代替。
var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
.subscribe(() => console.log('Clicked!'));
使得 RxJS 强大的正是它使用纯函数来产生值的能力。这意味着你的代码更不容易出错。
通常你会创建一个非纯函数,在这个函数之外也使用了共享变量的代码,这将使得你的应用状态一团糟。
var count = 0;
var button = document.querySelector('button');
button.addEventListener('click', () => console.log(`Clicked ${++count} times`));
使用 RxJS 的话,你会将应用状态隔离出来。
var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
.scan(count => count + 1, 0)
.subscribe(count => console.log(`Clicked ${count} times`));
scan 操作符的工作原理与数组的 reduce 类似。它需要一个暴露给回调函数当参数的初始值。每次回调函数运行后的返回值会作为下次回调函数运行时的参数。
RxJS 提供了一整套操作符来帮助你控制事件如何流经 observables 。
下面的代码展示的是如何控制一秒钟内最多点击一次,先来看使用普通的 JavaScript:
var count = 0;
var rate = 1000;
var lastClick = Date.now() - rate;
var button = document.querySelector('button');
button.addEventListener('click', () => {
if (Date.now() - lastClick >= rate) {
console.log(`Clicked ${++count} times`);
lastClick = Date.now();
}
});
使用 RxJS:
var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
.throttleTime(1000)
.scan(count => count + 1, 0)
.subscribe(count => console.log(`Clicked ${count} times`));
其他流程控制操作符有 filter、delay、debounceTime、take、takeUntil、distinct、distinctUntilChanged 等等。
对于流经 observables 的值,你可以对其进行转换。
下面的代码展示的是如何累加每次点击的鼠标 x 坐标,先来看使用普通的 JavaScript:
var count = 0;
var rate = 1000;
var lastClick = Date.now() - rate;
var button = document.querySelector('button');
button.addEventListener('click', (event) => {
if (Date.now() - lastClick >= rate) {
count += event.clientX;
console.log(count)
lastClick = Date.now();
}
});
使用 RxJS:
var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
.throttleTime(1000)
.map(event => event.clientX)
.scan((count, clientX) => count + clientX, 0)
.subscribe(count => console.log(count));
其他产生值的操作符有 pluck、pairwise、 sample 等等。
Observables 是多个值的惰性推送集合。它填补了下面表格中的空白:
单个值 | 多个值 | |
---|---|---|
拉取 | Function | Iterator |
推送 | Promise | Observable |
示例 - 当订阅下面代码中的 Observable 的时候会立即(同步地)推送值1、2、3,然后1秒后会推送值4,再然后是完成流:
var observable = Rx.Observable.create(function (observer) {
observer.next(1);
observer.next(2);
observer.next(3);
setTimeout(() => {
observer.next(4);
observer.complete();
}, 1000);
});
要调用 Observable 并看到这些值,我们需要订阅 Observable:
var observable = Rx.Observable.create(function (observer) {
observer.next(1);
observer.next(2);
observer.next(3);
setTimeout(() => {
observer.next(4);
observer.complete();
}, 1000);
});
console.log('just before subscribe');
observable.subscribe({
next: x => console.log('got value ' + x),
error: err => console.error('something wrong occurred: ' + err),
complete: () => console.log('done'),
});
console.log('just after subscribe');
控制台执行的结果:
just before subscribe
got value 1
got value 2
got value 3
just after subscribe
got value 4
done
拉取 和 推送 是两种不同的协议,用来描述数据 生产者 (Producer) 如何与数据消费者 (Consumer) 进行通信的。
什么是拉取? - 在拉取体系中,由消费者来决定何时从生产者那里接收数据。生产者本身不知道数据是何时交付到消费者手中的。
每个 JavaScript 函数都是拉取体系。函数是数据的生产者,调用该函数的代码通过从函数调用中“取出”一个 单个 返回值来对该函数进行消费。
ES2015 引入了 generator
函数和 iterators
(function*)
,这是另外一种类型的拉取体系。调用 iterator.next()
的代码是消费者,它会从 iterator(生产者) 那“取出” 多个 值。
生产者 | 消费者 | |
---|---|---|
拉取 | 被动的: 当被请求时产生数据。 | 主动的: 决定何时请求数据。 |
推送 | 主动的: 按自己的节奏产生数据。 | 被动的: 对收到的数据做出反应。 |
什么是推送? - 在推送体系中,由生产者来决定何时把数据发送给消费者。消费者本身不知道何时会接收到数据。
在当今的 JavaScript 世界中,Promises
是最常见的推送体系类型。Promise(生产者) 将一个解析过的值传递给已注册的回调函数(消费者),但不同于函数的是,由 Promise 来决定何时把值“推送”给回调函数。
RxJS 引入了 Observables,一个新的 JavaScript 推送体系。Observable 是多个值的生产者,并将值“推送”给观察者(消费者)。
Function 是惰性的评估运算,调用时会同步地返回一个单一值。 Generator 是惰性的评估运算,调用时会同步地返回零到(有可能的)无限多个值。 Promise 是最终可能(或可能不)返回单个值的运算。 Observable 是惰性的评估运算,它可以从它被调用的时刻起同步或异步地返回零到(有可能的)无限多个值。