乡下人产国偷v产偷v自拍,国产午夜片在线观看,婷婷成人亚洲综合国产麻豆,久久综合给合久久狠狠狠9

  • <output id="e9wm2"></output>
    <s id="e9wm2"><nobr id="e9wm2"><ins id="e9wm2"></ins></nobr></s>

    • 分享

      C# Task TaskFactory 設置最大并行線程數(shù)的方法

       黃金屋1 2019-05-19

      1. LimitedConcurrencyLevelTaskScheduler 介紹

      這個TaskScheduler用過的應該都知道,微軟開源的一個任務調(diào)度器,它的代碼很簡單,
      也很好懂,但是我沒有明白的是他是如何實現(xiàn)限制并發(fā)數(shù)的
      首先貼下它的代碼,大家先熟悉一下。

      1. public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
      2. {
      3. /// <summary>Whether the current thread is processing work items.</summary>
      4. [ThreadStatic]
      5. private static bool _currentThreadIsProcessingItems;
      6. /// <summary>The list of tasks to be executed.</summary>
      7. private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // protected by lock(_tasks)
      8. /// <summary>The maximum concurrency level allowed by this scheduler.</summary>
      9. private readonly int _maxDegreeOfParallelism;
      10. /// <summary>Whether the scheduler is currently processing work items.</summary>
      11. private int _delegatesQueuedOrRunning = 0; // protected by lock(_tasks)
      12. /// <summary>
      13. /// Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the
      14. /// specified degree of parallelism.
      15. /// </summary>
      16. /// <param name="maxDegreeOfParallelism">The maximum degree of parallelism provided by this scheduler.</param>
      17. public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
      18. {
      19. if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
      20. _maxDegreeOfParallelism = maxDegreeOfParallelism;
      21. }
      22. /// <summary>
      23. /// current executing number;
      24. /// </summary>
      25. public int CurrentCount { get; set; }
      26. /// <summary>Queues a task to the scheduler.</summary>
      27. /// <param name="task">The task to be queued.</param>
      28. protected sealed override void QueueTask(Task task)
      29. {
      30. // Add the task to the list of tasks to be processed. If there aren't enough
      31. // delegates currently queued or running to process tasks, schedule another.
      32. lock (_tasks)
      33. {
      34. Console.WriteLine("Task Count : {0} ", _tasks.Count);
      35. _tasks.AddLast(task);
      36. if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
      37. {
      38. ++_delegatesQueuedOrRunning;
      39. NotifyThreadPoolOfPendingWork();
      40. }
      41. }
      42. }
      43. int executingCount = 0;
      44. private static object executeLock = new object();
      45. /// <summary>
      46. /// Informs the ThreadPool that there's work to be executed for this scheduler.
      47. /// </summary>
      48. private void NotifyThreadPoolOfPendingWork()
      49. {
      50. ThreadPool.UnsafeQueueUserWorkItem(_ =>
      51. {
      52. // Note that the current thread is now processing work items.
      53. // This is necessary to enable inlining of tasks into this thread.
      54. _currentThreadIsProcessingItems = true;
      55. try
      56. {
      57. // Process all available items in the queue.
      58. while (true)
      59. {
      60. Task item;
      61. lock (_tasks)
      62. {
      63. // When there are no more items to be processed,
      64. // note that we're done processing, and get out.
      65. if (_tasks.Count == 0)
      66. {
      67. --_delegatesQueuedOrRunning;
      68. break;
      69. }
      70. // Get the next item from the queue
      71. item = _tasks.First.Value;
      72. _tasks.RemoveFirst();
      73. }
      74. // Execute the task we pulled out of the queue
      75. base.TryExecuteTask(item);
      76. }
      77. }
      78. // We're done processing items on the current thread
      79. finally { _currentThreadIsProcessingItems = false; }
      80. }, null);
      81. }
      82. /// <summary>Attempts to execute the specified task on the current thread.</summary>
      83. /// <param name="task">The task to be executed.</param>
      84. /// <param name="taskWasPreviouslyQueued"></param>
      85. /// <returns>Whether the task could be executed on the current thread.</returns>
      86. protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
      87. {
      88. // If this thread isn't already processing a task, we don't support inlining
      89. if (!_currentThreadIsProcessingItems) return false;
      90. // If the task was previously queued, remove it from the queue
      91. if (taskWasPreviouslyQueued) TryDequeue(task);
      92. // Try to run the task.
      93. return base.TryExecuteTask(task);
      94. }
      95. /// <summary>Attempts to remove a previously scheduled task from the scheduler.</summary>
      96. /// <param name="task">The task to be removed.</param>
      97. /// <returns>Whether the task could be found and removed.</returns>
      98. protected sealed override bool TryDequeue(Task task)
      99. {
      100. lock (_tasks) return _tasks.Remove(task);
      101. }
      102. /// <summary>Gets the maximum concurrency level supported by this scheduler.</summary>
      103. public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } }
      104. /// <summary>Gets an enumerable of the tasks currently scheduled on this scheduler.</summary>
      105. /// <returns>An enumerable of the tasks currently scheduled.</returns>
      106. protected sealed override IEnumerable<Task> GetScheduledTasks()
      107. {
      108. bool lockTaken = false;
      109. try
      110. {
      111. Monitor.TryEnter(_tasks, ref lockTaken);
      112. if (lockTaken) return _tasks.ToArray();
      113. else throw new NotSupportedException();
      114. }
      115. finally
      116. {
      117. if (lockTaken) Monitor.Exit(_tasks);
      118. }
      119. }
      120. }

      回到頂部

      簡單使用

      下面是調(diào)用代碼。

      1. static void Main(string[] args)
      2. {
      3. TaskFactory fac = new TaskFactory(new LimitedConcurrencyLevelTaskScheduler(5));
      4. //TaskFactory fac = new TaskFactory();
      5. for (int i = 0; i < 1000; i++)
      6. {
      7. fac.StartNew(s => {
      8. Thread.Sleep(1000);
      9. Console.WriteLine("Current Index {0}, ThreadId {1}",s,Thread.CurrentThread.ManagedThreadId);
      10. }, i);
      11. }
      12. Console.ReadKey();
      13. }

      調(diào)用很簡單
      根據(jù)調(diào)試調(diào)用順序可以知道。
      使用 LimitedConcurrencyLevelTaskScheduler 創(chuàng)建好TaskFactory 后,
      調(diào)用該TaskFacotry.StartNew 方法后。會進入 LimitedConcurrencyLevelTaskScheduler 的
      QueueTask 方法。

      1. /// <summary>Queues a task to the scheduler.</summary>
      2. /// <param name="task">The task to be queued.</param>
      3. protected sealed override void QueueTask(Task task)
      4. {
      5. // Add the task to the list of tasks to be processed. If there aren't enough
      6. // delegates currently queued or running to process tasks, schedule another.
      7. lock (_tasks)
      8. {
      9. Console.WriteLine("Task Count : {0} ", _tasks.Count);
      10. _tasks.AddLast(task);
      11. if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
      12. {
      13. ++_delegatesQueuedOrRunning;
      14. NotifyThreadPoolOfPendingWork();
      15. }
      16. }
      17. }

      代碼很簡單,把剛創(chuàng)建的Task 添加到任務隊列中去,然后判斷當前正在執(zhí)行的任務數(shù)量與設置的允許最大并發(fā)數(shù)進行比較, 如果小于該值,則開始通知正在掛起的任務開始執(zhí)行。
      我的疑問主要在 NotifyThreadPoolOfPendingWork 這個方法上。

      1. private void NotifyThreadPoolOfPendingWork()
      2. {
      3. ThreadPool.UnsafeQueueUserWorkItem(_ =>
      4. {
      5. // Note that the current thread is now processing work items.
      6. // This is necessary to enable inlining of tasks into this thread.
      7. _currentThreadIsProcessingItems = true;
      8. try
      9. {
      10. // Process all available items in the queue.
      11. while (true)
      12. {
      13. Task item;
      14. lock (_tasks)
      15. {
      16. // When there are no more items to be processed,
      17. // note that we're done processing, and get out.
      18. if (_tasks.Count == 0)
      19. {
      20. --_delegatesQueuedOrRunning;
      21. break;
      22. }
      23. // Get the next item from the queue
      24. item = _tasks.First.Value;
      25. _tasks.RemoveFirst();
      26. }
      27. // Execute the task we pulled out of the queue
      28. base.TryExecuteTask(item);
      29. }
      30. }
      31. // We're done processing items on the current thread
      32. finally { _currentThreadIsProcessingItems = false; }
      33. }, null);
      34. }

      從代碼中看到的意思是一直跑一個死循環(huán), 不斷從_tasks 中取出Task執(zhí)行,
      直到_task為空為止,然后退出循環(huán)。從這里并沒有看到限制并發(fā)數(shù)的限制,只有在QueueTask中調(diào)用的時候有個簡單的限制,然而好像并沒有什么卵用,
      因為只要 NotifyThreadPoolOfPendingWork 方法啟動了, 就會一直跑,直到所有的Task執(zhí)行完成。那他的并發(fā)數(shù)是如何限制的呢?

      一直很迷惑,是不是我哪里理解錯了, 還請知道的大神解惑一下。

        本站是提供個人知識管理的網(wǎng)絡存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導購買等信息,謹防詐騙。如發(fā)現(xiàn)有害或侵權內(nèi)容,請點擊一鍵舉報。
        轉藏 分享 獻花(0

        0條評論

        發(fā)表

        請遵守用戶 評論公約

        類似文章 更多