RxJava属于响应式编程,是一种基于异步数据流的编程方式。
关于RxJava
RxJava属于响应式编程,是一种基于异步数据流的编程方式。
响应式编程的一个关键概念是事件。因为事件是唯一合适的方式将我们的现实世界映射到软件中,对此我们做出响应。
RxJava到底是什么 —- 异步。
RxJava好在哪里 —- 简洁。
响应式的RxJava通过链式的逻辑简洁能够呈现出最简洁但不失优雅的异步代码,更重要的是随着逻辑越来越复杂,它依然可以保持简洁。
观察者模式
Observable:被观察者,它决定什么时候触发事件以及触发怎么样的事件
Observer:观察者,它决定事件触发的时候将有什么样的行为
当发生事件时候,Observable 通过 subscribe() 订阅 Observer,这就是观察者模式
对比一个实例更容易理解
按照观察者模式来理解(Button -> 被观察者,OnClikcListener -> 观察者,setOnClickListener -> 订阅,onClick -> 事件)
因此RxJava四个基本概念分别为:Observable
(可观察者,即被观察者)、 Observer
(观察者)、 subscribe
(订阅)、事件。Observable
和 Observer
通过 subscribe() 方法实现订阅关系,从而 Observable
可以在需要的时候发出事件来通知Observer
。
基本实现
相比于1.x,2.x增加新特性并且依赖了于四个基本接口
- Publisher
- Subscriber
- Subscription
- Processor
Publisher发送事件,Subscriber接受和处理事件。在 RxJava 2.x 中,Observable 用于订阅 Observer,不再支持背压(1.x 中可以使用背压策略),而 Flowable 用于订阅 Subscriber , 是支持背压的。
背压:背压是指在异步场景中,被观察者发送速度远快于观察者处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略
因此2.x最大的改动就是将 Observable 拆分为 Observable 和 Flowable。
同时出现了两种观察者模式。
创建Observer
1 | Observer<Integer> observer = new Observer<Integer>() { |
onSubscribe()
回调方法传递参数为Disposable
,Disposable
相当于 RxJava 1.x 中的 Subscription
, 用于解除订阅。
同时需要注意的是,onComplete()
和onError()
方法是互斥的,在队列中调用其中一个,就不应该再调用另一个。
创建Observable
1 | Observable observable = Observable.create(new ObservableOnSubscribe<Integer>() { |
创建Observable时,回调ObservableEmitter
,被调用两次onNext()
和一次onComplete
并直接throws Exception。
除了create()
方法之外,RxJava 2.x 还提供一些方法来快捷创建队列。
下列方法最终效果和使用create()方法一样。
just()1
Observable observable = Observable.just(1,2,3);
fromArray()1
2int[] nums = {1,2,3};
Observable observable = Observable.fromArray(nums);
订阅Subscribe
1 | Observable.create(new ObservableOnSubscribe<Integer>() { |
除了 subscribe(Observer)
和subscribe(Subscriber)
,subscribe()
还支持不完整定义的回调,RxJava 会自动根据定义创建出 Subscriber 。
Consumer
是一个接口,只有一个单参数无返回值的accept()方法,由于onNext(T obj)
和 onError(Throwable e)
也是单参数无返回值的,因此 Consumer
可以将 onNext(obj)
和 onError(e)
打包起来传入 subscribe() 以实现不完整定义的回调。
BiConsumer
则是接受两个参数。
类比RxJava 1.x
Consumer
-> Action1
BiConsumer
-> Action2
1 | Observable.just(1, 2, 3) |
变换的原理lift()
lift()核心代码1
2
3
4
5
6
7
8
9
10public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
return Observable.create(new OnSubscribe<R>() {
public void call(Subscriber subscriber) {
Subscriber newSubscriber = operator.call(subscriber);
newSubscriber.onStart();
onSubscribe.call(newSubscriber);
}
});
}
好玩的是 lift() 生成新的 Observable 并返回,而且创建新 Observable 所用参数 OnSubscribe 的回调方法 call() 的实现和Observable.subscribe() 其实是一样的。在lift()创建了一个 Observable 后,加上之前的原始 Observable,其实就会有两个 Observable。
类比过来 lift() 有些像一种代理机制,拦截事件和实现事件序列的变换。
这句话概括的简洁明了
在 Observable 执行了 lift(Operator) 方法之后,会返回一个新的 Observable,这个新的 Observable 会像一个代理一样,负责接收原始的 Observable 发出的事件,并在处理后发送给 Subscriber。
线程控制
既然是异步,关键在于线程的切换,RxJava使用 subscribeOn() 和 observeOn() 来轻松进行线程控制。
RxJava内置线程供选择
- Schedulers.io() io操作的线程, 通常用于网络,读写文件等io密集型的操作;
- Schedulers.computation() CPU密集型计算的线程,即不会被 I/O 等操作限制性能的操作;
- Schedulers.newThread() 创建新线程;
- AndroidSchedulers.mainThread() 代表Android的主线程;
subscribeOn():指定 subscribe() 所发生的线程,即Observable.onSubscribe被激活时所处的线程,是事件的产生线程。
observeOn():指定 subscriber 所运行的线程,是事件的消费线程,我更愿意理解为事件的回调和呈现线程。
后台线程取数据,主线程显示
subscribeOn()
和 observeOn()
的内部实现也是用的 lift()
,也就是说线程控制的原理主要也是lift ()。
不同的是 subscribeOn() 的线程切换发生在内建的 OnSubscribe 中,在通知上一级 OnSubscribe 时,事件还没有由上一级的 OnSubscribe 发送到Subscriber,所以 subscribeOn()
可以从一开始最开端就影响后续的切换。
但是 observeOn()
不一样,它的线程切换发生在内建的
Subscriber 发送事件给下一级的 Subscriber时,所以observeOn()
是控制后面的线程,observeOn() 指定的是它之后的操作所在的线程。
这样也就很好理解如果有多次切换线程的需求, observeOn()
可以多次调用,但是只有第一个 subscribeOn()
起作用,因为通知过程中线程就被第一个subscribeOn()
截断了。
操作符
留坑
doOnNext()
,doOnSubscribeOn()
分析
RxJava 1.x VS RxJava 2.x
RxJava 1.x | RxJava 2.x |
---|---|
About backpressure ![]() |
About backpressure ![]() |
when create Observable, callback Observable.onSubscribe() |
when create Observable, callback ObservableEmitter<> and directly throws Exception |
when create Observer, callback onNext(),onError(),onCompleted() , use Subscription |
when create Observer, callback onNext(),onError(),onComplete(),onSubscribe() , use Disposable |
To simplify subscribe, Fun1, Fun2, Action1,Action2 |
To simplify subscribe, Function, BiFunction, Consumer, BiConsumer |