BLOG

Aggregating WCF Service Results Asynchronously

30 September 2009 by Stuart Cam

I had a requirement recently to call multiple WCF endpoints and aggregate the results into a single list for further processing. The logical flow was similar to the following:

Logical flow

The WCF endpoints would take a reasonable length of time to return the results, so instead of executing the service calls in series I decided to create a new thread for each service call and return the aggregated list when all methods had completed.

The major pain point in any multithreaded code is managing access to shared resources between threads. If we create a temporary list for our results and insert new items we still need to manage access across multiple threads. Unfortunately, List<T> is not thread safe for modification, so we have to manage inserts through some kind of synchronisation mechanism - luckily we can use the lock(object){} syntax.

Since there are several places in the application where we'll need to do this type of aggregation I wrote some helper classes:

public abstract class MultiThreadedHelperWithBlock<TInput, TOutput>
{
    private readonly IList<Func<TInput, TOutput>> _functions = new List<Func<TInput, TOutput>>();
    private readonly object _mutex = new object();

    protected MultiThreadedHelperWithBlock<TInput, TOutput> ASyncMethod(Func<TInput, TOutput> function)
    {
        _functions.Add(function);
        return this;
    }

    protected void RunAsyncWithLockAndBlock(TInput input, Action<TOutput> action)
    {
        if(_functions.Count == 0)
            throw new Exception("Need to define at least one function to call");

        if(_functions.Count == 1)
        {
            action(_functions[0].Invoke(input));
            return;
        }

        var events = new ManualResetEvent[_functions.Count];
        for (var i = 0; i < _functions.Count; i++)
        {
            var index = i;
            var func = _functions[index];
            events[index] = new ManualResetEvent(false);
            var bw = new BackgroundWorker
            {
                WorkerReportsProgress = false,
                WorkerSupportsCancellation = false,
            };
            bw.DoWork += (sender, args) => args.Result = func.Invoke((TInput)args.Argument);
            bw.RunWorkerCompleted += (sender, args) =>
            {
                var result = (TOutput)args.Result;
                lock (_mutex)
                {
                    action(result);
                }
                events[index].Set();
            };
            bw.RunWorkerAsync(input);
        }
        WaitHandle.WaitAll(events);
    }
}

public class MultiThreadedListCombiner<TInput, TOutput> : MultiThreadedHelperWithBlock<TInput, IEnumerable<TOutput>>
{
    public new MultiThreadedListCombiner<TInput, TOutput> ASyncMethod(Func<TInput, IEnumerable<TOutput>> function)
    {
        return base.ASyncMethod(function) as MultiThreadedListCombiner<TInput, TOutput>;
    }

    public IEnumerable<TOutput> GetResults(TInput input)
    {
        var contents = new List<TOutput>();
        RunAsyncWithLockAndBlock(input, list =>
        {
            foreach (var item in list)
            {
                contents.Add(item);
            }
        });
        return contents;
    }
}

public class MultiThreadedServiceCall<TInterface, TInput, TOutput> : MultiThreadedHelperWithBlock<TInput, TOutput>
{
    private Func<TInterface, TInput, TOutput> _functionToCall;

    public MultiThreadedServiceCall<TInterface, TInput, TOutput> AttachService(TInterface service)
    {
        return ASyncMethod(request => _functionToCall.Invoke(service, request)) as MultiThreadedServiceCall<TInterface, TInput, TOutput>;
    }

    public MultiThreadedServiceCall<TInterface, TInput, TOutput> CallMethod(Func<TInterface, TInput, TOutput> functionToCall)
    {
        if (_functionToCall != null)
            throw new ArgumentException("CallMethod() has already been defined", "functionToCall");
        _functionToCall = functionToCall;
        return this;
    }

    public IEnumerable<TOutput> GetResults(TInput input)
    {
        if (_functionToCall == null) throw new NullReferenceException("Need to define CallMethod()");
        var contents = new List<TOutput>();
        RunAsyncWithLockAndBlock(input, contents.Add);
        return contents;
    }
}

