Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Write to open FileStream using reactive programming

I am writing a small logger and I want to open the log file once, keep writing reactively as log messages arrive, and dispose of everything on program termination.

I am not sure how I can keep the FileStream open and reactively write the messages as they arrive.

I would like to update the design from my previous solution where I had a ConcurrentQueue acting as a buffer, and a loop inside the using statements that consumed the queue.

Specifically, I want to simultaneously take advantage of the using statement construct, so I don't have to explicitly close the stream and writer, and of the reactive, loopless programming style. Currently I only know how to use one of these constructs at once: either the using/loop combination, or the explicit-stream-close/reactive combination.

Here's my code:

    BufferBlock<LogEntry> _buffer = new BufferBlock<LogEntry>();


    // CONSTRUCTOR
    public DefaultLogger(string folder)
    {
        var filePath = Path.Combine(folder, $"{DateTime.Now.ToString("yyyy.MM.dd")}.log");

        _cancellation = new CancellationTokenSource();

        var observable = _buffer.AsObservable();

        using (var stream = File.Create(_filePath))
        using (var writer = new StreamWriter(stream))
        using (var subscription = observable.Subscribe(entry =>
                                    writer.Write(GetFormattedString(entry))))
        {
            while (!_cancellation.IsCancellationRequested)
            {
                // what do I do here?
            }
        }
    }
like image 642
heltonbiker Avatar asked Sep 04 '17 20:09

heltonbiker


1 Answers

You need to use Observable.Using. It's designed to create an IDisposble resource that gets disposed when the sequence ends.

Try something like this:

IDisposable subscription = 
    Observable.Using(() => File.Create(_filePath),
        stream => Observable.Using(() => new StreamWriter(stream),
            writer => _buffer.AsObservable().Select(entry => new { entry, writer })))
        .Subscribe(x => x.writer.Write(GetFormattedString(x.entry)));
like image 168
Enigmativity Avatar answered Sep 23 '22 10:09

Enigmativity