Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Continuously reading from a stream?

I have a Stream object that occasionally gets some data on it, but at unpredictable intervals. Messages that appear on the Stream are well-defined and declare the size of their payload in advance (the size is a 16-bit integer contained in the first two bytes of each message).

I'd like to have a StreamWatcher class which detects when the Stream has some data on it. Once it does, I'd like an event to be raised so that a subscribed StreamProcessor instance can process the new message.

Can this be done with C# events without using Threads directly? It seems like it should be straightforward, but I can't get quite get my head around the right way to design this.

like image 379
Damien Wildfire Avatar asked May 06 '10 22:05

Damien Wildfire


2 Answers

When you say not use threads directly, I assume you still want to use them indirectly via async calls, otherwise this wouldn't be very useful.

All you need to do is wrap the async methods of the Stream and store the result in a buffer. First, let's define the event part of the spec:

public delegate void MessageAvailableEventHandler(object sender,
    MessageAvailableEventArgs e);

public class MessageAvailableEventArgs : EventArgs
{
    public MessageAvailableEventArgs(int messageSize) : base()
    {
        this.MessageSize = messageSize;
    }

    public int MessageSize { get; private set; }
}

Now, read one 16-bit integer from the stream asynchronously and report back when it's ready:

public class StreamWatcher
{
    private readonly Stream stream;

    private byte[] sizeBuffer = new byte[2];

    public StreamWatcher(Stream stream)
    {
        if (stream == null)
            throw new ArgumentNullException("stream");
        this.stream = stream;
        WatchNext();
    }

    protected void OnMessageAvailable(MessageAvailableEventArgs e)
    {
        var handler = MessageAvailable;
        if (handler != null)
            handler(this, e);
    }

    protected void WatchNext()
    {
        stream.BeginRead(sizeBuffer, 0, 2, new AsyncCallback(ReadCallback),
            null);
    }

    private void ReadCallback(IAsyncResult ar)
    {
        int bytesRead = stream.EndRead(ar);
        if (bytesRead != 2)
            throw new InvalidOperationException("Invalid message header.");
        int messageSize = sizeBuffer[1] << 8 + sizeBuffer[0];
        OnMessageAvailable(new MessageAvailableEventArgs(messageSize));
        WatchNext();
    }

    public event MessageAvailableEventHandler MessageAvailable;
}

I think that's about it. This assumes that whichever class is handling the message also has access to the Stream and is prepared to read it, synchronously or asynchronously, based on the message size in the event. If you want the watcher class to actually read the entire message then you'll have to add some more code to do that.

like image 192
Aaronaught Avatar answered Sep 21 '22 15:09

Aaronaught


Yes, this can be done. Use the non-blocking Stream.BeginRead method with an AsyncCallback. The callback is called asynchronously when data becomes available. In the callback, call Stream.EndRead to get the data, and call Stream.BeginRead again to get the next chunk of data. Buffer incoming data in a byte array that is large enough to hold the message. Once the byte array is full (multiple callback calls may be needed), raise the event. Then read the next message size, create a new buffer, repeat, done.

like image 28
dtb Avatar answered Sep 20 '22 15:09

dtb