RxJS - Observables, observers 和 operators 简介

RxJS 是响应式编程 (reactive programming) 强大的工具,今天咱们将深刻介绍 Observables 和 Observers 的内容,以及介绍如何建立本身的操做符 (operators)。javascript

若是你以前已经使用过 RxJS,并但愿了解 Observable 及 Operators (操做符) 的内部工做原理,那么这篇文章很是适合你。html

什么是 Observable

Observable 就是一个拥有如下特性的函数:java

  • 它接收一个 observer 对象做为参数,该对象中包含 nexterrorcomplete 方法node

  • 它返回一个函数,用于在销毁 Observable 时,执行清理操做react

在咱们实现的示例中,咱们将定义一个简单的 unsubscribe 函数来实现取消订阅的功能。然而在 RxJS 中,返回的是 Subcription 对象,该对象中包含一个 unsubscribe 方法。es6

一个 Observable 对象设置观察者 (observer),并将它与生产者关联起来。该生产者多是 DOM 元素产生的 clickinput 事件,也多是更复杂的事件,如 HTTP。typescript

为了更好地理解 Observable,咱们来自定义 Observable。首先,咱们先来看一个订阅的例子:编程

const node = document.querySelector('input[type=text]');

const input$ = Rx.Observable.fromEvent(node, 'input');

input$.subscribe({
  next: (event) => console.log(`You just typed ${event.target.value}!`),
  error: (err) => console.log(`Oops... ${err}`),
  complete: () => console.log(`Complete!`)
});

该示例中,Rx.Observable.formEvent() 方法接收一个 input 元素和事件名做为参数,而后返回一个 $input Observable 对象。接下来咱们使用 subscribe() 方法来定于该 Observable 对象。当触发 input 事件后,对应的值将会传递给 observer 对象。segmentfault

什么是 Observer

Observer (观察者) 很是简单,在上面的示例中,观察者是一个普通的对象,该对象会做为 subscribe() 方法的参数。此外 subscribe(next, error, complete) 也是一个有效的语法,但在本文中咱们将讨论对象字面量的形式。安全

当 Observable 对象产生新值的时候,咱们能够经过调用 next() 方法来通知对应的观察者。若出现异常,则会调用观察者的 error() 方法。当咱们订阅 Observable 对象后,只要有新的值,都会通知对应的观察者。但在如下两种状况下,新的值不会再通知对应的观察者:

  • 已调用 observer 对象的 complete() 方法

  • 消费者对数据再也不感兴趣,执行取消订阅操做

此外在执行最终的 subscribe() 订阅操做前,咱们传递的值能够通过一系列的链式处理操做。执行对应操做的东西叫操做符,每一个操做符执行完后会返回一个新的 Observable 对象,而后继续咱们的处理流程。

什么是 Operator

正如上面所说的,Observable 对象可以执行链式操做,具体以下所示:

const input$ = Rx.Observable.fromEvent(node, 'input')
  .map(event => event.target.value)
  .filter(value => value.length >= 2)
  .subscribe(value => {
    // use the `value`
});

上面代码的执行流程以下:

  • 假设用户在输入框中输入字符 a

  • Observable 对象响应对应的 input 事件,而后把值传递给 observer

  • map() 操做符返回一个新的 Observable 对象

  • filter() 操做符执行过滤操做,而后又返回一个新的 Observable 对象

  • 最后咱们经过调用 subscribe() 方法,来获取最终的值

简而言之,Operator 就是一个函数,它接收一个 Observable 对象,而后返回一个新的 Observable 对象。当咱们订阅新返回的 Observable 对象时,它内部会自动订阅前一个 Observable 对象。

自定义 Observable

Observable 构造函数

function Observable(subscribe) {
  this.subscribe = subscribe;
}

每一个 subscribe 回调函数被赋值给 this.subscribe 属性,该回调函数将会被咱们或其它 Observable 对象调用。

Observer 示例

在咱们深刻介绍前,咱们先来看一个简单的示例。以前咱们已经建立完 Observable 函数,如今咱们能够调用咱们的观察者 (observer),而后传递数值 1,而后订阅它:

const one$ = new Observable((observer) => {
  observer.next(1);
  observer.complete();
});

one$.subscribe({
  next: (value) => console.log(value) // 1
});

即咱们订阅咱们建立的 Observable 实例,而后经过 subscribe() 方法调用经过构造函数设置的回调函数。

Observable.fromEvent

下面就是咱们须要的基础结构,即在 Observable 对象上须要新增一个静态方法 fromEvent

Observable.fromEvent = (element, name) => { };

接下来咱们将参考 RxJS 为咱们提供的方法来实现自定义的 fromEvent() 方法:

const node = document.querySelector('input');
const input$ = Observable.fromEvent(node, 'input');

按照上面的使用方式,咱们的 fromEvent() 方法须要接收两个参数,同时须要返回一个新的 Observable 对象,具体以下:

Observable.fromEvent = (element, name) => {
  return new Observable((observer) => {
    
  });
};

接下来咱们来实现事件监听功能:

Observable.fromEvent = (element, name) => {
  return new Observable((observer) => {
    element.addEventListener(name, (event) => {}, false);
  });
};

那么咱们的 observer 参数来自哪里? 其实 observer 对象就是包含 nexterrorcomplete 方法的对象字面量。

