Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Exception using Rx and Await to accomplish reading file line by line async

I am learning to use RX and tried this sample. But could not fix the exception that happens in the highlighted while statement - while(!f.EndofStream)

I want to read a huge file - line by line - and for every line of data - I want to do some processing in a different thread (so I used ObserverOn) I want the whole thing async. I want to use ReadLineAsync since it returns TASK and so I can convert that to Observables and subscribe to it.

I guess the task thread which I create first, gets in between the Rx threads. But even if I use Observe and Subscribe using the currentThread, I still cannot stop the exception. Wonder how I do accomplish this neatly Aysnc with Rx.

Wondering if the whole thing could be done even simpler ?

    static void Main(string[] args)
    {
        RxWrapper.ReadFileWithRxAsync();
        Console.WriteLine("this should be called even before the file read begins");
        Console.ReadLine();
    }

    public static async Task ReadFileWithRxAsync()
    {
        Task t = Task.Run(() => ReadFileWithRx());
        await t;
    }


    public static void ReadFileWithRx()
    {
        string file = @"C:\FileWithLongListOfNames.txt";
        using (StreamReader f = File.OpenText(file))
        {
            string line = string.Empty;
            bool continueRead = true;

            ***while (!f.EndOfStream)***
            {
                f.ReadLineAsync()
                       .ToObservable()
                       .ObserveOn(Scheduler.Default)
                       .Subscribe(t =>
                           {
                               Console.WriteLine("custom code to manipulate every line data");
                           });
            }

        }
    }
like image 710
Everything Matters Avatar asked Apr 14 '14 15:04

Everything Matters


2 Answers

The exception is an InvalidOperationException - I'm not intimately familiar with the internals of FileStream, but according to the exception message this is being thrown because there is an in-flight asynchronous operation on the stream. The implication is that you must wait for any ReadLineAsync() calls to finish before checking EndOfStream.

Matthew Finlay has provided a neat re-working of your code to solve this immediate problem. However, I think it has problems of its own - and that there is a bigger issue that needs to be examined. Let's look at the fundamental elements of the problem:

  • You have a very large file.
  • You want to process it asynchronously.

This suggests that you don't want the whole file in memory, you want to be informed when the processing is done, and presumably you want to process the file as fast as possible.

Both solutions are using a thread to process each line (the ObserveOn is passing each line to a thread from the thread pool). This is actually not an efficient approach.

Looking at both solutions, there are two possibilities:

  • A. It takes more time on average to read a file line than it does to process it.
  • B. It takes less time on average to read a file line than it does to process it.

A. File read of a line slower than processing a line

In the case of A, the system will basically spend most of it's time idle while it waits for file IO to complete. In this scenario, Matthew's solution won't result in memory filling up - but it's worth seeing if using ReadLines directly in a tight loop produces better results due to less thread contention. (ObserveOn pushing the line to another thread will only buy you something if ReadLines isn't getting lines in advance of calling MoveNext - which I suspect it does - but test and see!)

B. File read of a line faster than processing a line

In the case of B (which I assume is more likely given what you have tried), all those lines will start to queue up in memory and, for a big enough file, you will end up with most of it in memory.

You should note that unless your handler is firing off asynchronous code to process a line, then all lines will be processed serially because Rx guarantees OnNext() handler invocations won't overlap.

The ReadLines() method is great because it returns an IEnumerable<string> and it's your enumeration of this that drives reading the file. However, when you call ToObservable() on this, it will enumerate as fast as possible to generate the observable events - there is no feedback (known as "backpressure" in reactive programs) in Rx to slow down this process.

The problem is not the ToObservable itself - it's the ObserveOn. ObserveOn doesn't block the OnNext() handler it is invoked on waiting until it's subscribers are done with the event - it queues up events as fast as possible against the target scheduler.

If you remove the ObserveOn, then - as long as your OnNext handler is synchronous - you'll see each line is read and processed one at a time because the ToObservable() is processing the enumeration on the same thread as the handler.

If this isn't want you want, and you attempt to mitigate this in pursuit of parallel processing by firing an async job in the subscriber - e.g. Task.Run(() => /* process line */ or similar - then things won't go as well as you hope.

Because it takes longer to process a line than read a line, you will create more and more tasks that aren't keeping pace with the incoming lines. The thread count will gradually increase and you will be starving the thread pool.

In this case, Rx isn't a great fit really.

What you probably want is a small number of worker threads (probably 1 per processor core) that fetch a line of code at a time to work on, and limit the number of lines of the file in memory.

A simple approach could be this, which limits the number of lines in memory to a fixed number of workers. It's a pull-based solution, which is a much better design in this scenario:

private Task ProcessFile(string filePath, int numberOfWorkers)
{
    var lines = File.ReadLines(filePath);       

    var parallelOptions = new ParallelOptions {
        MaxDegreeOfParallelism = numberOfWorkers
    };  

    return Task.Run(() => 
        Parallel.ForEach(lines, parallelOptions, ProcessFileLine));
}

private void ProcessFileLine(string line)
{
    /* Your processing logic here */
    Console.WriteLine(line);
}

And use it like this:

static void Main()
{       
    var processFile = ProcessFile(
        @"C:\Users\james.world\Downloads\example.txt", 8);

    Console.WriteLine("Processing file...");        
    processFile.Wait();
    Console.WriteLine("Done");
}

Final Notes

There are ways of dealing with back pressure in Rx (search around SO for some discussions) - but it's not something that Rx handles well, and I think the resulting solutions are less readable than the alternative above. There are also many other approaches that you can look at (actor based approaches such as TPL Dataflows, or LMAX Disruptor style ring-buffers for high-performance lock free approaches) but the core idea of pulling work from queues will be prevalent.

Even in this analysis, I am conveniently glossing over what you are doing to process the file, and tacitly assuming that the processing of each line is compute bound and truly independent. If there is work to merge the results and/or IO activity to store the output then all bets are off - you will need to examine the efficiency of this side of things carefully too.

In most cases where performing work in parallel as an optimization is under consideration, there are usually so many variables in play that it is best to measure the results of each approach to determine what is best. And measuring is a fine art - be sure to measure realistic scenarios, take averages of many runs of each test and properly reset the environment between runs (e.g. to eliminate caching effects) in order to reduce measurement error.

like image 145
James World Avatar answered Nov 02 '22 11:11

James World


I haven't looked into what is causing your exception, but I think the neatest way to write this is:

File.ReadLines(file)
  .ToObservable()
  .ObserveOn(Scheduler.Default)
  .Subscribe(Console.Writeline);

Note: ReadLines differs from ReadAllLines in that it will start yielding without having read the entire file, which is the behavior that you want.

like image 36
Matthew Finlay Avatar answered Nov 02 '22 11:11

Matthew Finlay