1. LimitedConcurrencyLevelTaskScheduler 介紹這個TaskScheduler用過的應該都知道,微軟開源的一個任務調(diào)度器,它的代碼很簡單, 也很好懂,但是我沒有明白的是他是如何實現(xiàn)限制并發(fā)數(shù)的 首先貼下它的代碼,大家先熟悉一下。 public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler /// <summary>Whether the current thread is processing work items.</summary> private static bool _currentThreadIsProcessingItems; /// <summary>The list of tasks to be executed.</summary> private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // protected by lock(_tasks) /// <summary>The maximum concurrency level allowed by this scheduler.</summary> private readonly int _maxDegreeOfParallelism; /// <summary>Whether the scheduler is currently processing work items.</summary> private int _delegatesQueuedOrRunning = 0; // protected by lock(_tasks) /// Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the /// specified degree of parallelism. /// <param name="maxDegreeOfParallelism">The maximum degree of parallelism provided by this scheduler.</param> public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism) if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism"); _maxDegreeOfParallelism = maxDegreeOfParallelism; /// current executing number; public int CurrentCount { get; set; } /// <summary>Queues a task to the scheduler.</summary> /// <param name="task">The task to be queued.</param> protected sealed override void QueueTask(Task task) // Add the task to the list of tasks to be processed. If there aren't enough // delegates currently queued or running to process tasks, schedule another. Console.WriteLine("Task Count : {0} ", _tasks.Count); if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism) ++_delegatesQueuedOrRunning; NotifyThreadPoolOfPendingWork(); private static object executeLock = new object(); /// Informs the ThreadPool that there's work to be executed for this scheduler. private void NotifyThreadPoolOfPendingWork() ThreadPool.UnsafeQueueUserWorkItem(_ => // Note that the current thread is now processing work items. // This is necessary to enable inlining of tasks into this thread. _currentThreadIsProcessingItems = true; // Process all available items in the queue. // When there are no more items to be processed, // note that we're done processing, and get out. --_delegatesQueuedOrRunning; // Get the next item from the queue item = _tasks.First.Value; // Execute the task we pulled out of the queue base.TryExecuteTask(item); // We're done processing items on the current thread finally { _currentThreadIsProcessingItems = false; } /// <summary>Attempts to execute the specified task on the current thread.</summary> /// <param name="task">The task to be executed.</param> /// <param name="taskWasPreviouslyQueued"></param> /// <returns>Whether the task could be executed on the current thread.</returns> protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) // If this thread isn't already processing a task, we don't support inlining if (!_currentThreadIsProcessingItems) return false; // If the task was previously queued, remove it from the queue if (taskWasPreviouslyQueued) TryDequeue(task); return base.TryExecuteTask(task); /// <summary>Attempts to remove a previously scheduled task from the scheduler.</summary> /// <param name="task">The task to be removed.</param> /// <returns>Whether the task could be found and removed.</returns> protected sealed override bool TryDequeue(Task task) lock (_tasks) return _tasks.Remove(task); /// <summary>Gets the maximum concurrency level supported by this scheduler.</summary> public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } } /// <summary>Gets an enumerable of the tasks currently scheduled on this scheduler.</summary> /// <returns>An enumerable of the tasks currently scheduled.</returns> protected sealed override IEnumerable<Task> GetScheduledTasks() Monitor.TryEnter(_tasks, ref lockTaken); if (lockTaken) return _tasks.ToArray(); else throw new NotSupportedException(); if (lockTaken) Monitor.Exit(_tasks);
回到頂部 簡單使用下面是調(diào)用代碼。 static void Main(string[] args) TaskFactory fac = new TaskFactory(new LimitedConcurrencyLevelTaskScheduler(5)); //TaskFactory fac = new TaskFactory(); for (int i = 0; i < 1000; i++) Console.WriteLine("Current Index {0}, ThreadId {1}",s,Thread.CurrentThread.ManagedThreadId);
調(diào)用很簡單 根據(jù)調(diào)試調(diào)用順序可以知道。 使用 LimitedConcurrencyLevelTaskScheduler 創(chuàng)建好TaskFactory 后, 調(diào)用該TaskFacotry.StartNew 方法后。會進入 LimitedConcurrencyLevelTaskScheduler 的 QueueTask 方法。 /// <summary>Queues a task to the scheduler.</summary> /// <param name="task">The task to be queued.</param> protected sealed override void QueueTask(Task task) // Add the task to the list of tasks to be processed. If there aren't enough // delegates currently queued or running to process tasks, schedule another. Console.WriteLine("Task Count : {0} ", _tasks.Count); if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism) ++_delegatesQueuedOrRunning; NotifyThreadPoolOfPendingWork();
代碼很簡單,把剛創(chuàng)建的Task 添加到任務隊列中去,然后判斷當前正在執(zhí)行的任務數(shù)量與設置的允許最大并發(fā)數(shù)進行比較, 如果小于該值,則開始通知正在掛起的任務開始執(zhí)行。 我的疑問主要在 NotifyThreadPoolOfPendingWork 這個方法上。 private void NotifyThreadPoolOfPendingWork() ThreadPool.UnsafeQueueUserWorkItem(_ => // Note that the current thread is now processing work items. // This is necessary to enable inlining of tasks into this thread. _currentThreadIsProcessingItems = true; // Process all available items in the queue. // When there are no more items to be processed, // note that we're done processing, and get out. --_delegatesQueuedOrRunning; // Get the next item from the queue item = _tasks.First.Value; // Execute the task we pulled out of the queue base.TryExecuteTask(item); // We're done processing items on the current thread finally { _currentThreadIsProcessingItems = false; }
從代碼中看到的意思是一直跑一個死循環(huán), 不斷從_tasks 中取出Task執(zhí)行, 直到_task為空為止,然后退出循環(huán)。從這里并沒有看到限制并發(fā)數(shù)的限制,只有在QueueTask中調(diào)用的時候有個簡單的限制,然而好像并沒有什么卵用, 因為只要 NotifyThreadPoolOfPendingWork 方法啟動了, 就會一直跑,直到所有的Task執(zhí)行完成。那他的并發(fā)數(shù)是如何限制的呢? 一直很迷惑,是不是我哪里理解錯了, 還請知道的大神解惑一下。
|