OK, so the above code looks a little daunting - generics, closures and lambda expressions everywhere, but essentially all of the hard-work (such as thread spawning and synchronisation) is handled by MultiThreadedHelperWithBlock<>. The other two classes just inherit from this to perform different tasks.

The Enterprise Integration Patterns website has an interesting case study on the Loan Broker Ecosystem. Notice how the Loan broker calls multiple recipients and aggregates the results? I thought it'd be interesting to demonstrate how the above code can be used for this type of task.

Firstly we need a service interface contract, some data transfer objects and some service implementations:

// Service Contract
public interface ILoanService
{
    LoanResponse GetResponse(LoanRequest request);
}

// Data Transfer Objects
public struct LoanRequest
{
    public double Amount;
    public double Term;
}

public struct LoanResponse
{
    public string Lender;
    public double MoneyRepayment;
}

// Service Implementations
public class MoneyBagsLender : ILoanService
{
    public LoanResponse GetResponse(LoanRequest request)
    {
        Thread.Sleep(3500);
        Console.WriteLine("MoneyBags result returned");
        var repayment = (request.Amount / request.Term) * 1.05;
        return new LoanResponse { Lender = "MoneyBags", MoneyRepayment = repayment };
    }
}

public class RegularLender : ILoanService
{
    public LoanResponse GetResponse(LoanRequest request)
    {
        Thread.Sleep(5000);
        Console.WriteLine("RegularLender result returned");
        var repayment = (request.Amount / request.Term) * 1.2;
        return new LoanResponse { Lender = "Regular", MoneyRepayment = repayment };
    }
}

public class ShrewdLender : ILoanService
{
    public LoanResponse GetResponse(LoanRequest request)
    {
        Thread.Sleep(2000);
        Console.WriteLine("ShrewdLender result returned");
        var repayment = (request.Amount / request.Term) * 1.5;
        return new LoanResponse { Lender = "Shrewd", MoneyRepayment = repayment };
    }
}

The service implementations (ShrewdLender,RegularLender and MoneyBagsLender) will simulate "busy work" by just sleeping the threads for a certain amount of time.

Let's use the MultiThreadedServiceCall<> class to call these three services and return the list of LoanResponses:

// Create the Loan Request
var loanRequest = new LoanRequest { Amount = 10000, Term = 30 };

// Return an IEnumerable<LoanResponse>
var loanResponses = new MultiThreadedServiceCall<ILoanService, LoanRequest, LoanResponse>()
			    .CallMethod((service, request) => service.GetResponse(request))
			    .AttachService(new MoneyBagsLender())
			    .AttachService(new ShrewdLender())
			    .AttachService(new RegularLender())
			    .GetResults(loanRequest)
			    .OrderByAscending(response => response.MoneyRepayment);

// Display the results
foreach (var response in loanResponses)
{
	Console.WriteLine("Monthly repayment with lender '{0}' =  ${1}", response.Lender, response.MoneyRepayment);
}

This gives us the following output - the results take some time to be returned, but as soon as they are all completed the IEnumerable becomes available for further processing:

Console Output

Some Notes:

  • No error checking!
  • There may be some performance implications using the BackgroundWorker class to spawn threads - ideally we'd want to use a ThreadPool.
  • Each thread can support up to 64 WaitHandles (ManualResetEvents are built on these), although if you need to call more than 64 services you've probably got severe architectural issues.
  • .CallMethod((service, request) => service.GetResponse(request)) instructs the class to call the function GetResponse(LoanRequest request) on each service.
  • The input and output types are strongly typed by the generic type arguments.
  • .GetResults(loanRequest) returns an IEnumerable, so we are able to method chain LINQ queries.

The 3 type arguments to MultiThreadedServiceCall<> specify:

  1. The interface that each service implements.
  2. The input argument type for the specified method in .CallMethod(...).
  3. The output argument type for the specified method in .CallMethod(...).

Download the Program.cs (6.37 kb)

Tags: , , ,

Categories: .NET | C Sharp


© Codebrain 2014. All Rights Reserved. Registered in England: 07744920. VAT: GB 119 4078 13