RxJs学习笔记

订阅发布+Iterator模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 订阅发布模式
function Observer() {
this.listeners = {};
}
Observer.prototype.subscribe = function (id, fn) {
Object.assign(this.listeners, { id: fn });
}
Observer.prototype.unsubscribe = function (id) {
delete this.listeners[id];
}
Observer.prototype.publish = function () {
var id = arguments[0];
this.listeners[id].apply(null. arguments.slice(1));
}
// iterator模式
function Iterator(arr, divisor) {
this.currentIndex = 0;
this.array = arr;
}
Iterator.prototype.next = function () {
if (this.currentIndex >= this.array.length) {
return { value: undefined, done: true };
}
return { value: this.array[this.currentIndex++], done: false };
}

一些基本概念

  • Observable 可观察对象
    表示一个可调用的未来值或者事件的集合
  • Observer 观察者
    一个回调函数的集合,它知道怎样去监听被Observable发送的值
  • Subscription 订阅
    表示一个可观察对象的执行,主要用于取消执行
  • Operators 操作符
    纯粹的函数,使得以函数编程的方式处理集合比如:map,filter,concat,flatMap
  • Subject 主题
    等同于一个事件驱动器,是将一个值或者事件广播到多个观察者的唯一途径
  • Schedulers 调度者
    用来控制并发,当计算发生的时候允许我们协调,比如setTimeout,requestAnimation等

常用API

  1. 创建数据流
  • 单值: of,empty,never
  • 多值: from
  • 定时: interval, timer
  • 从Promise创建: fromPromise
  • 自定义创建: create
  • 把callback bindCallback写法转换成链式写法,bindNodeCallback将一个NodeJs风格的回调函数API转化为一个能返回可观察对象的函数
    1
    2
    3
    4
    5
    6
    7
    let exists = Rx.Observable.bindCallback(fs.exists);
    exists('file.txt').subscribe(exist => console.log('exists?' + exists));
    import * as fs from 'fs';
    var readFileObservable = Rx.Observable.bindNodeCallback(fs.readFile);
    var result = readFileAsObservable('./roadNames.txt', 'utf-8');
    result.subscribe(x => console.log(x), e => console.error(e));
  1. 转换
  • 改变数据形态: map,mapTo,pluck
  • 过滤一些值:filter,skip,first,last,take,distinct,distance
  • 时间轴上的操作:delay, timeout,throttle,debounceTime,audit,bufferTime
  • 累加:reduce,scan
  • 异常处理: throw,catch,retry,finally
  • 条件执行: takeUntil,delayWhen,retryWhen,subscribeOn,ObserveOn
  • 转接:switch
  1. 组合
  • concat 保持原来的序列顺序连接两个数据流(顺序连接2个数据流)
  • merge 合并序列
  • race 预设条件为其中一个数据流完成
  • forkJoin 预设条件为所有数据流都完成
  • zip 取各来源数据流最后一个值合并为对象
  • combineLatest 取各来源数据流最后一个值合并为数组

Subject

Subject是允许值被多播到多个观察者的一种特殊的Observable,然而纯粹的可观察对象是单播的(每一个订阅的观察者拥有单独的可观察对象的执行)
每一个Subject都是一个Observable可观察对象,给定Subject后,你可以订阅它,提供的观察者将会按正常的开始接收值,从观察者角度来看,它不能判断一个可观察对象的执行时来自于单播Observable还是来自于Subject
同时每一个Subject都是一个Observer观察者对象,拥有next、error、complete方法

1
2
3
4
5
6
7
8
9
10
11
12
var subject = new Rx.Subject();
subject.subscribe(
next: v => console.log(v)
);
subject.subscribe(
next: v => console.log(v)
);
subject.next(1);
subject.next(2);
// 输出1 1 2 2
// 由于Subject也是一个观察者,所以可以当Observable.subscribe(observer)参数

