I'm using Reactive for the first time on a project and I ran into a problem where performance is very important.
I'm retrieving a large amount of data via a TCP socket, which I have to parse into objects and insert into a database. Each message has the following signature:
<payload-size> <payload>
Where size is uint32 (4kb) which describes the size of the following payload in bytes.
I want to use the functionality which the Reactive Framework provides to parallelize the following steps (seen below) to maximize performance and avoid being the bottleneck. Furthermore, I'm asking for a 'best practices' for implementing this.
TCP Socket ---> Observable (ArraySegment<byte>) --> Observable (Message)
I've already implemented the following code which provides me with an Observable (ArraySegment<byte>).
IObservable<TcpClient> observableTcpClient = endPoint.ListenerObservable(1);
IObservable<ArraySegment<byte>> observableSocket = observableTcpClient
.SelectMany(client => client.ToClientObservable(bufferSize));
I now want to transform the Observable (ArraySegment<byte>) to an Observable (Message). Where my first solution looked kinda like this because I though I could use an observable like a stream.
Read continous bytestream from Stream using TcpClient and Reactive Extensions
Will it be possible (and how) to create an observable using the following method? Or is there a better approach which you would recommend? I would really appreciate a good example.
Note: The Observable (ArraySegment) behave like a stream, so I do not know the size of the data it pushes to me. (Do I need to implement some kind of buffer or can the Reactive Framework help me?)
Observable (ArraySegment<byte>)
--> Buffer(4kb)
--> ReadSize --> Buffer(payload-size)
--> ReadPayload
--> Parse Payload
--> (Start over)
Thanks in advance! :)
EDIT: After Dimitri's comments, I present a revised solution below. There is one line in need of desperate refactoring, but it seems to work..
Window overload is used so we can write custom buffering.
var hinge = new Subject<Unit>();
observableSocket
.SelectMany(i => i) // to IObservable<byte>
.Window(() => hinge) // kinda-like-buffer
.Select(buff =>
{
return
from size in buff.Buffer(SIZEOFUINT32).Select(ConvertToUINT32)
from payload in buff.Buffer(size)
//Refactor line below! Window must be closed somehow..
from foo in Observable.Return(Unit.Default).Do( _ => hinge.OnNext(Unit.Default))
select payload;
})
.SelectMany(i=>i)
.ObserveOn(ThreadPoolScheduler.Instance)
.Select(ConvertToMessage);
EDIT 2: Removed old solution
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With