I'm using a custom implementation of a Stream
that will stream an IEnumerable<T>
into a stream. I'm using this EnumerableStream implementation to perform the conversion.
I'm using it to perform streaming over WCF in streaming mode. I'm able to convert the IEnumerable
to a stream without problem. Once, I'm in the client side, I can deserialize and get all the data, however I'm not able to find the condition to stop looping over my stream. I'm getting:
System.Runtime.Serialization.SerializationException: End of Stream encountered before parsing was completed.
Here's sample example of what I'm trying to achieve:
class Program
{
public static void Main()
{
var ListToSend = new List<List<string>>();
var ListToReceive = new List<List<string>>();
ListToSend = SimulateData().ToList();
using (Stream stream = GetStream(ListToSend))
{
var formatter = new BinaryFormatter();
while (stream.CanRead || 1 == 1 || true...) // What should I put in here to stop once I read everything???
{
List<string> row = formatter.Deserialize(stream) as List<string>;
ListToReceive.Add(row);
}
Printer(ListToReceive);
Console.WriteLine("Done");
}
}
private static void Printer(List<List<string>> data)
{
Console.WriteLine("Printing");
foreach (var row in data)
{
foreach (var cell in row)
{
Console.Write(cell + "\t");
}
Console.WriteLine("-------------------------------------------------------------------------------");
}
}
private static Stream GetStream(IEnumerable<List<string>> data)
{
return EnumerableStream.Create(data, DeserializerCallback);
}
private static List<byte> DeserializerCallback(object obj)
{
var binFormatter = new BinaryFormatter();
var mStream = new MemoryStream();
binFormatter.Serialize(mStream, obj);
return mStream.ToArray().ToList();
}
private static IEnumerable<List<string>> SimulateData()
{
Random randomizer = new Random();
for (var i = 0; i < 10; i++)
{
var row = new List<string>();
for (var j = 0; j < 1000; j++)
{
row.Add((randomizer.Next(100)).ToString());
}
yield return row;
}
}
}
I did not include the custom stream. I created a fiddle for those that want to see the entire code.
Here's a read-only Stream implementation that uses an IEnumerable<byte> as input: What you then still need is a function that converts IEnumerable<string> to IEnumerable<byte>:
So let’s get going. Async Streams or IAsyncEnumerable<T> provides a way to iterate over an IEnumerable collection asynchronously while using the yield operator to return data as it comes in. For instance, let’s consider a scenario of retrieving pages of data from a database or an API, or listening to data signals from a bunch of IoT sensors.
IEnumerable<T>is the base interface for collections in the System.Collections.Genericnamespace such as List<T>, Dictionary<TKey,TValue>, and Stack<T>and other generic collections such as ObservableCollection<T>and ConcurrentStack<T>. Collections that implement IEnumerable<T>can be enumerated by using the foreachstatement.
Public Interface IEnumerable(Of Out T) Implements IEnumerable Public Interface IEnumerable(Of T) Implements IEnumerable Type Parameters T The type of objects to enumerate. This type parameter is covariant. That is, you can use either the type you specified or any type that is more derived.
Do I need to add something in the custom stream itself to notify that all the data have been read?
You can, but that wouldn't help in the WCF scenario where the received Stream
is a different class.
There are two standard (official, by design) ways of determining the end of the Stream
data:
(1) ReadByte returning -1
Returns
The unsigned byte cast to an Int32, or -1 if at the end of the stream.
(2) Read returning 0 when called with count > 0
Returns
The total number of bytes read into the buffer. This can be less than the number of bytes requested if that many bytes are not currently available, or zero (0) if the end of the stream has been reached.
Unfortunately both they consume the current byte (advance to next) and will break the deserializer.
What are the possible solutions?
First, implementing some serialization/deserialization format (protocol) which allows you to know if there are more elements to deserialize. for instance, List<T>
stores Count
before elements, T[]
stores Length
before elements etc. Since the EnumerableStream<T>
does not know the count in advance, one simple solution would be to emit a single fake byte before each element:
private bool SerializeNext()
{
if (!_source.MoveNext())
return false;
buf.Enqueue(1); // <--
foreach (var b in _serializer(_source.Current))
_buf.Enqueue(b);
return true;
}
This would allow you to use
while (stream.ReadByte() != -1)
{
// ...
}
Second, if you want to keep the current format, a more general solution would be to implement a custom stream, which wraps another stream and implements PeekByte
method with the same semantics as the standard ReadByte
, but without consuming the current byte:
public class SequentialStream : Stream
{
private Stream source;
private bool leaveOpen;
private int? nextByte;
public SequentialStream(Stream source, bool leaveOpen = false)
{
if (source == null) throw new ArgumentNullException(nameof(source));
if (!source.CanRead) throw new ArgumentException("Non readable source.", nameof(source));
this.source = source;
this.leaveOpen = leaveOpen;
}
protected override void Dispose(bool disposing)
{
if (disposing && !leaveOpen)
source.Dispose();
base.Dispose(disposing);
}
public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => false;
public override long Length => throw new NotSupportedException();
public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
public override void Flush() { }
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
public override void SetLength(long value) => throw new NotSupportedException();
public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
public int PeekByte()
{
if (nextByte == null)
nextByte = source.ReadByte();
return nextByte.Value;
}
public override int Read(byte[] buffer, int offset, int count)
{
if (count <= 0) return 0;
if (nextByte != null)
{
if (nextByte.Value < 0) return 0;
buffer[offset] = (byte)nextByte.Value;
if (count > 1)
{
int read = source.Read(buffer, offset + 1, count - 1);
if (read == 0)
nextByte = -1;
else
nextByte = null;
return read + 1;
}
else
{
nextByte = null;
return 1;
}
}
else
{
int read = source.Read(buffer, offset, count);
if (read == 0)
nextByte = -1;
return read;
}
}
}
This basically implements read only forward only stream with 0 or 1 byte read ahead functionality.
The usage will be like this:
using (var stream = new SequentialStream(GetStream(ListToSend)))
{
// ...
while (stream.PeekByte() != -1)
{
// ...
}
// ...
}
P.S. What about
I also want to know why when I put a break point in the read function, the buffer size is changing randomly.
It's not randomly. BinaryFormatter
internally uses BinaryReader
to read typed values like Int32
, Byte
, String
etc., passing the desired size as count
, e.g. 4, 1, number of the string encoded bytes (which it knows because stores them in the stream before actual the data and reads it before trying to read the actual data) etc.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With