Subject

什么是 Subjects

Subjects 的行为看起来就像一个 Observable 和一个 Observer。它可以接受事件并且也可以被订阅。Subject 接受 .next 事件并且每当它接受一个事件,他就会发送这个事件给它的订阅者。

RxSwift 中有四种事件:

  • PublishSubject:以 empty 的形式创建并且只发给 subscriber 新元素。
  • BehaviorSubject:以初始值的方式创建,重播或发出最新的元素给它的 subscriber。
  • ReplaySubject:以一个缓存尺寸创建并且会确保元素以缓存尺寸进入缓存区,并且重播给它的 subscriber。
  • Variable:封装了一个 BehaviorSubject,保护当前值作为状态,仅仅给订阅者重播最新或者初始化的值。

下面以例子分析每种 Subject 的使用。

PublishSubject

1
2
3
4
5
6
7
let subject = PublishSubject<String>()

subject.onNext("Test")

let subscription1 = subject.subscribe(onNext: { (string) in
print(string)
}, onError: nil, onCompleted: nil, onDisposed: nil)

此时,控制台并不会输出任何值。在上面的代码中加上以下代码:

1
2
subject.on(.next("1"))
subject.on(.next("2"))

此时控制台输出:
1 2

由此可见,在 PublishSubject 被订阅之前,subject 发出的 .onNext("Test") 事件并没有被订阅。

把代码修改成这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
let subject = PublishSubject<String>()

subject.onNext("Test")

let subscription1 = subject.subscribe(onNext: { (string) in
print(string)
}, onError: nil, onCompleted: nil, onDisposed: nil)

subject.on(.next("1"))
subject.on(.next("2"))

let subscription2 = subject.subscribe({ (event) in
print("Subscription 2: ", event.element ?? event)
})

subject.on(.next("3"))

subscription1.dispose()

subject.on(.next("4"))

运行后控制台会输出:

1
2
3
4
5
1
2
3
Subscription 2: 3
Subscription 2: 4

由此可知,当一个订阅被 dispose 之后,它就不会在处理 subject 发出的新值。

当一个 PublishSubject 收到 .completed 或者 .error 事件后,它将会给自己的新订阅者发送停止事件,并且不会再发送 .next 事件。然而,它会重新发送它的停止事件给未来的订阅者:

1
2
3
4
5
6
7
8
9
10
11
12
13
subject.onCompleted()

subject.onNext("5")

subscription2.dispose()

let disposeBag = DisposeBag()
let subscription3 = subject.subscribe({
print("subscription 3:", $0.element ?? $0)
})
.disposed(by: disposeBag)

subject.onNext("6")

控制台输出:

1
2
Subscription 2: completed
subscription 3: completed

在收到 .completed 事件后,控制台并没有打印 5 和 6,但是打印了 subscription 3: completed。在收到停止事件后,重新给自己未来的订阅者 subscription3 发送了完成事件。

实际上,每种类型的 subject 在停止时,都会重新发送它的停止事件给未来的订阅者。

BehaviorSubject

BehaviorSubject 的工作原理和 PublishSubject 比较相似,他们的区别在于,BehaviorSubject 会重播最新的 .next 事件给它的订阅者:

1
2
3
4
5
6
7
8
9
10
// 提供一个初始值
let subject = BehaviorSubject(value: "Init")

let disposeBag = DisposeBag()

subject.onNext("1")

subject.subscribe({ (event) in
print("Subscription1:", (event.element ?? event.error ?? event) ?? "")
}).disposed(by: disposeBag)

控制台会输出:

1
Subscription1: 1

subject 重播了最新的值 1 给它的订阅者。

在上面的代码后面添加以下代码:

1
2
3
4
5
6
7
8
9
10

enum MyError: Error {
case anError
case anotherError
}

subject.onError(MyError.anError)
subject.subscribe({ (event) in
print("Subscription2:", (event.element ?? event.error ?? event) ?? "")
}).disposed(by: disposeBag)

输出:

1
2
Subscription1: anError
Subscription2: anError

最新值分别发送给了 subscription 1 和 2,anError。

BehaviorSubject 在空状态占位上很有用,例如,在一个 user profile 页面中,你可以给控件绑定一个 behaviorSubject。当 app 获取新数据的时候,可以先用最新值来占位。或者,在一个搜索页面,你可以展示最新的五个搜索数据。

ReplaySubject

当使用一个 ReplaySubject,它锁创建的缓存是被内存所持有的。如果你为摸个类型的 ReplySubject 设置一个很大的缓存尺寸,每个实例都会占据很大的内存。另一个需要注意的点是,创建一个数组的 ReplaySubject,每次发出的 element 是一个数组,因此,缓存区会缓存许多数组,这会引起内存压力。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
let subject = ReplaySubject<String>.create(bufferSize: 2)
subject.onNext("1")
subject.onNext("2")
subject.onNext("3")

