I am experimenting with Reactive Extensions to fetch a bunch of RSS items. I based myself on a blog post by Tim Greenfield: Silverlight Rx DataClient within MVVM.
I'm using it within a desktop application, but the code is similar.
The problem I'm having is in understanding the Retry()
functions. It doesn't seem to be doing what I expect and on what I'm expecting it.
var items = new List<RssItem>();
WebHelper.DownloadXmlFileAsync<RssItem>(new Uri(URI), "item")
.Retry(2)
.Finally(PublishResults)
.Subscribe(items.Add, ProcessError, () => ProcessCompleted(items));
When I pass in a valid URI, this works without any issues. When I make a typo in the URI it reports a 404 error through the ProcessError()
function, as one would expect, but it's only reported once. I would have expected it to show this error twice.
So it seems that the Retry()
function is not operating on my web request, but it looks like it actually applies on the functions which are passed to Subscribe()
. I could be wrong here though.
How can I make sure the Retry()
call applies on the web request?
Extra code:
public static class WebHelper
{
public static HttpWebRequest CreateHttp(Uri uri)
{
return CreateHttp(uri, "GET");
}
public static HttpWebRequest CreateHttp(Uri uri, string method)
{
if (uri.Scheme != Uri.UriSchemeHttp && uri.Scheme != Uri.UriSchemeHttps)
{
throw new ArgumentException("The specified URI does not use HTTP or HTTPS.", "uri");
}
var request = (HttpWebRequest)WebRequest.Create(uri);
request.Method = method;
return request;
}
public static IObservable<T> DownloadXmlFileAsync<T>(Uri uri, string elementName) where T : class
{
return (from request in Observable.Return(CreateHttp(uri))
from response in Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)()
let stream = response.GetResponseStream()
where stream != null
from item in XmlReader.Create(stream).GetXmlItem<T>(elementName).ToObservable()
select item);
}
}
public static class XmlExtensions
{
public static IEnumerable<T> GetXmlItem<T>(this XmlReader reader, string elementName) where T : class
{
var serializer = new XmlSerializer(typeof (T));
while (reader.GoToElement(elementName))
{
yield return serializer.Deserialize(reader) as T;
}
}
public static bool GoToElement(this XmlReader reader, string elementName)
{
do
{
if (reader.NodeType == XmlNodeType.Element && reader.Name == elementName)
{
return true;
}
} while (reader.Read());
return false;
}
}
XmlRoot("item")]
public class RssItem
{
[XmlElement("description")]
public string Description { get; set; }
[XmlElement("link")]
public string Link { get; set; }
[XmlElement("pubDate")]
public string PublishDate { get; set; }
[XmlElement("title")]
public string Title { get; set; }
public override string ToString()
{
return string.Format("Title: {0}", Title);
}
}
The Rx grammar for sequences is defined as:
OnNext* (OnError | OnCompleted)?
Receiving either an OnError
or an OnCompleted
signals the end of the sequence and subscriptions on the pipeline are expected to be torn down.
In the context of the operators:
observable.Retry(n)
is: Re-subscribe to observable
when an OnError
is received, upto n times.
observable.Finally(action)
is: Execute action
on receiving OnError|OnCompleted
Retry is meant to be used with cold observables (Lee Campbell has a good post on this) where subscription essentially causes the source to start.
Similarly Repeat
is exactly like Retry
except it resubscribes on receiving an OnCompleted
.
To see this in action, we can create an observable which will "fail" for the first n times, and then succeeds. Now for some code:
private static IObservable<int> ErrorProducer(int i)
{
int count = 0;
return Observable.Create<int>(observer =>
{
Console.WriteLine("Doing work");
if (count++ < i)
{
Console.WriteLine("Failed");
observer.OnError(new Exception());
}
else
{
Console.WriteLine("Done");
observer.OnNext(count);
observer.OnCompleted();
}
return Disposable.Empty;
});
}
For a producer which fails always:
print(ErrorProducer(3).Retry(2));
Gives:
Doing work <-- Subscription
Failed
Doing work <-- Resubscription
Failed
OnError(System.Exception)
Finally
For a producer which eventually succeeds:
print(ErrorProducer(2).Retry(3));
Doing work
Failed
Doing work
Failed
Doing work
Done
OnNext(3) <-- Succeeded
OnCompleted()
Finally
If you wanted your process error function to be called as many times as it retries, it should be placed before the Retry
.
i.e., seq.Do(value => { }, exception => { }).Retry(n)
You can read up on using hot/cold observables, and using the async pattern with Rx to clarify your understanding.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With