对Rxjava1.0的map方法的源码分析

论坛 期权论坛 编程之家     
选择匿名的用户   2021-6-2 17:15   1100   0

RxJava map转换类型方法分析:

楼主没做整理,改日再更

package com.yue.test;

import java.util.LinkedList;
import java.util.List;

import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.exceptions.Exceptions;
import rx.exceptions.OnErrorFailedException;
import rx.exceptions.OnErrorThrowable;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.internal.operators.OnSubscribeMap;
import rx.internal.operators.OnSubscribeMap.MapSubscriber;
import rx.plugins.RxJavaHooks;
import rx.plugins.RxJavaPlugins;
import rx.subscriptions.Subscriptions;

/**
 * RxJava原理map 一对一的转换
 * 
 * @ClassName: ReTest2
 * @Description: TODO
 * @author shimingyue
 * @date 2016-7-14 上午8:45:48
 * 
 */
public class ReTest2 {

 public static void main(String[] args) {
  // 使用map转换
  /**
   * 创建一个观察者String
   */
  Observer<String> observer = new Observer<String>() {

   // 被观察者将会回调这些方法以达到通知观察者的效果
   @Override
   public void onNext(String arg0) {
    System.out.println(arg0);
   }

   @Override
   public void onError(Throwable arg0) {

   }

   @Override
   public void onCompleted() {

   }
  };
  /**
   * 创建一个被观察者Integer
   */
  Observable<Integer> observable = Observable
    .create(new OnSubscribe<Integer>() {

     @Override
     public void call(Subscriber<? super Integer> subscriber) {
      subscriber.onNext(651);
      System.out.println("转换前:" + 651);
     }
    });

  /**
   * map的实际应用解释,对于map 不用从源码进行分析,它很简单 就是源被观察者调用了map后,会创建一个新的被观察者,而这个新的被观察者,
   * 而这个新的被观察者含有一个特殊的回调实例OnSubscribeMap[它的基类是OnSubscribe]
   * 而这个OnSubscribe又含有一个源被观察者和我们传入的FuncX实例,所以我们注册订阅这个
   * 被观察者的时候,实际调用的是OnSubscribeMap里面的call方法,在看call的源码
   * 
   * 订阅的源码 public Subscription subscribe(Subscriber subscriber) {
   * subscriber.onStart(); 
   * onSubscribe.call(subscriber); 
   * return
   * subscriber; } 可以看到调用了被观察者的onSubscribe,这个东西在创建被观察者的时候已经被赋值啦
   * 
   * 可以看Observable的构造方法 , protected Observable(OnSubscribe<T> f) {
   * this.onSubscribe = f; }
   * 
   * 
   * 
   * 源码分析
   * 先注意一下:OnSubscribeMap里面的T是源被观察者的泛型类型
   * 
   * 使用map转换后返回的Observable实例也不例外 因为他也调用了这里、、 
   * Observable源码    :RxJavaHooks.onCreate(f)返回一个OnSubscribe,f使我们的OnSubscribe,
   * RxJavaHooks只不过进行了异常的判断,而使用map后,这个f就是OnSubscribeMap的实例了,
   * public static
   * <T> Observable<T> create(OnSubscribe<T> f) { return new
   * Observable<T>(RxJavaHooks.onCreate(f)); }
   * 
   * 所以在订阅的时候执行的是啥呢,是OnSubscribeMap的call方法,下面是OnSubscribeMap的部分源码
   * 
   * OnSubscribeMap源码
   * 
   * @Override public void call(final Subscriber<? super R> o) {
   *           MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o,
   *           transformer); o.add(parent);
   *           source.unsafeSubscribe(parent); }
   * 
   *           可以看出创建了一个MapSubscriber实例[基类是Subscriber],but,这尼玛,跟以前不一样啊,
   *           那个unsafeSubscribe不用管是解除新观察者的订阅的,主要看add方法,
   *            public final void add(Subscription s) {
                       subscriptions.add(s);
         }
   *    终极add方法
   *       public void add(final Subscription s) {
            if (s.isUnsubscribed()) {
               return;
            }
            if (!unsubscribed) {
               synchronized (this) {
                  if (!unsubscribed) {
                     List<Subscription> subs = subscriptions;
                     if (subs == null) {
                        subs = new LinkedList<Subscription>();
                        subscriptions = subs;
                     }
                     subs.add(s);
                     return;
                  }
              }
           }
           // call after leaving the synchronized block so we're not holding a lock while executing this
           s.unsubscribe();
        }
   *    subs.add(s); 到最后尽然发现他只是被添加进了subs 观察者列表中,懵逼了吧,call方法还有下一句
   *    source.unsafeSubscribe(parent); 
   *    source是我们的源观察者,parent是我们新声明的观察者MapSubscriber
   * 
   *    unsafeSubscribe的源码,可以看到,啥都执行啦,但就是没有执行onNext ,而关键的通知和转换都在OnSubscribeMap
   *    的onNext里,真J B蛋疼,所不分析啦,反正最后会执行到MapSubscriber的onNext方法
   *   public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
        try {
            // new Subscriber so onStart it
            subscriber.onStart();
            // allow the hook to intercept and/or decorate
            RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
            return RxJavaHooks.onObservableReturn(subscriber);
        } catch (Throwable e) {
            // special handling for certain Throwable/Error/Exception types
            Exceptions.throwIfFatal(e);
            // if an unhandled error occurs executing the onSubscribe we will propagate it
            try {
                subscriber.onError(RxJavaHooks.onObservableError(e));
            } catch (Throwable e2) {
                Exceptions.throwIfFatal(e2);
                // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                // so we are unable to propagate the error correctly and will just throw
                RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                // TODO could the hook be the cause of the error in the on error handling.
                RxJavaHooks.onObservableError(r);
                // TODO why aren't we throwing the hook's return value.
                throw r; // NOPMD 
            }
            return Subscriptions.unsubscribed();
        }
    }
   * 我们看上面源码的RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);这一句,终于到了关键
   * 上面的this 和onSubscribe都是源观察者的可以看这一句source.unsafeSubscribe(parent); source是原来被保存的源观察者
   *  @SuppressWarnings({ "rawtypes", "unchecked" })
    public static <T> OnSubscribe<T> onObservableStart(Observable<T> instance, OnSubscribe<T> onSubscribe) {
        Func2<Observable, OnSubscribe, OnSubscribe> f = onObservableStart;
        if (f != null) {
            return f.call(instance, onSubscribe);
        }
        return onSubscribe;
    }
    看onObservableStart的这个实例 又出来RxJavaPlugins
            onObservableStart = new Func2<Observable, OnSubscribe, OnSubscribe>() {
            @Override
            public OnSubscribe call(Observable t1, OnSubscribe t2) {
                return RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeStart(t1, t2);
            }
        };
        
        继续看onSubscribeStart方法 可以看出经过一系列的处理判断异常抛出异常,安全检查,最后使用onSubscribeStart方法将我们的源
        被观察者的回调实例OnSubscribe返回啦,那我们就回头看吧
         @Deprecated
    public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {
        // pass through by default
        return onSubscribe;
    }
      重新回头看这一句代码 RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
      执行了源被观察者的call方法,而onSubscribe是个继承与Action1.这并不是很重要
      关键是它绕了一圈执行了源被观察者的onSubscribe实例的call方法,并传入了一个MapSubscriber对象
      这个对象使我们上面分析过的,一个持有源观察者和FuncX的一个新的观察者,我们再回头看我们自己写的源被观察者
      里面的那个subscriber观察者就是MapSubscriber这个东西,它调用这个东西的onNext方法,
      
      
      Observable<Integer> observable = Observable
    .create(new OnSubscribe<Integer>() {

     @Override
     public void call(Subscriber<? super Integer> subscriber) {
     //这儿这个651的整型参数将会被送到MapSubscriber的onNext里去转换类型,具体大家看源码
      subscriber.onNext(651);
      System.out.println("转换前:" + 651);
     }
    });
   * 
   * 
   *           里面的call方法在调用 观察者的回调方法onNext进行通知
   *           OnSubscribeMap部分源码如下,看result
   *           (转换后的结果),actual(观察者),mapper(FuncX)关键字
   * 
   * 
   * 
   * 总结:RxJava的其他方法 殊途同归,可能会加一些功能或改变
   * 
   * 现在总结一下具体的流程:
   * 1.观察者、被观察者和转换都执行完毕
   * 2.观察者订阅被观察者,这个被观察者是被转换后的新观察者
   * 3.在订阅方法里会执行onSubscribe.call(subscriber);
   * 4.而onSubscribe实际上是新的被观察者的onSubscribe实例,也就是OnSubscribeMap
   * 5.然后就可以跟着上面的源码一步步来啦
   * 
   * 订阅的源码不分析啦,稍后会在博客上分析,这个很简单,但一切的动作的执行都是从subscribe开始的,转换动作也是
   * 
   * 
   * 
   */
  Observable<String> observable2 = observable
    .map(new Func1<Integer, String>() {

     @Override
     public String call(Integer arg0) {
      System.out.println("获取源被观察的值:" + arg0);
      return "转换后:" + 652;
     }
    });
  observable2.subscribe(observer);
 }
} 


