C#并行編程中的Parallel.Invoke
一、基礎知識
并行編程:并行編程是指軟件開發的代碼,它能在同一時間執行多個計算任務,提高執行效率和性能一種編程方式,屬于多線程編程范疇。所以我們在設計過程中一般會將很多任務劃分成若干個互相獨立子任務,這些任務不考慮互相的依賴和順序。這樣我們就可以使用很好的使用并行編程。但是我們都知道多核處理器的并行設計使用共享內存,如果沒有考慮并發問題,就會有很多異常和達不到我們預期的效果。不過還好NET Framework4.0引入了Task Parallel Library(TPL)實現了基于任務設計而不用處理重復復雜的線程的并行開發框架。它支持數據并行,任務并行與流水線。核心主要是Task,但是一般簡單的并行我們可以利用Parallel提供的靜態類如下三個方法。
Parallel.Invoke 對給定任務實現并行開發
Parallel.For 對固定數目的任務提供循環迭代并行開發
parallel.Foreach 對固定數目的任務提供循環迭代并行開發
注意:所有的并行開發不是簡單的以為只要將For或者Foreach換成Parallel.For與Parallel.Foreach這樣簡單。
PS:從簡單的Invoke開始逐步深入探討并行開發的主要知識點,也對自己學習過程中的積累做個總結,其中參考了博客園中的其他優秀博文
滴答的雨 異步編程:輕量級線程同步基元對象
首先感謝您,在我學習并行開發過程中,您的博文對我幫助很大。
二、Parallel.Invoke在并行中的使用
首先我們來看看它的兩個重載方法:
public static void Invoke(params Action[] actions); public static void Invoke(ParallelOptions parallelOptions, params Action[] actions);
Invoke主要接受params的委托actions,比如我們要同時執行三個任務,我們可以這樣利用
方式一
Parallel.Invoke(() => Task1(), () => Task2(), () => Task3());
方式二
Parallel.Invoke(Task1, Task2, Task3);
方式三
Parallel.Invoke(
() =>
{
Task1();
},
Task2,
delegate () { Task3(); console.write('do someting!');}); 這樣Invoke就簡單實現了Task1,Task2,Task3的并行開發。下面我們用實例來說明他們的執行規則。以及兩個重載方法的使用。
三 、Demo
1、 Demo 1:
public class ParallelInvoke
{
/// <summary>
/// Invoke方式一 action
/// </summary>
public void Client1()
{
Stopwatch stopWatch = new Stopwatch();
Console.WriteLine("主線程:{0}線程ID : {1};開始", "Client1", Thread.CurrentThread.ManagedThreadId);
stopWatch.Start();
Parallel.Invoke(() => Task1("task1"), () => Task2("task2"), () => Task3("task3"));
stopWatch.Stop();
Console.WriteLine("主線程:{0}線程ID : {1};結束,共用時{2}ms", "Client1", Thread.CurrentThread.ManagedThreadId, stopWatch.ElapsedMilliseconds);
}
private void Task1(string data)
{
Thread.Sleep(5000);
Console.WriteLine("任務名:{0}線程ID : {1}", data, Thread.CurrentThread.ManagedThreadId);
}
private void Task2(string data)
{
Console.WriteLine("任務名:{0}線程ID : {1}", data, Thread.CurrentThread.ManagedThreadId);
}
private void Task3(string data)
{
Console.WriteLine("任務名:{0}線程ID : {1}", data, Thread.CurrentThread.ManagedThreadId);
}
} 執行運行后結果:
我們看到Invoke 執行Task三個方法主要有以下幾個特點:
1、沒有固定的順序,每個Task可能是不同的線程去執行,也可能是相同的;
2、主線程必須等Invoke中的所有方法執行完成后返回才繼續向下執行;這樣對我們以后設計并行的時候,要考慮每個Task任務盡可能差不多,如果相差很大,比如一個時間非常長,其他都比較短,這樣一個線程可能會影響整個任務的性能。 這點非常重要
3、這個非常簡單就實現了并行,不用我們考慮線程問題。主要Framework已經為我們控制好線程池的問題。
ps:如果其中有一個異常怎么辦? 帶做這個問題修改了增加了一個Task4.
2、 Demo2
public class ParallelInvoke
{
/// <summary>
/// Invoke方式一 action
/// </summary>
public void Client1()
{
Stopwatch stopWatch = new Stopwatch();
Console.WriteLine("主線程:{0}線程ID : {1};開始", "Client1", Thread.CurrentThread.ManagedThreadId);
stopWatch.Start();
try
{
Parallel.Invoke(() => Task1("task1"), () => Task2("task2"), () => Task3("task3"), delegate () { throw new Exception("我這里發送了異常"); });
}
catch (AggregateException ae)
{
foreach (var ex in ae.InnerExceptions)
Console.WriteLine(ex.Message);
}
stopWatch.Stop();
Console.WriteLine("主線程:{0}線程ID : {1};結束,共用時{2}ms", "Client1", Thread.CurrentThread.ManagedThreadId, stopWatch.ElapsedMilliseconds);
}
} 主要看 delegate() { throw new Exception( " 我這里發送了異常 " );} 增加了這個委托Task3. 然后我們看結果:
這里我們發現即使有異常程序也會完成執行,而且不會影響其他Task的執行。
3、demo3 重載方法ParallelOptions 的使用。
理解 ParallelOptions 建議大家 異步編程:輕量級線程同步基元對象 講的非常詳細。
主要理解兩個參數:
CancellationToken 控制線程的取消
MaxDegreeOfParallelism 設置最大的線程數,有時候可能會跑遍所有的內核,為了提高其他應用程序的穩定性,就要限制參與的內核
下面從代碼上看效果如何?
public class ParallelInvoke
{
// 定義CancellationTokenSource 控制取消
readonly CancellationTokenSource _cts = new CancellationTokenSource();
/// <summary>
/// Invoke方式一 action
/// </summary>
public void Client1()
{
Console.WriteLine("主線程:{0}線程ID : {1};開始{2}", "Client3", Thread.CurrentThread.ManagedThreadId, DateTime.Now);
var po = new ParallelOptions
{
CancellationToken = _cts.Token, // 控制線程取消
MaxDegreeOfParallelism = 3 // 設置最大的線程數3,仔細觀察線程ID變化
};
Parallel.Invoke(po, () => Task1("task1"), ()=>Task5(po), Task6);
Console.WriteLine("主線程:{0}線程ID : {1};結束{2}", "Client3", Thread.CurrentThread.ManagedThreadId, DateTime.Now);
}
private void Task1(string data)
{
Thread.Sleep(5000);
Console.WriteLine("任務名:{0}線程ID : {1}", data, Thread.CurrentThread.ManagedThreadId);
}
// 打印數字
private void Task5(ParallelOptions po)
{
Console.WriteLine("進入Task5線程ID : {0}", Thread.CurrentThread.ManagedThreadId);
int i = 0;
while (i < 100)
{
// 判斷是否已經取消
if (po.CancellationToken.IsCancellationRequested)
{
Console.WriteLine("已經被取消。");
return;
}
Thread.Sleep(100);
Console.Write(i + " ");
Interlocked.Increment(ref i);
}
}
/// <summary>
/// 10秒后取消
/// </summary>
private void Task6()
{
Console.WriteLine("進入取消任務,Task6線程ID : {0}", Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(1000 * 10);
_cts.Cancel();
Console.WriteLine("發起取消請求...........");
}
} 執行結果:
從程序結果我們看到以下特點:
1、程序在執行過程中線程數碼不超過3個。
2、CancellationTokenSource/CancellationToken控制任務的取消。
四、總結
Parallel.Invoke 的使用過程中我們要注意以下特點:
1、沒有特定的順序,Invoke中的方法全部執行完才返回,但是即使有異常在執行過程中也同樣會完成,他只是一個很簡單的并行處理方法,特點就是簡單,不需要我們考慮線程的問題。
2、如果在設計Invoke中有個需要很長時間,這樣會影響整個Invoke的效率和性能,這個我們在設計每個task時候必須去考慮的。
3、Invoke 參數是委托方法。
4、當然Invoke在每次調用都有開銷的,不一定并行一定比串行好,要根據實際情況,內核環境多次測試調優才可以。
5、異常處理比較復雜。