须要注意的是,咱们的 observer 参数不会被传递,直到 subscribe() 方法被调用。这意味着 addEventListener() 方法不会被调用,除非你订阅该 Observable 对象。

当咱们调用 subscribe() 方法,以前设置的 this.subscribe 回调函数会被调用,对应的参数是咱们定义的 observer 对象字面量,接下来将使用新的值,做为 next() 方法的参数,调用该方法。

很好,那接下来咱们要作什么?以前版本咱们只是设置了监听,但没有调用 observer 对象的 next() 方法,接下来让咱们来修复这个问题:

Observable.fromEvent = (element, name) => {
  return new Observable((observer) => {
    element.addEventListener(name, (event) => {
      observer.next(event);
    }, false);
  });
};

如你所知,当销毁 Observables 对象时,须要调用一个函数用来执行清理操做。针对目前的场景,在销毁时咱们须要移除事件监听:

Observable.fromEvent = (element, name) => {
  return new Observable((observer) => {
    const callback = (event) => observer.next(event);
    element.addEventListener(name, callback, false);
    return () => element.removeEventListener(name, callback, false);
  });
};

咱们没有调用 complete() 方法,由于该 Observable 对象处理的 DOM 相关的事件,在时间维度上它们多是无终止的。

如今让咱们来验证一下最终实现的功能:

const node = document.querySelector('input');
const p = document.querySelector('p');

function Observable(subscribe) {
  this.subscribe = subscribe;
}

Observable.fromEvent = (element, name) => {
  return new Observable((observer) => {
    const callback = (event) => observer.next(event);
    element.addEventListener(name, callback, false);
    return () => element.removeEventListener(name, callback, false);
  });
};

const input$ = Observable.fromEvent(node, 'input');

const unsubscribe = input$.subscribe({
  next: (event) => {
    p.innerHTML = event.target.value;
  }
});

// automatically unsub after 5s
setTimeout(unsubscribe, 5000);

自定义操做符

建立咱们本身的操做符应该会更容易一些,如今咱们了解 ObservableObservable 背后的概念。咱们将在 Observable 的原型对象上添加一个方法:

Observable.prototype.map = function (mapFn) { };

该方法的功能与 JavaScript 中的 Array.prototype.map 方法相似:

const input$ = Observable.fromEvent(node, 'input')
        .map(event => event.target.value);

因此咱们须要应用回调函数并调用它,这用于获取咱们所须要的数据。在咱们这样作以前,咱们须要流中的最新值。这里是巧妙的部分,在 map() 操做符中,咱们须要访问 Observable 实例。由于 map 方法在原型上,咱们能够经过如下方式访问 Observable 实例:

Observable.prototype.map = function (mapFn) {
  const input = this;
};

接下来咱们在返回的 Observable 对象中执行 input 对象的订阅操做:

Observable.prototype.map = function(mapFn) {
  const input = this;
  return new Observable((observer) => {
    return input.subscribe();
  });
};

咱们返回了 input.subscribe() 方法执行的结果,由于当咱们执行取消订阅操做时,将会依次调用每一个 Observable 对象取消订阅的方法。

最后咱们来完善一下 map 操做符的内部代码:

Observable.prototype.map = function (mapFn) {
  const input = this;
  return new Observable((observer) => {
    return input.subscribe({
      next: (value) => observer.next(mapFn(value)),
      error: (err) => observer.error(err),
      complete: () => observer.complete()
    });
  });
};

如今咱们已经能够执行链式操做了:

const input$ = Observable.fromEvent(node, 'input')
  .map(event => event.target.value);

input$.subscribe({
  next: (value) => {
    p.innerHTML = value;
  }
});

我有话说

Observable 与 Promise 有什么区别?

Observable(可观察对象)是基于推送(Push)运行时执行(lazy)的多值集合。

MagicQ 单值 多值
拉取(Pull) 函数 遍历器
推送(Push) Promise Observable
  • Promise

    • 返回单个值

    • 不可取消的

  • Observable

    • 随着时间的推移发出多个值

    • 能够取消的

    • 支持 map、filter、reduce 等操做符

    • 延迟执行,当订阅的时候才会开始执行

什么是 SafeObserver ?

上面的示例中,咱们使用一个包含了 next、error、complete 方法的普通 JavaScript 对象来定义观察者。一个普通的 JavaScript 对象只是一个开始,在 RxJS 5 里面,为开发者提供了一些保障机制,来保证一个更安全的观察者。如下是一些比较重要的原则:

  • 传入的 Observer 对象能够不实现全部规定的方法 (next、error、complete 方法)

  • complete 或者 error 触发以后再调用 next 方法是没用的

  • 调用 unsubscribe 方法后,任何方法都不能再被调用了

  • completeerror 触发后,unsubscribe 也会自动调用

  • nextcompleteerror 出现异常时,unsubscribe 也会自动调用以保证资源不会浪费

  • nextcompleteerror是可选的。按需处理便可,没必要所有处理

为了完成上述目标,咱们得把传入的匿名 Observer 对象封装在一个 SafeObserver 里以提供上述保障。

若想进一步了解详细信息,请参考 Observable详解 文章中 "自定义 Observable" 章节的内容。

参考资源