Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Transformation of observable byte array to objects

I'm using Reactive for the first time on a project and I ran into a problem where performance is very important.

Overview:

I'm retrieving a large amount of data via a TCP socket, which I have to parse into objects and insert into a database. Each message has the following signature:

<payload-size> <payload>

Where size is uint32 (4kb) which describes the size of the following payload in bytes.

Problem:

I want to use the functionality which the Reactive Framework provides to parallelize the following steps (seen below) to maximize performance and avoid being the bottleneck. Furthermore, I'm asking for a 'best practices' for implementing this.

TCP Socket ---> Observable (ArraySegment<byte>) --> Observable (Message)

I've already implemented the following code which provides me with an Observable (ArraySegment<byte>).

IObservable<TcpClient> observableTcpClient = endPoint.ListenerObservable(1);
IObservable<ArraySegment<byte>> observableSocket = observableTcpClient
.SelectMany(client => client.ToClientObservable(bufferSize));

I now want to transform the Observable (ArraySegment<byte>) to an Observable (Message). Where my first solution looked kinda like this because I though I could use an observable like a stream.

Read continous bytestream from Stream using TcpClient and Reactive Extensions

Question:

Will it be possible (and how) to create an observable using the following method? Or is there a better approach which you would recommend? I would really appreciate a good example.

Note: The Observable (ArraySegment) behave like a stream, so I do not know the size of the data it pushes to me. (Do I need to implement some kind of buffer or can the Reactive Framework help me?)

    Observable (ArraySegment<byte>) 
    --> Buffer(4kb) 
    --> ReadSize --> Buffer(payload-size) 
    --> ReadPayload 
    --> Parse Payload
    --> (Start over)

Thanks in advance! :)

like image 239
SOK Avatar asked Jun 08 '26 11:06

SOK


1 Answers

EDIT: After Dimitri's comments, I present a revised solution below. There is one line in need of desperate refactoring, but it seems to work..

Window overload is used so we can write custom buffering.

var hinge = new Subject<Unit>();

observableSocket
.SelectMany(i => i) // to IObservable<byte> 
.Window(() => hinge) // kinda-like-buffer
.Select(buff =>
{    
    return
        from size in buff.Buffer(SIZEOFUINT32).Select(ConvertToUINT32)
        from payload in buff.Buffer(size)
        //Refactor line below! Window must be closed somehow..
        from foo in Observable.Return(Unit.Default).Do( _ => hinge.OnNext(Unit.Default)) 
        select payload;                     
})
.SelectMany(i=>i)
.ObserveOn(ThreadPoolScheduler.Instance)
.Select(ConvertToMessage);

EDIT 2: Removed old solution

like image 90
supertopi Avatar answered Jun 10 '26 09:06

supertopi



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!