一个多播的可观察对象通过具有多个订阅者的Subject对象传递通知。然而一个单纯的单播可观察对象仅仅给一个单一的观察者发送通知。

1
2
3
4
5
6
7
8
9
10
var source = Rx.Observable.from([1, 2, 3]);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);
multicasted.subscribe({
next: v => console.log(v)
});
multicasted.subscribe({
next: v => console.log(v);
});
multicasted.connect();

Subscription

订阅是一个表示一次性资源的对象,通常是一个可观察对象的执行。订阅对象有一个很重要的方法:unsubscribe,仅仅废弃掉可观察对象所持有的资源。
通过add添加取消订阅之后会一同取消订阅。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
var observable = Rx.Observable.interval(1000);
var subscription = observable.subscribe(x => console.log(x));
// ...
subscription.unsubscribe();
// example2
var observable1 = Rx.Observable.interval(400);
var observable2 = Rx.Observable.interval(300);
var subscription = observable1.subscribe(x => console.log('first' + x));
var childSubscription = observable2.subscribe(x => console.log('second' + x));
// example3
var observable3 = Rx.Observable

Scheduler调度者

  • 一个调度者是一个数据结构。它知道如何根据优先级或其它标准存储和排列任务
  • 一个调度者是一个执行上下文.它表示何处何时任务被执行(如immediately立即,or in another callback mechanism回调机制例如setTimeout、process.nextTick或animation frame)
  • 一个调度者具有虚拟的时钟。它通过调度器上的getter方法now()提供了时间的概念。在特定调度程序上调度的任务将紧紧遵守由该时钟表示的时间
    调度者使得你可以确定可观察对象在什么执行上下文中给观察者发送通知。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    var observable = Rx.Ovservable.create(observer => {
    observer.next(1);
    observer.next(2);
    observer.next(3);
    observer.complete();
    }).observeOn(Rx.Scheduler.async);
    console.log('just before subscribe');
    observable.subscribe({
    next: x => console.log('got value' + x),
    error: err => console.error('something wrong occurred:' + err),
    complete: () => console.log('done')
    });
    console.log('just after subscribe');
    observer是一个代理,具体讲会由Rx.Scheduler.async替代
    var proxyObserver = {
    next: val => {
    Rx.Scheduler.
    }
    }

对时间轴操作

1
2
3
4
5
6
7
8
9
const timeA$ = Rx.Observable.interval(10000);
const timeB$ = timeA$.filter(num => {
return (num % 2 != 0)
&& (num % 3 != 0)
&& (num % 5 != 0)
&& (num % 7 != 0)
});
const timeC$ = timeB$.debounceTime(3000);
const timeD$ = timeC$.delay(2000);

结果如下:
时间轴操作

producing values产生值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// typing "hello world"
var input = Rx.Observable.fromEvent(document.querySelector('input'), 'keypress');
// Pass on a new value
input.map(event => event.target.value)
.subscribe(value => console.log(value)); // "h"
// Pass on a new value by plucking it
input.pluck('target', 'value')
.subscribe(value => console.log(value)); // "h"
// Pass the two previous values
input.pluck('target', 'value').pairwise()
.subscribe(value => console.log(value)); // ["h", "e"]
// Only pass unique values through
input.pluck('target', 'value').distinct()
.subscribe(value => console.log(value)); // "helo wrd"
// Do not pass repeating values through
input.pluck('target', 'value').distinctUntilChanged()
.subscribe(value => console.log(value)); // "helo world"

concat顺序组合发出

combineLatest组合多个Observable生成一个新的Observable

组合多个Observable产生一个新的Observable,其发射的值根据其每个输入Observable的最新值计算。(无论何时作为输入的Observable发出的一个值,它取得所有输入的最新值作为它的发射值)

