Observable

什么是 Observable

Observables 是 Rx 的核心。

在 RxSwift 中,所有事情都是 sequence。Observable 也是一个 sequence,可被监听的 sequence。
Observable 最重要的功能就是异步操作,它在一定事件内会发出事件。

– 1 – 2 – 3 –>

从左向右的箭头代表了时间,数字代表了 sequence 元素。元素 1 发送后,接着 2 和 3 也被发出,这些事件就伴随着 Observable 的生命周期。

同样的,RACSignal 也是 RAC 的核心,如果用过 RAC,可以暂且把 Observables 当做 RACSignal 来学习。

Observable 生命周期

  • 当一个 Observable 发出一元素,就是说发出了一个 next event。
  • 当其结束分发事件后,就会发出一个 completed event,然后这个 Observable 就会被终止。
  • 如果分发事件的时候出现错误,就会发出一个 error event,同时它也会被终止。
  • 一旦一个 Observable 被终止,它也会停止分发事件。

创建 Observables

just - of - from

  • just 创建了一个只包含一个元素的 observable sequence。
  • of 创建的 sequence 类型由传入的元素类型所决定。
  • from 创建的 sequence,是从一个数组中获取的元素。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
let one = 1
let two = 2
let three = 3

let observable: Observable<Int> = Observable<Int>.just(one)
// An observable of Int with 3 elements
let observable2 = Observable.of(one, two, three)
// An observable of [Int] with a single element - Array
let observable3 = Observable.of([one, two, three])
// An observable of Int
let observable4 = Observable.from([one, two, three])

observable.subscribe(onNext: { (int) in
print("just: \(int)")
// just: 1
})

observable2.subscribe(onNext: { (int) in
print("of: \(int)")
// of: 1
// of: 2
// of: 3
})

observable3.subscribe(onNext: { (int) in
print("of array: \(int)")
// of array: [1, 2, 3]
})

observable4.subscribe(onNext: { (int) in
print("from: \(int)")
// from: 1
// from: 2
// from: 3
})

订阅 Observables

在 RxSwift 中,订阅,subscribe(),就像 KVO 中的 addObserver()。但是不同的是,只有一个 subscriber 订阅了 observable,它才会发出 event。当 observable 调用 subscribe 操作时,在闭包中,它为每个元素发送了 .next event,然后发送 .completed event,然后终止。当然,也可能发出 .error event。

在 subscribe 操作中使用 iflet 语法可以获取 element。

subscribe(onNext:) 处理 .next 事件,不会去理会其他事件。onNext 闭包接收 next 事件的元素作为参数,所以可以不必在 subscribe 操作中手动获取元素了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
let one = 1
let two = 2
let three = 3

let observable = Observable.of(one, two, three)
observable.subscribe { (event) in
print("subscrib event: \(event)")
if let element = event.element {
print("subscrib: \(element)")
}
}

observable.subscribe(onNext: { (element) in
print("subscribe next on: \(element)")
})

// 控制台输出:
/**
subscrib event: next(1)
subscrib: 1
subscrib event: next(2)
subscrib: 2
subscrib event: next(3)
subscrib: 3
subscrib event: completed
subscribe next on: 1
subscribe next on: 2
subscribe next on: 3
*/

empty

empty 操作创建了一个空的 observable sequence,不包含任何元素,它只发送一个 completed 事件。
如果一个 observable 不能被推断类型,则它必须指定类型:Observable<Type>

1
2
3
4
5
6
7
8
let observable = Observable<Void>.empty()
observable
.subscribe(onNext: { (element) in
print("empty: \(element)")
}) {
print("Completed")
// Completed
}

empty 一般在你想返回一个直接终止的或者没有值的 observable 时使用。

never

和 empty 相反,字面意思也可以看出,never 创建的 observable 不发出任何值。
并且这个 observable 也不会终止,它可以用来代表一个无限的时间间隔。

1
2
3
4
5
6
let observable = Observable<Any>.never()
observable
.subscribe( onNext: { element in print(element) },
onCompleted: { print("Completed") } )

// 控制台什么都不会打印

range

1
2
3
4
5
6
7
8
9
10
11
12
let observable = Observable.range(start: 1, count: 3)

var n = 0

observable.subscribe(onNext: { (i) in
n += i
print(n)
})

// 1 (0+1)
// 3 (1+2)
// 6 (3+3)

range 操作会自动发出 completed 事件并终止。

销毁和终止 Observable

订阅操作触发了一个 observable 去发出事件,当它发出 error 事件或 completed 事件后就会被终止。但是也可以手动通过取消订阅去终止。

dispose

通过 dispose() 操作可以手动取消一个 observable 的订阅。

1
2
3
4
5
6
7
8
9
10
11
let observable = Observable.of("a", "b", "c")
let subscription = observable.subscribe { (event) in
print(event)
}

subscription.dispose()

// next(a)
// next(b)
// next(c)
// completed

DisposeBag

手动管理每个订阅的销毁是非常蛋疼的,RxSwift 提供了一个 DisposeBag 类去管理每个订阅的取消(通过 disposed(by:) 操作实现)。
一个 disposeBag 持有 disposables,当 disposeBag 要被取消配置的时候,它会对每个 observable 调用 dispose() 操作。

1
2
3
4
5
6
7
let disposeBag = DisposeBag()

Observable.of("a", "b", "c")
.subscribe {
print($0)
}
.disposed(by: disposeBag)

