Subject
首先确定一个概念,Subject 即是 Observable 也是 Observer:
subject 在收到 $source 新消息时,会通知内部所有观察者(observerA 、observerB)
何时使用 Subject:
- 需要共享相同的 observable 执行。
- 当需要决定观察者迟来时该怎么做,是否使用 ReplaySubject、BehaviorSubject?
- 需要完全控制 next()、error() 和 completed() 方法。
Cold Observable 的问题
cold ovservable 是无法多播的,因为数据不同步: stackblitz
多播
而将 cold -> hot 后,就可以多播了: stackblitz
subject 不能重复使用
很多人會直接把這個特性拿來用在不知道如何建立 Observable 的狀況:
class MyButton extends React.Component {
constructor(props) {
super(props);
this.state = { count: 0 };
this.subject = new Rx.Subject();
this.subject
.mapTo(1)
.scan((origin, next) => origin + next)
.subscribe(x => {
this.setState({ count: x })
})
}
render() {
return <button onClick={event => this.subject.next(event)}>{this.state.count}</button>
}
}
因为 React API 的关系,如果我們想要把 React Event 转换成 observable 就可以用 Subject 幫我們做到這件事;但绝大多数的情況我們是可以透過 Observable.create
來做到這件事,像下面这样:
import { Observable } from 'rxjs'
const example = Observable.create(observer => {
const source = getSomeSource(); // 某個数据源
source.addListener('some', (some) => {
observer.next(some)
})
});
大概就會像上面這樣,如果沒有合適的 creation operators 我們還是可以利用 Observable.create 來建立 observable,除非真的因為框架限制才會直接用 Subject。
BehaviorSubject
很多時候希望 Subject 能代表當下的状态,也就是說如果今天有一個新的 subscribe,我們希望 Subject 能立即給出最新的值,而不是沒有值:
ReplaySubject
希望 Subject 代表事件,但又能在新 subscribe 时重新发送最后的几個元素
可能會有人以為 ReplaySubject(1) 是不是就等同於 BehaviorSubject,其實是不一樣的,BehaviorSubject 在建立時就會有起始值,比如 BehaviorSubject(0) 起始值就是 0,BehaviorSubject 是代表著狀態而 ReplaySubject 只是事件的重放而已
AsyncSubject
AsyncSubject 会在 subject 结束后送出最后一個值,其实这个行为跟 Promise 很像,都是等到事情结束后送出一個值,但实际上我們非常非常少用到 AsyncSubject,絕大部分的時候都是使用 BehaviorSubject 跟 ReplaySubject 或 Subject。
多播操作符
multicast
refCount: 建立自动 connect 的 Observable
publish
multicast
的简写:
var source = interval(1000).pipe(
publish(),
refCount()
)
// 等同于:
/* var source = interval(1000).pipe(
multicast(new Rx.Subject()),
refCount()
) */
变体:
publishBehavior
var source = interval(1000).pipe( publishBehavior(0), refCount() ) // 等同于: /* var source = interval(1000).pipe( multicast(new Rx.BehaviorSubject(0)), refCount() ) */
publishReplay
var source = interval(1000).pipe( publishReplay(1), refCount() ) // 等同于: /* var source = interval(1000).pipe( multicast(new Rx.ReplaySubject(1)), refCount() ) */
publishLast
var source = interval(1000).pipe( publishLast(), refCount() ) // 等同于: /* var source = interval(1000).pipe( multicast(new Rx.AsyncSubject(1)), refCount() ) */
share
publish
+ refCount
的简写:
var source - interval(1000).pipe(
share(),
)
/* var source = interval(1000).pipe(
publish(),
refCount()
) */
/* var source = interval(1000).pipe(
multicast(new Rx.Subject()),
refCount()
) */