1
2
3
4
5
6
var weight = Rx.Observable.of(1,3,5,7,9);
var height = Rx.Observable.of(2,4,6,8,10);
var bmi = Rx.Observable.combineLatest(weight, height, (w, h) => [w,h]);
bmi.subscribe(x => console.log('BMI is' + x))
// [9,2] [9,4] [9,6] [9,8] [9,10]
// 取combineLatest最后一个值和另外一个进行组合

defer

以惰性的方式产生一个Observable,也就是说当订阅的时候才会产生
public static defer(observableFactory: function(): Observable | Promise): Observable
等待一个Observer订阅它然后生成一个Observable,通常有一个Observable工厂函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
var clickOrInterval = Rx.Observable.defer(function () {
if (Math.random() > 0.5) {
return Rx.Observable.fromEvent(document, 'click');
} else {
return Rx.Observable.interval(1000);
}
});
clickOrInterval.subscribe(x => console.log(x));
/** Using a promise **/
var source = Rx.Observable.defer(() => Promise.resolve(42));
var subscription = source.subscribe(
x => console.log(`onNext:${x}`),
e => console.log(`onError:${e}`),
() => console.log('onCompleted')
)

empty

public static empty(scheduler: Scheduler): Observable
创建一个不发射任何值得Observable,它只会发射一个complete通知

1
2
3
4
var result = Rx.Observable.empty().startWith(7)
result.subscribe(x => console.log(x))
var interval = Rx.Observable.interval(1000)

switchMap和mergeMap区别

switchMap与mergeMap都是将分支流疏通到主干上,而不同的地方在于switchMap只会保留最后的流而取消抛弃之前的流

forkJoin

Rx.Observable.forkJoin(...args [resultSelector])
并行运行所有可观察序列并收集其最后的元素

1
2
3
4
5
6
7
8
9
10
11
12
var source = Rx.Observer.forkJoin(
Rx.Observable.return(42),
Rx.Observable.range(0, 10),
Rx.Observable.fromArray([1,2,3]),
Promise.resolve(56)
);
var subscription = source.subscribe(
x => console.log('onNext' + x),
e => console.log('onError:' + e),
() => console.log('onCompleted')
);
// 最后示例range里的数据只有最后一个

from

public static from(ish: ObservableInput<T>, scheduler: Scheduler): Observable<T>
将一个数组,类数组(字符串也可以),Promise,可迭代对象,类可观察对象,转化为Observable

1
2
3
4
5
6
7
8
9
10
11
// 无限迭代对象
function* generateDoubles(seed) {
var i = seed;
while (true) {
yield i;
i = 2 * i;
}
}
var iterator = generateDoubles(3);
var result = Rx.Observable.from(iterator);
result.subscribe(x => console.log(x))

fromEvent

Rx.Observable.fromEvent(element, eventName, [selector])
将一个元素上的事件转化为Observable
使用Jquery,zepto,Backbone.Marionette,AngularJs和Ember.js的库方法,并且如果不存在,则退回到本地绑定,如果您使用AMD,您需要将这些库作为RsJs的依赖关系包括在requirejs配置文件中,当决定使用哪个库时,RxJs将尝试检测它们的存在。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
var input = $(input);
var spurce = Rx.Observable.fromEvent(input, 'click')
var subscription = source.subscribe(
x => console.log('Next: clicked!'),
err => console.error('Error:' + err),
() => console.log('Completed')
)
input.trigger('click');
// for nodejs
var EventEmitter = require('events').EventEmitter,
Rx = require('rx');
var em = new EventEmitter();
var source = Rx.Observable.fromEvent(
eventEmiiter,
'data',
function(args) {
return { foo: args[0], bar: args[1] };
}
)
var subscription = source.subscribe(
x => {
console.log('Next:', x.foo, x.bar)
},
err => {...},
() => {...}
)
em.emit('data', 'baz', 'quux')

fromEventPattern

