Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to return stream from WCF service?

I'm playing with protobuf-net and WCF. Here is code I created:

public class MobileServiceV2
{
    [WebGet(UriTemplate = "/some-data")]
    [Description("returns test data")]
    public Stream GetSomeData()
    {
        WebOperationContext.Current.OutgoingResponse.ContentType = "application/x-protobuf";

        var ms = new MemoryStream();
        ProtoBuf.Serializer.Serialize(ms, new MyResponse { SomeData = "Test data here" });
        return ms;
    }
}

[DataContract]
public class MyResponse
{
    [DataMember(Order = 1)] 
    public string SomeData { get; set; }
}

When I look in Fiddler - I can see proper outgoing content-type and all looks good, but I get empty response. IE prompts to download file and this file is empty. Is serializer not working? Or I just don't do it right?

EDIT:

I added following code to method and yes, it serializes correctly. Something wrong with how I return stream from WCF..

using (var file = File.Create("C:\\test.bin"))
        {
            Serializer.Serialize(file, new MyResponse { SomeData = "Test data here" });
        }
like image 875
katit Avatar asked Jul 20 '12 15:07

katit


2 Answers

Just write to a MemoryStream, and rewind it. Do not Dispose() it in this case:

var ms = new MemoryStream();
Serializer.Serialize(ms, obj);
ms.Position = 0;
return ms;

This does, however, mean that it buffers in memory. I could try and come up with some voodoo to avoid that, but it would be very complex.

like image 74
Marc Gravell Avatar answered Nov 01 '22 11:11

Marc Gravell


If I understand the question correctly, you're trying to use a streaming WCF binding. In that case, you could try splitting the data into chunks that are serialized individually and deserialized in the same way on the client. The only caveat is the WCF supplied Stream implementation on the receiving side - you'll need to wrap it and manage read on your own. The following is a class I use to facilitate this:

    public static class StreamingUtility
{

    public static IEnumerable<T> FromStream<T>(this Stream value, Action<T> perItemCallback = null)
    {

        List<T> result = new List<T>();
        StreamProxy sp = new StreamProxy(value);
        try
        {
            while (sp.CanRead)
            {

                T v = ProtoBuf.Serializer.DeserializeWithLengthPrefix<T>((Stream)sp, ProtoBuf.PrefixStyle.Base128);
                if (perItemCallback != null)
                    perItemCallback(v);

                result.Add(v);
            }
        }
        catch { }

        return result;
    }

    public static StreamingContent<T> SingleToStream<T>(this T value)
    {
        return new StreamingContent<T>(new T[] { value });
    }

    public static StreamingContent<T> ToStream<T>(this IEnumerable<T> value)
    {
        return new StreamingContent<T>(value);
    }

    public class StreamingContent<T> : Stream
    {
        private bool _canRead = true;
        private ManualResetEventSlim _dataIsReady = new ManualResetEventSlim(false);
        private bool _noMoreAdditions = false;
        private long _readingOffset = 0;

        //private IFormatter _serializer = new BinaryFormatter(null, new StreamingContext(StreamingContextStates.CrossMachine));
        private IEnumerable<T> _source = null;

        private MemoryStream _stream = new MemoryStream();

        public static StreamingContent<T> Clone(Stream origin)
        {
            return new StreamingContent<T>(origin);
        }

        private StreamingContent(Stream origin)
        {
            byte[] b = new byte[65536];

            while (true)
            {
                int count = origin.Read(b, 0, b.Length);

                if (count > 0)
                {
                    _stream.Write(b, 0, count);
                }
                else
                    break;
            }
            _noMoreAdditions = true;
        }

        public StreamingContent(IEnumerable<T> source)
        {
            if (!s_initialized)
            {
                StreamingUtility.Initialize();

                StreamingUtility.s_initialized = true;
            }

            _source = source.ToList();
            if (source.Count() > 0)
            {
                new Thread(new ParameterizedThreadStart(obj =>
                {
                    StreamingContent<T> _this = obj as StreamingContent<T>;
                    foreach (T item in _this._source)
                    {
                        lock (_this._stream)
                        {
                            if (_this._noMoreAdditions) break;
                            _stream.Seek(0, SeekOrigin.End);

                            ProtoBuf.Serializer.SerializeWithLengthPrefix<T>(_this._stream, item, ProtoBuf.PrefixStyle.Base128);

                            //_serializer.Serialize(_this._stream, item);
                            _dataIsReady.Set();
                        }
                    }

                    lock (_this._stream)
                    {
                        _this._noMoreAdditions = true;
                        _dataIsReady.Set();
                    }
                })) { IsBackground = true }.Start(this);
            }
            else
            {
                _canRead = false;
            }
        }

        public override bool CanRead
        {
            get { return _canRead; }
        }

        public override bool CanSeek
        {
            get { return false; }
        }

        public override bool CanWrite
        {
            get { return false; }
        }

        public override long Length
        {
            get
            {
                while (!_noMoreAdditions) Thread.Sleep(20);
                return _stream.Length;
            }
        }