总结:

  • 比较起flatmap转换的复杂度,map是比较简单的
  1. 首先,我们创建观察者和被观察者,并且他们的泛型参数类型是不同的,因此我们需要把我们定义的源被观察者转换成和我们定义的源观察者一样的泛型参数类型,
  2. 源被观察者调用map方法,并传入我们自己定义的一个Funcx
  3. map方法内部会利用我们的源被观察者和Funcx创建一个OnSubscribeMap【它继承于OnSubscribe】持有源被观察者和Funcx实例
  4. 然后又利用这个OnSubscribeMap监听实例创建出一个新的二手观察者,它持有OnSubscribeMap
  5. 创建和转换完毕后,观察者订阅这个新的被观察者,observable2.subscribe(observer)
  6. 订阅方法subscribe内部执行OnSubscribe.call,此时的OnSubscribe实例是新的被观察者的实例,也就是OnSubscribeMap,并向call方法传入了我们自己定义的源观察者
  7. OnSubscribeMap的call方法里会创建出一个新的二手观察者,MapSubscriber,它持有Funcx和源观察者
  8. 除了创建新的观察者还执行了 source.unsafeSubscribe(parent);方法 source:源被观察者,创建OnSubscribeMap传入,parent:刚才声明的MapSubscriber实例
  9. unsafeSubscribe方法内,RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);主要是这一句代码
    先看前半段RxJavaHooks.onObservableStart(this, onSubscribe).传入了一个源被观察者this,还有一个源被观察者的onSubscribe监听实例
    它其实就做了一下异常判断,没有特殊情况就返回源被观察者的onSubscribe实例,然后调用call方法,call方法内的参数subscriber
    我们刚才传入的MapSubscriber实例parent
  10. 好了,终于调用到源被观察者的call方法了,还传入MapSubscriber实例parent,
  11. 所以我们源被观察者的call方法里的subscriber是MapSubscriber实例,调用的也是它的onNext方法,
  12. MapSubscriberMapSubscriber的onNext方法内 result = mapper.call(t);mappermapper,我们创建OnSubscriberMap时传入的Funcx,t:onNext传入的参数
  13. 可以看到它对参数t使用funcx的call方法进行了转换,funcx是我们自己写的,返回什么由我们决定
  14. 转换后 actual.onNext(result);又执行了这一句actual:源观察者,ok结束

大总结

  • 当我们发现我们的观察者和被观察者的泛型参数不同是,我们就要调用被观察者的map方法对参数进行类型转换,特殊情况还要使用flatMap方法
  • 订阅新的被观察者后,代码转一圈会执行源被观察者的call回调,
  • 里面调用一个新的持有源被观察者和funcx的MapSubscriber观察者实例,调用oinNext方法并传入源被观察者的泛型参数
  • 在这个onNext方法里对传入参数使用funcx的call方法转换,可以看出执行了几次源被观察者的call方法里的MapSubscriber的onNext,就转换了几次
  • 然后再调用源观察者的onNext方法,并传入转换后的result






















分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:3875789
帖子:775174
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP