Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Convert Event based code to Rx

I have the following code (simplified for posting purposes).

public class SomeDataObject
{
    public delegate void ReadyEventHandler;
    public delegate void ErrorEventHandler;

    public event ReadyEventHandler Ready;
    public event ErrorEventHandler Error;
    ...
}

pubic class ConsumerClass
{
    private SomeDataObject dataObject;

    private Task<List<string>> GetStrings()
    {
        List<string> results = new List<string>();
        var tcs = new TaskCompletionSource<List<string>>();

        SomeDataObject.ReadyEventHandler ReadyHandler = null;
        SomeDataObject.ErrorEventHandler ErrorHandler = null;

        ReadyHandler += () =>
        {
            for (int i =0; i < dataObject.ItemCount; i++)
                results.Add(dataObject[i].ToString());

            tcs.TrySetResult(results);
        }

        ErrorHandler += ()
        {
            tcs.TrySetException(new Exception("oops!");
        }

        dataObject.Ready += ReadyHandler;
        dataObject.Error += ErrorHandler;

        dataObject.DoRequest();
    }
}

The idea is that when DoRequest call is made, SomeDataObject will get some data and raise either the Ready or Error events (details not important!). If data is available, then the ItemCount indicates how many items are available.

I am new to Rx and cannot find any comparable example. So is it possible to convert this into Rx so that IObservable<string> is returned instead of Task<List<string>> using Observable.Create somehow?

Regards Alan

like image 918
Alan Rutter Avatar asked Feb 11 '14 02:02

Alan Rutter


2 Answers

Matthew's answer is close but has some problems. First, it is eager, which is not normally in the spirit of Rx/Functional programming. Next I think that you will want to be able to release the event handles when the consumer disposes. Finally the usage of a subject should be a code smell, and this case it points to the two problems above :-)

Here I use Observable.Create (which should be your #1 goto tool in the tool box, with subjects being your last resort) to lazily connect, and also offer disconnection/releasing events when the subscription is disposed.

private IObservable<string> GetStrings()
{
    return Observable.Create<string>(o=>
    {
        SomeDataObject.ReadyEventHandler ReadyHandler = null;
        SomeDataObject.ErrorEventHandler ErrorHandler = null;

        ReadyHandler += () =>
        {
            for (int i =0; i < dataObject.ItemCount; i++)
                o.OnNext(dataObject[i].ToString());

            o.OnCompleted();
        }

        ErrorHandler += () =>
        {
            o.OnError(new Exception("oops!"));
        }

        dataObject.Ready += ReadyHandler;
        dataObject.Error += ErrorHandler;

        dataObject.DoRequest();

        return Disposable.Create(()=>
            {
                dataObject.Ready -= ReadyHandler;
                dataObject.Error -= ErrorHandler;
            });
    }
}

I would also consider moving dataObject to a parameter to the method too. Sharing state in an Async system is a source of problems.

like image 114
Lee Campbell Avatar answered Sep 22 '22 12:09

Lee Campbell


In response to your comments on Lee's (perfectly lovely and tick-worthy) answer, here's how to modify his answer to get a single List<string> response and block for it:

private IObservable<List<string>> GetStrings(SomeDataObject dataObject)
{
    return Observable.Create<List<string>>(o=>
    {
        SomeDataObject.ReadyEventHandler ReadyHandler = null;
        SomeDataObject.ErrorEventHandler ErrorHandler = null;

        ReadyHandler = () =>
        {
            var results = new List<string>(dataObject.ItemCount);
            for (int i =0; i < dataObject.ItemCount; i++)
                results.Add(dataObject[i].ToString());

            o.OnNext(results);
            o.OnCompleted();
        };

        ErrorHandler = () =>
        {
            o.OnError(new Exception("oops!"));
        };

        dataObject.Ready += ReadyHandler;
        dataObject.Error += ErrorHandler;

        dataObject.DoRequest();

        return Disposable.Create(()=>
            {
                dataObject.Ready -= ReadyHandler;
                dataObject.Error -= ErrorHandler;
            });
    });
}

Now you can block on this with:

var results = GetStrings().Wait();

If using .NET 4.5, then in an async method you can also do:

var results = await GetStrings();
like image 39
James World Avatar answered Sep 20 '22 12:09

James World