0%

RxJava2.x 整理

RxJava属于响应式编程,是一种基于异步数据流的编程方式。

关于RxJava

RxJava属于响应式编程,是一种基于异步数据流的编程方式。
响应式编程的一个关键概念是事件。因为事件是唯一合适的方式将我们的现实世界映射到软件中,对此我们做出响应。

RxJava到底是什么 —- 异步。
RxJava好在哪里 —- 简洁。

响应式的RxJava通过链式的逻辑简洁能够呈现出最简洁但不失优雅的异步代码,更重要的是随着逻辑越来越复杂,它依然可以保持简洁。

观察者模式

Observable:被观察者,它决定什么时候触发事件以及触发怎么样的事件
Observer:观察者,它决定事件触发的时候将有什么样的行为

当发生事件时候,Observable 通过 subscribe() 订阅 Observer,这就是观察者模式

对比一个实例更容易理解


按照观察者模式来理解(Button -> 被观察者,OnClikcListener -> 观察者,setOnClickListener -> 订阅,onClick -> 事件)

因此RxJava四个基本概念分别为:Observable(可观察者,即被观察者)、 Observer(观察者)、 subscribe (订阅)、事件。ObservableObserver 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知Observer

基本实现

相比于1.x,2.x增加新特性并且依赖了于四个基本接口

  • Publisher
  • Subscriber
  • Subscription
  • Processor

Publisher发送事件,Subscriber接受和处理事件。在 RxJava 2.x 中,Observable 用于订阅 Observer,不再支持背压(1.x 中可以使用背压策略),而 Flowable 用于订阅 Subscriber , 是支持背压的。

背压:背压是指在异步场景中,被观察者发送速度远快于观察者处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略

因此2.x最大的改动就是将 Observable 拆分为 Observable 和 Flowable。

同时出现了两种观察者模式。

创建Observer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Observer<Integer> observer = new Observer<Integer>() {
Disposable mDisposable;

@Override
public void onSubscribe(Disposable d) {
mDisposable = d;
}

@Override
public void onNext(Integer integer) {
if (integer == 2) {
mDisposable.dispose();
}
Log.d(TAG, "onNext : value : " + integer + "\n");
}

@Override
public void onError(Throwable e) {
e.printStackTrace();
}

@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};

onSubscribe()回调方法传递参数为DisposableDisposable相当于 RxJava 1.x 中的 Subscription, 用于解除订阅。

同时需要注意的是,onComplete()onError()方法是互斥的,在队列中调用其中一个,就不应该再调用另一个。

创建Observable

1
2
3
4
5
6
7
8
9
10
11
Observable observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
Log.d(TAG, "Observable emit 1\n");
e.onNext(1);
Log.d(TAG, "Observable emit 2\n");
e.onNext(2);
Log.d(TAG, "Observable emit 3\n");
e.onComplete();
}
});

创建Observable时,回调ObservableEmitter,被调用两次onNext()和一次onComplete并直接throws Exception。

除了create()方法之外,RxJava 2.x 还提供一些方法来快捷创建队列。

下列方法最终效果和使用create()方法一样。

just()

1
Observable observable = Observable.just(1,2,3);

fromArray()

1
2
int[] nums = {1,2,3};
Observable observable = Observable.fromArray(nums);

订阅Subscribe

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
    Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
Log.d(TAG, "Observable emit 1\n");
e.onNext(1);
Log.d(TAG, "Observable emit 2\n");
e.onNext(2);
Log.d(TAG, "Observable emit 3\n");
e.onComplete();
}
}).subscribe(new Observer<Integer>() {
Disposable mDisposable;

@Override
public void onSubscribe(Disposable d) {
mDisposable = d;
}

@Override
public void onNext(Integer integer) {
if (integer == 2) {
mDisposable.dispose();
}

Log.d(TAG, "onNext : integer : " + integer + "\n");
}

@Override
public void onError(Throwable e) {
e.printStackTrace();
}

@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}

除了 subscribe(Observer)subscribe(Subscriber)subscribe()还支持不完整定义的回调,RxJava 会自动根据定义创建出 Subscriber 。

Consumer是一个接口,只有一个单参数无返回值的accept()方法,由于onNext(T obj)onError(Throwable e)也是单参数无返回值的,因此 Consumer可以将 onNext(obj)onError(e)打包起来传入 subscribe() 以实现不完整定义的回调。

BiConsumer则是接受两个参数。

类比RxJava 1.x

Consumer -> Action1

BiConsumer -> Action2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
    Observable.just(1, 2, 3)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept : integer :" + integer + "\n");
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d(TAG, throwable.getMessage());
}
});

}

