RxJS 是响应式编程 (reactive programming) 强大的工具,今天咱们将深刻介绍 Observables 和 Observers 的内容,以及介绍如何建立本身的操做符 (operators)。javascript
若是你以前已经使用过 RxJS,并但愿了解 Observable 及 Operators (操做符) 的内部工做原理,那么这篇文章很是适合你。html
Observable 就是一个拥有如下特性的函数:java
它接收一个 observer
对象做为参数,该对象中包含 next
、error
和 complete
方法node
它返回一个函数,用于在销毁 Observable 时,执行清理操做react
在咱们实现的示例中,咱们将定义一个简单的 unsubscribe
函数来实现取消订阅的功能。然而在 RxJS 中,返回的是 Subcription
对象,该对象中包含一个 unsubscribe
方法。es6
一个 Observable 对象设置观察者 (observer),并将它与生产者关联起来。该生产者多是 DOM 元素产生的 click
或 input
事件,也多是更复杂的事件,如 HTTP。typescript
为了更好地理解 Observable,咱们来自定义 Observable。首先,咱们先来看一个订阅的例子:编程
const node = document.querySelector('input[type=text]'); const input$ = Rx.Observable.fromEvent(node, 'input'); input$.subscribe({ next: (event) => console.log(`You just typed ${event.target.value}!`), error: (err) => console.log(`Oops... ${err}`), complete: () => console.log(`Complete!`) });
该示例中,Rx.Observable.formEvent()
方法接收一个 input
元素和事件名做为参数,而后返回一个 $input
Observable 对象。接下来咱们使用 subscribe()
方法来定于该 Observable 对象。当触发 input
事件后,对应的值将会传递给 observer 对象。segmentfault
Observer (观察者) 很是简单,在上面的示例中,观察者是一个普通的对象,该对象会做为 subscribe()
方法的参数。此外 subscribe(next, error, complete)
也是一个有效的语法,但在本文中咱们将讨论对象字面量的形式。安全
当 Observable 对象产生新值的时候,咱们能够经过调用 next()
方法来通知对应的观察者。若出现异常,则会调用观察者的 error()
方法。当咱们订阅 Observable 对象后,只要有新的值,都会通知对应的观察者。但在如下两种状况下,新的值不会再通知对应的观察者:
已调用 observer 对象的 complete()
方法
消费者对数据再也不感兴趣,执行取消订阅操做
此外在执行最终的 subscribe()
订阅操做前,咱们传递的值能够通过一系列的链式处理操做。执行对应操做的东西叫操做符,每一个操做符执行完后会返回一个新的 Observable 对象,而后继续咱们的处理流程。
正如上面所说的,Observable 对象可以执行链式操做,具体以下所示:
const input$ = Rx.Observable.fromEvent(node, 'input') .map(event => event.target.value) .filter(value => value.length >= 2) .subscribe(value => { // use the `value` });
上面代码的执行流程以下:
假设用户在输入框中输入字符 a
Observable 对象响应对应的 input
事件,而后把值传递给 observer
map()
操做符返回一个新的 Observable 对象
filter()
操做符执行过滤操做,而后又返回一个新的 Observable 对象
最后咱们经过调用 subscribe()
方法,来获取最终的值
简而言之,Operator 就是一个函数,它接收一个 Observable 对象,而后返回一个新的 Observable 对象。当咱们订阅新返回的 Observable 对象时,它内部会自动订阅前一个 Observable 对象。
function Observable(subscribe) { this.subscribe = subscribe; }
每一个 subscribe
回调函数被赋值给 this.subscribe
属性,该回调函数将会被咱们或其它 Observable 对象调用。
在咱们深刻介绍前,咱们先来看一个简单的示例。以前咱们已经建立完 Observable
函数,如今咱们能够调用咱们的观察者 (observer),而后传递数值 1,而后订阅它:
const one$ = new Observable((observer) => { observer.next(1); observer.complete(); }); one$.subscribe({ next: (value) => console.log(value) // 1 });
即咱们订阅咱们建立的 Observable 实例,而后经过 subscribe()
方法调用经过构造函数设置的回调函数。
下面就是咱们须要的基础结构,即在 Observable 对象上须要新增一个静态方法 fromEvent
:
Observable.fromEvent = (element, name) => { };
接下来咱们将参考 RxJS 为咱们提供的方法来实现自定义的 fromEvent()
方法:
const node = document.querySelector('input'); const input$ = Observable.fromEvent(node, 'input');
按照上面的使用方式,咱们的 fromEvent()
方法须要接收两个参数,同时须要返回一个新的 Observable 对象,具体以下:
Observable.fromEvent = (element, name) => { return new Observable((observer) => { }); };
接下来咱们来实现事件监听功能:
Observable.fromEvent = (element, name) => { return new Observable((observer) => { element.addEventListener(name, (event) => {}, false); }); };
那么咱们的 observer
参数来自哪里? 其实 observer
对象就是包含 next
、error
和 complete
方法的对象字面量。
须要注意的是,咱们的 observer 参数不会被传递,直到
subscribe()
方法被调用。这意味着addEventListener()
方法不会被调用,除非你订阅该 Observable 对象。
当咱们调用 subscribe()
方法,以前设置的 this.subscribe
回调函数会被调用,对应的参数是咱们定义的 observer 对象字面量,接下来将使用新的值,做为 next()
方法的参数,调用该方法。
很好,那接下来咱们要作什么?以前版本咱们只是设置了监听,但没有调用 observer 对象的 next()
方法,接下来让咱们来修复这个问题:
Observable.fromEvent = (element, name) => { return new Observable((observer) => { element.addEventListener(name, (event) => { observer.next(event); }, false); }); };
如你所知,当销毁 Observables 对象时,须要调用一个函数用来执行清理操做。针对目前的场景,在销毁时咱们须要移除事件监听:
Observable.fromEvent = (element, name) => { return new Observable((observer) => { const callback = (event) => observer.next(event); element.addEventListener(name, callback, false); return () => element.removeEventListener(name, callback, false); }); };
咱们没有调用 complete()
方法,由于该 Observable 对象处理的 DOM 相关的事件,在时间维度上它们多是无终止的。
如今让咱们来验证一下最终实现的功能:
const node = document.querySelector('input'); const p = document.querySelector('p'); function Observable(subscribe) { this.subscribe = subscribe; } Observable.fromEvent = (element, name) => { return new Observable((observer) => { const callback = (event) => observer.next(event); element.addEventListener(name, callback, false); return () => element.removeEventListener(name, callback, false); }); }; const input$ = Observable.fromEvent(node, 'input'); const unsubscribe = input$.subscribe({ next: (event) => { p.innerHTML = event.target.value; } }); // automatically unsub after 5s setTimeout(unsubscribe, 5000);
建立咱们本身的操做符应该会更容易一些,如今咱们了解 Observable
和 Observable
背后的概念。咱们将在 Observable 的原型对象上添加一个方法:
Observable.prototype.map = function (mapFn) { };
该方法的功能与 JavaScript 中的 Array.prototype.map
方法相似:
const input$ = Observable.fromEvent(node, 'input') .map(event => event.target.value);
因此咱们须要应用回调函数并调用它,这用于获取咱们所须要的数据。在咱们这样作以前,咱们须要流中的最新值。这里是巧妙的部分,在 map()
操做符中,咱们须要访问 Observable
实例。由于 map
方法在原型上,咱们能够经过如下方式访问 Observable 实例:
Observable.prototype.map = function (mapFn) { const input = this; };
接下来咱们在返回的 Observable 对象中执行 input
对象的订阅操做:
Observable.prototype.map = function(mapFn) { const input = this; return new Observable((observer) => { return input.subscribe(); }); };
咱们返回了
input.subscribe()
方法执行的结果,由于当咱们执行取消订阅操做时,将会依次调用每一个 Observable 对象取消订阅的方法。
最后咱们来完善一下 map
操做符的内部代码:
Observable.prototype.map = function (mapFn) { const input = this; return new Observable((observer) => { return input.subscribe({ next: (value) => observer.next(mapFn(value)), error: (err) => observer.error(err), complete: () => observer.complete() }); }); };
如今咱们已经能够执行链式操做了:
const input$ = Observable.fromEvent(node, 'input') .map(event => event.target.value); input$.subscribe({ next: (value) => { p.innerHTML = value; } });
Observable(可观察对象)是基于推送(Push)运行时执行(lazy)的多值集合。
MagicQ | 单值 | 多值 |
---|---|---|
拉取(Pull) | 函数 | 遍历器 |
推送(Push) | Promise | Observable |
Promise
返回单个值
不可取消的
Observable
随着时间的推移发出多个值
能够取消的
支持 map、filter、reduce 等操做符
延迟执行,当订阅的时候才会开始执行
上面的示例中,咱们使用一个包含了 next、error、complete 方法的普通 JavaScript 对象来定义观察者。一个普通的 JavaScript 对象只是一个开始,在 RxJS 5 里面,为开发者提供了一些保障机制,来保证一个更安全的观察者。如下是一些比较重要的原则:
传入的 Observer
对象能够不实现全部规定的方法 (next、error、complete 方法)
在 complete
或者 error
触发以后再调用 next
方法是没用的
调用 unsubscribe
方法后,任何方法都不能再被调用了
complete
和 error
触发后,unsubscribe
也会自动调用
当 next
、complete
和error
出现异常时,unsubscribe
也会自动调用以保证资源不会浪费
next
、complete
和error
是可选的。按需处理便可,没必要所有处理
为了完成上述目标,咱们得把传入的匿名 Observer
对象封装在一个 SafeObserver
里以提供上述保障。
若想进一步了解详细信息,请参考 Observable详解 文章中 "自定义 Observable" 章节的内容。