Advertisement

Rxjava2升级注意事项及适配技巧

阅读量:
  • RxJava2变化
  • NPE适配技巧

RxJava2变化

RxJava2(统称为rx2)遵循Reactive-Streams specification规范进行改写后与RxJava1相比变化显著。

  1. 引用包名不同
    import时的包名由rx.*变成了io.reactive.*

  2. 必须拒绝null值
    在rx1版本中,Observable组件能够接收任何类型的值,并且允许传递null值本身并不会引发任何问题:例如使用Observable.just(null)这样的用法也不会导致异常或性能问题。然而,在rx2版本中对这种行为进行了严格限制:如果试图向系统发射null值将会导致NullPointerException的发生。因此,在使用rx2时我们必须增加对null值的检查逻辑以避免此类异常情况的发生。

在rx1版本中存在一个显著的问题即其对_backpressure_的支持不够完善这会导致操作符或订阅者无法及时处理相关消息进而触发_missingbackpressureexception异常然而大多数情况下我们并不需要深入考虑这一问题因此rx2版本引入了新的组件_flowable该组件能够更好地管理_backpressure功能而原来的Observable组件则不再支持_backpressure功能以适应不同的使用需求因此开发人员可以根据具体场景选择合适的组件来实现预期效果

与 Observable 和 Flowable 相对应的是 Observer 和 Suscriber 两种订阅者类型。具体来说, Observable 的订阅操作为 observable.subscribe(observable\ observer), 而 Flowable 的订阅操作为 flowable.subscribe(flowable\ subscriber). 因此, 在 Observable 的 create 方法中不再传递 flowable 参数, 而是直接通过 observable\ emitter 来实现订阅功能, 即:

复制代码
    Observable.create(new ObservableOnSubscribe<Integer>() {

     @Override
     public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
    
    }
    })

基于reactive-streams框架,在不再使用原有的 Suscriber 接口后,并采用了基于 reactive-streams 的 Suscriber 实现。同时对应的 Observer 接口保持一致,在此过程中 rx2 的 Observer 和 Suscriber 系统有所区别:其中 onNextonErroronComplete 方法仍然存在;但是由于缺少了 rx1 中特有的 onStart 方法的存在性问题。然而通过使用 rx2 提供的新颖接口体系——新的 onSubscribe 方法则能够部分替代原来的(rx1中的) onStart. 如果仍希望保留使用(rx1中的) onStart, 可以参考(rx2中的) DefaultSubscriber, ResourceSubscriber 或 DisposableSubscriber 这些抽象类来实现所需的接口功能。

在Rx2框架中, 因rx2框架内新增了reactive-streams模块, 其中已存在一个名为Subscription的功能模块. 因此将rx1中的Subscription功能更名为Disposable. 不过, 为了使整个系统保持一致性, Observable.subscribe(Observer) 和 Flowable.subscribe(Subscriber) 这两种订阅方式不再返回原来的Disposable对象(因为rx1会返回一个Subscription), 而是提供了几个新的重载函数, 它们会分别返回对应的Disposable对象.

复制代码
    public final Disposable subscribe()

    
    public final Disposable subscribe(Consumer<? super T> onNext)
    
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)
    
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete)
    
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe)
    //直接订阅Observer或Subscriber返回void
    public final void subscribe(Observer<? super T> observer)
    public final void subscribe(Subscriber<? super T> s)

rx2被修改为遵循Java8的命名规范,并对Action和Function进行了调整。具体来说:

(Action)rx1 rx2
Action0 Action
Action1 Consumer
Action2 BiConsumer
Action3-Action9 取消
ActionN Consumer<Object[]>
(Function)rx1 rx2
Func0 取消
Func1 Function
Func2 BiFunction
Func3 - Func9 Function3 - Function9
FuncN Function<Object[], R>
Func1<T, Boolean> Predicate

