I have this function which checks for proxy servers and currently it checks only a number of threads and waits for all to finish until the next set is starting. Is it possible to start a new thread as soon as one is finished from the maximum allowed?
for (int i = 0; i < listProxies.Count(); i+=nThreadsNum)
{
for (nCurrentThread = 0; nCurrentThread < nThreadsNum; nCurrentThread++)
{
if (nCurrentThread < nThreadsNum)
{
string strProxyIP = listProxies[i + nCurrentThread].sIPAddress;
int nPort = listProxies[i + nCurrentThread].nPort;
tasks.Add(Task.Factory.StartNew<ProxyAddress>(() => CheckProxyServer(strProxyIP, nPort, nCurrentThread)));
}
}
Task.WaitAll(tasks.ToArray());
foreach (var tsk in tasks)
{
ProxyAddress result = tsk.Result;
UpdateProxyDBRecord(result.sIPAddress, result.bOnlineStatus);
}
tasks.Clear();
}
This seems much more simple:
int numberProcessed = 0;
Parallel.ForEach(listProxies,
new ParallelOptions { MaxDegreeOfParallelism = nThreadsNum },
(p)=> {
var result = CheckProxyServer(p.sIPAddress, s.nPort, Thread.CurrentThread.ManagedThreadId);
UpdateProxyDBRecord(result.sIPAddress, result.bOnlineStatus);
Interlocked.Increment(numberProcessed);
});
With slots:
var obj = new Object();
var slots = new List<int>();
Parallel.ForEach(listProxies,
new ParallelOptions { MaxDegreeOfParallelism = nThreadsNum },
(p)=> {
int threadId = Thread.CurrentThread.ManagedThreadId;
int slot = slots.IndexOf(threadId);
if (slot == -1)
{
lock(obj)
{
slots.Add(threadId);
}
slot = slots.IndexOf(threadId);
}
var result = CheckProxyServer(p.sIPAddress, s.nPort, slot);
UpdateProxyDBRecord(result.sIPAddress, result.bOnlineStatus);
});
I took a few shortcuts there to guarantee thread safety. You don't have to do the normal check-lock-check dance because there will never be two threads attempting to add the same threadid to the list, so the second check will always fail and isn't needed. Secondly, for the same reason, I don't believe you need to ever lock around the outer IndexOf either. That makes this a very highly efficient concurrent routine that rarely locks (it should only lock nThreadsNum times) no matter how many items are in the enumerable.
Another solution is to use a SemaphoreSlim
or the Producer-Consumer Pattern using a BlockinCollection<T>
. Both solution support cancellation.
SemaphoreSlim
private async Task CheckProxyServerAsync(IEnumerable<object> proxies)
{
var tasks = new List<Task>();
int currentThreadNumber = 0;
int maxNumberOfThreads = 8;
using (semaphore = new SemaphoreSlim(maxNumberOfThreads, maxNumberOfThreads))
{
foreach (var proxy in proxies)
{
// Asynchronously wait until thread is available if thread limit reached
await semaphore.WaitAsync();
string proxyIP = proxy.IPAddress;
int port = proxy.Port;
tasks.Add(Task.Run(() => CheckProxyServer(proxyIP, port, Interlocked.Increment(ref currentThreadNumber)))
.ContinueWith(
(task) =>
{
ProxyAddress result = task.Result;
// Method call must be thread-safe!
UpdateProxyDbRecord(result.IPAddress, result.OnlineStatus);
Interlocked.Decrement(ref currentThreadNumber);
// Allow to start next thread if thread limit was reached
semaphore.Release();
},
TaskContinuationOptions.OnlyOnRanToCompletion));
}
// Asynchronously wait until all tasks are completed
// to prevent premature disposal of semaphore
await Task.WhenAll(tasks);
}
}
Producer-Consumer Pattern
// Uses a fixed number of same threads
private async Task CheckProxyServerAsync(IEnumerable<ProxyInfo> proxies)
{
var pipe = new BlockingCollection<ProxyInfo>();
int maxNumberOfThreads = 8;
var tasks = new List<Task>();
// Create all threads (count == maxNumberOfThreads)
for (int currentThreadNumber = 0; currentThreadNumber < maxNumberOfThreads; currentThreadNumber++)
{
tasks.Add(
Task.Run(() => ConsumeProxyInfo(pipe, currentThreadNumber)));
}
proxies.ToList().ForEach(pipe.Add);
pipe.CompleteAdding();
await Task.WhenAll(tasks);
}
private void ConsumeProxyInfo(BlockingCollection<ProxyInfo> proxiesPipe, int currentThreadNumber)
{
while (!proxiesPipe.IsCompleted)
{
if (proxiesPipe.TryTake(out ProxyInfo proxy))
{
int port = proxy.Port;
string proxyIP = proxy.IPAddress;
ProxyAddress result = CheckProxyServer(proxyIP, port, currentThreadNumber);
// Method call must be thread-safe!
UpdateProxyDbRecord(result.IPAddress, result.OnlineStatus);
}
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With