Rx.Observable.fromEventPattern(addHandler, [removeHandler], [selector])
通过使用addHandler和removeHandler函数添加和删除处理程序,当输出Observable被订阅时,addHandler被调用,并且订阅被取消时调动removeHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
function addClickHandler(handler) {
document.addEventListener('click', handler);
}
function removeClickHandler(handler) {
document.removeEventListener('click', handler);
}
var clicks = Rx.Observable.fromEventPattern(
addClickHandler,
removeClickHandler
);
clicks.subscribe(x => console.log(x))
// another example
var input = $('#input');
var source = Rx.Observable.fromEventPattern(
function add (h) {
input.bind('click', h)
}
function remove(h) {
input.unbind('click', h)
}
)
var subscription = source.subscribe(
x => console.log('Next: clicked'),
err => console.error(err),
() => console.log('complete')
)
input.trigger('click');
source.unsubscribe()

fromPromise

public static fromPromsie(promise: Promise<T>, scheduler: Scheduler): Observable<T>
将一个Promise转为一个Observable
resove作为next发出,然后complete,失败输出error

1
2
var result = Rx.Observable.fromPromise(fetch('http://www.baidu.com'))
result.subscribe(x => console.log(x), e => console.error(e))

interval

public static interval(period: number, scheduler: Scheduler): Observable
返回一个周期性的、递增的方式发射值的Observable
interval返回一个Observable,它发出一个递增的无限整数序列,第一个参数为时间间隔,需要注意的是,第一发射不立即发送,而是在第一个周期过去之后发送,第二个参数,默认是用异步调度程序提供时间概念,但是可以将任何调度程序传递给它

1
2
var numbers = Rx.Observable.interval(1000);
numbers.subscribe(x => console.log(x))

merge

public static merge(observable:...Observable, concurrent: number, scheduler: Scheduler): Observable<T>
创建一个发射所有被合并的observable所发射的值

never

public static never():Observable
创建一个不发射任何值next,error,complete的Observable
作用:用于测试或与其他Observable组合。

1
2
3
4
5
// 下面代码会产生值,如果不加,不会产生任何值
var x = Rx.Observable.never().startWidth(2);
x.subscribe(
next: v => console.log(v) // 2
)

of

public static of(values:...T, scheduler: Scheduler): Observable
创建一个Observable,发射指定参数的值,一个接一个,最后发出complete

range

public static range(start:number, count: number, scheduler: Scheduler): Observable<T>
创建发射一个数字序列的observable
所以range(0,10) => 0~9
range(1,10) => 1~10

throw

public static throw(error: any, scheduler: Scheduler): Observable
创建一个只发出error通知的Observable,用于与其他Observable合并,如mergeMap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
var result = Rx.Observable.throw(new Error('oops')).startWith(7)
result.subscribe(x => console.log(x), e => console.error(e));
// 输出: 7 Error:oops!
// 结论startWith会next最前面
var interval = Rx.Observable.interval(1000);
var result = interval.mergeMap(x => {
return x === 13
? Rx.Observable.throw('Thirteens are bad')
: Rx.Observable.of('a', 'b', 'c')
})
result.subscribe(
x => console.log(x),
e => console.error(e),
() => {}
)
// 结论mergeMap可以在时间上作用,每隔1s输出abc,13次终止

timer

public static timer(initialDelay: number|Date, period: number, scheduler): Observable<T>
和interval差不多,只不过第一个参数可以指定初始延迟执行事件,第二个是后面延迟间隔

toAsync

Rx.Observable.toAsync(function: Function, [scheduler], [context]): Function
将函数转换为异步函数,生成的一步函数的每次调用都会导致调用指定调度程序上的原始同步函数

1
2
3
4
5
6
7
8
9
10
11
12
var func = Rx.Observable.toAsync((x, y) => {
return x + y;
});
// 注意该操作符返回的不是observable,而是一个异步函数,当异步函数被调用后
// (注意,调用并未立即执行),返回一个observable,该observable被订阅时,原// 函数才被执行,并返回值
// Execute function with 3 and 4
var source = func(3, 4);
var subscription = source.subscribe(
x => console.log('Next' + x), 7
err => console.log('Error' + err),
() => console.log('completed!')
)

