Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Task Parallel Library - Custom Task Schedulers

I have a requirement to fire off web service requests to an online api and I thought that Parallel Extensions would be a good fit for my needs.

The web service in question is designed to be called repeatedly, but has a mechanism that charges you if you got over a certain number of calls per second. I obviously want to minimize my charges and so was wondering if anyone has seen a TaskScheduler that can cope with the following requirements:

  1. Limit the number of tasks scheduled per timespan. I guess if the number of requests exceeded this limit then it would need to throw away the task or possibly block? (to stop a back log of tasks)
  2. Detect if the same request is already in the scheduler to be executed but hasn't been yet and if so not queue the second task but return the first instead.

Do people feel that these are the sorts of responsibilities a task scheduler should be dealing with or am i barking up the wrong tree? If you have alternatives I am open to suggestions.

like image 544
Fen Avatar asked Mar 20 '12 21:03

Fen


People also ask

What is the difference between task run and task factory StartNew?

Task. Run(action) internally uses the default TaskScheduler , which means it always offloads a task to the thread pool. StartNew(action) , on the other hand, uses the scheduler of the current thread which may not use thread pool at all!

Does task WhenAll run in parallel?

WhenAll() method in . NET Core. This will upload the first file, then the next file. There is no parallelism here, as the “async Task” does not automatically make something run in in parallel.

What is TPL in programming?

The Task Parallel Library (TPL) is a set of public types and APIs in the System. Threading and System. Threading. Tasks namespaces. The purpose of the TPL is to make developers more productive by simplifying the process of adding parallelism and concurrency to applications.

Which of the following is not handled by the Task Parallel library TPL?

Task parallel library does not handles the race conditions by default.


2 Answers

I agree with others that TPL Dataflow sounds like a good solution for this.

To limit the processing, you could create a TransformBlock that doesn't actually transform the data in any way, it just delays it if it arrived too soon after the previous data:

static IPropagatorBlock<T, T> CreateDelayBlock<T>(TimeSpan delay)
{
    DateTime lastItem = DateTime.MinValue;
    return new TransformBlock<T, T>(
        async x =>
                {
                    var waitTime = lastItem + delay - DateTime.UtcNow;
                    if (waitTime > TimeSpan.Zero)
                        await Task.Delay(waitTime);

                    lastItem = DateTime.UtcNow;

                    return x;
                },
        new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
}

Then create a method that produces the data (for example integers starting from 0):

static async Task Producer(ITargetBlock<int> target)
{
    int i = 0;
    while (await target.SendAsync(i))
        i++;
}

It's written asynchronously, so that if the target block isn't able to process the items right now, it will wait.

Then write a consumer method:

static void Consumer(int i)
{
    Console.WriteLine(i);
}

And finally, link it all together and start it up:

var delayBlock = CreateDelayBlock<int>(TimeSpan.FromMilliseconds(500));

var consumerBlock = new ActionBlock<int>(
    (Action<int>)Consumer,
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

delayBlock.LinkTo(consumerBlock, new DataflowLinkOptions { PropagateCompletion = true });

Task.WaitAll(Producer(delayBlock), consumerBlock.Completion);

Here, delayBlock will accept at most one item every 500 ms and the Consumer() method can run multiple times in parallel. To finish processing, call delayBlock.Complete().

If you want to add some caching per your #2, you could create another TransformBlock do the work there and link it to the other blocks.

like image 111
svick Avatar answered Sep 29 '22 08:09

svick


Honestly I would work at a higher level of abstraction and use the TPL Dataflow API for this. The only catch is you would need to write a custom block that will throttle the requests at the rate at which you need because, by default, blocks are "greedy" and will just process as fast as possible. The implementation would be something like this:

  1. Start with a BufferBlock<T> which is the logical block that you would post to.
  2. Link the BufferBlock<T> to a custom block which has the knowledge of requests/sec and throttling logic.
  3. Link the custom block from 2 to to your ActionBlock<T>.

I don't have the time to write the custom block for #2 right this second, but I will check back later and try to fill in an implementation for you if you haven't already figured it out.

like image 42
Drew Marsh Avatar answered Sep 29 '22 06:09

Drew Marsh