Wednesday, October 13, 2010

Using Parallel.Invoke with dynamic number of Actions

 

The new Task Parallel Library (new in NET 4.0) is an excellent and (relatively) crazy-easy way of improving application performance.   In this article, we show how to use one of the methods in the library (Parallel.Invoke) to simultaneously execute a method with runtime-determined number of threads.

Let us take the case of an Order fullfilment piece that takes orders and submits them for processing.

Step 1: Retrieve the list of approved Orders.

    1         class Order

    2         {

    3             public int OrderID;

    4             public Order(int orderID)

    5             {

    6                 this.OrderID = orderID;

    7             }

    8         }

    9 

   10        static private List<Order> GetApprovedOrders()

   11         {

   12             List<Order> orders;

   13 

   14             orders = new List<Order>();

   15 

   16             for (int i = 0; i < 100; i++)

   17             {

   18                 orders.Add(new Order(i));

   19             }

   20 

   21             return orders;

   22         }

 

Step 2: Divide up the list depending on the number of concurrent tasks.

    1         static private List<List<Order>> DivideUpWork(List<Order> orders, int nThreads)

    2         {

    3             List<List<Order>> list;

    4 

    5             list = new List<List<Order>>();

    6 

    7             for (int i = 0; i < nThreads; i++)

    8             {

    9                 List<Order> sublist = orders.Where((c, j) => j % nThreads == i).ToList<Order>();

   10                 list.Add(sublist);

   11             }

   12 

   13             return list;

   14         }

Step 3: Define the “work” method that will take a list of orders and submit them for processing.   In this sample case, all the “work” is to just the display the OrderID.

    1         static private void ProcessOrders(int threadNumber, List<Order> orders)

    2         {

    3             for (int i = 0; i < orders.Count; i++)

    4             {

    5                 Console.WriteLine(String.Format("{0} of {1}: {2}", threadNumber, orders.Count - 1, orders[i].OrderID));

    6             }

    7         }

Step 4: Now, tie them all and call Parallel.Invoke().   The key here is to “parameterize” each invocation of ProcessOrder().    Since the System.Action delegate does not allow for any parameters, we workaround this in Line 16 using closure.   The assignment in Line 15 ensures that each call to ProcessOrder will be unique. 

    1         static void Main(string[] args)

    2         {

    3             int nThreads;

    4             Action[] actions;

    5             List<Order> orders;

    6             List<List<Order>> ordersByThread;

    7 

    8             nThreads       = 8;

    9             orders         = GetApprovedOrders();

   10             ordersByThread = DivideUpWork(orders, nThreads);

   11 

   12             actions = new Action[nThreads];

   13             for (int i = 0; i < nThreads; i++)

   14             {

   15                 int local_i = i;

   16                 actions[local_i] = () => { ProcessOrders(local_i, ordersByThread[local_i]); };

   17             }

   18 

   19             Parallel.Invoke(actions);

   20         }

Full program is here.

Technorati Tags: ,