DisposeBag 类也是最常用的,创建并订阅一个 observable,然后直接在后面添加 disposeBy 操作。

为什么要 dispose 操作?

当你忘记给一个订阅操作添加 disposeBag,或者当一个订阅结束手动调用 dispose() 操作,或者在某个时间点引起了 observable 的终止,这些行为都可能造成内存泄露,而 disposeBag 可以去自动管理 observables 的终止操作。

创建 Observable

create 操作:static func create(_ subscribe: @escaping (AnyObserver<Int>) -> Disposable) -> Observable<Int>

通过 create 操作可以创建一个 observable,create 操作包含了一个 subscribe 的闭包参数,这个闭包提供了 observer,用来实现 observable 的订阅。AnyObserver 是一个泛型类型,给一个 observable sequence 添加值,这个值被发送给订阅者。

Observer 可以发出 next、completed、error 事件。

至此可以看出 RxSwift 的 Observable.create 操作和 RAC 的 [RACSignal create] 操作基本上是一个意思。ReactiveCocoa 也是借用了 Rx 的思想在 OC 上实现了响应式框架。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

enum MyError: Error {
case anError
}

let disposeBag = DisposeBag()

Observable<String>.create { (observer) -> Disposable in
observer.on(.next("1"))
observer.on(.error(MyError.anError))
observer.on(.completed)
observer.onNext("?")

// Disposables.create() 操作返回了一个空的 disposable
return Disposables.create()
}
.subscribe(
onNext: { print($0) },
onError: { print($0) },
onCompleted: { print("Completed") },
onDisposed: { print("Disposed") })
.disposed(by: disposeBag)

// 1
// anError
// Disposed
// observer 发出的 error 操作终止了订阅行为,completed 操作和 next("?") 操作并没有被发出

在上面的这段代码中,如果你注释掉 error、completed 和 disposedBy 操作会发生什么?
造成内存泄漏,控制台会打印:1 和 ?,并没有任何 error 或 completed 或 disposed 信息被打印。

创建 observable 工厂

deferred

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
let disposeBag = DisposeBag()

var flip = false

let factory: Observable<Int> = Observable.deferred {

flip = !flip

if flip {
return Observable.of(1, 2, 3)
} else {
return Observable.of(4, 5, 6)
}
}

for _ in 0...3 {
factory
.subscribe(onNext: {
print($0, terminator:"")
})
.disposed(by: disposeBag)
print()
}

// 123
// 456
// 123
// 456

Singles

Singles 会发送两个事件:

  • .success(value) 事件是 .next.completed 的组合
  • .error

Single 非常适合处理一次性操作,要么成功,要么失败,例如:

  • 下载数据
  • 从磁盘加载数据

Single 使用示范:
将一个文件拖入工程中,例如我拖入了一个 Repo 的 README.md 文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
func exampleOfSingle() {

print("--- Example of: single --- ")

let disposeBag = DisposeBag()
loadText(from: "README", type: "md")
.subscribe {
// single in
//
// print(single)
// switch single {
// case.success(let string):
// print(string)
// case.error(let error):
// print(error)
switch $0 {
case .success(let string):
print(string)
case .error(let error):
print(error)
}
}
.disposed(by: disposeBag)
}

func loadText(from name: String, type: String = "txt") -> Single<String> {
return Single.create { single in
let disposable = Disposables.create()

guard let path = Bundle.main.path(forResource: name, ofType: type) else {
single(.error(FileReadError.fileNotFound))
return disposable
}

guard let data = FileManager.default.contents(atPath: path) else {
single(.error(FileReadError.unreadable))
return disposable
}

guard let contents = String(data: data, encoding: .utf8) else {
single(.error(FileReadError.encodingFailed))
return disposable
}

single(.success(contents))

return disposable
}
}

enum FileReadError: Error {
case fileNotFound, unreadable, encodingFailed
}

以上代码调用 exampleOfSingle() 后,控制台会打印 README.md 的内容:

1
2
3
# LearnRxSwift

#### 项目介绍

Completable

Completable 仅仅会发送一个 completed 事件或者一个 error 事件,它不会发送任何 value。

当你仅仅考虑一个操作是否成功或者失败的时候,可以使用 Completable。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
let bag = DisposeBag()

let completable = Completable.create { (event) -> Disposable in

// 模拟操作
let flag = Int.random(in: 0..<2)

var success: Bool {
return flag == 1 ? true : false
}

guard success else {
event(.error(FileReadError.fileNotFound))
return Disposables.create()
}

event(.completed)
return Disposables.create()
}

completable.subscribe(onCompleted: {
print("Completed")
}) { (error) in
print("An error has occurred")
}.disposed(by: bag)

// An error has occurred

Maybe

Maybe 是 Single 和 Completable 的混搭。它可以发送 success、completed 或 error 事件,但只能发送一个事件。

如果你需要去事件一个可以发送成功或者失败,并且当成功时,可以选择性的返回一个值的操作,可以使用 Maybe 实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
let maybe = Maybe<Int>.create { (event) -> Disposable in
event(.success(1))
event(.completed)
event(.error(FileReadError.encodingFailed))
return Disposables.create()
}

let bag = DisposeBag()

maybe.subscribe(onSuccess: {
print($0)
}, onError: { (errpr) in
print("An error has occurred")
}) {
print("Completed")
}.disposed(by: bag)

// 1