>>返回《C# 并發(fā)編程》
1. 調(diào)度到線程池
Task task = Task.Run(() =>
{
Thread.Sleep(TimeSpan.FromSeconds(2));
});
Task.Run 也能正常地返回結(jié)果,能使用異步 Lambda 表達(dá)式。下面代碼中 Task.Run 返回的 task 會(huì)在 2 秒后完成,并返回結(jié)果 13:
Task<int> task = Task.Run(async () =>
{
await Task.Delay(TimeSpan.FromSeconds(2));
return 13;
});
Task.Run 返回一個(gè) Task (或 Task<T> )對象,該對象可以被異步或響應(yīng)式代碼正常使用。
注意: 但不要在 ASP.NET 中使用 Task.Run ,除非你有絕對的把握。在 ASP.NET 中, 處理請求的代碼本來就是在 ASP.NET 線程池線程中運(yùn)行的,強(qiáng)行把它放到另一個(gè)線程池線程通常會(huì)適得其反。
但UI程序,使用Task.Run可以執(zhí)行耗時(shí)操作,有效的防止頁面卡住問題。
在進(jìn)行動(dòng)態(tài)并行開發(fā)時(shí), 一定要用 Task.Factory.StartNew 來代替 Task.Run
- 因?yàn)楦鶕?jù)默認(rèn)配置,
Task.Run 返回的 Task 對象適合被異步調(diào)用(即被異步代碼或響應(yīng)式代碼使用)。
Task.Run 也不支持動(dòng)態(tài)并行代碼中普遍使用的高級概念,例如 父/子任務(wù)。
2. 任務(wù)調(diào)度器
需要讓多個(gè)代碼段按照指定的方式運(yùn)行。例如
- 讓所有代碼段在 UI 線程中運(yùn)行
- 只允許特定數(shù)量的代碼段同時(shí)運(yùn)行。
2.1. Default 調(diào)度器
TaskScheduler.Default ,它的作用是讓任務(wù)在線程池中排隊(duì), Task.Run 、并行、數(shù)據(jù)流的代碼用的都是 TaskScheduler.Default 。
2.2. 捕獲當(dāng)前同步上下文 調(diào)度器
可以捕獲一個(gè)特定的上下文,用 TaskScheduler.FromCurrentSynchronizationContext 調(diào)度任務(wù),讓它回到該上下文:
TaskScheduler scheduler = TaskScheduler.FromCurrentSynchronizationContext();
這條語句創(chuàng)建了一個(gè)捕獲當(dāng)前 同步上下文 的 TaskScheduler 對象,并將代碼調(diào)度到這個(gè)上下文中
SynchronizationContext 類表示一個(gè)通用的調(diào)度上下文。
- 大多數(shù) UI 框架有一個(gè)表示 UI 線程的 同步上下文
- ASP.NET 有一個(gè)表示 HTTP 請求的 同步上下文
建議:
在 UI 線程上執(zhí)行代碼時(shí),永遠(yuǎn)不要使用針對特定平臺(tái)的類型。\
- WPF、IOS、Android 都有
Dispatcher 類
- Windows 應(yīng)用商店平臺(tái)使用
CoreDispatcher
- WinForms 有
ISynchronizeInvoke 接口(即 Control.Invoke )
不要在新寫的代碼中使用這些類型,就當(dāng)它們不存在吧。使用這些類型會(huì)使代碼無謂地綁定在某個(gè)特定平臺(tái)上。
同步上下文 是通用的、基于上述類型的抽象類。
2.3. ConcurrentExclusiveSchedulerPair 調(diào)度器
它實(shí)際上是兩個(gè)互相關(guān)聯(lián)的調(diào)度器。 只要 ExclusiveScheduler 上沒有運(yùn)行任務(wù), ConcurrentScheduler 就可以讓多個(gè)任務(wù)同時(shí)執(zhí)行。只有當(dāng) ConcurrentScheduler 沒有執(zhí)行任務(wù)時(shí), ExclusiveScheduler 才可以執(zhí)行任務(wù),并且每次只允許運(yùn)行一個(gè)任務(wù):
public static void ConcurrentExclusiveSchedulerPairRun()
{
var schedulerPair = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default, maxConcurrencyLevel: 2);
//由于并行被限流,所以ConcurrentScheduler 會(huì)兩個(gè)兩個(gè)輸出,然后執(zhí)行完這兩個(gè)開啟的8個(gè)串行任務(wù)
TaskScheduler concurrent = schedulerPair.ConcurrentScheduler;
TaskScheduler exclusive = schedulerPair.ExclusiveScheduler;
//Default 由于沒有限制,所以第一層會(huì)先輸出,全部隨機(jī)
// TaskScheduler concurrent = TaskScheduler.Default;
// TaskScheduler exclusive =TaskScheduler.Default;
var list = new List<List<int>>();
for (int i = 0; i < 4; i++)
{
var actionList = new List<int>();
list.Add(actionList);
for (int j = 0; j < 4; j++)
{
actionList.Add(i * 10 + j);
}
}
var tasks = list.Select(u => Task.Factory.StartNew(state =>
{
System.Console.WriteLine($"ConcurrentScheduler");
((List<int>)state).Select(i => Task.Factory.StartNew(state2 => System.Console.WriteLine($"ExclusiveScheduler:{state2}"), i, CancellationToken.None, TaskCreationOptions.None, exclusive)).ToArray();
}, u, CancellationToken.None, TaskCreationOptions.None, concurrent));
Task.WaitAll(tasks.ToArray());
}
輸出:
ConcurrentScheduler
ConcurrentScheduler
ExclusiveScheduler:0
ExclusiveScheduler:1
ExclusiveScheduler:2
ExclusiveScheduler:3
ExclusiveScheduler:10
ExclusiveScheduler:11
ExclusiveScheduler:12
ExclusiveScheduler:13
ConcurrentScheduler
ConcurrentScheduler
ExclusiveScheduler:20
ExclusiveScheduler:21
ExclusiveScheduler:22
ExclusiveScheduler:23
ExclusiveScheduler:30
ExclusiveScheduler:31
ExclusiveScheduler:32
ExclusiveScheduler:33
ConcurrentExclusiveSchedulerPair 的常見用法是
- 用
ExclusiveScheduler 來確保每次只運(yùn)行一個(gè)任務(wù)。
ExclusiveScheduler 執(zhí)行的代碼會(huì)在線程池中運(yùn)行,但是使用了同一個(gè) ExclusiveScheduler 對象的其他代碼不能同時(shí)運(yùn)行。
ConcurrentExclusiveSchedulerPair 的另一個(gè)用法是作為限流調(diào)度器。
- 創(chuàng)建的
ConcurrentExclusiveSchedulerPair 對象可以限制自身的并發(fā)數(shù)量。
- 這時(shí)通常不使用 ExclusiveScheduler
var schedulerPair = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default,maxConcurrencyLevel: 8);
TaskScheduler scheduler = schedulerPair.ConcurrentScheduler;
3. 調(diào)度并行代碼
public static void RotateMatricesRun()
{
List<List<Action<float>>> actionLists = new List<List<Action<float>>>();
for (int i = 0; i < 15; i++)
{
var actionList = new List<Action<float>>();
actionLists.Add(actionList);
for (int j = 0; j < 15; j++)
{
actionList.Add(new Action<float>(degrees =>
{
Thread.Sleep(200);
System.Console.WriteLine("degrees:" + degrees + " " + DateTime.Now.ToString("HHmmss.fff"));
}));
}
}
RotateMatrices(actionLists, 10);
//雖然兩個(gè)并行嵌套但是由于調(diào)度器的設(shè)置,導(dǎo)致任務(wù)是8個(gè)8個(gè)執(zhí)行的,結(jié)果是8個(gè)后200ms再8個(gè)
}
static void RotateMatrices(IEnumerable<IEnumerable<Action<float>>> collections, float degrees)
{
var schedulerPair = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default, maxConcurrencyLevel: 8);
TaskScheduler scheduler = schedulerPair.ConcurrentScheduler;
ParallelOptions options = new ParallelOptions
{
TaskScheduler = scheduler
};
Parallel.ForEach(collections, options,
matrices =>
{
Parallel.ForEach(matrices,
options,
matrix => matrix.Invoke(degrees)
);
System.Console.WriteLine($"============");
});
}
輸出:
degrees:10 190424.120
... 118個(gè) ...
degrees:10 190426.963
============
============
============
============
============
============
============
============
degrees:10 190427.167
... 6個(gè) ...
degrees:10 190427.167
... 5個(gè) ...
degrees:10 190428.589
... 6個(gè) ...
degrees:10 190428.589
degrees:10 190428.791
degrees:10 190428.791
degrees:10 190428.791
degrees:10 190428.791
degrees:10 190428.791
degrees:10 190428.791
============
degrees:10 190428.791
degrees:10 190428.791
degrees:10 190428.994
... 6個(gè) ...
degrees:10 190428.994
============
degrees:10 190429.194
... 6個(gè) ...
degrees:10 190429.194
============
degrees:10 190429.395
degrees:10 190429.395
degrees:10 190429.395
degrees:10 190429.395
degrees:10 190429.395
============
degrees:10 190429.395
degrees:10 190429.395
degrees:10 190429.395
degrees:10 190429.598
degrees:10 190429.598
degrees:10 190429.598
degrees:10 190429.598
============
degrees:10 190429.598
degrees:10 190429.598
degrees:10 190429.598
degrees:10 190429.598
============
degrees:10 190429.800
============
4. 用調(diào)度器實(shí)現(xiàn)數(shù)據(jù)流的同步
Stopwatch sw = Stopwatch.StartNew();
// 模擬 UI同步上下文
AsyncContext.Run(() =>
{
var options = new ExecutionDataflowBlockOptions
{
//使用次調(diào)度器,則代碼會(huì)放到創(chuàng)建線程的同步上下文上執(zhí)行(若是當(dāng)前同步上下文是UI Context 或 此例的AsyncContext)
//運(yùn)行和注釋下行運(yùn)行觀察Creator和Executor線程Id的變化
TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext(),
};
var multiplyBlock = new TransformBlock<int, int>(item => item * 2);
System.Console.WriteLine($"Creator ThreadId: {Thread.CurrentThread.ManagedThreadId}.");
var displayBlock = new ActionBlock<int>(result =>
{
// ListBox.Items.Add(result)
System.Console.WriteLine($"Executor ThreadId: {Thread.CurrentThread.ManagedThreadId} res:{result}.");
}, options);
multiplyBlock.LinkTo(displayBlock);
for (int i = 0; i < 5; i++)
{
multiplyBlock.Post(i);
System.Console.WriteLine($"Post {i}");
}
multiplyBlock.Completion.Wait(2000);
});
System.Console.WriteLine($"Cost {sw.ElapsedMilliseconds}ms.");
輸出:
Creator ThreadId: 1.
Post 0
Post 1
Post 2
Post 3
Post 4
Executor ThreadId: 1 res:0.
Executor ThreadId: 1 res:2.
Executor ThreadId: 1 res:4.
Executor ThreadId: 1 res:6.
Executor ThreadId: 1 res:8.
Cost 2062ms.
|