同时,在rx2中,所有的Action和Function都抛出了异常,因此我们并不得不去捕获这些异常了。
7. 一些常用方法的变更
from
rx1中,对于数组,列表和Future类型的数据源都采用from进行获取,而rx2对其进行了细 分,分别拆分为fromArray , fromIterablefromFuture , 并添加 了一个fromPublisher , 至于 fromCallable 则保持不变。
其他
带有2-9个参数的startWith统一改为startWithArray
toList和toMap等的返回值变成Single类型,并添加toObservable方便转化
toBlocking().y变成blockingY(),例如toBlocking().firstOrDefault(defaultValue) 变成 blockingFirst(defaultValue)

这通常包括rx2中常见的几种变更,并非全部在此列举,请参见官方文档RxJava2的变化以获取详细信息

NPE适配技巧

因为rx2不再支持接收null值,所以在构建流量表之前必须首先检查该原始数据是否为null.然而,在升级过程中,默认情况下不会涉及此问题.这时添加这样的检查步骤无疑是一项耗时费力且令人厌烦的任务.因此我们可以开发并使用一个名为NullCheckUtil的工具类来处理这些原始数据,从而大大简化了每次构建流量表的过程.主要分为以下两种情况:当目标流量表的数据源是null时;以及当目标流量表的数据源不是null时.

源为单一实体
在缓存中获取对象时,在尚未保存的情况下返回值必然是null,在保存成功后才会得到非null结果;如果保存失败则返回null或未成功存储值。因此,在获取到确定值之前必须确保不会生成新的rx流。对于认为已经存在的对象(即无需再创建新对象),我们通常会采用Observable.just()来启动一个rx流;因此我们可以对该对象封装成一个外壳(即采用Java8中的Optional对其进行封装)。需要注意的是:Java8以下版本的Optional要求Android的API版本需达到24以上才能使用;若不想依赖Java8特性,则可以直接编写简单的封装类。

复制代码
    Observable.just(Optional.ofNullable(cache))

        .subscribe(new Consumer<Optional<Object>>() {
                @Override
                public void accept(Optional<Object> optional) throws Exception {
                    // 通过optional.isPresent()判断是否为null,再通过optional.get() 获取cache
                }
            });

当源数据类型为列表形式时

复制代码
    public interface Function<T, R> {
    R apply(T var);
    }
    
    // ①
    public static <T> List<T> elvisList(List<T> t) {
    if (t == null) return Collections.emptyList();
    return t;
    }
    
    // ②
    public static <T, R> List<R> elvisList(T t, Function<T, List<R>> ifNotNull) {
    List<R> list = t == null ? new ArrayList<R>() : ifNotNull.apply(t);
    if (list == null) list = Collections.emptyList();
    return list;
    }
    
    // ③
    public static <T, R> Observable<R> observableElvisList(T t, Function<T, List<R>> ifNotNull) {
    List<R> list = t == null ? new ArrayList<R>() : ifNotNull.apply(t);
    if (list == null) list = Collections.emptyList();
    return Observable.fromIterable(list);
    }

这个思路本身非常简单,它就是检查是否为null,如果是的话就需要生成一个空列表.其中的方法①最为直接,不需要多言.而方法②则适用于数据源嵌入于某个对象中的情况,这时就需要通过外部提供的Function接口来获取该列表.至于方法③,它其实就是建立在上述两种方式的基础上,直接返回一个Observable.对于rx2来说,我们可以直接应用这种方法作为其处理的基础来源,这样一来就无需过多考虑细节.

复制代码
    NullCheckUtil.observableElvisList(apiResponseData, data -> data.list).subscribe();

当采用了lambda的方式进行表达时,则显得更加简洁明了。对于这样的场景下,强烈推荐采用该方法。这样一来就无需担心数据源是否为null的问题了,在这种情况下即可轻松地构建相应的rx流。无疑这种做法大大提升了代码的简洁性和可维护性,并且对于那些需要执行通用操作(如map、filter、toList、compose等)的情况而言,则能够进一步优化代码结构。

全部评论 (0)

还没有任何评论哟~