Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reactive Extensions - Deserializing stream from IObservable<byte[]> into individual delimited messages without the use of a Subject

I am taking the messages pumped to me from an IObservable<byte[]> and deserializing these into strings, which are then pumped out via an IObservable<string>. A Socket is populating the IObservable<byte[]> messages via a FromEventPattern conversion. The deserialized messages from the Socket are linefeed delimited strings. Since a single message received from the Socket is not required to be a single delimited string (it could be any portion of any number of messages, and partial messages are possible). The first way to solve this that came to mind was with a Subject and a closure like so:

private IObservable<string> GetCompleteMessage(IObservable<byte[]> bytes)
{
    const byte byteLineFeed = 10;
    var subject = new Subject<string>();
    byte[] leftovers = null;

    bytes.Subscribe(current =>
    {
        var lastPositionOfLineFeed = -1;
        for (var i = 0; i < current.Length; i++)
        {
            if (current[i] == byteLineFeed)
            {
                if (leftovers != null)
                {
                    subject.OnNext(
                        Encoding.ASCII.GetString(
                            leftovers.Union(current.Slice(lastPositionOfLineFeed + 1,
                                i - lastPositionOfLineFeed))
                                        .ToArray()));
                    leftovers = null;
                }
                else
                {
                    subject.OnNext(
                        Encoding.ASCII.GetString(
                            current.Slice(lastPositionOfLineFeed + 1,
                                i - lastPositionOfLineFeed)));
                }
                lastPositionOfLineFeed = i;
            }
        }
        if (lastPositionOfLineFeed != current.Length - 1)
        {
            if (leftovers != null)
            {
                leftovers = leftovers.Union(current.Slice(lastPositionOfLineFeed + 1,
                    current.Length - lastPositionOfLineFeed - 1))
                                        .ToArray();
            }
            else
            {
                leftovers = current.Slice(lastPositionOfLineFeed + 1,
                    current.Length - lastPositionOfLineFeed - 1);
            }
        }
    });

    return subject.AsObservable();
}

This works well, but I know that the use of Subjects is frowned upon for a variety of reasons, some of which are featured in this block of code. I feel like I might be reinventing the wheel here as I'm not thoroughly familiar with all of the methods in Rx. Could I do this without a closure and a Subject? If so, how would I go about it? Or does use of a Subject here make sense?

like image 591
joelmdev Avatar asked Nov 07 '14 19:11

joelmdev


2 Answers

I would use SelectMany with a selector which returns an IEnumerable<string>.

eg:

    public static IObservable<string> GetCompleteMessage(this IObservable<byte[]> source)
    {
        const byte byteLineFeed = 10;
        IEnumerable<byte> remanider = Enumerable.Empty<byte>();

        Func<byte[], IEnumerable<string>> selector = data =>
        {
            var result = new List<string>();
            var current = new ArraySegment<byte>(data);

            while (true)
            {
                var dividerOffset = ((IList<byte>)current).IndexOf(byteLineFeed);

                if (dividerOffset == -1) // No newline found
                {
                    remanider = remanider.Concat(current);
                    break;
                }

                var segment = new ArraySegment<byte>(current.Array, current.Offset, dividerOffset);
                var lineBytes = remanider.Concat(segment).ToArray();
                result.Add(Encoding.ASCII.GetString(lineBytes));

                remanider = Enumerable.Empty<byte>();
                current = new ArraySegment<byte>(current.Array, current.Offset + dividerOffset + 1, current.Count - 1 - dividerOffset);
            }

            return result;
        };

        return source.SelectMany(selector);
    }

Alternatively, you could use a NetworkStream and StreamReader to achieve the same result:

    public static IObservable<string> ReadLineObservable(this TextReader reader)
    {
        return Observable.FromAsync(() => reader.ReadLineAsync())
            .Repeat()
            .TakeWhile(x => x != null);
    }
like image 145
Foole Avatar answered Oct 31 '22 07:10

Foole


Ignoring the probably superior use of BCL provided constructs to create a TextReader, if that can be made to fit your scenario, I wondered about how I would do this in an Rx idiomatic way and came up with the following short query that avoids custom operators and subjects and all that:

var messages =
    arrayStream.Select(bytes => bytes.ToObservable()).Concat()
               .Publish(ps => ps.Where(p => p != 10)
                                .Buffer(() => ps.Where(p => p == 10)))
               .Select(ls => Encoding.ASCII.GetString(ls.ToArray()));

Breaking it down

I assume ASCII encoding (as you had in your question) in order to assume a byte value 10 is a line delimiter - with a multi-byte encoding scheme this would be naïve and a more complex framing algorithm is required (one of the reasons why leaning on BCL provided stream infrastructure is probably better).

So, assuming a stream of byte arrays IObservable<byte[]> arrayStream then we can flatten to a stream of IObservable<byte> like this:

arrayStream.Select(bytes => bytes.ToObservable()).Concat()

This uses Select + Concat rather than SelectMany in order to guarantee that the bytes are streamed out in strict order. I edited this in on a hunch - I've not analyzed the code to be sufficiently comfortable that there isn't a possibility of a subsequent array overlapping with a previous one without this extra guard - and I think it will come down to the scheduler being used. If you are interested, check the Rx source here. Anyway, better to be safe. This done, we can buffer up a list of line delimited bytes like this:

.Pubish(ps => ps.Where(p => p != 10)
                .Buffer(() => ps.Where(p => p == 10)))

Here we are publishing because we will subscribe to the byte stream twice, so we must multicast it. We buffer on a byte stream stripped of line feeds, and supply a buffer closing function that watches for line feeds. This emits lists of bytes that have been delimited.

Finally, we decode the messages with a simple projection:

.Select(ls => Encoding.UTF8.GetString(ls.ToArray()));

Here's a fully working example demonstrating some messages whose framing is split between several packets:

var rawMessages = new byte[][] {
    Encoding.ASCII.GetBytes("This\ni"),
    Encoding.ASCII.GetBytes("s\na"),
    Encoding.ASCII.GetBytes("\ntest!\n")
};

var arrayStream = new Subject<byte[]>();

var messages =
    arrayStream.Select(bytes => bytes.ToObservable()).Concat()
               .Publish(ps => ps.Where(p => p != 10)
                                .Buffer(() => ps.Where(p => p == 10)))
               .Select(ls => Encoding.ASCII.GetString(ls.ToArray())); 

messages.Subscribe(b => Console.WriteLine(b));

foreach(var array in rawMessages)
{
    arrayStream.OnNext(array);
}    

Output:

This
is
a
test!

Notes Caveats

The Rx maybe idiomatic, but there's lots to think about here. I've ignored performance here in stripping down the byte arrays and building them up again (but then network is miles slower than compute so that may not be a concern). I've assumed error handling is all dealt with upstream in producing the IObservable<byte[]>, and I've not dealt with decoding errors - there's no attention to timeouts, network issues etc. I've assumed the last message has the linefeed postfixed.

If you want multiple observers to subscribe to this stream, be sure to multicast it via Publish if you need to avoid multiple subscriptions to the underlying IObservable<byte[]>.

like image 44
James World Avatar answered Oct 31 '22 06:10

James World