-
Code's Tags
-
Your Codes
-
Reffers
-
Linked Codes
|
Code:
Short link for Twitter:
HTML:
HTML view:
Copy Source | Copy HTML- using System;
- using System.Collections.Generic;
- using System.Diagnostics;
- using System.Threading;
- using Wintellect.Threading.AsyncProgModel;
- using AsyncMethod = System.Func<Wintellect.Threading.AsyncProgModel.AsyncEnumerator,
- System.Collections.Generic.IEnumerator<System.Int32>>;
-
- namespace FunWithLambdas
- {
- public class DependencyManager
- {
- private Dictionary<int, List<int>> dependenciesFromTo;
- private ManualResetEvent doneEvent;
- private readonly Dictionary<int, OperationData> operations = new Dictionary<int, OperationData>();
- private int remainingCount;
- private readonly object stateLock = new object();
- public event EventHandler<OperationCompletedEventArgs> OperationCompleted;
-
- public void AddOperation(int id, AsyncMethod operation, params int[] dependencies)
- {
- if (operation == null)
- throw new ArgumentNullException("operation");
- if (dependencies == null)
- throw new ArgumentNullException("dependencies");
-
- var data = new OperationData
- {
- Context = ExecutionContext.Capture(),
- Id = id,
- Operation = operation,
- Dependencies = dependencies
- };
- operations.Add(id, data);
- }
-
- public void Execute()
- {
- VerifyThatAllOperationsHaveBeenRegistered();
- VerifyThereAreNoCycles();
- // Fill dependency data structures
- dependenciesFromTo = new Dictionary<int, List<int>>();
- foreach (var op in operations.Values)
- {
- op.NumRemainingDependencies = op.Dependencies.Length;
-
- foreach (var from in op.Dependencies)
- {
- List<int> toList;
- if (!dependenciesFromTo.TryGetValue(from, out toList))
- {
- toList = new List<int>();
- dependenciesFromTo.Add(from, toList);
- }
- toList.Add(op.Id);
- }
- }
-
- // Launch and wait
- remainingCount = operations.Count;
- using (doneEvent = new ManualResetEvent(false))
- {
- lock (stateLock)
- {
- foreach (var op in operations.Values)
- {
- if (op.NumRemainingDependencies == 0)
- QueueOperation(op);
- }
- }
- doneEvent.WaitOne();
- }
- }
-
-
- private List<int> CreateTopologicalSort()
- {
- // Build up the dependencies graph
- var dependenciesToFrom = new Dictionary<int, List<int>>();
- var dependenciesFromTo = new Dictionary<int, List<int>>();
- foreach (var op in operations.Values)
- {
- dependenciesToFrom.Add(op.Id, new List<int>(op.Dependencies));
- foreach (var depId in op.Dependencies)
- {
- List<int> ids;
- if (!dependenciesFromTo.TryGetValue(depId, out ids))
- {
- ids = new List<int>();
- dependenciesFromTo.Add(depId, ids);
- }
- ids.Add(op.Id);
- }
- }
-
- // Create the sorted list
- var overallPartialOrderingIds = new List<int>(dependenciesToFrom.Count);
- var thisIterationIds = new List<int>(dependenciesToFrom.Count);
- while (dependenciesToFrom.Count > 0)
- {
- thisIterationIds.Clear();
- foreach (var item in dependenciesToFrom)
- {
- // If an item has zero input operations, remove it.
- if (item.Value.Count == 0)
- {
- thisIterationIds.Add(item.Key);
-
- // Remove all outbound edges
- List<int> depIds;
- if (dependenciesFromTo.TryGetValue(item.Key, out depIds))
- {
- foreach (var depId in depIds)
- {
- dependenciesToFrom[depId].Remove(item.Key);
- }
- }
- }
- }
-
- // If nothing was found to remove, there's no valid sort.
- if (thisIterationIds.Count == 0) return null;
-
- // Remove the found items from the dictionary and
- // add them to the overall ordering
- foreach (var id in thisIterationIds) dependenciesToFrom.Remove(id);
- overallPartialOrderingIds.AddRange(thisIterationIds);
- }
-
- return overallPartialOrderingIds;
- }
-
- private void OnOperationCompleted(OperationData data)
- {
- var handler = OperationCompleted;
- if (handler != null)
- handler(this, new OperationCompletedEventArgs(
- data.Id, data.Start, data.End));
- }
-
- private void ProcessOperation(OperationData data)
- {
- //// Time and run the operation's delegate
- data.Start = DateTimeOffset.Now;
-
- AsyncEnumerator ae = new AsyncEnumerator();
- if (data.Context != null)
- {
- ExecutionContext.Run(data.Context.CreateCopy(),
- op => ae.Execute(
- ((OperationData)op).Operation(ae)),
- data);
- }
- else data.Operation(ae);
-
- data.End = DateTimeOffset.Now;
-
-
- // Raise the operation completed event
- OnOperationCompleted(data);
-
- // Signal to all that depend on this operation of its
- // completion, and potentially launch newly available
- lock (stateLock)
- {
- List<int> toList;
- if (dependenciesFromTo.TryGetValue(data.Id, out toList))
- {
- foreach (var targetId in toList)
- {
- OperationData targetData = operations[targetId];
- if (--targetData.NumRemainingDependencies == 0)
- QueueOperation(targetData);
- }
- }
- dependenciesFromTo.Remove(data.Id);
-
-
- if (--remainingCount == 0) doneEvent.Set();
- }
- }
-
- private void QueueOperation(OperationData data)
- {
- ThreadPool.UnsafeQueueUserWorkItem(state =>
- ProcessOperation((OperationData)state), data);
- }
-
- private void VerifyThatAllOperationsHaveBeenRegistered()
- {
- foreach (var op in operations.Values)
- foreach (var dependency in op.Dependencies)
- if (!operations.ContainsKey(dependency))
- {
- throw new InvalidOperationException(
- "Missing operation: " + dependency);
- }
- }
-
- private void VerifyThereAreNoCycles()
- {
- if (CreateTopologicalSort() == null)
- throw new InvalidOperationException("Cycle detected");
- }
-
-
-
-
-
- public class OperationCompletedEventArgs : EventArgs
- {
- internal OperationCompletedEventArgs(
- int id, DateTimeOffset start, DateTimeOffset end)
- {
- Id = id; Start = start; End = end;
- }
- public DateTimeOffset End { get; private set; }
- public int Id { get; private set; }
- public DateTimeOffset Start { get; private set; }
- }
-
- private class OperationData
- {
- internal ExecutionContext Context;
- internal int[] Dependencies;
- internal int Id;
- internal int NumRemainingDependencies;
- internal AsyncMethod Operation;
-
- internal DateTimeOffset Start, End;
- }
- }
-
- class Program
- {
- static IEnumerator<Int32> WasteTime(AsyncEnumerator ae)
- {
- Console.WriteLine("Wasting one second on thread " + Thread.CurrentThread.ManagedThreadId);
- Thread.Sleep(1000);
- yield break;
- }
-
- static void Main()
- {
- var dm = new DependencyManager();
- AsyncMethod op = WasteTime;
- dm.AddOperation(1, op);
- dm.AddOperation(2, op);
- dm.AddOperation(3, op);
- dm.AddOperation(4, op, 1);
- dm.AddOperation(5, op, 1, 2, 3);
- dm.AddOperation(6, op, 3, 4);
- dm.AddOperation(7, op, 5, 6);
- dm.AddOperation(8, op, 5);
-
- Stopwatch st = new Stopwatch();
- st.Start();
- dm.Execute();
- st.Stop();
- Console.WriteLine(st.Elapsed);
- Console.ReadKey();
- }
- }
- }
|