I have a Parallel.ForEach loop which loops through a collection. Inside, the loop I make multiple network I/O calls. I used Task.ContinueWith and nested the subsequent async-await calls. The order of the processing doesn't matter, but the data from each async calls should be processed in a synchronized way. Meaning- For each iteration, the data retrieved from the first async call should get passed to the second async call. After the second async call finishes, the data from both the async call should be processed together.
Parallel.ForEach(someCollection, parallelOptions, async (item, state) =>
{
Task<Country> countryTask = Task.Run(() => GetCountry(item.ID));
//this is my first async call
await countryTask.ContinueWith((countryData) =>
{
countries.Add(countryData.Result);
Task<State> stateTask = Task.Run(() => GetState(countryData.Result.CountryID));
//based on the data I receive in 'stateTask', I make another async call
stateTask.ContinueWith((stateData) =>
{
states.Add(stateData.Result);
// use data from both the async calls pass it to below function for some calculation
// in a synchronized way (for a country, its corresponding state should be passed)
myCollection.ConcurrentAddRange(SomeCalculation(countryData.Result, stateData.Result));
});
});
});
I tried the above without using continue await but it was not working in synchronized way. Now, the above code executes to completion but no records gets processed.
Any help with this please? Let me know if I can add more details.
As your methods involve I/O, they should be written to be truly asynchronous, not just synchronously ran on the threadpool using Task.Run
.
Then you could use Task.WhenAll
in combination with Enumerable.Select
:
var tasks = someCollection.Select(async item =>
{
var country = await GetCountryAsync(item.Id);
var state = await GetStateAsync(country.CountryID);
var calculation = SomeCalculation(country, state);
return (country, state, calculation);
});
foreach (var tuple in await Task.WhenAll(tasks))
{
countries.Add(tuple.country);
states.Add(tuple.state);
myCollection.AddRange(tuple.calculation);
}
This would ensure that each country
> state
> calculation
occurs sequentially, but each item
is processed concurrently, and asynchronously.
Update as per comment
using var semaphore = new SemaphoreSlim(2);
using var cts = new CancellationTokenSource();
int failures = 0;
var tasks = someCollection.Select(async item =>
{
await semaphore.WaitAsync(cts.Token);
try
{
var country = await GetCountryAsync(item.Id);
var state = await GetStateAsync(country.CountryID);
var calculation = SomeCalculation(country, state);
Interlocked.Exchange(ref failures, 0);
return (country, state, calculation);
{
catch
{
if (Interlocked.Increment(ref failures) >= 10)
{
cts.Cancel();
}
throw;
}
finally
{
semaphore.Release();
}
});
The semaphore ensures a maximum of 2 concurrent async operations, and the cancellation token will cancel all outstanding tasks after 10 consecutive exceptions.
The Interlocked
methods ensures that failures
is accessed in a thread-safe manner.
Further Update
It may be even more efficient to use 2 semaphores to prevent multiple iterations.
Encapsulate all the list-adding into a single method:
void AddToLists(Country country, State state, Calculation calculation)
{
countries.Add(country);
states.Add(state);
myCollection.AddRange(calculation);
}
Then you could allow 2 threads to simultaneously serve the Http requests, and 1 to perform the adds, making that operation thread-safe:
using var httpSemaphore = new SemaphoreSlim(2);
using var listAddSemaphore = new SemaphoreSlim(1);
using var cts = new CancellationTokenSource();
int failures = 0;
await Task.WhenAll(someCollection.Select(async item =>
{
await httpSemaphore.WaitAsync(cts.Token);
try
{
var country = await GetCountryAsync(item.Id);
var state = await GetStateAsync(country.CountryID);
var calculation = SomeCalculation(country, state);
await listAddSemaphore.WaitAsync(cts.Token);
AddToLists(country, state, calculation);
Interlocked.Exchange(ref failures, 0);
{
catch
{
if (Interlocked.Increment(ref failures) >= 10)
{
cts.Cancel();
}
throw;
}
finally
{
httpSemaphore.Release();
listAddSemaphore.Release();
}
}));
I think you're over-complicating this; inside the Parallel.ForEach
, you're already on the thread pool, so there is really no benefit creating lots of additional tasks inside. So; how to do this really depends on whether GetState
etc are synchronous or asynchronous. If we assume synchronous, then something like:
Parallel.ForEach(someCollection, parallelOptions, (item, _) =>
{
var country = GetCountry(item.Id);
countries.Add(country); // warning: may need to synchronize
var state = GetState(country.CountryID);
states.Add(state); // warning: may need to synchronize
// use data from both the async calls pass it to below function for some calculation
// in a synchronized way (for a country, its corresponding state should be passed)
myCollection.ConcurrentAddRange(SomeCalculation(country, state));
});
If they are async, it gets more awkward; it would be nice if we could do something like:
// WARNING: DANGEROUS CODE - DO NOT COPY
Parallel.ForEach(someCollection, parallelOptions, async (item, _) =>
{
var country = await GetCountryAsync(item.Id);
countries.Add(country); // warning: may need to synchronize
var state = await GetStateAsync(country.CountryID);
states.Add(state); // warning: may need to synchronize
// use data from both the async calls pass it to below function for some calculation
// in a synchronized way (for a country, its corresponding state should be passed)
myCollection.ConcurrentAddRange(SomeCalculation(country, state));
});
but the problem here is that none of the callbacks in Parallel.ForEach
are "awaitable", meaning: we have silently created an async void
callback here, which is : very bad. This means that Parallel.ForEach
will think it has "finished" as soon as the non-complete await
happens, which means:
There doesn't seem to be any good API to avoid this currently.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With