Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Multiple async-await chaining inside Parallel.ForEach

I have a Parallel.ForEach loop which loops through a collection. Inside, the loop I make multiple network I/O calls. I used Task.ContinueWith and nested the subsequent async-await calls. The order of the processing doesn't matter, but the data from each async calls should be processed in a synchronized way. Meaning- For each iteration, the data retrieved from the first async call should get passed to the second async call. After the second async call finishes, the data from both the async call should be processed together.

Parallel.ForEach(someCollection, parallelOptions, async (item, state) =>
{
    Task<Country> countryTask = Task.Run(() => GetCountry(item.ID));

    //this is my first async call
    await countryTask.ContinueWith((countryData) =>
    {
        countries.Add(countryData.Result);

        Task<State> stateTask = Task.Run(() => GetState(countryData.Result.CountryID));

        //based on the data I receive in 'stateTask', I make another async call
        stateTask.ContinueWith((stateData) =>
        {
            states.Add(stateData.Result);

            // use data from both the async calls pass it to below function for some calculation
            // in a synchronized way (for a country, its corresponding state should be passed)

            myCollection.ConcurrentAddRange(SomeCalculation(countryData.Result, stateData.Result));
        });
    });
});

I tried the above without using continue await but it was not working in synchronized way. Now, the above code executes to completion but no records gets processed.

Any help with this please? Let me know if I can add more details.

like image 551
Display name Avatar asked Dec 13 '22 08:12

Display name


2 Answers

As your methods involve I/O, they should be written to be truly asynchronous, not just synchronously ran on the threadpool using Task.Run.

Then you could use Task.WhenAll in combination with Enumerable.Select:

var tasks = someCollection.Select(async item =>
{
    var country = await GetCountryAsync(item.Id);
    var state = await GetStateAsync(country.CountryID);
    var calculation = SomeCalculation(country, state);

    return (country, state, calculation);
});

foreach (var tuple in await Task.WhenAll(tasks))
{
    countries.Add(tuple.country);
    states.Add(tuple.state);
    myCollection.AddRange(tuple.calculation);
}

This would ensure that each country > state > calculation occurs sequentially, but each item is processed concurrently, and asynchronously.


Update as per comment

using var semaphore = new SemaphoreSlim(2);
using var cts = new CancellationTokenSource();

int failures = 0;

var tasks = someCollection.Select(async item =>
{
    await semaphore.WaitAsync(cts.Token);
    
    try
    {
        var country = await GetCountryAsync(item.Id);
        var state = await GetStateAsync(country.CountryID);
        var calculation = SomeCalculation(country, state);

        Interlocked.Exchange(ref failures, 0);

        return (country, state, calculation);
    {
    catch
    {
        if (Interlocked.Increment(ref failures) >= 10)
        {
            cts.Cancel();
        }
        throw;
    }
    finally
    {
        semaphore.Release();
    }
});

The semaphore ensures a maximum of 2 concurrent async operations, and the cancellation token will cancel all outstanding tasks after 10 consecutive exceptions.

The Interlocked methods ensures that failures is accessed in a thread-safe manner.


Further Update

It may be even more efficient to use 2 semaphores to prevent multiple iterations.

Encapsulate all the list-adding into a single method:

void AddToLists(Country country, State state, Calculation calculation)
{
    countries.Add(country);
    states.Add(state);
    myCollection.AddRange(calculation);
}

Then you could allow 2 threads to simultaneously serve the Http requests, and 1 to perform the adds, making that operation thread-safe:

using var httpSemaphore = new SemaphoreSlim(2);
using var listAddSemaphore = new SemaphoreSlim(1);
using var cts = new CancellationTokenSource();

int failures = 0;

await Task.WhenAll(someCollection.Select(async item =>
{
    await httpSemaphore.WaitAsync(cts.Token);
    
    try
    {
        var country = await GetCountryAsync(item.Id);
        var state = await GetStateAsync(country.CountryID);
        var calculation = SomeCalculation(country, state);

        await listAddSemaphore.WaitAsync(cts.Token);
        AddToLists(country, state, calculation);

        Interlocked.Exchange(ref failures, 0);
    {
    catch
    {
        if (Interlocked.Increment(ref failures) >= 10)
        {
            cts.Cancel();
        }
        throw;
    }
    finally
    {
        httpSemaphore.Release();
        listAddSemaphore.Release();
    }
}));
like image 60
Johnathan Barclay Avatar answered Dec 15 '22 22:12

Johnathan Barclay


I think you're over-complicating this; inside the Parallel.ForEach, you're already on the thread pool, so there is really no benefit creating lots of additional tasks inside. So; how to do this really depends on whether GetState etc are synchronous or asynchronous. If we assume synchronous, then something like:

Parallel.ForEach(someCollection, parallelOptions, (item, _) =>
{
    var country = GetCountry(item.Id);

    countries.Add(country); // warning: may need to synchronize

    var state = GetState(country.CountryID);

    states.Add(state); // warning: may need to synchronize

    // use data from both the async calls pass it to below function for some calculation
    // in a synchronized way (for a country, its corresponding state should be passed)
    myCollection.ConcurrentAddRange(SomeCalculation(country, state));
});

If they are async, it gets more awkward; it would be nice if we could do something like:

// WARNING: DANGEROUS CODE - DO NOT COPY
Parallel.ForEach(someCollection, parallelOptions, async (item, _) =>
{
    var country = await GetCountryAsync(item.Id);

    countries.Add(country); // warning: may need to synchronize

    var state = await GetStateAsync(country.CountryID);

    states.Add(state); // warning: may need to synchronize

    // use data from both the async calls pass it to below function for some calculation
    // in a synchronized way (for a country, its corresponding state should be passed)
    myCollection.ConcurrentAddRange(SomeCalculation(country, state));
});

but the problem here is that none of the callbacks in Parallel.ForEach are "awaitable", meaning: we have silently created an async void callback here, which is : very bad. This means that Parallel.ForEach will think it has "finished" as soon as the non-complete await happens, which means:

  1. we have no clue when all the work has actually finished
  2. you could be doing a lot more concurrently than you intended (max-dop can not be respected)

There doesn't seem to be any good API to avoid this currently.

like image 24
Marc Gravell Avatar answered Dec 15 '22 20:12

Marc Gravell