微软工作流,基于微软并行计算的工作流解决方案 (4) Cancellation, MaxDegreeOfParallelism 和CacheMetadata

上面的列子我们实现了基于System.Threading.Tasks下面的组件封装的异步并行活动AsyncParallelActivity。 为了完善这个例子我们还要实现一些并行工作流活动的基本功能
1) 设定MaxDegreeOfParallelism (并行数目控制)
2) 支持Cancellation (子任务取消)
3) 实现CacheMetaData (性能优化)
我们先解释一下为什么需要Asynchronous Programming Model (APM)模式来实现并行,简单理解主要原因是同步执行会在同一个线程里面执行(InProcess),可以说.net framework中的异步编程都是利用IAsyncResult来实现的。(异步编程和多线程的关系是什么呢?请大家思考)
1 MaxDegreeOfParallelism
System.Threading.Tasks.ParallelOptions类型封装了一些关于并行任务的基本配置信息,它主要包含有CancellationToken, MaxDegreeOfParallelism, TaskScheduler:
CancellationToken主要用于实现取消某个并发任务 (task cancellation),
MaxDegreeOfParallelism用于控制并发执行的最多任务,主要用于性能和线程资源控制
TaskScheduler用于控制子任务的执行次序,我们可以利用此类型做自己定制的Scheduler,参见
http://www.codeguru.com/csharp/article.php/c18931
首先我们来实现MaxDegreeOfParallelism,很简单,就在我们的AsyncParallelActivity类里面增加一个属性MaxDegreeOfParallelism,然后传递给Task Factory就可以了。根据MDSN的说法, “The exception that is thrown when this MaxDegreeOfParallelism is set to 0 or some value less than -1.”这个值不能设置成0,或者负数。我们默认把它设为0.
[RequiredArgument] [DefaultValue(0)] public InArgument MaxDegreeOfParallelism { get; set; }
我们要修改一下我们的AsyncParallelActivity的BeginExecute方法,让它能接收ParallelOptions参数。
同时,对于我们上面的例子,我们拖拽了2个AsyncParallelActivity实例,但是它们的区别仅仅是执行的方法不同,我们要修改Function参数让它能接收一组Action.
这样我们就抛弃了微软的WF中的Parallel Activity,利用我们的AsyncParallelActivity实现对并行任务的封装。
修改后的代码为:
[RequiredArgument] public InArgument[]> Functions { get; set; } [RequiredArgument] [DefaultValue(0)] public InArgument MaxDegreeOfParallelism { get; set; }
protected override IAsyncResult BeginExecute(AsyncCodeActivityContext context, AsyncCallback callback, object state) { var data = Data.Get(context); var functions = Functions.Get(context); if (data == null) throw new ArgumentNullException("Data"); if (functions == null) throw new ArgumentException("Functions"); ParallelOptions options = new ParallelOptions(); options.MaxDegreeOfParallelism = MaxDegreeOfParallelism.Get(context); // use Task to wrap the action System.Threading.Tasks.Task parallel = Task.Factory.StartNew((taskState) => { try { Parallel.ForEach>(functions,options, singleAction => singleAction(data)); } catch (OperationCanceledException) { //Exception Handler here, using Enterprise Library Exception Block... } }, state); //force to call the EndExecute parallel.ContinueWith((task) => { callback.Invoke(task); }); return parallel; }
我们修改一下OrderService代码,增加一些方法,用于测试MaxDegreeOfParallelism.
public class OrderService { public void DoPartReplication(Order order) { order.PartNumber = "102324"; Thread.Sleep(2000); System.Console.WriteLine("DoPartReplication done"); } public void DoVehicleReplication(Order order) { order.VehicleNumber = "V0123"; Thread.Sleep(2000); System.Console.WriteLine("DoVehicleReplication done"); } public void DoCustomerReplication(Order order) { order.Customer = "Customer123"; Thread.Sleep(2000); System.Console.WriteLine("DoCustomerReplication done"); } }
我们重新定义我们的主活动CloseOrderWorkflow,抛弃微软的WF中的Parallel Activity,完全利用我们自己定制的AsyncParallelActivity,见下图
基于微软并行计算的工作流解决方案 (4) Cancellation, MaxDegreeOfParallelism 和CacheMetadata微软工作流
基于微软并行计算的工作流解决方案 (4) Cancellation, MaxDegreeOfParallelism 和CacheMetadata微软工作流
其中的Functions设置为:
New Action(Of Order)() {AddressOf OrderService.DoPartReplication, AddressOf OrderService.DoVehicleReplication, AddressOf OrderService.DoCustomerReplication}
它绑定了三个业务方法。
测试1)设置MaxDegreeOfParallelism=4,执行结果为:
基于微软并行计算的工作流解决方案 (4) Cancellation, MaxDegreeOfParallelism 和CacheMetadata微软工作流
总共执行时间为2秒,三个业务方法都执行到了。
测试2)设置MaxDegreeOfParallelism=2,执行结果为:
基于微软并行计算的工作流解决方案 (4) Cancellation, MaxDegreeOfParallelism 和CacheMetadata微软工作流
执行时间为4秒,因为我们设置了最大并发为2,第三个业务方法只有等待了。测试结果证明MaxDegreeOfParallelism有效。
测试代码在这里。
WorkflowConsole_chapter_4.zip
2 Cancellation (待续)
Tags:  什么是并行计算 并行计算导论 并行计算 工作流解决方案 微软工作流

延伸阅读

最新评论

发表评论