Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Parallel.ForEach using Thread.Sleep equivalent

So here's the situation: I need to make a call to a web site that starts a search. This search continues for an unknown amount of time, and the only way I know if the search has finished is by periodically querying the website to see if there's a "Download Data" link somewhere on it (it uses some strange ajax call on a javascript timer to check the backend and update the page, I think).

So here's the trick: I have hundreds of items I need to search for, one at a time. So I have some code that looks a little bit like this:

var items = getItems();
Parallel.ForEach(items, item =>
{
   startSearch(item);
   var finished = isSearchFinished(item);
   while(finished == false)
   {
      finished = isSearchFinished(item); //<--- How do I delay this action 30 Secs?
   }
   downloadData(item);
}

Now, obviously this isn't the real code, because there could be things that cause isSearchFinished to always be false.

Obvious infinite loop danger aside, how would I correctly keep isSearchFinished() from calling over and over and over, but instead call every, say, 30 seconds or 1 minute?

I know Thread.Sleep() isn't the right solution, and I think the solution might be accomplished by using Threading.Timer() but I'm not very familiar with it, and there are so many threading options that I'm just not sure which to use.

like image 291
Ryan Avatar asked Jun 28 '14 00:06

Ryan


People also ask

Is parallel ForEach multithreaded?

Parallel. ForEach is like the foreach loop in C#, except the foreach loop runs on a single thread and processing take place sequentially, while the Parallel. ForEach loop runs on multiple threads and the processing takes place in a parallel manner.

Does parallel ForEach use ThreadPool?

Parallel. ForEach uses managed thread pool to schedule parallel actions. The number of threads is set by ThreadPool.

What is the alternative for thread sleep?

Alternative: use wait() and notify() Now we have to refactor our test as well. After we start the counter we need to block the thread running the test until the counting is over. Remember, the counting happens in a different thread.

Does task WhenAll run in parallel?

WhenAll() method in . NET Core. This will upload the first file, then the next file. There is no parallelism here, as the “async Task” does not automatically make something run in in parallel.


2 Answers

It's quite easy to implement with tasks and async/await, as noted by @KevinS in the comments:

async Task<ItemData> ProcessItemAsync(Item item)
{
    while (true)
    {
        if (await isSearchFinishedAsync(item))
            break;
        await Task.Delay(30 * 1000);
    }
    return await downloadDataAsync(item);
}

// ...

var items = getItems();
var tasks = items.Select(i => ProcessItemAsync(i)).ToArray();
await Task.WhenAll(tasks);
var data = tasks.Select(t = > t.Result);

This way, you don't block ThreadPool threads in vain for what is mostly a bunch of I/O-bound network operations. If you're not familiar with async/await, the async-await tag wiki might be a good place to start.

I assume you can convert your synchronous methods isSearchFinished and downloadData to asynchronous versions using something like HttpClient for non-blocking HTTP request and returning a Task<>. If you are unable to do so, you still can simply wrap them with Task.Run, as await Task.Run(() => isSearchFinished(item)) and await Task.Run(() => downloadData(item)). Normally this is not recommended, but as you have hundreds of items, it sill would give you a much better level of concurrency than with Parallel.ForEach in this case, because you won't be blocking pool threads for 30s, thanks to asynchronous Task.Delay.

like image 100
noseratio Avatar answered Sep 21 '22 05:09

noseratio


You can also write a generic function using TaskCompletionSource and Threading.Timer to return a Task that becomes complete once a specified retry function succeeds.

public static Task RetryAsync(Func<bool> retryFunc, TimeSpan retryInterval)
{
    return RetryAsync(retryFunc, retryInterval, CancellationToken.None);
}

public static Task RetryAsync(Func<bool> retryFunc, TimeSpan retryInterval, CancellationToken cancellationToken)
{
    var tcs = new TaskCompletionSource<object>();

    cancellationToken.Register(() => tcs.TrySetCanceled());

    var timer = new Timer((state) =>
    {
        var taskCompletionSource = (TaskCompletionSource<object>) state;

        try
        {                   
            if (retryFunc())
            {
                taskCompletionSource.TrySetResult(null);
            }
        }
        catch (Exception ex)
        {
            taskCompletionSource.TrySetException(ex);
        }
    }, tcs, TimeSpan.FromMilliseconds(0), retryInterval);

    // Once the task is complete, dispose of the timer so it doesn't keep firing. Also captures the timer
    // in a closure so it does not get disposed.
    tcs.Task.ContinueWith(t => timer.Dispose(),
                          CancellationToken.None,
                          TaskContinuationOptions.ExecuteSynchronously,
                          TaskScheduler.Default);

    return tcs.Task;
}

You can then use RetryAsync like this:

var searchTasks = new List<Task>();

searchTasks.AddRange(items.Select(
        downloadItem => RetryAsync( () => isSearchFinished(downloadItem),  TimeSpan.FromSeconds(2))  // retry timout
        .ContinueWith(t => downloadData(downloadItem), 
                      CancellationToken.None, 
                      TaskContinuationOptions.OnlyOnRanToCompletion, 
                      TaskScheduler.Default)));

await Task.WhenAll(searchTasks.ToArray());

The ContinueWith part specifies what you do once the task has completed successfully. In this case it will run your downloadData method on a thread pool thread because we specified TaskScheduler.Default and the continuation will only execute if the task ran to completion, i.e. it was not canceled and no exception was thrown.

like image 43
NeddySpaghetti Avatar answered Sep 19 '22 05:09

NeddySpaghetti