乡下人产国偷v产偷v自拍,国产午夜片在线观看,婷婷成人亚洲综合国产麻豆,久久综合给合久久狠狠狠9

  • <output id="e9wm2"></output>
    <s id="e9wm2"><nobr id="e9wm2"><ins id="e9wm2"></ins></nobr></s>

    • 分享

      RxJava observeOn()與subscribeOn()的關(guān)系

       檸檬冰啡咖 2018-01-18



      observeOn和subscribeOn都是對observable的一種操作,區(qū)別就是subscribeOn改變了observable本身產(chǎn)生事件的schedule以及發(fā)出事件后相關(guān)處理事件的程序所在的scheduler,而obseveron僅僅是改變了對發(fā)出事件后相關(guān)處理事件的程序所在的scheduler。

      或許你會問,這有多大的區(qū)別嗎?的確是有的,比如說產(chǎn)生observable事件是一件費(fèi)時可能會卡主線程的操作(比如說獲取網(wǎng)絡(luò)數(shù)據(jù)),那么subscribeOn就是你的選擇,這樣可以避免卡住主線程。

      兩者最主要的差別是影響的范圍不同,observeOn is more limited,但是卻是可以多次調(diào)用,多次改變不同的接受者所在的scheduler,在調(diào)用這個函數(shù)之后的observable造成影響。而subscribeOn則是一次性的,無論在什么地方調(diào)用,總是從改變最原始的observable開始影響整個observable的處理。

      subscribeOn()和observeOn()的區(qū)別

      • subscribeOn()主要改變的是訂閱的線程,即call()執(zhí)行的線程;
      • observeOn()主要改變的是發(fā)送的線程,即onNext()執(zhí)行的線程。

      subscribeOn

      我們先看一個例子。

             Observable
                      .create(new Observable.OnSubscribe<String>() {
                          @Override
                          public void call(Subscriber<? super String> subscriber) {
                              subscriber.onNext("a");
                              subscriber.onNext("b");
      
                              subscriber.onCompleted();
                          }
                      })
                      .subscribeOn(Schedulers.io())
                      .subscribe(new Observer<String>() {
                          @Override
                          public void onCompleted() {
      
                          }
      
                          @Override
                          public void onError(Throwable e) {
      
                          }
      
                          @Override
                          public void onNext(String integer) {
                              System.out.println(integer);
                          }
                      });
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27

      運(yùn)行如下:

      a
      b
      • 1
      • 2

      我們看一下subscribeOn()中,都干了什么

      public final Observable<T> subscribeOn(Scheduler scheduler) {
              if (this instanceof ScalarSynchronousObservable) {
                  return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
              }
              return create(new OperatorSubscribeOn<T>(this, scheduler));
          }
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6

      很明顯,會走if之外的方法。

      在這里我們可以看到,我們又創(chuàng)建了一個Observable對象,但創(chuàng)建時傳入的參數(shù)為OperatorSubscribeOn(this,scheduler),我們看一下此對象以及其對應(yīng)的構(gòu)造方法

      OperatorSubscribeOn代碼:

      public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {
      
          final Scheduler scheduler;
          final Observable<T> source;
      
          public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
              this.scheduler = scheduler;
              this.source = source;
          }
      
          @Override
          public void call(final Subscriber<? super T> subscriber) {
              final Worker inner = scheduler.createWorker();
              subscriber.add(inner);
      
              inner.schedule(new Action0() {
                  @Override
                  public void call() {
                      final Thread t = Thread.currentThread();
      
                      Subscriber<T> s = new Subscriber<T>(subscriber) {
                          @Override
                          public void onNext(T t) {
                              subscriber.onNext(t);
                          }
      
                          @Override
                          public void onError(Throwable e) {
                              try {
                                  subscriber.onError(e);
                              } finally {
                                  inner.unsubscribe();
                              }
                          }
      
                          @Override
                          public void onCompleted() {
                              try {
                                  subscriber.onCompleted();
                              } finally {
                                  inner.unsubscribe();
                              }
                          }
      
                          @Override
                          public void setProducer(final Producer p) {
                              subscriber.setProducer(new Producer() {
                                  @Override
                                  public void request(final long n) {
                                      if (t == Thread.currentThread()) {
                                          p.request(n);
                                      } else {
                                          inner.schedule(new Action0() {
                                              @Override
                                              public void call() {
                                                  p.request(n);
                                              }
                                          });
                                      }
                                  }
                              });
                          }
                      };
      
                      source.unsafeSubscribe(s);
                  }
              });
          }
      }
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69

      可以看到,OperatorSubscribeOn實(shí)現(xiàn)Onsubscribe,并且由其構(gòu)造方法可知,他保存了上一個Observable對象,并保存了Scheduler對象。

      這里做個總結(jié)。

      把Observable.create()創(chuàng)建的稱之為Observable_1,OnSubscribe_1。把subscribeOn()創(chuàng)建的稱之為Observable_2,OnSubscribe_2(= OperatorSubscribeOn)。

      那么,前兩步就是創(chuàng)建了兩個的observable,和OnSubscribe,并且OnSubscribe_2中保存了Observable_1的應(yīng)用,即source。

      調(diào)用Observable_2.subscribe()方法會調(diào)用OnSubscibe_2的call方法,即OperatorSubscribeOn的call()。

      下面分析下call()方法。

      • inner.schedule()改變了線程,此時Action的call()在我們指定的線程中運(yùn)行。
      • Subscriber被包裝了一層。
      • source.unsafeSubscribe(s);,注意source是Observable_1對象。

      unsafeSubscribe方法代碼:

      public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
              try {
                  // new Subscriber so onStart it
                  subscriber.onStart();
                  // allow the hook to intercept and/or decorate
                  hook.onSubscribeStart(this, onSubscribe).call(subscriber);
                  return hook.onSubscribeReturn(subscriber);
              } catch (Throwable e) {
                  // special handling for certain Throwable/Error/Exception types
                  Exceptions.throwIfFatal(e);
                  // if an unhandled error occurs executing the onSubscribe we will propagate it
                  try {
                      subscriber.onError(hook.onSubscribeError(e));
                  } catch (Throwable e2) {
                      Exceptions.throwIfFatal(e2);
                      // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                      // so we are unable to propagate the error correctly and will just throw
                      RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                      // TODO could the hook be the cause of the error in the on error handling.
                      hook.onSubscribeError(r);
                      // TODO why aren't we throwing the hook's return value.
                      throw r;
                  }
                  return Subscriptions.unsubscribed();
              }
          }
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26

      代碼很多,關(guān)鍵代碼:

      hook.onSubscribeStart(this, onSubscribe).call(subscriber);

      該方法即調(diào)用了OnSubscribe_1.call()方法。注意,此時的call()方法在我們指定的線程中運(yùn)行。那么就起到了改變線程的作用。

      對于以上線程,我們可以總結(jié),其有如下流程:

      • Observable.create() : 創(chuàng)建了Observable_1和OnSubscribe_1;
      • subscribeOn(): 創(chuàng)建Observable_2和OperatorSubscribeOn(OnSubscribe_2),同時OperatorSubscribeOn保存了Observable_1的引用。
      • observable_2.subscribe(Observer):
        • 調(diào)用OperatorSubscribeOn的call()。call()改變了線程的運(yùn)行,并且調(diào)用了Observable_1.unsafeSubscribe(s);
        • Observable_1.unsafeSubscribe(s);,該方法的實(shí)現(xiàn)中調(diào)用了OnSubscribe_1的call()。
          從這個可以了解,無論我們的subscribeOn()放在哪里,他改變的是subscribe()的過程,而不是onNext()的過程。

      那么如果有多個subscribeOn(),那么線程會怎樣執(zhí)行呢。如果按照我們的邏輯,有以下程序

      Observable.just("ss") 
                      .subscribeOn(Schedulers.io())   // ----1---
                      .subscribeOn(Schedulers.newThread()) //----2----
                      .subscribe(new Action1<String>() {
                          @Override
                          public void call(String s) {
      
                          }
                      });
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9

      那么,我們根據(jù)之前的源碼分析其執(zhí)行邏輯。

      • Observable.just(“ss”),創(chuàng)建Observable_1,OnSubscribe_1
      • Observable_1.subscribeOn(Schedulers.io()):創(chuàng)建observable_2,OperatorSubscribeOn_2并保存Observable_1的引用。
      • observable_2.subscribeOn(Schedulers.newThread()):創(chuàng)建Observable_3,OperatorSubscribeOn_3并保存Observable_2的引用。
      • Observable_3.subscribe():
        • 調(diào)用OperatorSubscribeOn_3.call(),改變線程為Schedulers.newThread()。
        • 調(diào)用OperatorSubscribeOn_2.call(),改變線程為Schedulers.io()。
        • 調(diào)用OnSubscribe_1.call(),此時call()運(yùn)行在Schedulers.io()。
          根據(jù)以上邏輯分析,會按照1的線程進(jìn)行執(zhí)行。

      subscribeOn如何工作,關(guān)鍵代碼其實(shí)就是一行代碼:

      source.unsafeSubscribe(s);
      • 1

      注意它所在的位置,是在worker的call里面,說白了,就是把source.subscribe這一行調(diào)用放在指定的線程里,那么總結(jié)起來的結(jié)論就是:

      subscribeOn的調(diào)用,改變了調(diào)用前序列所運(yùn)行的線程。


      observeOn

      看一下observeOn()源碼:

      public final Observable<T> observeOn(Scheduler scheduler) {
              return observeOn(scheduler, RxRingBuffer.SIZE);
          }
      
      public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
              return observeOn(scheduler, false, bufferSize);
          }
      
      public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
              if (this instanceof ScalarSynchronousObservable) {
                  return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
              }
              return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
          }
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14

      這里引出了新的操作符lift

      public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
              return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
          }
      • 1
      • 2
      • 3

      這里不再介紹了,詳見:http://blog.csdn.net/jdsjlzx/article/details/51686152

      在lift()中,有如下關(guān)鍵代碼:

      Subscriber<? super T> st = hook.onLift(operator).call(o);
      • 1

      OperatorObserveOn.call()核心代碼:

       @Override
          public Subscriber<? super T> call(Subscriber<? super T> child) {
              if (scheduler instanceof ImmediateScheduler) {
                  // avoid overhead, execute directly
                  return child;
              } else if (scheduler instanceof TrampolineScheduler) {
                  // avoid overhead, execute directly
                  return child;
              } else {
                  ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
                  parent.init();
                  return parent;
              }
          }
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14

      我們看到其返回了ObserveOnSubscriber< T>,注意:此時只調(diào)用了call()方法,但call()方法中并沒有改變線程的操作,此時為subscribe()過程。

      我們直奔重點(diǎn),因?yàn)椋覀兞私獾狡涓淖兊氖莖nNext()過程,那么我們肯定要看一下ObserveOnSubscriber.onNext()找找在哪改變線程

      @Override
      public void onNext(final T t) {
                  if (isUnsubscribed() || finished) {
                      return;
                  }
                  if (!queue.offer(on.next(t))) {
                      onError(new MissingBackpressureException());
                      return;
                  }
                  schedule();
              }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12

      這里做了兩件事,首先把結(jié)果緩存到一個隊(duì)列里,然后調(diào)用schedule啟動傳入的worker

      我們這里需要注意下:

      在調(diào)用observeOn前的序列,把結(jié)果傳入到onNext就是它的工作,它并不關(guān)心后續(xù)的流程,所以工作就到這里就結(jié)束了,剩下的交給ObserveOnSubscriber繼續(xù)。

      onNext方法最后調(diào)用了schedule(),從方法名可以看到,其肯定是改變線程用的,并且該方法經(jīng)過一番循環(huán)之后,調(diào)用了該類的call()方法。

      protected void schedule() {
                  if (counter.getAndIncrement() == 0) {
                      recursiveScheduler.schedule(this);
                  }
              }
      • 1
      • 2
      • 3
      • 4
      • 5

      recursiveScheduler 就是之前我們傳入的Scheduler,我們一般會在observeOn傳入AndroidScheluders.mainThread()。

      scheduler中調(diào)用的call()方法

              // only execute this from schedule()
              @Override
              public void call() {
                  long missed = 1L;
                  long currentEmission = emitted;
      
                  // these are accessed in a tight loop around atomics so
                  // loading them into local variables avoids the mandatory re-reading
                  // of the constant fields
                  final Queue<Object> q = this.queue;
                  final Subscriber<? super T> localChild = this.child;
                  final NotificationLite<T> localOn = this.on;
      
                  // requested and counter are not included to avoid JIT issues with register spilling
                  // and their access is is amortized because they are part of the outer loop which runs
                  // less frequently (usually after each bufferSize elements)
      
                  for (;;) {
                      long requestAmount = requested.get();
      
                      while (requestAmount != currentEmission) {
                          boolean done = finished;
                          Object v = q.poll();
                          boolean empty = v == null;
      
                          if (checkTerminated(done, empty, localChild, q)) {
                              return;
                          }
      
                          if (empty) {
                              break;
                          }
      
                          localChild.onNext(localOn.getValue(v));
      
                          currentEmission++;
                          if (currentEmission == limit) {
                              requestAmount = BackpressureUtils.produced(requested, currentEmission);
                              request(currentEmission);
                              currentEmission = 0L;
                          }
                      }
      
                      if (requestAmount == currentEmission) {
                          if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
                              return;
                          }
                      }
      
                      emitted = currentEmission;
                      missed = counter.addAndGet(-missed);
                      if (missed == 0L) {
                          break;
                      }
                  }
              }
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56

      call()中有l(wèi)ocalChild.onNext(localOn.getValue(v));調(diào)用。

      在Scheduler啟動后, 我們在Observable.subscribe(a)傳入的a就是這里的child, 我們看到,在call中終于調(diào)用了它的onNext方法,把真正的結(jié)果傳了出去,但是在這里,我們是工作在observeOn的線程上的。

      總結(jié)起來的結(jié)論就是:

      1. observeOn 對調(diào)用之前的序列默不關(guān)心,也不會要求之前的序列運(yùn)行在指定的線程上
      2. observeOn 對之前的序列產(chǎn)生的結(jié)果先緩存起來,然后再在指定的線程上,推送給最終的subscriber

      observeOn改變的是onNext()調(diào)用

      subcribeOn和observeOn 對比分析

      Observable
      .map                    // 操作1
      .flatMap                // 操作2
      .subscribeOn(io)
      .map                    //操作3
      .flatMap                //操作4
      .observeOn(main)
      .map                    //操作5
      .flatMap                //操作6
      .subscribeOn(io)        //!!特別注意
      .subscribe(handleData)
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11

      有如上邏輯,則我們對其運(yùn)行進(jìn)行分析。

      首先,我們需要先明白其內(nèi)部執(zhí)行的邏輯。

      在調(diào)用subscribe之后,邏輯開始運(yùn)行。分別調(diào)用每一步OnSubscribe.call(),注意:自下往上。當(dāng)運(yùn)行到最上,即Observable.create()后,我們在其中調(diào)用了subscriber.onNext(),于是程序開始自上往下執(zhí)行每一個對象的subscriber.onNext()方法。最終,直到subscribe()中的回調(diào)。

      其次,從上面對subscribeOn()和observeOn()的分析中可以明白,subscribeOn()是在call()方法中起作用,而observeOn()是在onNext()中作用。

      那么對于以上的邏輯,我們可以得出如下結(jié)論:

      • 操作1,2,3,4在io線程中,因?yàn)樵谌绻麤]有observeOn()影響,他們的回調(diào)操作默認(rèn)在訂閱的線程中。而我們的訂閱線程在subscribeOn(io)發(fā)生了改變。注意他們執(zhí)行的先后順序。
      • 操作5,6在main線程中運(yùn)行。因?yàn)閛bserveOn()改變了onNext().
      • 特別注意那一個邏輯沒起到作用

      再簡單點(diǎn)總結(jié)就是

      1. subscribeOn的調(diào)用切換之前的線程。
      2. observeOn的調(diào)用切換之后的線程。
      3. observeOn之后,不可再調(diào)用subscribeOn 切換線程

      復(fù)雜情況

      我們經(jīng)常多次使用subscribeOn切換線程,那么以后是否可以組合observeOn和subscribeOn達(dá)到自由切換的目的呢?

      組合是可以的,但是他們的執(zhí)行順序是有條件的,如果仔細(xì)分析的話,可以知道observeOn調(diào)用之后,再調(diào)用subscribeOn是無效的,原因是什么?

      因?yàn)閟ubscribeOn改變的是subscribe這句調(diào)用所在的線程,大多數(shù)情況,產(chǎn)生內(nèi)容和消費(fèi)內(nèi)容是在同一線程的,所以改變了產(chǎn)生內(nèi)容所在的線程,就改變了消費(fèi)內(nèi)容所在的線程。

      經(jīng)過上面的闡述,我們知道,observeOn的工作原理是把消費(fèi)結(jié)果先緩存,再切換到新線程上讓原始消費(fèi)者消費(fèi),它和生產(chǎn)者是沒有一點(diǎn)關(guān)系的,就算subscribeOn調(diào)用了,也只是改變observeOn這個消費(fèi)者所在的線程,和OperatorObserveOn中存儲的原始消費(fèi)者一點(diǎn)關(guān)系都沒有,它還是由observeOn控制。

      @扔物線 大神給的總結(jié):

      1. 下面提到的“操作”包括產(chǎn)生事件、用操作符操作事件以及最終的通過 subscriber 消費(fèi)事件;
      2. 只有第一subscribeOn() 起作用(所以多個 subscribeOn() 無意義);
      3. 這個 subscribeOn() 控制從流程開始的第一個操作,直到遇到第一個 observeOn();
      4. observeOn() 可以使用多次,每個 observeOn() 將導(dǎo)致一次線程切換(),這次切換開始于這次 observeOn() 的下一個操作;
      5. 不論是 subscribeOn() 還是 observeOn(),每次線程切換如果不受到下一個 observeOn() 的干預(yù),線程將不再改變,不會自動切換到其他線程。

      參考文章:
      https://segmentfault.com/a/1190000004856071
      http://blog.csdn.net/lisdye2/article/details/51113837


        本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點(diǎn)擊一鍵舉報(bào)。
        轉(zhuǎn)藏 分享 獻(xiàn)花(0

        0條評論

        發(fā)表

        請遵守用戶 評論公約

        類似文章 更多