Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spawn processes, but only 5 at a time

i'm working off a queue with filenames. Each file has to be processed by a external binary. This works fine, but it only processes one file at a time. Is it possible two spawn a number of processes parallel?

Queue<string> queue = new Queue<string>();
queue.Enqueue("1.mp3");
queue.Enqueue("2.mp3");
queue.Enqueue("3.mp3");
...
queue.Enqueue("10000.mp3");

while (queue.Count > 0)
{
    string file = queue.Dequeue();

    Process p = new Process();    
    p.StartInfo.FileName = @"binary.exe";
    p.StartInfo.Arguments = file;
    p.StartInfo.UseShellExecute = false;
    p.StartInfo.CreateNoWindow = true;
    p.StartInfo.RedirectStandardOutput = true;
    p.Start();
    p.WaitForExit();
}

Update: I like the solution from Alex LE (Spawn processes, but only 5 at a time), but is it possible to wait for the child processes to exit as suggested by Ben Voigt?

Edit 1: i need to check for p.ExitCode == 0 to make some database updates.

like image 825
Makabra Avatar asked Dec 05 '10 19:12

Makabra


3 Answers

Here's what should have been possible, if the wait handle associated with the process was marked public instead of internal as it currently is (vote here to ask Microsoft to change that):

void BatchProcess()
{
    Queue<string> queue = new Queue<string>();
    queue.Enqueue("1.mp3");
    queue.Enqueue("2.mp3");
    queue.Enqueue("3.mp3");
    ...
    queue.Enqueue("10000.mp3");

    WaitHandle[] subprocesses = new WaitHandle[Math.Min(queue.Count, 5)];
    for( int i = 0; i < subprocesses.Length; i++ ) {
        subprocesses[i] = Spawn(queue.Dequeue());
    }

    while (queue.Count > 0) {
        int j = WaitHandle.WaitAny(subprocesses);
        subprocesses[j].Dispose();
        subprocesses[j] = Spawn(queue.Dequeue());
    }

    WaitHandle.WaitAll(subprocesses);
    foreach (var wh in subprocesses) {
        wh.Dispose();
    }
}

ProcessWaitHandle Spawn(string args)
{
    using (Process p = new Process()) {
        p.StartInfo.FileName = @"binary.exe";
        p.StartInfo.Arguments = args;
        p.StartInfo.UseShellExecute = false;
        p.StartInfo.CreateNoWindow = true;
        p.Start();
        return p.WaitHandle;
    }
}

This would be the most efficient solution possible, because no synchronization objects are needed besides the Win32 processes themselves. There are no extra threads needed in the C# code and no asynchronous method invocations, therefore no locking or other synchronization is needed whatsoever.

like image 106
Ben Voigt Avatar answered Nov 01 '22 12:11

Ben Voigt


This works (this will be easier with C# 5.0 async await):

Queue<string> queue = new Queue<string>();
queue.Enqueue("1.mp3");
queue.Enqueue("2.mp3");
queue.Enqueue("3.mp3");
...
queue.Enqueue("10000.mp3");

int runningProcesses = 0;
const int MaxRunningProcesses = 5;
object syncLock = new object();

Action<string> run = new Action<string>(delegate(string file) {
    using (Process p = new Process()) {
        p.StartInfo.FileName = @"binary.exe";
        p.StartInfo.Arguments = file;
        p.StartInfo.UseShellExecute = false;
        p.StartInfo.CreateNoWindow = true;
        p.StartInfo.RedirectStandardOutput = true;
        p.Start();
        p.WaitForExit();
    }
});

Action runNext = null;
runNext = delegate() {
    lock (syncLock) {
        if (queue.Count > 0) {
            run.BeginInvoke(queue.Dequeue(), new AsyncCallback(delegate {
                runNext();
            }), null);
        }
    }
};

while (runningProcesses++ < MaxRunningProcesses) {
    runNext();
}
like image 42
Alex LE Avatar answered Nov 01 '22 11:11

Alex LE


Exctracting some parts of your code and adding a semaphore:

Semaphore semX = new Semaphore(5, int.MaxValue);

void f(name, args) {
    using (Process p = new Process())
    {
        p.StartInfo.FileName = name;
        p.StartInfo.Arguments = args;
        p.StartInfo.UseShellExecute = false;
        p.StartInfo.CreateNoWindow = true;
        p.StartInfo.RedirectStandardOutput = true;
        p.Start();
        p.WaitForExit();
    }

    semX.Release();     // !!! This one is important
}

You then use

while (queue.Count > 0)
{
    string file = queue.Dequeue();
    semX.WaitOne();    // !!!
    (new Thread((ThreadStart) (() => f(file, "")))).Start();    // dirty unreadable code to start a routine async
}

for (int n = 5; n > 0; n--)        // Wait for the last 5 to finish
    semX.WaitOne();

semX.Dispose();                    // Dispose the semaphore
like image 32
bohdan_trotsenko Avatar answered Nov 01 '22 11:11

bohdan_trotsenko