Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using Parallel Linq Extensions to union two sequences, how can one yield the fastest results first?

Let's say I have two sequences returning integers 1 to 5.

The first returns 1, 2 and 3 very fast, but 4 and 5 take 200ms each.

public static IEnumerable<int> FastFirst()
{
    for (int i = 1; i < 6; i++)
    {
        if (i > 3) Thread.Sleep(200);
        yield return i;
    }
}

The second returns 1, 2 and 3 with a 200ms delay, but 4 and 5 are returned fast.

public static IEnumerable<int> SlowFirst()
{
    for (int i = 1; i < 6; i++)
    {
        if (i < 4) Thread.Sleep(200);
        yield return i;
    }
}

Unioning both these sequences give me just numbers 1 to 5.

FastFirst().Union(SlowFirst());

I cannot guarantee which of the two methods has delays at what point, so the order of the execution cannot guarantee a solution for me. Therefore, I would like to parallelise the union, in order to minimise the (artifical) delay in my example.

A real-world scenario: I have a cache that returns some entities, and a datasource that returns all entities. I'd like to be able to return an iterator from a method that internally parallelises the request to both the cache and the datasource so that the cached results yield as fast as possible.

Note 1: I realise this is still wasting CPU cycles; I'm not asking how can I prevent the sequences from iterating over their slow elements, just how I can union them as fast as possible.

Update 1: I've tailored achitaka-san's great response to accept multiple producers, and to use ContinueWhenAll to set the BlockingCollection's CompleteAdding just the once. I just put it here since it would get lost in the lack of comments formatting. Any further feedback would be great!

public static IEnumerable<TResult> SelectAsync<TResult>(
    params IEnumerable<TResult>[] producer)
{
    var resultsQueue = new BlockingCollection<TResult>();

    var taskList = new HashSet<Task>();
    foreach (var result in producer)
    {
        taskList.Add(
            Task.Factory.StartNew(
                () =>
                    {
                        foreach (var product in result)
                        {
                            resultsQueue.Add(product);
                        }
                    }));
    }

    Task.Factory.ContinueWhenAll(taskList.ToArray(), x => resultsQueue.CompleteAdding());

    return resultsQueue.GetConsumingEnumerable();
}
like image 703
Alex Norcliffe Avatar asked Nov 09 '11 13:11

Alex Norcliffe


1 Answers

Take a look at this. The first method just returns everything in order results come. The second checks uniqueness. If you chain them you will get the result you want I think.

public static class Class1
{
    public static IEnumerable<TResult> SelectAsync<TResult>(
        IEnumerable<TResult> producer1,
        IEnumerable<TResult> producer2,
        int capacity)
    {
        var resultsQueue = new BlockingCollection<TResult>(capacity);
        var producer1Done = false;
        var producer2Done = false;

        Task.Factory.StartNew(() =>
        {
            foreach (var product in producer1)
            {
                resultsQueue.Add(product);
            }
            producer1Done = true;
            if (producer1Done && producer2Done) { resultsQueue.CompleteAdding(); }
        });

        Task.Factory.StartNew(() =>
        {
            foreach (var product in producer2)
            {
                resultsQueue.Add(product);
            }
            producer2Done = true;
            if (producer1Done && producer2Done) { resultsQueue.CompleteAdding(); }
        });

        return resultsQueue.GetConsumingEnumerable();
    }


    public static IEnumerable<TResult> SelectAsyncUnique<TResult>(this IEnumerable<TResult> source)
    {
        HashSet<TResult> knownResults = new HashSet<TResult>();
        foreach (TResult result in source)
        {
            if (knownResults.Contains(result)) {continue;}
            knownResults.Add(result);
            yield return result;
        }
    }
}
like image 162
George Mamaladze Avatar answered Sep 30 '22 17:09

George Mamaladze