It's a little hard for me to understand the actual behavior in this scenario. What is actually happening to not execute the task when expected but later when SemaphoreSlim has been disposed? It throws the following exception-
System.ObjectDisposedException {"The semaphore has been disposed."}
I have a class library like -
public class ParallelProcessor
{
private Action[] actions;
private int maxConcurrency;
public ParallelProcessor(Action[] actionList, int maxConcurrency)
{
this.actions = actionList;
this.maxConcurrency = maxConcurrency;
}
public void RunAllActions()
{
if (Utility.IsNullOrEmpty<Action>(actions))
throw new Exception("No Action Found!");
using (SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
{
foreach (Action action in actions)
{
Task.Factory.StartNew(() =>
{
concurrencySemaphore.Wait();
try
{
action();
}
finally
{
concurrencySemaphore.Release();
}
});
}
}
}
}
And I'm using it like-
class Program
{
static void Main(string[] args)
{
int maxConcurrency = 3;
Action[] actions = new Action[] { () => Console.WriteLine(1), () => Console.WriteLine(2), () => Console.WriteLine(3) }; //Array.Empty<Action>();
ParallelProcessor processor = new ParallelProcessor(actions, maxConcurrency);
processor.RunAllActions();
Console.ReadLine();
}
}
Could anybody please shed some light on this? Thanks in advance.
The problem is your using
statement. This is how things are happening:
Options:
using
statement (so you don't dispose of the semaphore, but that's unlikely to be a problem unless you're using this really heavily)using
statement) until all the tasks have completed, e.g. by using Parallel.ForEach
instead of calling Task.Factory.StartNew
directlyYour semaphore is disposed at the end of the using
block, but used by the still running Task created inside it.
I would recommend moving the semaphore up to the class level:
public class ParallelProcessor
{
private Action[] actions;
private SemaphoreSlim concurrencySemaphore;
public ParallelProcessor(Action[] actionList, int maxConcurrency)
{
this.actions = actionList;
concurrencySemaphore = new SemaphoreSlim(maxConcurrency);
}
public void RunAllActions()
{
if (Utility.IsNullOrEmpty<Action>(actions))
throw new Exception("No Action Found!");
foreach (Action action in actions)
{
Task.Factory.StartNew(() =>
{
concurrencySemaphore.Wait();
try
{
action();
}
finally
{
concurrencySemaphore.Release();
}
});
}
}
}
or an alternative approach, where RunAllActions
will block until all are done:
public class ParallelProcessor
{
private Action[] actions;
private int maxConcurrency;
public ParallelProcessor(Action[] actionList, int maxConcurrency)
{
this.actions = actionList;
this.maxConcurrency = maxConcurrency;
}
public void RunAllActions()
{
if (Utility.IsNullOrEmpty<Action>(actions))
throw new Exception("No Action Found!");
using (var concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
{
Task.WaitAll(actions.Select(a => Task.Run(() =>
{
concurrencySemaphore.Wait();
try { a(); }
finally { concurrencySemaphore.Release(); }
})).ToArray());
}
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With