乡下人产国偷v产偷v自拍,国产午夜片在线观看,婷婷成人亚洲综合国产麻豆,久久综合给合久久狠狠狠9

  • <output id="e9wm2"></output>
    <s id="e9wm2"><nobr id="e9wm2"><ins id="e9wm2"></ins></nobr></s>

    • 分享

      Java并發(fā)異步編程,原來十個接口的活現(xiàn)在只需要一個接口就搞定!

       西北望msm66g9f 2019-08-20

      技術(shù)文章第一時間送達!

      作者:錦成同學(xué)

      juejin.im/post/5d3c46d2f265da1b9163dbce

      什么?對你沒有聽錯,也沒有看錯 ..多線程并發(fā)執(zhí)行任務(wù),取結(jié)果歸集~~ 不再憂愁….

      引言

      先來看一些APP的獲取數(shù)據(jù),諸如此類,一個頁面獲取N多個,多達10個左右的一個用戶行為數(shù)據(jù),比如:點贊數(shù),發(fā)布文章數(shù),點贊數(shù),消息數(shù),關(guān)注數(shù),收藏數(shù),粉絲數(shù),卡券數(shù),紅包數(shù)……….. 真的是多~ 我們看些圖:

      平時要10+接口的去獲取數(shù)據(jù)(因為當(dāng)你10+個查詢寫一起,那估計到半分鐘才能響應(yīng)了),一個頁面上N多接口,真是累死前端的寶寶了,前端開啟多線程也累啊,我們做后端的要體量一下前端的寶寶們,畢竟有句話叫'程序員何苦為難程序員~'

      今天我們也可以一個接口將這些數(shù)據(jù)返回~ 還賊TM快,解決串行編程,阻塞編程帶來的苦惱~

      多線程并發(fā)執(zhí)行任務(wù),取結(jié)果歸集

      今天豬腳就是:Future、FutureTask、ExecutorService…

      • 用上FutureTask任務(wù)獲取結(jié)果老少皆宜,就是CPU有消耗。FutureTask也可以做閉鎖(實現(xiàn)了Future的語義,表示一種抽象的可計算的結(jié)果)。通過把Callable(相當(dāng)于一個可生成結(jié)果的Runnable)作為一個屬性,進而把它自己作為一個執(zhí)行器去繼承Runnable,FutureTask 實際上就是一個支持取消行為的異步任務(wù)執(zhí)行器。

      • Callable就是一個回調(diào)接口,可以泛型聲明返回類型,而Runnable是線程去執(zhí)行的方法.這個很簡單~大家想深入了解就進去看源碼好了~?因為真的很簡單~

      • FutureTask實現(xiàn)了Future,提供了start, cancel, query等功能,并且實現(xiàn)了Runnable接口,可以提交給線程執(zhí)行。

      • Java并發(fā)工具類的三板斧 狀態(tài),隊列,CAS

      狀態(tài)

       /**
           * The run state of this task, initially NEW.  The run state
           * transitions to a terminal state only in methods set,
           * setException, and cancel.  During completion, state may take on
           * transient values of COMPLETING (while outcome is being set) or
           * INTERRUPTING (only while interrupting the runner to satisfy a
           * cancel(true)). Transitions from these intermediate to final
           * states use cheaper ordered/lazy writes because values are unique
           * and cannot be further modified.
           *
           * Possible state transitions:        //可能發(fā)生的狀態(tài)過度過程
           * NEW -> COMPLETING -> NORMAL        // 創(chuàng)建-->完成-->正常
           * NEW -> COMPLETING -> EXCEPTIONAL   // 創(chuàng)建-->完成-->異常
           * NEW -> CANCELLED                   // 創(chuàng)建-->取消
           * NEW -> INTERRUPTING -> INTERRUPTED // 創(chuàng)建-->中斷中-->中斷結(jié)束
           */


          private volatile int state;                  // 執(zhí)行器狀態(tài)

          private static final int NEW = 0;            // 初始值        由構(gòu)造函數(shù)保證 
          private static final int COMPLETING = 1;     // 完成進行時    正在設(shè)置任務(wù)結(jié)果
          private static final int NORMAL = 2;         // 正常結(jié)束      任務(wù)正常執(zhí)行完畢
          private static final int EXCEPTIONAL = 3;    // 發(fā)生異常      任務(wù)執(zhí)行過程中發(fā)生異常
          private static final int CANCELLED = 4;      // 已經(jīng)取消      任務(wù)已經(jīng)取消
          private static final int INTERRUPTING = 5;   // 中斷進行時    正在中斷運行任務(wù)的線程
          private static final int INTERRUPTED = 6;    // 中斷結(jié)束      任務(wù)被中斷

          /** The underlying callable; nulled out after running */
          private Callable<V> callable;
          /** The result to return or exception to throw from get() */
          private Object outcome; // non-volatile, protected by state reads/writes
          /** The thread running the callable; CASed during run() */
          private volatile Thread runner;
          /** Treiber stack of waiting threads */
          private volatile WaitNode waiters;

      還不明白就看圖:

      public interface Future<T{
          /**
          *取消任務(wù)
          *@param mayInterruptIfRunning
          *是否允許取消正在執(zhí)行卻沒有執(zhí)行完畢的任務(wù),如果設(shè)置true,則表示可以取消正在執(zhí)行過程中的任務(wù)
          *如果任務(wù)正在執(zhí)行,則返回true
          *如果任務(wù)還沒有執(zhí)行,則無論mayInterruptIfRunning為true還是false,返回true
          *如果任務(wù)已經(jīng)完成,則無論mayInterruptIfRunning為true還是false,返回false
          */

          boolean cancel(boolean mayInterruptIfRunning);
          /**
          *任務(wù)是否被取消成功,如果在任務(wù)正常完成前被取消成功,則返回 true
          */

          boolean isCancelled();
          /**
          *任務(wù)是否完成
          */

          boolean isDone();
          /**
          *通過阻塞獲取執(zhí)行結(jié)果
          */

          get() throws InterruptedException, ExecutionException;
          /**
          *通過阻塞獲取執(zhí)行結(jié)果。如果在指定的時間內(nèi)沒有返回,則返回null
          */

          get(long timeout, TimeUnit unit)
              throws InterruptedException, ExecutionException, TimeoutException
      ;
      }

      Future

      • cancle 可以停止任務(wù)的執(zhí)行 但不一定成功 看返回值true or false

      • get 阻塞獲取callable的任務(wù)結(jié)果,即get阻塞住調(diào)用線程,直至計算完成返回結(jié)果

      • isCancelled 是否取消成功

      • isDone 是否完成

      重點說明:

      Furture.get()獲取執(zhí)行結(jié)果的值,取決于執(zhí)行的狀態(tài),如果任務(wù)完成,會立即返回結(jié)果,否則一直阻塞直到任務(wù)進入完成狀態(tài),然后返回結(jié)果或者拋出異常。

      “運行完成”表示計算的所有可能結(jié)束的狀態(tài),包含正常結(jié)束,由于取消而結(jié)束和由于異常而結(jié)束。當(dāng)進入完成狀態(tài),他會停止在這個狀態(tài)上,只要state不處于 NEW 狀態(tài),就說明任務(wù)已經(jīng)執(zhí)行完畢。

      FutureTask負責(zé)將計算結(jié)果從執(zhí)行任務(wù)的線程傳遞到調(diào)用這個線程的線程,而且確保了,傳遞過程中結(jié)果的安全發(fā)布

      UNSAFE 無鎖編程技術(shù),確保了線程的安全性~ 為了保持無鎖編程CPU的消耗,所以用狀態(tài)標記,減少空轉(zhuǎn)的時候CPU的壓力

      • 任務(wù)本尊:callable

      • 任務(wù)的執(zhí)行者:runner

      • 任務(wù)的結(jié)果:outcome

      • 獲取任務(wù)的結(jié)果:state + outcome + waiters

      • 中斷或者取消任務(wù):state + runner + waiters

      run方法

      1、檢查state,非NEW,說明已經(jīng)啟動,直接返回;否則,設(shè)置runner為當(dāng)前線程,成功則繼續(xù),否則,返回。

      2、調(diào)用Callable.call()方法執(zhí)行任務(wù),成功則調(diào)用set(result)方法,失敗則調(diào)用setException(ex)方法,最終都會設(shè)置state,并調(diào)用finishCompletion()方法,喚醒阻塞在get()方法上的線程們。

      3、如注釋所示,如果省略ran變量,并把'set(result);' 語句移動到try代碼塊'ran = true;' 語句處,會怎樣呢?首先,從代碼邏輯上看,是沒有問題的,但是,考慮到'set(result);'方法萬一拋出異常甚至是錯誤了呢?set()方法最終會調(diào)用到用戶自定義的done()方法,所以,不可省略。

      4、如果state為INTERRUPTING, 則主動讓出CPU,自旋等待別的線程執(zhí)行完中斷流程。見handlePossibleCancellationInterrupt(int s) 方法。

      public void run({
              // UNSAFE.compareAndSwapObject, CAS保證Callable任務(wù)只被執(zhí)行一次 無鎖編程
              if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
                  return;
              try {
                  Callable<V> c = callable; // 拿到執(zhí)行任務(wù)
                  if (c != null && state == NEW) { // 任務(wù)不為空,并且執(zhí)行器狀態(tài)是初始值,才會執(zhí)行;如果取消就不執(zhí)行了
                      V result;
                      boolean ran; // 記錄是否執(zhí)行成功
                      try {
                          result = c.call(); // 執(zhí)行任務(wù)
                          ran = true// 成功
                      } catch (Throwable ex) {
                          result = null// 異常,清空結(jié)果
                          ran = false// 失敗
                          setException(ex); // 記錄異常
                      }
                      if (ran) // 問題:ran變量可以省略嗎,把set(result);移到try塊里面?
                          set(result); // 設(shè)置結(jié)果
                  }
              } finally {
                  runner = null// 直到set狀態(tài)前,runner一直都是非空的,為了防止并發(fā)調(diào)用run()方法。
                  int s = state;
                  if (s >= INTERRUPTING) // 有別的線程要中斷當(dāng)前線程,把CPU讓出去,自旋等一下
                      handlePossibleCancellationInterrupt(s);
              }
          }
            private void handlePossibleCancellationInterrupt(int s{
               if (s == INTERRUPTING) // 當(dāng)state為INTERRUPTING時
                   while (state == INTERRUPTING) // 表示有線程正在中斷當(dāng)前線程
                       Thread.yield(); // 讓出CPU,自旋等待中斷
           }

      再啰嗦下: run方法重點做了以下幾件事:

      • 將runner屬性設(shè)置成當(dāng)前正在執(zhí)行run方法的線程

      • 調(diào)用callable成員變量的call方法來執(zhí)行任務(wù)

      • 設(shè)置執(zhí)行結(jié)果outcome, 如果執(zhí)行成功, 則outcome保存的就是執(zhí)行結(jié)果;如果執(zhí)行過程中發(fā)生了異常, 則outcome中保存的就是異常,設(shè)置結(jié)果之前,先將state狀態(tài)設(shè)為中間態(tài)

      • 對outcome的賦值完成后,設(shè)置state狀態(tài)為終止態(tài)(NORMAL或者EXCEPTIONAL)

      • 喚醒Treiber棧中所有等待的線程

      • 善后清理(waiters, callable,runner設(shè)為null)

      • 檢查是否有遺漏的中斷,如果有,等待中斷狀態(tài)完成。

      怎么能少了get方法呢,一直阻塞獲取參見:awaitDone

          public V get() throws InterruptedException, ExecutionException {
              int s = state; // 執(zhí)行器狀態(tài)
               if (s <= COMPLETING) // 如果狀態(tài)小于等于COMPLETING,說明任務(wù)正在執(zhí)行,需要等待
                   s = awaitDone(false0L); // 等待
               return report(s); // 報告結(jié)果
           }

      順便偷偷看下get(long, TimeUnit),就是get的方法擴展,增加了超時時間,超時后我還沒拿到就生氣拋異?!?

      public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
              if (unit == null// 參數(shù)校驗
                  throw new NullPointerException();
              int s = state; // 執(zhí)行器狀態(tài)
              if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) // 如果狀態(tài)小于等于COMPLETING,說明任務(wù)正在執(zhí)行,需要等待;等待指定時間,state依然小于等于COMPLETING
                  throw new TimeoutException(); // 拋出超時異常
              return report(s); // 報告結(jié)果
          }

      那么再看awaitDone,要知道會寫死循環(huán)while(true)|for (;;)的都是高手~

      private int awaitDone(boolean timed, long nanos) throws InterruptedException {
              final long deadline = timed ? System.nanoTime() + nanos : 0L// 計算deadline
              WaitNode q = null// 等待結(jié)點
              boolean queued = false// 是否已經(jīng)入隊
              for (;;) {
                  if (Thread.interrupted()) { // 如果當(dāng)前線程已經(jīng)標記中斷,則直接移除此結(jié)點,并拋出中斷異常
                      removeWaiter(q);
                      throw new InterruptedException();
                  }

                  int s = state; // 執(zhí)行器狀態(tài)
                  if (s > COMPLETING) { // 如果狀態(tài)大于COMPLETING,說明任務(wù)已經(jīng)完成,或者已經(jīng)取消,直接返回
                      if (q != null)
                          q.thread = null// 復(fù)位線程屬性
                      return s; // 返回
                  } else if (s == COMPLETING) // 如果狀態(tài)等于COMPLETING,說明正在整理結(jié)果,自旋等待一會兒
                      Thread.yield();
                  else if (q == null// 初始,構(gòu)建結(jié)點
                      q = new WaitNode();
                  else if (!queued) // 還沒入隊,則CAS入隊
                      queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
                  else if (timed) { // 是否允許超時
                      nanos = deadline - System.nanoTime(); // 計算等待時間
                      if (nanos <= 0L) { // 超時
                          removeWaiter(q); // 移除結(jié)點
                          return state; // 返回結(jié)果
                      }
                      LockSupport.parkNanos(this, nanos); // 線程阻塞指定時間
                  } else
                      LockSupport.park(this); // 阻塞線程
              }
          }

      至此,線程安排任務(wù)和獲取我就不啰嗦了~~~~還要很多探索的,畢竟帶薪聊天比較緊張,我就不多贅述了~

      隊列

      接著我們來看隊列,在FutureTask中,隊列的實現(xiàn)是一個單向鏈表,它表示所有等待任務(wù)執(zhí)行完畢的線程的集合。我們知道,F(xiàn)utureTask實現(xiàn)了Future接口,可以獲取“Task”的執(zhí)行結(jié)果,那么如果獲取結(jié)果時,任務(wù)還沒有執(zhí)行完畢怎么辦呢?那么獲取結(jié)果的線程就會在一個等待隊列中掛起,直到任務(wù)執(zhí)行完畢被喚醒。這一點有點類似于AQS中的sync queue,在下文的分析中,大家可以自己對照它們的異同點。

      我們前面說過,在并發(fā)編程中使用隊列通常是將當(dāng)前線程包裝成某種類型的數(shù)據(jù)結(jié)構(gòu)扔到等待隊列中,我們先來看看隊列中的每一個節(jié)點是怎么個結(jié)構(gòu):

      static final class WaitNode {
          volatile Thread thread;
          volatile WaitNode next;
          WaitNode() { thread = Thread.currentThread(); }
      }

      可見,相比于AQS的sync queue所使用的雙向鏈表中的Node,這個WaitNode要簡單多了,它只包含了一個記錄線程的thread屬性和指向下一個節(jié)點的next屬性。

      值得一提的是,F(xiàn)utureTask中的這個單向鏈表是當(dāng)做棧來使用的,確切來說是當(dāng)做Treiber棧來使用的,不了解Treiber棧是個啥的可以簡單的把它當(dāng)做是一個線程安全的棧,它使用CAS來完成入棧出棧操作(想進一步了解的話可以看這篇文章)。

      為啥要使用一個線程安全的棧呢,因為同一時刻可能有多個線程都在獲取任務(wù)的執(zhí)行結(jié)果,如果任務(wù)還在執(zhí)行過程中,則這些線程就要被包裝成WaitNode扔到Treiber棧的棧頂,即完成入棧操作,這樣就有可能出現(xiàn)多個線程同時入棧的情況,因此需要使用CAS操作保證入棧的線程安全,對于出棧的情況也是同理。

      由于FutureTask中的隊列本質(zhì)上是一個Treiber(驅(qū)動)棧,那么使用這個隊列就只需要一個指向棧頂節(jié)點的指針就行了,在FutureTask中,就是waiters屬性:

      /** Treiber stack of waiting threads */
      private volatile WaitNode waiters;

      事實上,它就是整個單向鏈表的頭節(jié)點。

      綜上,F(xiàn)utureTask中所使用的隊列的結(jié)構(gòu)如下:

      CAS操作

      CAS操作大多數(shù)是用來改變狀態(tài)的,在FutureTask中也不例外。我們一般在靜態(tài)代碼塊中初始化需要CAS操作的屬性的偏移量:

          // Unsafe mechanics
          private static final sun.misc.Unsafe UNSAFE;
          private static final long stateOffset;
          private static final long runnerOffset;
          private static final long waitersOffset;
          static {
              try {
                  UNSAFE = sun.misc.Unsafe.getUnsafe();
                  Class<?> k = FutureTask.class;
                  stateOffset = UNSAFE.objectFieldOffset(k.getDeclaredField('state'));
                  runnerOffset = UNSAFE.objectFieldOffset(k.getDeclaredField('runner'));
                  waitersOffset = UNSAFE.objectFieldOffset(k.getDeclaredField('waiters'));
              } catch (Exception e) {
                  throw new Error(e);
              }
          }

      從這個靜態(tài)代碼塊中我們也可以看出,CAS操作主要針對3個屬性,包括state、runner和waiters,說明這3個屬性基本是會被多個線程同時訪問的。其中state屬性代表了任務(wù)的狀態(tài),waiters屬性代表了指向棧頂節(jié)點的指針,這兩個我們上面已經(jīng)分析過了。

      runner屬性代表了執(zhí)行FutureTask中的“Task”的線程。為什么需要一個屬性來記錄執(zhí)行任務(wù)的線程呢?這是為了中斷或者取消任務(wù)做準備的,只有知道了執(zhí)行任務(wù)的線程是誰,我們才能去中斷它。

      定義完屬性的偏移量之后,接下來就是CAS操作本身了。在FutureTask,CAS操作最終調(diào)用的還是Unsafe類的compareAndSwapXXX方法,關(guān)于Unsafe,由于帶薪碼文這里不再贅述。

      實戰(zhàn)演練

      一切沒有例子的講解都是耍流氓 >>> 蔥姜切沫~~加入生命的源泉….

      實戰(zhàn)項目以springboot為項目腳手架,github地址:

      https://github.com/leaJone/mybot

      1.MyFutureTask實現(xiàn)類

      內(nèi)部定義一個線程池進行任務(wù)的調(diào)度和線程的管理以及線程的復(fù)用,大家可以根據(jù)自己的實際項目情況進行配置

      其中線程調(diào)度示例:核心線程 8 最大線程 20 保活時間30s 存儲隊列 10 有守護線程 拒絕策略:將超負荷任務(wù)回退到調(diào)用者

      說明 :

      默認使用核心線程(8)數(shù)執(zhí)行任務(wù),任務(wù)數(shù)量超過核心線程數(shù)就丟到隊列,隊列(10)滿了就再開啟新的線程,新的線程數(shù)最大為20,當(dāng)任務(wù)執(zhí)行完,新開啟的線程將存活30s,若沒有任務(wù)就消亡,線程池回到核心線程數(shù)量.

      import com.boot.lea.mybot.dto.UserBehaviorDataDTO;
      import com.boot.lea.mybot.service.UserService;
      import com.google.common.util.concurrent.ThreadFactoryBuilder;
      import lombok.extern.slf4j.Slf4j;
      import org.springframework.stereotype.Component;

      import javax.annotation.Resource;
      import java.util.concurrent.*;


      /**
       * @author Lijing
       * @date 2019年7月29日
       */

      @Slf4j
      @Component
      public class MyFutureTask {


          @Resource
          UserService userService;

          /**
           * 核心線程 8 最大線程 20 ?;顣r間30s 存儲隊列 10 有守護線程 拒絕策略:將超負荷任務(wù)回退到調(diào)用者
           */

          private static ExecutorService executor = new ThreadPoolExecutor(820,
                  30L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10),
                  new ThreadFactoryBuilder().setNameFormat('User_Async_FutureTask-%d').setDaemon(true).build(),
                  new ThreadPoolExecutor.CallerRunsPolicy());


          @SuppressWarnings('all')
          public UserBehaviorDataDTO getUserAggregatedResult(final Long userId) {

              System.out.println('MyFutureTask的線程:' + Thread.currentThread());


              long fansCount = 0, msgCount = 0, collectCount = 0,
                      followCount = 0, redBagCount = 0, couponCount = 0;

      //        fansCount = userService.countFansCountByUserId(userId);
      //        msgCount = userService.countMsgCountByUserId(userId);
      //        collectCount = userService.countCollectCountByUserId(userId);
      //        followCount = userService.countFollowCountByUserId(userId);
      //        redBagCount = userService.countRedBagCountByUserId(userId);
      //        couponCount = userService.countCouponCountByUserId(userId);

              try {

                  Future<Long> fansCountFT = executor.submit(() -> userService.countFansCountByUserId(userId));
                  Future<Long> msgCountFT = executor.submit(() -> userService.countMsgCountByUserId(userId));
                  Future<Long> collectCountFT = executor.submit(() -> userService.countCollectCountByUserId(userId));
                  Future<Long> followCountFT = executor.submit(() -> userService.countFollowCountByUserId(userId));
                  Future<Long> redBagCountFT = executor.submit(() -> userService.countRedBagCountByUserId(userId));
                  Future<Long> couponCountFT = executor.submit(() -> userService.countCouponCountByUserId(userId));

                  //get阻塞
                  fansCount = fansCountFT.get();
                  msgCount = msgCountFT.get();
                  collectCount = collectCountFT.get();
                  followCount = followCountFT.get();
                  redBagCount = redBagCountFT.get();
                  couponCount = couponCountFT.get();

              } catch (InterruptedException | ExecutionException e) {
                  e.printStackTrace();
                  log.error('>>>>>>聚合查詢用戶聚合信息異常:' + e + '<<<<<<<<<');
              }
              UserBehaviorDataDTO userBehaviorData =
                      UserBehaviorDataDTO.builder().fansCount(fansCount).msgCount(msgCount)
                              .collectCount(collectCount).followCount(followCount)
                              .redBagCount(redBagCount).couponCount(couponCount).build();
              return userBehaviorData;
          }


      }

      2.service業(yè)務(wù)方法

      常規(guī)業(yè)務(wù)查詢方法,為了特效,以及看出實際的效果,我們每個方法做了延時

      import com.boot.lea.mybot.mapper.UserMapper;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.stereotype.Service;

      import java.util.concurrent.TimeUnit;

      @Service
      public class UserServiceImpl implements UserService {

          @Autowired
          UserMapper userMapper;

          @Override
          public long countFansCountByUserId(Long userId) {
              try {
                  Thread.sleep(10000);
                  System.out.println('獲取FansCount===睡眠:' + 10 + 's');
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
              System.out.println('UserService獲取FansCount的線程  ' + Thread.currentThread().getName());
              return 520;
          }

          @Override
          public long countMsgCountByUserId(Long userId) {
              System.out.println('UserService獲取MsgCount的線程  ' + Thread.currentThread().getName());
              try {
                  Thread.sleep(10000);
                  System.out.println('獲取MsgCount===睡眠:' + 10 + 's');
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
              return 618;
          }

          @Override
          public long countCollectCountByUserId(Long userId) {
              System.out.println('UserService獲取CollectCount的線程  ' + Thread.currentThread().getName());
              try {
                  Thread.sleep(10000);
                  System.out.println('獲取CollectCount==睡眠:' + 10 + 's');
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
              return 6664;
          }

          @Override
          public long countFollowCountByUserId(Long userId) {
              System.out.println('UserService獲取FollowCount的線程  ' + Thread.currentThread().getName());
              try {
                  Thread.sleep(10000);
                  System.out.println('獲取FollowCount===睡眠:' + 10's');
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
              return userMapper.countFollowCountByUserId(userId);
          }

          @Override
          public long countRedBagCountByUserId(Long userId) {
              System.out.println('UserService獲取RedBagCount的線程  ' + Thread.currentThread().getName());
              try {
                  TimeUnit.SECONDS.sleep(4);
                  System.out.println('獲取RedBagCount===睡眠:' + 4 + 's');
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
              return 99;
          }

          @Override
          public long countCouponCountByUserId(Long userId) {
              System.out.println('UserService獲取CouponCount的線程  ' + Thread.currentThread().getName());
              try {
                  TimeUnit.SECONDS.sleep(8);
                  System.out.println('獲取CouponCount===睡眠:' + 8's');
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
              return 66;
          }
      }

      3.controller調(diào)用

      /**
       * @author LiJing
       * @ClassName: UserController
       * @Description: 用戶控制器
       * @date 2019/7/29 15:16
       */

      @RestController
      @RequestMapping('user/')
      public class UserController {


          @Autowired
          private UserService userService;


          @Autowired
          private MyFutureTask myFutureTask;


          @GetMapping('/index')
          @ResponseBody
          public String index() {
              return '啟動用戶模塊成功~~~~~~~~';
          }

          //http://localhost:8080/api/user/get/data?userId=4

          @GetMapping('/get/data')
          @ResponseBody
          public UserBehaviorDataDTO getUserData(Long userId) {
              System.out.println('UserController的線程:' + Thread.currentThread());
              long begin = System.currentTimeMillis();
              UserBehaviorDataDTO userAggregatedResult = myFutureTask.getUserAggregatedResult(userId);
              long end = System.currentTimeMillis();
              System.out.println('===============總耗時:' + (end - begin) /1000.0000'秒');
              return userAggregatedResult;
          }


      }

      我們啟動項目:開啟調(diào)用 http://localhost:8080/api/user/get/data?userId=4

      當(dāng)我們線程池配置為:核心線程 8 最大線程 20 ?;顣r間30s 存儲隊列 10 的時候,我們測試的結(jié)果如下:

      結(jié)果:我們看到每個server method的執(zhí)行線程都是從線程池中發(fā)起的線程名:User_Async_FutureTask-%d, 總耗時從累計的52秒縮短到10秒,即取決于最耗時的方法查詢時間.

      那我們再將注釋代碼放開,進行串行查詢進行測試:

      結(jié)果:我們使用串行的方式進行查詢,結(jié)果匯總將達到52秒,那太可怕了~~

      總結(jié)

      使用FutureTask的時候,就是將任務(wù)runner以caller的方式進行回調(diào),阻塞獲取,最后我們將結(jié)果匯總,即完成了開啟多線程異步調(diào)用我們的業(yè)務(wù)方法.

                  Future<Long> fansCountFT = executor.submit(new Callable<Long>() {
                      @Override
                      public Long call() throws Exception {
                          return userService.countFansCountByUserId(userId);
                      }
                  });

      這里使用的只是一個簡單的例子,具體項目可以定義具體的業(yè)務(wù)方法進行歸并處理,其實在JDK1.8以后,又有了ExecutorCompletionService,ForkJoinTask,CompletableFuture這些都可以實現(xiàn)上述的方法,我們后續(xù)會做一些這些方法使用的案例,期望大家的關(guān)注,文章中有不足之處,歡迎指正~

      小甜點

      所以:我們要用到親愛的Spring的異步編程,異步編程有很多種方式:比如常見的Future的sync,CompletableFuture.supplyAsync,@Async,哈哈 其實都離不開Thread.start()…,等等我說個笑話:

      老爸有倆孩子:小紅和小明。老爸想喝酒了,他讓小紅去買酒,小紅出去了。然后老爸突然想吸煙了,于是老爸讓小明去買煙。在面對對象的思想中,一般會把買東西,然后買回來這件事作為一個方法,如果按照順序結(jié)構(gòu)或者使用多線程同步的話,小明想去買煙就必須等小紅這個買東西的操作進行完。這樣無疑增加了時間的開銷(萬一老爸尿憋呢?)。異步就是為了解決這樣的問題。你可以分別給小紅小明下達指令,讓他們?nèi)ベI東西,然后你就可以自己做自己的事,等他們買回來的時候接收結(jié)果就可以了。

      package com.boot.lea.mybot.futrue;

      /**
       * @ClassName: TestFuture
       * @Description: 演示異步編程
       * @author LiJing
       * @date 2019/8/5 15:16
       */

      @SuppressWarnings('all')
      public class TestFuture {
          static ExecutorService executor = Executors.newFixedThreadPool(2);

          public static void main(String[] args) throws InterruptedException {
              //兩個線程的線程池
              //小紅買酒任務(wù),這里的future2代表的是小紅未來發(fā)生的操作,返回小紅買東西這個操作的結(jié)果
              CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
                  System.out.println('爸:小紅你去買瓶酒!');
                  try {
                      System.out.println('小紅出去買酒了,女孩子跑的比較慢,估計5s后才會回來...');
                      Thread.sleep(5000);
                      return '我買回來了!';
                  } catch (InterruptedException e) {
                      System.err.println('小紅路上遭遇了不測');
                      return '來世再見!';
                  }
              }, executor);

              //小明買煙任務(wù),這里的future1代表的是小明未來買東西會發(fā)生的事,返回值是小明買東西的結(jié)果
              CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
                  System.out.println('爸:小明你去買包煙!');
                  try {
                      System.out.println('小明出去買煙了,可能要3s后回來...');
                      Thread.sleep(3000);

                      throw new InterruptedException();
      //                return '我買回來了!';
                  } catch (InterruptedException e) {
                      System.out.println('小明路上遭遇了不測!');
                      return '這是我托人帶來的口信,我已經(jīng)不在了。';
                  }
              }, executor);

              //獲取小紅買酒結(jié)果,從小紅的操作中獲取結(jié)果,把結(jié)果打印
              future2.thenAccept((e) -> {
                  System.out.println('小紅說:' + e);
              });
              //獲取小明買煙的結(jié)果
              future1.thenAccept((e) -> {
                  System.out.println('小明說:' + e);
              });

              System.out.println('爸:等啊等 西湖美景三月天嘞......');
              System.out.println('爸: 我覺得無聊甚至去了趟廁所。');
              Thread.currentThread().join(9 * 1000);
              System.out.println('爸:終于給老子買來了......huo 酒');
              //關(guān)閉線程池
              executor.shutdown();
          }
      }

      運行結(jié)果:

        本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊一鍵舉報。
        轉(zhuǎn)藏 分享 獻花(0

        0條評論

        發(fā)表

        請遵守用戶 評論公約

        類似文章 更多