乡下人产国偷v产偷v自拍,国产午夜片在线观看,婷婷成人亚洲综合国产麻豆,久久综合给合久久狠狠狠9

  • <output id="e9wm2"></output>
    <s id="e9wm2"><nobr id="e9wm2"><ins id="e9wm2"></ins></nobr></s>

    • 分享

      ExecutorService的十個(gè)使用技巧

       hh3755 2015-06-24
      Published: 26 Nov 2014 Category: Java

      ExecutorService這個(gè)接口從Java 5開始就已經(jīng)存在了。這得追溯到2004年了。這里小小地提醒一下,官方已經(jīng)不再支持Java 5, Java 6了,Java 7在半年后也將停止支持。我之所以會提起ExecutorService這么舊的一個(gè)接口是因?yàn)?,大多?shù)Java程序員并沒有搞清楚它的工作原理。關(guān)于它可以介紹的有很多,這里我只想分享它的一些較少為人所知的特性以及實(shí)踐技巧。本文主要是面向初級程序員的,并沒有過于高深的東西。

      1. 線程命名

      這點(diǎn)得反復(fù)強(qiáng)調(diào)。對正在運(yùn)行的JVM進(jìn)行線程轉(zhuǎn)儲(thread dump)或者調(diào)試時(shí),線程池默認(rèn)的命名機(jī)制是pool-N-thread-M,這里N是線程池的序號(每新創(chuàng)建一個(gè)線程池,這個(gè)N都會加一),而M是池里線程的序號。比方說,pool-2-thread-3指的是JVM生命周期中第二個(gè)線程池里的第三個(gè)線程。參考這里 Executors.defaultThreadFactory()。這樣的名字表述性不佳。由于JDK將命名機(jī)制都隱藏在ThreadFactory里面,這使得要正確地命名線程得稍微費(fèi)點(diǎn)工夫。所幸的是Guava提供了這么一個(gè)工具類:

      import com.google.common.util.concurrent.ThreadFactoryBuilder;
       
      final ThreadFactory threadFactory = new ThreadFactoryBuilder()
              .setNameFormat("Orders-%d")
              .setDaemon(true)
              .build();
      final ExecutorService executorService = Executors.newFixedThreadPool(10, threadFactory);
      
      1. 根據(jù)上下文切換名字

      這是我從高效的jstack:如何對高速運(yùn)行的服務(wù)器進(jìn)行調(diào)試一文中學(xué)到的一個(gè)技巧。線程名可以隨時(shí)進(jìn)行修改,只要你想這么做的話。這是有一定的意義的,因?yàn)榫€程轉(zhuǎn)儲只能看到類名和方法名,而沒有參數(shù)及本地變量。通過調(diào)整線程名可以保留一些比較關(guān)鍵的上下文信息,這樣排查消息/記錄/查詢等變慢或者出現(xiàn)死鎖的問題時(shí)就容易多了。示例:

      private void process(String messageId) {
          executorService.submit(() -> {
              final Thread currentThread = Thread.currentThread();
              final String oldName = currentThread.getName();
              currentThread.setName("Processing-" + messageId);
              try {
                  //real logic here...
              } finally {
                  currentThread.setName(oldName);
              }
          });
      }
      

      在try-finally塊中當(dāng)前線程的名字是Processing-某個(gè)消息ID。這對跟蹤系統(tǒng)內(nèi)的消息流會比較有用。

      1. 顯式地安全地關(guān)閉線程

      客戶端線程和線程池之間會有一個(gè)任務(wù)隊(duì)列。當(dāng)程序要關(guān)閉時(shí),你需要注意兩件事情:入隊(duì)的這些任務(wù)的情況怎么樣了以及正在運(yùn)行的這個(gè)任務(wù)執(zhí)行得如何了。令人驚訝的是很多開發(fā)人員并沒能正確地或者有意識地去關(guān)閉線程池。正確的方法有兩種:一個(gè)是讓所有的入隊(duì)任務(wù)都執(zhí)行完畢(shutdown()),再就是舍棄這些任務(wù)(shutdownNow())——這完全取決于你。比如說如果我們提交了N多任務(wù)并且希望等它們都執(zhí)行完后才返回的話,那么就使用shutdown():

      private void sendAllEmails(List<String> emails) throws InterruptedException {
          emails.forEach(email ->
                  executorService.submit(() ->
                          sendEmail(email)));
          executorService.shutdown();
          final boolean done = executorService.awaitTermination(1, TimeUnit.MINUTES);
          log.debug("All e-mails were sent so far? {}", done);
      }
      

      本例中我們發(fā)送了許多電子郵件,每一封郵件都對應(yīng)著線程池中的一個(gè)任務(wù)。提交完這些任務(wù)后我們會關(guān)閉線程池,這樣就不會再有新的任務(wù)進(jìn)來了。然后我們會至少等待一分鐘,直到這些任務(wù)執(zhí)行完。如果1分鐘后還是有的任務(wù)沒執(zhí)行到的話,awaitTermination()便會返回false。但是剩下的任務(wù)還會繼續(xù)執(zhí)行。我知道有些趕時(shí)髦的人會這么寫:

      emails.parallelStream().forEach(this::sendEmail);
      

      他們覺得我那樣很老套,不過我個(gè)人比較喜歡能控制并發(fā)線程的數(shù)量。還有一個(gè)優(yōu)雅地關(guān)閉掉線程池的方法就是shutdownNow():

      final List<Runnable> rejected = executorService.shutdownNow();
      log.debug("Rejected tasks: {}", rejected.size());
      

      這么做的話隊(duì)列中的所有任務(wù)都會被舍棄并返回。已執(zhí)行的任務(wù)仍會繼續(xù)執(zhí)行。

      1. 謹(jǐn)慎地處理中斷

      Future的一個(gè)較少提及的特性便是cancelling。這里我就不重復(fù)多說了,可以看下我之前的一篇文章:InterruptedException及線程中斷。

      1. 監(jiān)控隊(duì)列長度,確保隊(duì)列有界

      不當(dāng)?shù)木€程池大小會使得處理速度變慢,穩(wěn)定性下降,并且導(dǎo)致內(nèi)存泄露。如果配置的線程過少,則隊(duì)列會持續(xù)變大,消耗過多內(nèi)存。而過多的線程又會由于頻繁的上下文切換導(dǎo)致整個(gè)系統(tǒng)的速度變緩——殊途而同歸。隊(duì)列的長度至關(guān)重要,它必須得是有界的,這樣如果線程池不堪重負(fù)了它可以暫時(shí)拒絕掉新的請求:

      final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(100);
      executorService = new ThreadPoolExecutor(n, n,
              0L, TimeUnit.MILLISECONDS,
              queue);
      

      上面的代碼等價(jià)于Executors.newFixedThreadPool(n),然而不同的是默認(rèn)的實(shí)現(xiàn)是一個(gè)無界的LinkedBlockingQueue。這里我們用的是一個(gè)固定100大小的ArrayBlockingQueue。也就是說如果已經(jīng)有100個(gè)任務(wù)在隊(duì)列中了(還有N個(gè)在執(zhí)行中),新的任務(wù)就會被拒絕掉,并拋出RejectedExecutionException異常。由于這里的隊(duì)列是在外部聲明的,我們還可以時(shí)不時(shí)地調(diào)用下它的size()方法來將隊(duì)列大小記錄在到日志/JMX/或者你所使用的監(jiān)控系統(tǒng)中。

      1. 別忘了異常處理

      下面這段代碼執(zhí)行的結(jié)果是什么?

      executorService.submit(() -> {
          System.out.println(1 / 0);
      });
      

      我被它坑過無數(shù)回了:它什么也不會輸出。沒有任何的java.lang.ArithmeticException: / by zero的征兆,啥也沒有。線程池會把這個(gè)異常吞掉,就像什么也沒發(fā)生過一樣。如果是你自己創(chuàng)建的java.lang.Thread還好,這樣UncaughtExceptionHandler還能起作用。不過如果是線程池的話你就得小心了。如果你提交的是Runnable對象的話(就像上面那個(gè)一樣,沒有返回值),你得將整個(gè)方法體用try-catch包起來,至少打印一下異常。如果你提交的是Callable的話,得確保你在用get()方法取值的時(shí)候重新拋出異常:

      final Future<Integer> division = executorService.submit(() -> 1 / 0);
      //below will throw ExecutionException caused by ArithmeticException
      division.get();
      

      有趣的是Spring框架的@Async為此還弄出了個(gè)BUG,參見:SPR-8995以及 SPR-12090。

      1. 監(jiān)控隊(duì)列中的等待時(shí)間

      監(jiān)控工作隊(duì)列的長度只是一個(gè)方面。然而排除故障時(shí)查看從提交任務(wù)到實(shí)際執(zhí)行之間的時(shí)間差就顯得非常重要了。這個(gè)時(shí)間差越接近0就越好(說明正好線程池中有空閑的線程),否則任務(wù)要入隊(duì)的話這個(gè)時(shí)間就會增加了。再進(jìn)一步說,如果線程池不是固定線程數(shù)的話,執(zhí)行新的任務(wù)還得新創(chuàng)建一個(gè)線程,這個(gè)同樣也會消耗一定的時(shí)間。為了能更好地監(jiān)控這項(xiàng)指標(biāo),可以對ExecutorService做一下封裝:

      public class WaitTimeMonitoringExecutorService implements ExecutorService {
       
          private final ExecutorService target;
       
          public WaitTimeMonitoringExecutorService(ExecutorService target) {
              this.target = target;
          }
       
          @Override
          public <T> Future<T> submit(Callable<T> task) {
              final long startTime = System.currentTimeMillis();
              return target.submit(() -> {
                          final long queueDuration = System.currentTimeMillis() - startTime;
                          log.debug("Task {} spent {}ms in queue", task, queueDuration);
                          return task.call();
                      }
              );
          }
       
          @Override
          public <T> Future<T> submit(Runnable task, T result) {
              return submit(() -> {
                  task.run();
                  return result;
              });
          }
       
          @Override
          public Future<?> submit(Runnable task) {
              return submit(new Callable<Void>() {
                  @Override
                  public Void call() throws Exception {
                      task.run();
                      return null;
                  }
              });
          }
       
          //...
       
      }
      

      這個(gè)實(shí)現(xiàn)并不完整,不過也能說明大概的意思了。當(dāng)我們將任務(wù)提交給線程池的時(shí)候,便立即開始記錄它的時(shí)間。一旦這個(gè)任務(wù)被取出并開始執(zhí)行時(shí)便停止計(jì)時(shí)。不要被代碼中的startTime和queueDuration這兩個(gè)變量搞混了。事實(shí)上它們是在兩個(gè)不同的線程中進(jìn)行求值的,通常都會差個(gè)毫秒級或者秒級:

      Task com.nurkiewicz.MyTask@7c7f3894 spent 9883ms in queue
      
      1. 保留客戶端的棧跟蹤信息

      近來響應(yīng)式編程受到了不少關(guān)注。 Reactive manifesto, reactive streams, RxJava(僅發(fā)布了1.0版本!),Clojure agents, scala.rx等等。它們都非常不錯,但棧跟蹤信息就完蛋了,它們幾乎是毫無價(jià)值的。假設(shè)提交到線程池中的一個(gè)任務(wù)出現(xiàn)了異常:

      java.lang.NullPointerException: null
          at com.nurkiewicz.MyTask.call(Main.java:76) ~[classes/:na]
          at com.nurkiewicz.MyTask.call(Main.java:72) ~[classes/:na]
          at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0]
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0]
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0]
          at java.lang.Thread.run(Thread.java:744) ~[na:1.8.0]
      

      可以很容易發(fā)現(xiàn)NPE異常出現(xiàn)在MyTask的76行。但是我們并不知道是誰提交的這個(gè)任務(wù),因?yàn)闂P畔⒅荒芸吹絋hread以及ThreadPoolExecutor。技術(shù)上來講我們當(dāng)然是可以看下代碼,看看是何處創(chuàng)建的MyTask。不過如果沒有線程在這中間的話,我們馬上便能知道是誰提交的任務(wù)。那么如果我們可以保留客戶端代碼(提交任務(wù)的那段代碼)的棧信息呢?這個(gè)想法并非我首創(chuàng)的,Hazelcast就將異常從所有者節(jié)點(diǎn)傳播到了客戶端中。下面是一個(gè)非常簡單的將客戶端棧信息保留下來以便失敗時(shí)查看的例子:

      public class ExecutorServiceWithClientTrace implements ExecutorService {
       
          protected final ExecutorService target;
       
          public ExecutorServiceWithClientTrace(ExecutorService target) {
              this.target = target;
          }
       
          @Override
          public <T> Future<T> submit(Callable<T> task) {
              return target.submit(wrap(task, clientTrace(), Thread.currentThread().getName()));
          }
       
          private <T> Callable<T> wrap(final Callable<T> task, final Exception clientStack, String clientThreadName) {
              return () -> {
                  try {
                      return task.call();
                  } catch (Exception e) {
                      log.error("Exception {} in task submitted from thrad {} here:", e, clientThreadName, clientStack);
                      throw e;
                  }
              };
          }
       
          private Exception clientTrace() {
              return new Exception("Client stack trace");
          }
       
          @Override
          public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
              return tasks.stream().map(this::submit).collect(toList());
          }
       
          //...
       
      }
      

      這樣一旦失敗的話我們便可以取到完整的棧信息以及提交任務(wù)時(shí)所在的線程的名字。跟之前相比我們有了一些更有價(jià)值的信息:

      Exception java.lang.NullPointerException in task submitted from thrad main here:
      java.lang.Exception: Client stack trace
          at com.nurkiewicz.ExecutorServiceWithClientTrace.clientTrace(ExecutorServiceWithClientTrace.java:43) ~[classes/:na]
          at com.nurkiewicz.ExecutorServiceWithClientTrace.submit(ExecutorServiceWithClientTrace.java:28) ~[classes/:na]
          at com.nurkiewicz.Main.main(Main.java:31) ~[classes/:na]
          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0]
          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0]
          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0]
          at java.lang.reflect.Method.invoke(Method.java:483) ~[na:1.8.0]
          at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) ~[idea_rt.jar:na]
      
      1. 優(yōu)先使用CompletableFuture

      Java 8中引入了更為強(qiáng)大的CompletableFuture。有可能的話盡量使用下它。ExecutorService并沒有擴(kuò)展以支持這個(gè)增強(qiáng)型的接口,因此你得自己動手了。這么寫是不行的了:

      final Future<BigDecimal> future = 
          executorService.submit(this::calculate);
      

      你得這樣:

      final CompletableFuture<BigDecimal> future = 
          CompletableFuture.supplyAsync(this::calculate, executorService);
      

      CompletableFuture 繼承自Future,因此跟之前的用法一樣。但是使用你接口的人一定會感謝CompletableFuture所提供的這些額外的功能的。

      1. 同步隊(duì)列

      SynchronousQueue是一個(gè)非常有意思的BlockingQueue。它本身甚至都算不上是一個(gè)數(shù)據(jù)結(jié)構(gòu)。最好的解釋就是它是一個(gè)容量為0的隊(duì)列。這里引用下Java文檔中的一段話:

      每一個(gè)insert操作都需要等待另一個(gè)線程的一個(gè)對應(yīng)的remove操作,反之亦然。同步隊(duì)列內(nèi)部不會有任何空間,甚至連一個(gè)位置也沒有。你無法對同步隊(duì)列執(zhí)行peek操作,因?yàn)閮H當(dāng)你要移除一個(gè)元素的時(shí)候才存在這么個(gè)元素;如果沒有別的線程在嘗試移除一個(gè)元素你也無法往里面插入元素;你也無法對它進(jìn)行遍歷,因?yàn)樗裁炊紱]有。。。 同步隊(duì)列與CSP和Ada中所用到的集結(jié)管道(rendezvous channel)有異曲同工之妙。

      它和線程池有什么關(guān)系?你可以試試在ThreadPoolExecutor中用下SynchronousQueue:

      BlockingQueue<Runnable> queue = new SynchronousQueue<>();
      ExecutorService executorService = new ThreadPoolExecutor(n, n,
              0L, TimeUnit.MILLISECONDS,
              queue);
      

      我們創(chuàng)建了一個(gè)擁有兩個(gè)線程的線程池,以及一個(gè)SynchronousQueue。由于SynchronousQueue本質(zhì)上是一個(gè)容量為0的隊(duì)列,因此這個(gè)ExecutorService只有當(dāng)有空閑線程的時(shí)候才能接受新的任務(wù)。如果所有的線程都在忙,新的任務(wù)便會馬上被拒絕掉,不會進(jìn)行等待。這在要么立即執(zhí)行,要么馬上丟棄的后臺執(zhí)行的場景中會非常有用。

      終于講完了,希望你能找到一個(gè)自己感興趣的特性!

        本站是提供個(gè)人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點(diǎn)擊一鍵舉報(bào)。
        轉(zhuǎn)藏 分享 獻(xiàn)花(0

        0條評論

        發(fā)表

        請遵守用戶 評論公約

        類似文章 更多