let disposeBag = DisposeBag()

subject.subscribe({
print("Subscription1:", ($0.element ?? $0.error ?? $0) ?? "")
}).disposed(by: disposeBag)

subject.subscribe({
print("Subscription2:", ($0.element ?? $0.error ?? $0) ?? "")
}).disposed(by: disposeBag)

控制台会输出:

1
2
3
4
Subscription1: 2
Subscription1: 3
Subscription2: 2
Subscription2: 3

1 没有被发出,而 2、3 被发出了。因为上面代码创建的缓存区大小是 2,2 和 3 挤掉了 1。

继续添加下面的代码:

1
2
3
4
subject.onNext("4")
subject.subscribe({
print("Subscription3:", ($0.element ?? $0.error ?? $0) ?? "")
}).disposed(by: disposeBag)

输出:

1
2
3
4
Subscription1: 4
Subscription2: 4
Subscription3: 3
Subscription3: 4

因为输出的内容是按缓存区区分的,所以后面添加的第三个订阅者输出了 3 和 4,最新的两个事件。而订阅者 1 和订阅者 2 在上一个 buffer 中已经有了值,所以发出最新值 4。

下面来测试下 Error 情况,在第三个订阅之前加上:

1
2
3
4
5
6
subject.onNext("4")
// 新添加
subject.onError(MyError.anError)
subject.subscribe({
print("Subscription3:", ($0.element ?? $0.error ?? $0) ?? "")
}).disposed(by: disposeBag)

输出:

1
2
3
4
5
6
7
Subscription1: 4
Subscription2: 4
Subscription1: anError
Subscription2: anError
Subscription3: 3
Subscription3: 4
Subscription3: anError

这是因为上文提到过:每种类型的 subject 在停止时,都会重新发送它的停止事件给未来的订阅者。

在 onError 事件后添加:

1
2
3
4
5
subject.onError(MyError.anError)
subject.dispose()
subject.subscribe({
print("Subscription3:", ($0.element ?? $0.error ?? $0) ?? "")
}).disposed(by: disposeBag)

输出:

1
2
3
4
5
Subscription1: 4
Subscription2: 4
Subscription1: anError
Subscription2: anError
Subscription3: Object `RxSwift.(unknown context at 0x128dcdbc8).ReplayMany<Swift.String>` was already disposed.

在第三个订阅前手动调用 dipose,第三个订阅只会收到一个错误事件,说,subject 已经被销毁了。

通常情况下,不需要手动调用 dispose(),如果你给一个 subscription 添加了一个 disposeBag,那么当这个 subject 的持有者被销毁的时候,它也会一并销毁。

Variables

上文提到过,Variables 封装了一个 BehaviorSubject 并且存储了当前值。你可以通过 value property 去获取当前值,和其他 subjects 与通常的 observable 不一样的是,你也可以用那个 value property 去给一个 Variable 设置新的元素,换句话说,不用调用 onNext 方法。Variables 特殊的地方在于,它确保了不会发出一个 error 事件,尽管你可以去监听 .error 事件,但你不能手动添加一个 .error 事件给 Variable。Variable 同样会在它要被释放的时候发送 complete 事件,所以不需要手动添加 .completed 事件。

创建一个 Variable:

1
2
3
4
5
6
7
8
let variable = Variable("Init")
variable.value = "Re-init"

let disposeBag = DisposeBag()

variable.asObservable().subscribe({
print("Subscription:", ($0.element ?? $0.error ?? $0) ?? "")
}).disposed(by: disposeBag)

输出:

1
Subscription: Re-init

由控制台输出可知,订阅获得了最新值。这点和 BehaviorSubject 没什么区别,但是写法上,Variable 是需要先变成 Observable。

将刚才 Subscription 改为 Subscription1,继续测试新值:

1
2
3
4
5
6
variable.value = "1"
variable.asObservable().subscribe({
print("Subscription2:", ($0.element ?? $0.error ?? $0) ?? "")
}).disposed(by: disposeBag)

variable.value = "2"

输出:

1
2
3
4
Subscription1: 1
Subscription2: 1
Subscription1: 2
Subscription2: 2

之前的 subscription1 在 variable 设置新值 1 的时候收到了新值,当 subscription2 被订阅的时候也收到了同样的值,因为在 variable.value = "1" 时,1 就是最新值。然后 value 被更新为 2,同样的,两次订阅都收到最新值。

Variable 是非常有用的,主要有以下两点:

  1. 和其他的 subject 一样,无论什么时候,一个 next 事件被发出,它都会做出响应。
  2. 它可以提供“一次性”的需求,例如,当你仅仅需要检查当前值而不用去订阅获取更新。

以上就是四种 Subjects 的主要用法。后续会更新 Observable 和 Operator 的实战内容。