rxjava 2.0 head first

RxJava 2.0 简单介绍

一年一年有一年,RxJava也新增了2.0版本,那么为什么是新增版本而不说升级版本呢?

因为2.0版本和1.0版本两者并不兼容,2.0版本是基于Reactive-Streams规范重新设计而来;同时1.x版本和2.x版本两者会并行开发维护,但是1.x版本只维护到2018-03-31

下面我们简单介绍一下两者的不同。

0x00 依赖&包名不同

使用rxjava 1.x、2.x版本的依赖如下:

// rxjava 1.x
compile 'io.reactivex:rxjava:1.1.6'

// rxjava 2.x
compile "io.reactivex.rxjava2:rxjava:2.x.y"

包名修改如下:

// 1.x -> 2.x
rx.** -> io.reactivex.**

0x01 Observable与Flowable

Observable在2.0版本不支持backpressure,它会缓存全部的数据,一一发送给消费者,如果消费不及时,会产生OOM。于此对应,在2.x版本新增了Flowable,支持设置/自定义backpressure,同时在创建时必须制定backpressure。

 Flowable.create(new FlowableOnSubscribe<Object>() {
            @Override
            public void subscribe(FlowableEmitter<Object> e) throws Exception {
                for (int i = 0; i < 256; i++) {
                    e.onNext(i);
                }
                e.onComplete();
            }
        }, BackpressureStrategy.BUFFER).subscribe(System.out::println, Throwable::printStackTrace);


0x02 Single

当使用Single时,生产者调用onSuccess()通知订阅者,同时终止整个事件流,生产者只能发送一个success事件,订阅者也只能收到一个success事件,适用于网络请求等确定只有单个事件的事件流。对于1.x版本而言,则需要主动调用onComplete()来终止事件流。

注意: Single没有onComplete()方法;只能产生success、error两种事件。

  Single.create(s -> s.onSuccess("aaaa"))
                .subscribe(System.out::println, Throwable::printStackTrace);

0x03 Completable

当使用Completable时,生产者通过调用onComplete()终止事件流,订阅者会收到事件结束回调,适用于订阅者仅需要知道事件结束,而不需要执行结果的情形。

注意: Completable没有onSuccess()方法;只能产生complete、error两种事件。

Completable.create(new CompletableOnSubscribe() {
            @Override
            public void subscribe(CompletableEmitter e) throws Exception {
                // do something;
                e.onComplete();
            }
        }).subscribe();

0x04 Maybe

MaybeSingleCompletable的组合体,相较于Single只能发送一次item,Completable只能通知事件结束,Maybe可以发送最多一个item,也就是可以发送一个item或者直接终止事件流。

Maybe调用onSuccess()结束事件流时,订阅者收到一次success事件;当Maybe调用onComplete()结束事件流时,订阅者只能收到事件结束事件。

  • onSuccess()收到一次事件:

    Maybe.create(new MaybeOnSubscribe<Object>() {
            @Override
            public void subscribe(MaybeEmitter<Object> e) throws Exception {
                e.onSuccess("aaa");
            }
        }).subscribe(System.out::println, Throwable::printStackTrace, () -> {
            System.out.println("onCompletable...");
        });
    
  • onComplete()收到结束事件:

    Maybe.create(new MaybeOnSubscribe<Object>() {
            @Override
            public void subscribe(MaybeEmitter<Object> e) throws Exception {
                e.onComplete();
            }
        }).subscribe(System.out::println, Throwable::printStackTrace, () -> {
            System.out.println("onCompletable...");
        });
    
    

注意: Maybe拥有onSuccess()和onComplete()方法;可以产生success、complete、error三种事件,其中success和complete是对立的。

0x05 Null

2.0x版本不支持传递null事件,会抛出NullPointerException终止整个事件流。

Single.create(new SingleOnSubscribe<Object>() {
            @Override
            public void subscribe(SingleEmitter<Object> e) throws Exception {
                e.onSuccess(null);
            }
        }).subscribe(System.out::println, Throwable::printStackTrace);

错误日志如下:

java.lang.NullPointerException: onSuccess called with null. Null values are generally not allowed in 2.x operators and sources.

0x06 取消订阅

1. 接口改变

2.x版本由于按照Reactive-Streams规范进行开发,而在Reactive-Streams中已经定义了org.reactivestreams.Subscription接口

package org.reactivestreams;

public interface Subscription {
    void request(long var1);

    void cancel();
}

,而1.x版本也定义了一个rx.Subscription接口

package rx;

public interface Subscription {
    void unsubscribe();

    boolean isUnsubscribed();
}

2. 简单取消订阅

可以看到两个类名一样,但是接口方法并不一样,含义也不相同,所以为了避免歧义,2.x版本中干掉了旧的Subscription,同时使用Disposable接口来替代旧的Subscription。具体代码如下:

// 1.x 调用unsubscribe()方法来取消订阅
final rx.Subscription subscription = rx.Observable.just(1, 2, 3).subscribe();
subscription.unsubscribe();

// 2.x 调用dispose()方法来取消订阅
final Disposable subscriber = Flowable.just(1, 2, 3).subscribe();
subscriber.dispose();

3. 使用Subscriber取消订阅

在1.x版本中,我们调用subscribe()后会返回一个rx.Subscription,我们可以使用它进行操作;在2.x版本中,我们调用subscribe()时,如果传入的是Subscriber,那就返回值是void,需要大家自己保存引用。


// 1.x 
rx.Subscription subscription = rx.Observable.just(1, 2, 3)
  .subscribe(new rx.Subscriber<Integer>() {
            @Override
            public void onNext(Integer integer) {

            }

            @Override
            public void onError(Throwable t) {

            }

            @Override
            public void onComplete() {

            }
        });
subscription.unsubscribe();


// 2.x
ResourceSubscriber<Integer> resourceSubscriber = new ResourceSubscriber<Integer>() {
            @Override
            public void onNext(Integer integer) {

            }

            @Override
            public void onError(Throwable t) {
              // must dispose;
				  dispose();
            }

            @Override
            public void onComplete() {
              // must dispose;
				  dispose();
            }
        };
// 注意当传入subscriber进行订阅时,返回值是void,所以需要自己保存;
Flowable.just(1, 2, 3).subscribe(resourceSubscriber);
resourceSubscriber.dispose();

4. 批量取消订阅

1.x版本使用rx.CompositeSubscription批量取消订阅;2.x版本使用io.reactivex.disposables.CompositeDisposable批量取消订阅。

0x07 Subject & Processor

按照Reactive-Streams规范,Subject是一种行为,既是消费者,同时也是生成者,最终被定义为org.reactivestreams.Processor接口,故而,在1.x版本中的subject,在2.x版本中就变成了processor,并且支持backpressure。同时2.x版本中保留了1.x版本的subject,配合Observable使用,不过也不支持backpressure。如:

// 1.x
Subject<Object, Object> subject= new SerializedSubject<>(PublishSubject.<Object>create());
subject.onNext("aaa");
subject.onError("aaa");
subject.onComplete();


// 2.x
final FlowableProcessor<Object> objectFlowableProcessor =
            PublishProcessor.create().toSerialized();
objectFlowableProcessor.onNext("aa");
objectFlowableProcessor.onError(new Throwable());
objectFlowableProcessor.onComplete();

参考

  1. RxJava 2.0