        public override long Position
        {
            get
            {
                throw new Exception("This stream does not support getting the Position property.");
            }
            set
            {
                throw new Exception("This stream does not support setting the Position property.");
            }
        }

        public override void Close()
        {
            lock (_stream)
            {
                _noMoreAdditions = true;
                _stream.Close();
            }
        }

        public override void Flush()
        {
        }

        public override int Read(byte[] buffer, int offset, int count)
        {
            if (!CanRead) return 0;

            bool wait = false;

            lock (_stream)
            {
                wait = !_dataIsReady.IsSet && !_noMoreAdditions;
            }

            if (wait)
            {
                _dataIsReady.Wait();
            }

            lock (_stream)
            {
                if (!_noMoreAdditions)
                    _dataIsReady.Reset();

                if (_stream.Length > _readingOffset)
                {
                    _stream.Seek(_readingOffset, SeekOrigin.Begin);
                    int res = _stream.Read(buffer, 0, count);

                    if (_noMoreAdditions && count + _readingOffset >= _stream.Length)
                        _canRead = false;

                    _readingOffset += res;

                    return res;
                }
            }

            return 0;
        }

        public override long Seek(long offset, SeekOrigin origin)
        {
            throw new Exception("This stream does not support seeking.");
        }

        public override void SetLength(long value)
        {
            throw new Exception("This stream does not support setting the Length.");
        }

        public override void Write(byte[] buffer, int offset, int count)
        {
            throw new Exception("This stream does not support writing.");
        }

        protected override void Dispose(bool disposing)
        {
            try
            {
                lock (_stream)
                {
                    _noMoreAdditions = true;
                    _stream.Close();
                }
            }
            catch { }
        }
    }

    private class StreamProxy : Stream
    {
        private bool _canRead = true;
        private bool _endOfMessage = false;
        private Stream _internalStream;
        private int _readPosition = 0;
        private MemoryStream _storage = new MemoryStream();
        private int _writePosition = 0;

        public StreamProxy(Stream internalStream)
        {
            _internalStream = internalStream;
            byte[] initialRequest = new byte[1000];

            int length = _internalStream.Read(initialRequest, 0, 1000);

            if (length != 0)
                _storage.Write(initialRequest, 0, length);
            else
                _canRead = false;

            _writePosition = length;
        }

        public override bool CanRead
        {
            get { return _canRead; }
        }

        public override bool CanSeek
        {
            get { return false; }
        }

        public override bool CanWrite
        {
            get { return false; }
        }

        public override long Length
        {
            get { throw new NotImplementedException(); }
        }

        public override long Position
        {
            get
            {
                return _readPosition;
            }
            set
            {
                throw new NotImplementedException();
            }
        }

        public override void Flush()
        {
        }

        public override int ReadByte()
        {
            byte[] res = new byte[1];
            int g = Read(res, 0, 1);
            return res[0];
        }

        public override int Read(byte[] buffer, int offset, int count)
        {


            int res = 0;
            if (_readPosition + count > _writePosition)
            {
                /// add extra bytes to see if more data is available and we need to allow next read
                int readSize = _readPosition + count - _writePosition;

                if (readSize < 1024)
                    readSize = 1024;

                byte[] read = new byte[readSize];
                res = _internalStream.Read(read, 0, readSize);
                if (res > 0)
                {
                    _storage.Seek(_writePosition, SeekOrigin.Begin);
                    _writePosition += res;
                    _storage.Write(read, 0, res);

                }
                else if (res == 0)/// If the read returned 0, we're at the end
                {
                    _endOfMessage = true;

                }

                if (res > 0 && res < readSize)
                {
                    read = new byte[1024];
                    res = _internalStream.Read(read, 0, 1024);
                    if (res > 0)
                    {
                        _storage.Seek(_writePosition, SeekOrigin.Begin);
                        _writePosition += res;
                        _storage.Write(read, 0, res);

                    }
                    else if (res == 0)/// If the read returned 0, we're at the end
                    {
                        _endOfMessage = true;

                    }
                }
            }

            _storage.Seek(_readPosition, SeekOrigin.Begin);
            res = _storage.Read(buffer, offset, count);
            _readPosition += res;


            /// If end of message was reached and all the data was read from the
            /// internal storage, mark CanRead as false
            if (_readPosition >= _writePosition && _endOfMessage)
                _canRead = false;

            return res;
        }

        public override long Seek(long offset, SeekOrigin origin)
        {
            throw new NotImplementedException();
        }

        public override void SetLength(long value)
        {
            throw new NotImplementedException();
        }

        public override void Write(byte[] buffer, int offset, int count)
        {
            throw new NotImplementedException();
        }
    }
}

To use it simply call a WCF interface method like this:

IEnumerable<SomeType> collection = ...
clannel.Method(collection.ToStream());

And read it on the receiving end like this:

public void Method(Stream stream){
   IEnumerable<SomeType> coll = stream.FromStream<SomeType>();
}

This implementation is still in testing, so I'll appreciate any input.

like image 3
John K Avatar answered Nov 01 '22 12:11

John K