Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Consuming a custom stream (IEnumerable<T>)

I'm using a custom implementation of a Stream that will stream an IEnumerable<T> into a stream. I'm using this EnumerableStream implementation to perform the conversion.

I'm using it to perform streaming over WCF in streaming mode. I'm able to convert the IEnumerable to a stream without problem. Once, I'm in the client side, I can deserialize and get all the data, however I'm not able to find the condition to stop looping over my stream. I'm getting:

System.Runtime.Serialization.SerializationException: End of Stream encountered before parsing was completed.

Here's sample example of what I'm trying to achieve:

class Program
{
    public static void Main()
    {
        var ListToSend = new List<List<string>>();
        var ListToReceive = new List<List<string>>();
        ListToSend = SimulateData().ToList();
        using (Stream stream = GetStream(ListToSend))
        {
            var formatter = new BinaryFormatter();
            while (stream.CanRead || 1 == 1 || true...) // What should I put in here to stop once I read everything???
            {
                List<string> row = formatter.Deserialize(stream) as List<string>;
                ListToReceive.Add(row);
            }
            Printer(ListToReceive);
            Console.WriteLine("Done");
        }
    }

    private static void Printer(List<List<string>> data)
    {
        Console.WriteLine("Printing");
        foreach (var row in data)
        {
            foreach (var cell in row)
            {
                Console.Write(cell + "\t");
            }
            Console.WriteLine("-------------------------------------------------------------------------------");
        }
    }
    private static Stream GetStream(IEnumerable<List<string>> data)
    {
        return EnumerableStream.Create(data, DeserializerCallback);
    }

    private static List<byte> DeserializerCallback(object obj)
    {
        var binFormatter = new BinaryFormatter();
        var mStream = new MemoryStream();
        binFormatter.Serialize(mStream, obj);
        return mStream.ToArray().ToList();
    }

    private static IEnumerable<List<string>> SimulateData()
    {
        Random randomizer = new Random();
        for (var i = 0; i < 10; i++)
        {
            var row = new List<string>();
            for (var j = 0; j < 1000; j++)
            {
                row.Add((randomizer.Next(100)).ToString());
            }
            yield return row;
        }
    }
}

I did not include the custom stream. I created a fiddle for those that want to see the entire code.

  • Do I need to add something in the custom stream itself to notify that all the data have been read?
  • Is it because the format of the deserializer and serialiser are not the same (I don't think so).
  • I also want to know why when I put a break point in the read function, the buffer size is changing randomly.
  • I would prefer not to wrap the code with a try and catch, I want a clean solution that does not crash.
like image 389
billybob Avatar asked Aug 23 '18 19:08

billybob


People also ask

Is there a read-only stream implementation that uses an IEnumerable as input?

Here's a read-only Stream implementation that uses an IEnumerable<byte> as input: What you then still need is a function that converts IEnumerable<string> to IEnumerable<byte>:

How do I iterate over an IEnumerable collection asynchronously?

So let’s get going. Async Streams or IAsyncEnumerable<T> provides a way to iterate over an IEnumerable collection asynchronously while using the yield operator to return data as it comes in. For instance, let’s consider a scenario of retrieving pages of data from a database or an API, or listening to data signals from a bunch of IoT sensors.

What is IEnumerable<T>?

IEnumerable<T>is the base interface for collections in the System.Collections.Genericnamespace such as List<T>, Dictionary<TKey,TValue>, and Stack<T>and other generic collections such as ObservableCollection<T>and ConcurrentStack<T>. Collections that implement IEnumerable<T>can be enumerated by using the foreachstatement.

What is the difference between public interface IEnumerable and IEnumerable?

Public Interface IEnumerable(Of Out T) Implements IEnumerable Public Interface IEnumerable(Of T) Implements IEnumerable Type Parameters T The type of objects to enumerate. This type parameter is covariant. That is, you can use either the type you specified or any type that is more derived.


1 Answers

Do I need to add something in the custom stream itself to notify that all the data have been read?

You can, but that wouldn't help in the WCF scenario where the received Stream is a different class.

There are two standard (official, by design) ways of determining the end of the Stream data:

(1) ReadByte returning -1

Returns

The unsigned byte cast to an Int32, or -1 if at the end of the stream.

(2) Read returning 0 when called with count > 0

Returns

The total number of bytes read into the buffer. This can be less than the number of bytes requested if that many bytes are not currently available, or zero (0) if the end of the stream has been reached.

Unfortunately both they consume the current byte (advance to next) and will break the deserializer.

What are the possible solutions?

First, implementing some serialization/deserialization format (protocol) which allows you to know if there are more elements to deserialize. for instance, List<T> stores Count before elements, T[] stores Length before elements etc. Since the EnumerableStream<T> does not know the count in advance, one simple solution would be to emit a single fake byte before each element:

private bool SerializeNext()
{
    if (!_source.MoveNext())
        return false;

    buf.Enqueue(1); // <--
    foreach (var b in _serializer(_source.Current))
        _buf.Enqueue(b);

    return true;
}

This would allow you to use

while (stream.ReadByte() != -1)
{
    // ...
}

Second, if you want to keep the current format, a more general solution would be to implement a custom stream, which wraps another stream and implements PeekByte method with the same semantics as the standard ReadByte, but without consuming the current byte:

public class SequentialStream : Stream
{
    private Stream source;
    private bool leaveOpen;
    private int? nextByte;

    public SequentialStream(Stream source, bool leaveOpen = false)
    {
        if (source == null) throw new ArgumentNullException(nameof(source));
        if (!source.CanRead) throw new ArgumentException("Non readable source.", nameof(source));
        this.source = source;
        this.leaveOpen = leaveOpen;
    }

    protected override void Dispose(bool disposing)
    {
        if (disposing && !leaveOpen)
            source.Dispose();
        base.Dispose(disposing);
    }

    public override bool CanRead => true;
    public override bool CanSeek => false;
    public override bool CanWrite => false;
    public override long Length => throw new NotSupportedException();
    public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
    public override void Flush() { }
    public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
    public override void SetLength(long value) => throw new NotSupportedException();
    public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();

    public int PeekByte()
    {
        if (nextByte == null)
            nextByte = source.ReadByte();
        return nextByte.Value;
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        if (count <= 0) return 0;
        if (nextByte != null)
        {
            if (nextByte.Value < 0) return 0;
            buffer[offset] = (byte)nextByte.Value;
            if (count > 1)
            {
                int read = source.Read(buffer, offset + 1, count - 1);
                if (read == 0)
                    nextByte = -1;
                else
                    nextByte = null;
                return read + 1;
            }
            else
            {
                nextByte = null;
                return 1;
            }
        }
        else
        {
            int read = source.Read(buffer, offset, count);
            if (read == 0)
                nextByte = -1;
            return read;
        }
    }
} 

This basically implements read only forward only stream with 0 or 1 byte read ahead functionality.

The usage will be like this:

using (var stream = new SequentialStream(GetStream(ListToSend)))
{
    // ...
    while (stream.PeekByte() != -1) 
    {
        // ...
    }
    // ...
}

P.S. What about

I also want to know why when I put a break point in the read function, the buffer size is changing randomly.

It's not randomly. BinaryFormatter internally uses BinaryReader to read typed values like Int32, Byte, String etc., passing the desired size as count, e.g. 4, 1, number of the string encoded bytes (which it knows because stores them in the stream before actual the data and reads it before trying to read the actual data) etc.

like image 144
Ivan Stoev Avatar answered Oct 15 '22 20:10

Ivan Stoev