RxJava 2.0 简单介绍
一年一年有一年,RxJava也新增了2.0版本,那么为什么是新增版本而不说升级版本呢?
因为2.0版本和1.0版本两者并不兼容,2.0版本是基于Reactive-Streams规范重新设计而来;同时1.x版本和2.x版本两者会并行开发维护,但是1.x版本只维护到2018-03-31。
下面我们简单介绍一下两者的不同。
0x00 依赖&包名不同
使用rxjava 1.x、2.x版本的依赖如下:
// rxjava 1.x
compile 'io.reactivex:rxjava:1.1.6'
// rxjava 2.x
compile "io.reactivex.rxjava2:rxjava:2.x.y"
包名修改如下:
// 1.x -> 2.x
rx.** -> io.reactivex.**
0x01 Observable与Flowable
Observable
在2.0版本不支持backpressure,它会缓存全部的数据,一一发送给消费者,如果消费不及时,会产生OOM。于此对应,在2.x版本新增了Flowable
,支持设置/自定义backpressure,同时在创建时必须制定backpressure。
Flowable.create(new FlowableOnSubscribe<Object>() {
@Override
public void subscribe(FlowableEmitter<Object> e) throws Exception {
for (int i = 0; i < 256; i++) {
e.onNext(i);
}
e.onComplete();
}
}, BackpressureStrategy.BUFFER).subscribe(System.out::println, Throwable::printStackTrace);
0x02 Single
当使用Single
时,生产者调用onSuccess()
通知订阅者,同时终止整个事件流,生产者只能发送一个success事件,订阅者也只能收到一个success事件,适用于网络请求等确定只有单个事件的事件流。对于1.x版本而言,则需要主动调用onComplete()来终止事件流。
注意: Single没有onComplete()方法;只能产生success、error两种事件。
Single.create(s -> s.onSuccess("aaaa"))
.subscribe(System.out::println, Throwable::printStackTrace);
0x03 Completable
当使用Completable
时,生产者通过调用onComplete()
终止事件流,订阅者会收到事件结束回调,适用于订阅者仅需要知道事件结束,而不需要执行结果的情形。
注意: Completable没有onSuccess()方法;只能产生complete、error两种事件。
Completable.create(new CompletableOnSubscribe() {
@Override
public void subscribe(CompletableEmitter e) throws Exception {
// do something;
e.onComplete();
}
}).subscribe();
0x04 Maybe
Maybe
是Single
和Completable
的组合体,相较于Single
只能发送一次item,Completable
只能通知事件结束,Maybe
可以发送最多一个item,也就是可以发送一个item或者直接终止事件流。
当Maybe
调用onSuccess()
结束事件流时,订阅者收到一次success事件;当Maybe
调用onComplete()
结束事件流时,订阅者只能收到事件结束事件。
onSuccess()收到一次事件:
Maybe.create(new MaybeOnSubscribe<Object>() { @Override public void subscribe(MaybeEmitter<Object> e) throws Exception { e.onSuccess("aaa"); } }).subscribe(System.out::println, Throwable::printStackTrace, () -> { System.out.println("onCompletable..."); });
onComplete()收到结束事件:
Maybe.create(new MaybeOnSubscribe<Object>() { @Override public void subscribe(MaybeEmitter<Object> e) throws Exception { e.onComplete(); } }).subscribe(System.out::println, Throwable::printStackTrace, () -> { System.out.println("onCompletable..."); });
注意: Maybe拥有onSuccess()和onComplete()方法;可以产生success、complete、error三种事件,其中success和complete是对立的。
0x05 Null
2.0x版本不支持传递null事件,会抛出NullPointerException
终止整个事件流。
Single.create(new SingleOnSubscribe<Object>() {
@Override
public void subscribe(SingleEmitter<Object> e) throws Exception {
e.onSuccess(null);
}
}).subscribe(System.out::println, Throwable::printStackTrace);
错误日志如下:
java.lang.NullPointerException: onSuccess called with null. Null values are generally not allowed in 2.x operators and sources.
0x06 取消订阅
1. 接口改变
2.x版本由于按照Reactive-Streams规范进行开发,而在Reactive-Streams中已经定义了org.reactivestreams.Subscription
接口
package org.reactivestreams;
public interface Subscription {
void request(long var1);
void cancel();
}
,而1.x版本也定义了一个rx.Subscription
接口
package rx;
public interface Subscription {
void unsubscribe();
boolean isUnsubscribed();
}
2. 简单取消订阅
可以看到两个类名一样,但是接口方法并不一样,含义也不相同,所以为了避免歧义,2.x版本中干掉了旧的Subscription
,同时使用Disposable
接口来替代旧的Subscription
。具体代码如下:
// 1.x 调用unsubscribe()方法来取消订阅
final rx.Subscription subscription = rx.Observable.just(1, 2, 3).subscribe();
subscription.unsubscribe();
// 2.x 调用dispose()方法来取消订阅
final Disposable subscriber = Flowable.just(1, 2, 3).subscribe();
subscriber.dispose();
3. 使用Subscriber取消订阅
在1.x版本中,我们调用subscribe()
后会返回一个rx.Subscription
,我们可以使用它进行操作;在2.x版本中,我们调用subscribe()
时,如果传入的是Subscriber
,那就返回值是void
,需要大家自己保存引用。
// 1.x
rx.Subscription subscription = rx.Observable.just(1, 2, 3)
.subscribe(new rx.Subscriber<Integer>() {
@Override
public void onNext(Integer integer) {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
subscription.unsubscribe();
// 2.x
ResourceSubscriber<Integer> resourceSubscriber = new ResourceSubscriber<Integer>() {
@Override
public void onNext(Integer integer) {
}
@Override
public void onError(Throwable t) {
// must dispose;
dispose();
}
@Override
public void onComplete() {
// must dispose;
dispose();
}
};
// 注意当传入subscriber进行订阅时,返回值是void,所以需要自己保存;
Flowable.just(1, 2, 3).subscribe(resourceSubscriber);
resourceSubscriber.dispose();
4. 批量取消订阅
1.x版本使用rx.CompositeSubscription
批量取消订阅;2.x版本使用io.reactivex.disposables.CompositeDisposable
批量取消订阅。
0x07 Subject & Processor
按照Reactive-Streams规范,Subject
是一种行为,既是消费者,同时也是生成者,最终被定义为org.reactivestreams.Processor
接口,故而,在1.x版本中的subject
,在2.x版本中就变成了processor
,并且支持backpressure。同时2.x版本中保留了1.x版本的subject
,配合Observable
使用,不过也不支持backpressure。如:
// 1.x
Subject<Object, Object> subject= new SerializedSubject<>(PublishSubject.<Object>create());
subject.onNext("aaa");
subject.onError("aaa");
subject.onComplete();
// 2.x
final FlowableProcessor<Object> objectFlowableProcessor =
PublishProcessor.create().toSerialized();
objectFlowableProcessor.onNext("aa");
objectFlowableProcessor.onError(new Throwable());
objectFlowableProcessor.onComplete();