I have the following code (simplified for posting purposes).
public class SomeDataObject
{
public delegate void ReadyEventHandler;
public delegate void ErrorEventHandler;
public event ReadyEventHandler Ready;
public event ErrorEventHandler Error;
...
}
pubic class ConsumerClass
{
private SomeDataObject dataObject;
private Task<List<string>> GetStrings()
{
List<string> results = new List<string>();
var tcs = new TaskCompletionSource<List<string>>();
SomeDataObject.ReadyEventHandler ReadyHandler = null;
SomeDataObject.ErrorEventHandler ErrorHandler = null;
ReadyHandler += () =>
{
for (int i =0; i < dataObject.ItemCount; i++)
results.Add(dataObject[i].ToString());
tcs.TrySetResult(results);
}
ErrorHandler += ()
{
tcs.TrySetException(new Exception("oops!");
}
dataObject.Ready += ReadyHandler;
dataObject.Error += ErrorHandler;
dataObject.DoRequest();
}
}
The idea is that when DoRequest call is made, SomeDataObject will get some data and raise either the Ready or Error events (details not important!). If data is available, then the ItemCount indicates how many items are available.
I am new to Rx and cannot find any comparable example. So is it possible to convert this into Rx so that IObservable<string>
is returned instead of Task<List<string>>
using Observable.Create somehow?
Regards Alan
Matthew's answer is close but has some problems. First, it is eager, which is not normally in the spirit of Rx/Functional programming. Next I think that you will want to be able to release the event handles when the consumer disposes. Finally the usage of a subject should be a code smell, and this case it points to the two problems above :-)
Here I use Observable.Create (which should be your #1 goto tool in the tool box, with subjects being your last resort) to lazily connect, and also offer disconnection/releasing events when the subscription is disposed.
private IObservable<string> GetStrings()
{
return Observable.Create<string>(o=>
{
SomeDataObject.ReadyEventHandler ReadyHandler = null;
SomeDataObject.ErrorEventHandler ErrorHandler = null;
ReadyHandler += () =>
{
for (int i =0; i < dataObject.ItemCount; i++)
o.OnNext(dataObject[i].ToString());
o.OnCompleted();
}
ErrorHandler += () =>
{
o.OnError(new Exception("oops!"));
}
dataObject.Ready += ReadyHandler;
dataObject.Error += ErrorHandler;
dataObject.DoRequest();
return Disposable.Create(()=>
{
dataObject.Ready -= ReadyHandler;
dataObject.Error -= ErrorHandler;
});
}
}
I would also consider moving dataObject
to a parameter to the method too. Sharing state in an Async system is a source of problems.
In response to your comments on Lee's (perfectly lovely and tick-worthy) answer, here's how to modify his answer to get a single List<string>
response and block for it:
private IObservable<List<string>> GetStrings(SomeDataObject dataObject)
{
return Observable.Create<List<string>>(o=>
{
SomeDataObject.ReadyEventHandler ReadyHandler = null;
SomeDataObject.ErrorEventHandler ErrorHandler = null;
ReadyHandler = () =>
{
var results = new List<string>(dataObject.ItemCount);
for (int i =0; i < dataObject.ItemCount; i++)
results.Add(dataObject[i].ToString());
o.OnNext(results);
o.OnCompleted();
};
ErrorHandler = () =>
{
o.OnError(new Exception("oops!"));
};
dataObject.Ready += ReadyHandler;
dataObject.Error += ErrorHandler;
dataObject.DoRequest();
return Disposable.Create(()=>
{
dataObject.Ready -= ReadyHandler;
dataObject.Error -= ErrorHandler;
});
});
}
Now you can block on this with:
var results = GetStrings().Wait();
If using .NET 4.5, then in an async
method you can also do:
var results = await GetStrings();
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With