Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

TPL Dataflow : How to throttle an entire pipeline?

I want to limit the number of items posted in a Dataflow pipeline. The number of items depends of the production environment. These objects consume a large amount of memory (images) so I would like to post them when the last block of the pipeline has done its job.

I tried to use a SemaphoreSlim to throttle the producer and release it in the last block of the pipeline. It works, but if an exception is raised during the process, the program waits forever and the exception is not intercepted.

Here is a sample which looks like our code. How can I do this ?

static void Main(string[] args)
{
    SemaphoreSlim semaphore = new SemaphoreSlim(1, 2);

    var downloadString = new TransformBlock<string, string>(uri =>
    {
        Console.WriteLine("Downloading '{0}'...", uri);
        return new WebClient().DownloadString(uri);
    });

    var createWordList = new TransformBlock<string, string[]>(text =>
    {
        Console.WriteLine("Creating word list...");

        char[] tokens = text.ToArray();
        for (int i = 0; i < tokens.Length; i++)
        {
            if (!char.IsLetter(tokens[i]))
                tokens[i] = ' ';
        }
        text = new string(tokens);

        return text.Split(new char[] { ' ' },
           StringSplitOptions.RemoveEmptyEntries);
    });

    var filterWordList = new TransformBlock<string[], string[]>(words =>
    {
        Console.WriteLine("Filtering word list...");
        throw new InvalidOperationException("ouch !"); // explicit for test
        return words.Where(word => word.Length > 3).OrderBy(word => word)
           .Distinct().ToArray();
    });

    var findPalindromes = new TransformBlock<string[], string[]>(words =>
    {
        Console.WriteLine("Finding palindromes...");

        var palindromes = new ConcurrentQueue<string>();

        Parallel.ForEach(words, word =>
        {
            string reverse = new string(word.Reverse().ToArray());

            if (Array.BinarySearch<string>(words, reverse) >= 0 &&
                word != reverse)
            {
                palindromes.Enqueue(word);
            }
        });

        return palindromes.ToArray();
    });

    var printPalindrome = new ActionBlock<string[]>(palindromes =>
    {
        try
        {
            foreach (string palindrome in palindromes)
            {
                Console.WriteLine("Found palindrome {0}/{1}",
                   palindrome, new string(palindrome.Reverse().ToArray()));
            }
        }
        finally
        {
            semaphore.Release();
        }
    });

    downloadString.LinkTo(createWordList);
    createWordList.LinkTo(filterWordList);
    filterWordList.LinkTo(findPalindromes);
    findPalindromes.LinkTo(printPalindrome);


    downloadString.Completion.ContinueWith(t =>
    {
        if (t.IsFaulted) 
            ((IDataflowBlock)createWordList).Fault(t.Exception);
        else createWordList.Complete();
    });
    createWordList.Completion.ContinueWith(t =>
    {
        if (t.IsFaulted) 
            ((IDataflowBlock)filterWordList).Fault(t.Exception);
        else filterWordList.Complete();
    });
    filterWordList.Completion.ContinueWith(t =>
    {
        if (t.IsFaulted) 
            ((IDataflowBlock)findPalindromes).Fault(t.Exception); // enter here when an exception throws
        else findPalindromes.Complete();
    });
    findPalindromes.Completion.ContinueWith(t =>
    {
        if (t.IsFaulted)
            ((IDataflowBlock)printPalindrome).Fault(t.Exception); // the fault is propagated here but not catched
        else printPalindrome.Complete();
    });

    try
    {
        for (int i = 0; i < 10; i++)
        {
            Console.WriteLine(i);

            downloadString.Post("http://www.google.com");
            semaphore.Wait(); // waits here when an exception throws
        }

        downloadString.Complete();

        printPalindrome.Completion.Wait();
    }
    catch (AggregateException agg)
    {
        Console.WriteLine("An error has occured : " + agg);
    }
    Console.WriteLine("Done");
    Console.ReadKey();
}
like image 900
n3bula Avatar asked Mar 03 '15 17:03

n3bula


2 Answers

You should simply wait on both the semaphore and the completion task together. In that way if the block ends prematurely (either by exception or cancellation) then the exception will be rethrown and if not then you will wait on your semaphore until there's room to post more.

You can do that with Task.WhenAny and SemaphoreSlim.WaitAsync:

for (int i = 0; i < 10; i++)
{
    Console.WriteLine(i);
    downloadString.Post("http://www.google.com");

    if (printPalindrome.Completion.IsCompleted)
    {
        break;
    }

    Task.WhenAny(semaphore.WaitAsync(), printPalindrome.Completion).Wait();
}

Note: using Task.Wait is only appropriate in this case as it's Main. Usually this should be an async method and you should await the task returned from Task.WhenAny.

like image 107
i3arnon Avatar answered Nov 12 '22 13:11

i3arnon


This is how I handled throttling or only allowing 10 items in the source block at any one time. You could modify this to have 1. Make sure that you also throttle any other blocks in the pipeline, otherwise, you could get the source block with 1 and the next block with a lot more.

var sourceBlock = new BufferBlock<string>(
    new ExecutionDataflowBlockOptions() { 
        SingleProducerConstrained = true, 
        BoundedCapacity = 10 });

Then the producer does this:

sourceBlock.SendAsync("value", shutdownToken).Wait(shutdownToken);

If you're using async / await, just await the SendAsync call.

like image 43
BDig Avatar answered Nov 12 '22 13:11

BDig