深入Rxjava原理-带你实现一个基本的rxjava框架

Rxjava 本质上是 观察者模式框架。被观察者(Observable)->传递数据-> 观察者(observer)在调用subscribe ()方法进行订阅时,会把observer 层层往上构造出新的observer。

例如:Observable.create().map().subscribe(observer) ,observer会先被new MapObserver(observer),用MapObserver接收事件源,再传递到原始的observer。

RxJava源码分析

我们先来分析一下RxJava的工程目录

从源码结构上看无非是观察者Observer/Subscriber、被观察者Observerable、以及与订阅/观察相关的功能类:


  • annotations是相关注解
  • exceptions
  • functions 主要是订阅相关的接口类,比如Action1、Func0等
  • internal是内部使用的util、operaters的综合,方便将订阅关系捋顺,比较重要
  • observerable包主要是专门为某种场景定制的Oberverable类
  • observer包特定的订阅者及集合
  • schedulers包有关异步逻辑的线程关系
  • subjects包订阅中间产生的对象
  • subscriptions包是订阅集合,网络调用常用到

上述最最最核心的就是Observerable类了,光代码就有上万行,不过先不要慌,内部是很有条理的,我们下面继续看。

Observerable类包含三类方法

  • a.产生Observerable被观察者的方法,主要的有create、just、list等
  • b.对Observerable进行中间变换的方法,我们比较熟悉的是map、flatMap、lift、merge、zip、startwith、takeab类方法的特征是都返回Observerable对象
  • c.订阅方法,返回subscriptiond订阅对象

Observerable类是所有异步处理的开始、进行和结束,是核心类,理解了这个类就理解了RxJava。

Observerable类的众多方法中总有一款适合用来处理你的异步逻辑,有兴趣的可以深入的研究一下这些方法。

实战一个简易rxjava

为学习rxjava的基本流程,写一个精简版的rxjava

Subscriber观察者

Observer 接口

public interface Observer {    void onCompleted();    void onError(Throwable t);    void onNext(T var1);}

SubScriber 简化:

public abstract class Subscriber implements Observer {    public void start() {    }}

Observable订阅源

Observable(订阅源)

在RxJava里面是一个大而杂的类,拥有很多工厂方法和各式各样的操作符。每个Observable里面有一个OnSubscribe对象,只有一个方法(void call(Subscriber<? super T> subscriber);),用来产生数据流,这是典型的命令模式。

public class Observable {    final OnSubscribe onSubscribe;    private Observable(OnSubscribe onSubscribe) {        this.onSubscribe = onSubscribe;    }    public static  Observable create(OnSubscribe onSubscribe) {        return new Observable(onSubscribe);    }    public void subscribe(Subscriber<? super T> subscriber) {        subscriber.start();        onSubscribe.call(subscriber);    }    public interface OnSubscribe {        void call(Subscriber<? super T> subscriber);    }}

这样一个大致的框架就出来了

测试

        Observable.create(new Observable.OnSubscribe() {            public void call(Subscriber<? super Integer> subscriber) {                for (int i = 0; i < 10; i++) {                    subscriber.onNext(i);                }                subscriber.onCompleted();            }        }).subscribe(new Subscriber() {                public void onCompleted() {                  System.out.println("complete");                }                public void onError(Throwable r) {                }                public void onNext(String string) {                  System.out.println(Thread.currentThread().getName());                  System.out.println(string);                }              });

下面实现map,起始map是就是对结果再包装一层Observe.

实现结果测试

Observable.create(new Observable.OnSubscribe() {      public void call(Subscriber<? super Integer> subscriber) {        for (int i = 0; i < 10; i++) {          subscriber.onNext(i);        }        subscriber.onCompleted();      }    })              .map(new Observable.Transformer() {                public String call(Integer from) {                  System.out.println("subsc1@ " + Thread.currentThread().getName());                  return "maping " + from;                }              })              .map(new Observable.Transformer() {                public String call(String from) {                  System.out.println("subsc2@ " + Thread.currentThread().getName());                  return "maping2 " + from;                }              })              .subscribe(new Subscriber() {                public void onCompleted() {                  System.out.println("complete");                }                public void onError(Throwable r) {                }                public void onNext(String string) {                  System.out.println(Thread.currentThread().getName());                  System.out.println(string);                }              });

至于线程切换,就是在指定的线程调用call 函数、或调用subscriber里的onNext()等函数

小结一下,文章到这里主要简单说明了它的原理,以及源码分析。再到一个简单的实战演练。很好的理解rxjava的使用原理,有关rxjava的学习还有许多要深入的知识点,为了更好的学习rxjava这块技术,为架构师铺路。我从大学师兄现任阿里的资深架构师手里拿到了许多Android开发的进阶资料,特此分享一下。

【私信:“手册”获取】《Android高级开发学习》

【私信:“手册” 获取】Android核心技术进阶手册

文末

总体来说RxJava主要作用帮你优雅的处理异步逻辑。RxJava是处理异步逻辑的利器,以往我们处理异步时,需要创建一个线程,传入callback或者listener,线程处理完任务后通过callback、listener、notify或者发送广播去通知UI线程和其他线程。使用RxJava可以在一个方法体内完成这所有逻辑。

发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章