Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Lazy, stream driven object serialization with protobuf-net

We are developing a WCF service for streaming a large amount of data, therefore we have chosen to use WCF Streaming functionality combined with a protobuf-net serialization.

Context:

Generally an idea is to serialize objects in the service, write them into a stream and send. On the other end the caller will receive a Stream object and it can read all data.

So currently the service method code looks somewhat like this:

public Result TestMethod(Parameter parameter)
{
    // Create response
    var responseObject = new BusinessResponse { Value = "some very large data"};

    // The resposne have to be serialized in advance to intermediate MemoryStream
    var stream = new MemoryStream();
    serializer.Serialize(stream, responseObject);
    stream.Position = 0;

    // ResultBody is a stream, Result is a MessageContract
    return new Result {ResultBody = stream};
}

The BusinessResponse object is serialized to a MemoryStream and that is returned from a method. On the client side the calling code looks like that:

var parameter = new Parameter();

// Call the service method
var methodResult = channel.TestMethod(parameter);

// protobuf-net deserializer reads from a stream received from a service.
// while reading is performed by protobuf-net, 
// on the service side WCF is actually reading from a 
// memory stream where serialized message is stored
var result = serializer.Deserialize<BusinessResponse>(methodResult.ResultBody);
return result;

So when serializer.Deserialize() is called it reads from a stream methodResult.ResultBody, on the same time on the service side WCF is reading a MemoryStream, that has been returned from a TestMethod.

Problem:

What we would like to achieve is to get rid of a MemoryStream and initial serialization of the whole object on the service side at once. Since we use streaming we would like to avoid keeping a serialized object in memory before sending.

Idea:

The perfect solution would be to return an empty, custom-made Stream object (from TestMethod()) with a reference to an object that is to be serialized ('BusinessResponse' object in my example). So when WCF calls a Read() method of my stream, I internally serialize a piece of an object using protobuf-net and return it to the caller without storing it in the memory.

And now there is a problem, because what we actually need is a possibility to serialize an object piece by piece in the moment when stream is read. I understand that this is totally different way of serialization - instead of pushing an object to a serializer, I'd like to request a serialized content piece by piece.

Is that kind of serialization is somehow possible using protobuf-net?

like image 652
Hubert R. Skrzypek Avatar asked Nov 04 '22 15:11

Hubert R. Skrzypek


1 Answers

I cooked up some code that is probably along the lines of the gate idea of Marc.

public class PullStream : Stream
{
    private byte[] internalBuffer;
    private bool ended;
    private static ManualResetEvent dataAvailable = new ManualResetEvent(false);
    private static ManualResetEvent dataEmpty = new ManualResetEvent(true);

    public override bool CanRead
    {
        get { return true; }
    }

    public override bool CanSeek
    {
        get { return false; }
    }

    public override bool CanWrite
    {
        get { return true; }
    }

    public override void Flush()
    {
        throw new NotImplementedException();
    }

    public override long Length
    {
        get { throw new NotImplementedException(); }
    }

    public override long Position
    {
        get
        {
            throw new NotImplementedException();
        }
        set
        {
            throw new NotImplementedException();
        }
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        dataAvailable.WaitOne();
        if ( count >= internalBuffer.Length)
        {
            var retVal = internalBuffer.Length;
            Array.Copy(internalBuffer, buffer, retVal);
            internalBuffer = null;
            dataAvailable.Reset();
            dataEmpty.Set();
            return retVal;
        }
        else
        {
            Array.Copy(internalBuffer, buffer, count);
            internalBuffer = internalBuffer.Skip(count).ToArray(); // i know
            return count;
        }
    }

    public override long Seek(long offset, SeekOrigin origin)
    {
        throw new NotImplementedException();
    }

    public override void SetLength(long value)
    {
        throw new NotImplementedException();
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
        dataEmpty.WaitOne();
        dataEmpty.Reset();

        internalBuffer = new byte[count];
        Array.Copy(buffer, internalBuffer, count);

        Debug.WriteLine("Writing some data");

        dataAvailable.Set();
    }

    public void End()
    {
        dataEmpty.WaitOne();
        dataEmpty.Reset();

        internalBuffer = new byte[0];

        Debug.WriteLine("Ending writes");

        dataAvailable.Set();
    }
}

This is a simple stream descendant class only implementing Read and Write (and End). The Read blocks while no data is available and the Write blocks while data is available. This way there is only one byte buffer involved. The linq copying of the rest is open for optimization ;-) The End method is added so no blocking occurs where Read is performed when no data is available and no data will be written any more.

You have to write to this stream from a separate thread. I show this below:

    // create a large object
    var obj = new List<ToSerialize>();
    for(int i = 0; i <= 1000; i ++)
        obj.Add(new ToSerialize { Test = "This is my very loooong message" });
    // create my special stream to read from
    var ms = new PullStream();
    new Thread(x =>
    {
        ProtoBuf.Serializer.Serialize(ms, obj);
        ms.End();
    }).Start();
    var buffer = new byte[100];
    // stream to write back to (just to show deserialization is working too)
    var ws = new MemoryStream();
    int read;
    while ((read = ms.Read(buffer, 0, 100)) != 0)
    {
        ws.Write(buffer, 0, read);
        Debug.WriteLine("read some data");
    }
    ws.Position = 0;
    var back = ProtoBuf.Serializer.Deserialize<List<ToSerialize>>(ws);

I hope this solves your problem :-) It was fun to code this anyway.

Regards, Jacco

like image 172
Jacco Avatar answered Nov 08 '22 05:11

Jacco