Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Parallel.Invoke - Dynamically creating more 'threads'

I am educating myself on Parallel.Invoke, and parallel processing in general, for use in current project. I need a push in the right direction to understand how you can dynamically\intelligently allocate more parallel 'threads' as required.

As an example. Say you are parsing large log files. This involves reading from file, some sort of parsing of the returned lines and finally writing to a database.

So to me this is a typical problem that can benefit from parallel processing.

As a simple first pass the following code implements this.

Parallel.Invoke(
  ()=> readFileLinesToBuffer(),
  ()=> parseFileLinesFromBuffer(),
  ()=> updateResultsToDatabase()    
);

Behind the scenes

  1. readFileLinesToBuffer() reads each line and stores to a buffer.
  2. parseFileLinesFromBuffer comes along and consumes lines from buffer and then let's say it put them on another buffer so that updateResultsToDatabase() can come along and consume this buffer.

So the code shown assumes that each of the three steps uses the same amount of time\resources but lets say the parseFileLinesFromBuffer() is a long running process so instead of running just one of these methods you want to run two in parallel.

How can you have the code intelligently decide to do this based on any bottlenecks it might perceive?

Conceptually I can see how some approach of monitoring the buffer sizes might work, spawning a new 'thread' to consume the buffer at an increased rate for example...but I figure this type of issue has been considered in putting together the TPL library.

Some sample code would be great but I really just need a clue as to what concepts I should investigate next. It looks like maybe the System.Threading.Tasks.TaskScheduler holds the key?

like image 908
SleepyBoBos Avatar asked Oct 08 '22 16:10

SleepyBoBos


1 Answers

Have you tried the Reactive Extensions?

http://msdn.microsoft.com/en-us/data/gg577609.aspx

The Rx is a new tecnology from Microsoft, the focus as stated in the official site:

The Reactive Extensions (Rx)... ...is a library to compose asynchronous and event-based programs using observable collections and LINQ-style query operators.

You can download it as a Nuget package

https://nuget.org/packages/Rx-Main/1.0.11226

Since I am currently learning Rx I wanted to take this example and just write code for it, the code I ended up it is not actually executed in parallel, but it is completely asynchronous, and guarantees the source lines are executed in order.

Perhaps this is not the best implementation, but like I said I am learning Rx, (thread-safe should be a good improvement)

This is a DTO that I am using to return data from the background threads

class MyItem
{
    public string Line { get; set; }
    public int CurrentThread { get; set; }
}

These are the basic methods doing the real work, I am simulating the time with a simple Thread.Sleep and I am returning the thread used to execute each method Thread.CurrentThread.ManagedThreadId. Note the timer of the ProcessLine it is 4 sec, it's the most time-consuming operation

private IEnumerable<MyItem> ReadLinesFromFile(string fileName)
{
    var source = from e in Enumerable.Range(1, 10)
                 let v = e.ToString()
                 select v;

    foreach (var item in source)
    {
        Thread.Sleep(1000);
        yield return new MyItem { CurrentThread = Thread.CurrentThread.ManagedThreadId, Line = item };
    }
}

private MyItem UpdateResultToDatabase(string processedLine)
{
    Thread.Sleep(700);
    return new MyItem { Line = "s" + processedLine, CurrentThread = Thread.CurrentThread.ManagedThreadId };
}

private MyItem ProcessLine(string line)
{
    Thread.Sleep(4000);
    return new MyItem { Line = "p" + line, CurrentThread = Thread.CurrentThread.ManagedThreadId };
}

The following method I am using it just to update the UI

private void DisplayResults(MyItem myItem, Color color, string message)
{
    this.listView1.Items.Add(
        new ListViewItem(
            new[]
            {
                message, 
                myItem.Line ,
                myItem.CurrentThread.ToString(), 
                Thread.CurrentThread.ManagedThreadId.ToString()
            }
        )
        {
            ForeColor = color
        }
    );
}

And finally this is the method that calls the Rx API

private void PlayWithRx()
{
    // we init the observavble with the lines read from the file
    var source = this.ReadLinesFromFile("some file").ToObservable(Scheduler.TaskPool);

    source.ObserveOn(this).Subscribe(x =>
    {
        // for each line read, we update the UI
        this.DisplayResults(x, Color.Red, "Read");

        // for each line read, we subscribe the line to the ProcessLine method
        var process = Observable.Start(() => this.ProcessLine(x.Line), Scheduler.TaskPool)
            .ObserveOn(this).Subscribe(c =>
            {
                // for each line processed, we update the UI
                this.DisplayResults(c, Color.Blue, "Processed");

                // for each line processed we subscribe to the final process the UpdateResultToDatabase method
                // finally, we update the UI when the line processed has been saved to the database
                var persist = Observable.Start(() => this.UpdateResultToDatabase(c.Line), Scheduler.TaskPool)
                    .ObserveOn(this).Subscribe(z => this.DisplayResults(z, Color.Black, "Saved"));
            });
    });
}

This process runs totally in the background, this is the output generated:

enter image description here

like image 143
Jupaol Avatar answered Oct 10 '22 08:10

Jupaol