using

when

while

wrap

webSocket

zip

public static zip(observables: *): Observable<R>

Instance Operators

audit

public audit(durationSelector: function(value:T): Observable): Observable<T>
在某个持续时间段内忽略原始observable发射的值,该方法的参数为一个函数,该函数需返回一个决定持续时长的observable或promise。之后从原始observable发射最近的值,不断重复这个过程
audit很像auditTime,但是其持续时长由第二个observable所决定

1
2
3
4
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.audit(ev => Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));
// 每秒只会有一次单击会被发射,发射时间点为每隔1秒

auditTime

public auditTime(durationTime: number, [scheduler, Scheduler]): Observable
在某个时间段内,忽略原始observable发射的值,该段时间由设定的duration的值(单位为ms)来决定,每隔一个设定的时间段,将从原始的observable发射最近的值,不断重复这个过程。

1
2
3
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.auditTime(1000);
result.subscribe(x => console.log(x));

buffer

public buffer(closingNotifier: Observable): Observable<T[]>
缓存原始observable发射的值,直到作为参数的另一个observable发射了值,之后返回一个由这些缓存值组成的数组.

1
2
3
4
var clicks = Rx.Observable.fromEvent(document, 'click');
var interval = Rx.Observable.interval(1000);
var buffered = interval.buffer(clicks);
buffered.subscribe(x => console.log(x));

bufferCount

public bufferCount(bufferSize: Number, startBufferEvery): Observable<T[]>
缓存原始observable发射的值,直到达到bufferSize给定的上限
var clicks = Rx.Observable.fromEvent(document, ‘click’);
var buffered = clicks.bufferCount(2);
buffered.subscribe(x => console.log(x));

bufferTime

public bufferTime(bufferTimeSpan: number, [bufferCreationInterval: number], [maxBufferSize: number],[scheduler: Scheduler]): Observable<T[]>
params: arg1: 必选,buffertTimeSpan发射值得时间间隔 arg2: 可选,设置打开缓存区和发射值的事件间隔 arg3: 可选,设置缓存区长度 scheduler: 可选
var clicks = Rx.Observable.fromEvent(document, ‘click’)
var buffered = clicks.bufferTime(1000);
buffered.subscribe(x => console.log(x));

bufferToggle

public bufferToggle(openings: SubscribableOrPromise<O>, closingSelector: function(value: O): SubscribleOrPromise): Observable<T[]>
以数组形式收集过去的值,在opening发射值时开始收集,并调动closingSelector函数获取一个Observable,以告知何时关闭缓存区

1
2
3
4
5
var clicks = Rx.Observable.fromEvent(document, 'click');
var openings = Rx.Observable.interval(1000);
var buffered = clicks.bufferToggle(openings, i =>
i % 2 ? Rx.Observable.interval(500) : Rx.Observable.empty())
buffered.subscribe(x => console.log(x));

bufferWhen

public bufferWhen(closingSelector: function (): Observable): Observable<T[]>

1
2
3
4
var clicks = Rx.Observable.fromEvent(document, 'click')
var buffered = clicks.bufferWhen(() =>
Rx.Observable.interval(1000 + Math.random() * 4000));
buffered.subscribe(x => console.log(x));

catch

public catch(selector: function): Observable
参数为一个函数:它接收作为参数err,这是错误,并捕获。需要注意的是,如果是用了该操作符,observer的error方法不会被执行,因为错误已经被catch操作符捕获。
此外,不可以使用try…catch,因为代码块是同步执行的

combineAll

combineLatest

`public static combineLatest(observable1: Observable, observale2: Observable, project: function, scheduler: Scheduler)
组合多个Observable产生一个新的Observable,其发射值根据其每个输入Observable的最新值计算

throttleTime

debounceTime

takeUntil

…未完待续