订阅发布+Iterator模式
|
|
一些基本概念
- Observable 可观察对象
表示一个可调用的未来值或者事件的集合 - Observer 观察者
一个回调函数的集合,它知道怎样去监听被Observable发送的值 - Subscription 订阅
表示一个可观察对象的执行,主要用于取消执行 - Operators 操作符
纯粹的函数,使得以函数编程的方式处理集合比如:map,filter,concat,flatMap - Subject 主题
等同于一个事件驱动器,是将一个值或者事件广播到多个观察者的唯一途径 - Schedulers 调度者
用来控制并发,当计算发生的时候允许我们协调,比如setTimeout,requestAnimation等
常用API
- 创建数据流
- 单值: of,empty,never
- 多值: from
- 定时: interval, timer
- 从Promise创建: fromPromise
- 自定义创建: create
- 把callback bindCallback写法转换成链式写法,bindNodeCallback将一个NodeJs风格的回调函数API转化为一个能返回可观察对象的函数1234567let exists = Rx.Observable.bindCallback(fs.exists);exists('file.txt').subscribe(exist => console.log('exists?' + exists));import * as fs from 'fs';var readFileObservable = Rx.Observable.bindNodeCallback(fs.readFile);var result = readFileAsObservable('./roadNames.txt', 'utf-8');result.subscribe(x => console.log(x), e => console.error(e));
- 转换
- 改变数据形态: map,mapTo,pluck
- 过滤一些值:filter,skip,first,last,take,distinct,distance
- 时间轴上的操作:delay, timeout,throttle,debounceTime,audit,bufferTime
- 累加:reduce,scan
- 异常处理: throw,catch,retry,finally
- 条件执行: takeUntil,delayWhen,retryWhen,subscribeOn,ObserveOn
- 转接:switch
- 组合
- concat 保持原来的序列顺序连接两个数据流(顺序连接2个数据流)
- merge 合并序列
- race 预设条件为其中一个数据流完成
- forkJoin 预设条件为所有数据流都完成
- zip 取各来源数据流最后一个值合并为对象
- combineLatest 取各来源数据流最后一个值合并为数组
Subject
Subject是允许值被多播到多个观察者的一种特殊的Observable,然而纯粹的可观察对象是单播的(每一个订阅的观察者拥有单独的可观察对象的执行)
每一个Subject都是一个Observable可观察对象,给定Subject后,你可以订阅它,提供的观察者将会按正常的开始接收值,从观察者角度来看,它不能判断一个可观察对象的执行时来自于单播Observable还是来自于Subject
同时每一个Subject都是一个Observer观察者对象,拥有next、error、complete方法
一个多播的可观察对象通过具有多个订阅者的Subject对象传递通知。然而一个单纯的单播可观察对象仅仅给一个单一的观察者发送通知。
Subscription
订阅是一个表示一次性资源的对象,通常是一个可观察对象的执行。订阅对象有一个很重要的方法:unsubscribe,仅仅废弃掉可观察对象所持有的资源。
通过add添加取消订阅之后会一同取消订阅。
Scheduler调度者
- 一个调度者是一个数据结构。它知道如何根据优先级或其它标准存储和排列任务
- 一个调度者是一个执行上下文.它表示何处何时任务被执行(如immediately立即,or in another callback mechanism回调机制例如setTimeout、process.nextTick或animation frame)
- 一个调度者具有虚拟的时钟。它通过调度器上的getter方法now()提供了时间的概念。在特定调度程序上调度的任务将紧紧遵守由该时钟表示的时间
调度者使得你可以确定可观察对象在什么执行上下文中给观察者发送通知。1234567891011121314151617181920var observable = Rx.Ovservable.create(observer => {observer.next(1);observer.next(2);observer.next(3);observer.complete();}).observeOn(Rx.Scheduler.async);console.log('just before subscribe');observable.subscribe({next: x => console.log('got value' + x),error: err => console.error('something wrong occurred:' + err),complete: () => console.log('done')});console.log('just after subscribe');observer是一个代理,具体讲会由Rx.Scheduler.async替代var proxyObserver = {next: val => {Rx.Scheduler.}}
对时间轴操作
|
|
结果如下:
producing values产生值
|
|
concat顺序组合发出
combineLatest组合多个Observable生成一个新的Observable
组合多个Observable产生一个新的Observable,其发射的值根据其每个输入Observable的最新值计算。(无论何时作为输入的Observable发出的一个值,它取得所有输入的最新值作为它的发射值)
defer
以惰性的方式产生一个Observable,也就是说当订阅的时候才会产生
public static defer(observableFactory: function(): Observable | Promise): Observable
等待一个Observer订阅它然后生成一个Observable,通常有一个Observable工厂函数。
empty
public static empty(scheduler: Scheduler): Observable
创建一个不发射任何值得Observable,它只会发射一个complete通知
switchMap和mergeMap区别
switchMap与mergeMap都是将分支流疏通到主干上,而不同的地方在于switchMap只会保留最后的流而取消抛弃之前的流
forkJoin
Rx.Observable.forkJoin(...args [resultSelector])
并行运行所有可观察序列并收集其最后的元素
from
public static from(ish: ObservableInput<T>, scheduler: Scheduler): Observable<T>
将一个数组,类数组(字符串也可以),Promise,可迭代对象,类可观察对象,转化为Observable
fromEvent
Rx.Observable.fromEvent(element, eventName, [selector])
将一个元素上的事件转化为Observable
使用Jquery,zepto,Backbone.Marionette,AngularJs和Ember.js的库方法,并且如果不存在,则退回到本地绑定,如果您使用AMD,您需要将这些库作为RsJs的依赖关系包括在requirejs配置文件中,当决定使用哪个库时,RxJs将尝试检测它们的存在。
fromEventPattern
Rx.Observable.fromEventPattern(addHandler, [removeHandler], [selector])
通过使用addHandler和removeHandler函数添加和删除处理程序,当输出Observable被订阅时,addHandler被调用,并且订阅被取消时调动removeHandler
fromPromise
public static fromPromsie(promise: Promise<T>, scheduler: Scheduler): Observable<T>
将一个Promise转为一个Observable
resove作为next发出,然后complete,失败输出error
interval
public static interval(period: number, scheduler: Scheduler): Observable
返回一个周期性的、递增的方式发射值的Observable
interval返回一个Observable,它发出一个递增的无限整数序列,第一个参数为时间间隔,需要注意的是,第一发射不立即发送,而是在第一个周期过去之后发送,第二个参数,默认是用异步调度程序提供时间概念,但是可以将任何调度程序传递给它
merge
public static merge(observable:...Observable, concurrent: number, scheduler: Scheduler): Observable<T>
创建一个发射所有被合并的observable所发射的值
never
public static never():Observable
创建一个不发射任何值next,error,complete的Observable
作用:用于测试或与其他Observable组合。
of
public static of(values:...T, scheduler: Scheduler): Observable
创建一个Observable,发射指定参数的值,一个接一个,最后发出complete
range
public static range(start:number, count: number, scheduler: Scheduler): Observable<T>
创建发射一个数字序列的observable
所以range(0,10) => 0~9
range(1,10) => 1~10
throw
public static throw(error: any, scheduler: Scheduler): Observable
创建一个只发出error通知的Observable,用于与其他Observable合并,如mergeMap
timer
public static timer(initialDelay: number|Date, period: number, scheduler): Observable<T>
和interval差不多,只不过第一个参数可以指定初始延迟执行事件,第二个是后面延迟间隔
toAsync
Rx.Observable.toAsync(function: Function, [scheduler], [context]): Function
将函数转换为异步函数,生成的一步函数的每次调用都会导致调用指定调度程序上的原始同步函数
using
when
while
wrap
webSocket
zip
public static zip(observables: *): Observable<R>
Instance Operators
audit
public audit(durationSelector: function(value:T): Observable): Observable<T>
在某个持续时间段内忽略原始observable发射的值,该方法的参数为一个函数,该函数需返回一个决定持续时长的observable或promise。之后从原始observable发射最近的值,不断重复这个过程
audit很像auditTime,但是其持续时长由第二个observable所决定
auditTime
public auditTime(durationTime: number, [scheduler, Scheduler]): Observable
在某个时间段内,忽略原始observable发射的值,该段时间由设定的duration的值(单位为ms)来决定,每隔一个设定的时间段,将从原始的observable发射最近的值,不断重复这个过程。
buffer
public buffer(closingNotifier: Observable): Observable<T[]>
缓存原始observable发射的值,直到作为参数的另一个observable发射了值,之后返回一个由这些缓存值组成的数组.
bufferCount
public bufferCount(bufferSize: Number, startBufferEvery): Observable<T[]>
缓存原始observable发射的值,直到达到bufferSize给定的上限
var clicks = Rx.Observable.fromEvent(document, ‘click’);
var buffered = clicks.bufferCount(2);
buffered.subscribe(x => console.log(x));
bufferTime
public bufferTime(bufferTimeSpan: number, [bufferCreationInterval: number], [maxBufferSize: number],[scheduler: Scheduler]): Observable<T[]>
params:
arg1: 必选,buffertTimeSpan发射值得时间间隔
arg2: 可选,设置打开缓存区和发射值的事件间隔
arg3: 可选,设置缓存区长度
scheduler: 可选
var clicks = Rx.Observable.fromEvent(document, ‘click’)
var buffered = clicks.bufferTime(1000);
buffered.subscribe(x => console.log(x));
bufferToggle
public bufferToggle(openings: SubscribableOrPromise<O>, closingSelector: function(value: O): SubscribleOrPromise): Observable<T[]>
以数组形式收集过去的值,在opening发射值时开始收集,并调动closingSelector函数获取一个Observable,以告知何时关闭缓存区
bufferWhen
public bufferWhen(closingSelector: function (): Observable): Observable<T[]>
catch
public catch(selector: function): Observable
参数为一个函数:它接收作为参数err,这是错误,并捕获。需要注意的是,如果是用了该操作符,observer的error方法不会被执行,因为错误已经被catch操作符捕获。
此外,不可以使用try…catch,因为代码块是同步执行的
combineAll
combineLatest
`public static combineLatest(observable1: Observable, observale2: Observable, project: function, scheduler: Scheduler)
组合多个Observable产生一个新的Observable,其发射值根据其每个输入Observable的最新值计算
throttleTime
debounceTime
takeUntil
…未完待续