变换的原理lift()

lift()核心代码

1
2
3
4
5
6
7
8
9
10
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
return Observable.create(new OnSubscribe<R>() {
@Override
public void call(Subscriber subscriber) {
Subscriber newSubscriber = operator.call(subscriber);
newSubscriber.onStart();
onSubscribe.call(newSubscriber);
}
});
}

好玩的是 lift() 生成新的 Observable 并返回,而且创建新 Observable 所用参数 OnSubscribe 的回调方法 call() 的实现和Observable.subscribe() 其实是一样的。在lift()创建了一个 Observable 后,加上之前的原始 Observable,其实就会有两个 Observable。

类比过来 lift() 有些像一种代理机制,拦截事件和实现事件序列的变换。

这句话概括的简洁明了

在 Observable 执行了 lift(Operator) 方法之后,会返回一个新的 Observable,这个新的 Observable 会像一个代理一样,负责接收原始的 Observable 发出的事件,并在处理后发送给 Subscriber。

线程控制

既然是异步,关键在于线程的切换,RxJava使用 subscribeOn() 和 observeOn() 来轻松进行线程控制。

RxJava内置线程供选择

  • Schedulers.io() io操作的线程, 通常用于网络,读写文件等io密集型的操作;
  • Schedulers.computation() CPU密集型计算的线程,即不会被 I/O 等操作限制性能的操作;
  • Schedulers.newThread() 创建新线程;
  • AndroidSchedulers.mainThread() 代表Android的主线程;

subscribeOn():指定 subscribe() 所发生的线程,即Observable.onSubscribe被激活时所处的线程,是事件的产生线程。

observeOn():指定 subscriber 所运行的线程,是事件的消费线程,我更愿意理解为事件的回调和呈现线程。

后台线程取数据,主线程显示

subscribeOn()observeOn()的内部实现也是用的 lift(),也就是说线程控制的原理主要也是lift ()。

不同的是 subscribeOn() 的线程切换发生在内建的 OnSubscribe 中,在通知上一级 OnSubscribe 时,事件还没有由上一级的 OnSubscribe 发送到Subscriber,所以 subscribeOn()可以从一开始最开端就影响后续的切换。

但是 observeOn() 不一样,它的线程切换发生在内建的
Subscriber 发送事件给下一级的 Subscriber时,所以observeOn()是控制后面的线程,observeOn() 指定的是它之后的操作所在的线程。

这样也就很好理解如果有多次切换线程的需求, observeOn()可以多次调用,但是只有第一个 subscribeOn() 起作用,因为通知过程中线程就被第一个subscribeOn()截断了。

操作符

Mytraining for RxJava2

留坑

doOnNext()doOnSubscribeOn()分析


RxJava 1.x VS RxJava 2.x

RxJava 1.x RxJava 2.x
About backpressure About backpressure
when create Observable, callback Observable.onSubscribe() when create Observable, callback ObservableEmitter<> and directly throws Exception
when create Observer, callback onNext(),onError(),onCompleted(), use Subscription when create Observer, callback onNext(),onError(),onComplete(),onSubscribe(), use Disposable
To simplify subscribe, Fun1, Fun2, Action1,Action2 To simplify subscribe, Function, BiFunction, Consumer, BiConsumer

参考:

RxJava-Doc

给 Android 开发者的 RxJava 详解

这可能是最好的RxJava 2.x 教程