First of all, I'm using the most recent of Rx, that is 2.1. As far as I understand, lots of things have changed when Rx turned 2, so I'm really looking forward to receive an up-to-date answer. Thanks in advance.
I'm implementing a classic task for Rx: observing text of TextBox (AutoCompleteBox from WPToolkit to be exact), in order to provide a list of suggestions to user. Suggestions are fetched from the network, and I want to use these ordinary Rx goodies, like Throttle, DistinctUntilChanged, etc.
I'm also using recently released portable HttpClient for Windows Phone 8, since it provides task-based asynchronous API, which is nice.
The problem I'm having is the cross-thread access while reading the Text
value of 'AutoCompleteBox`. Here is the code:
var http = new HttpClient();
var searchFunc = Observable.FromAsync<HttpResponseMessage>(() =>
http.GetAsync(FormatUrl(SEARCH_URL, "DE", new GeoCoordinate(51, 13), searchBox.Text /* <-- causes exception */, 10, "")));
var uithread = new SynchronizationContextScheduler(SynchronizationContext.Current);
var textChange = Observable.FromEventPattern<RoutedEventArgs>(searchBox, "TextChanged")
.Throttle(TimeSpan.FromMilliseconds(800))
.DistinctUntilChanged()
.SubscribeOn(uithread)
.SelectMany(searchFunc)
.Select(async (resp) => SearchResultsParser.ParseSearchResults(await resp.Content.ReadAsStreamAsync(), new GeoCoordinate(51, 13)))
.Select(async (results) => searchBox.ItemsSource = await results)
.ObserveOn(uithread)
.Subscribe();
Exception happens when searchFunc
is executed. I see from VS that it executes on a Worker Thread, despite I use SubscribeOn.
Here's the example using SynchronizationContextScheduler
, but I've also tried just SubscribeOnDispatcher
, with the same result. Looks like I'm missing something important with this ObserveOn
stuff, or maybe regarding Observable.FromAsync
. Could you kindly point me to my mistake?
SubscribeOn
is almost never what you want - you might think it means "Where my Subscribe
method runs", but it actually means "Where the actual wiring to the IDisposable
(and disposal) runs" - ObserveOn
is the equivalent for "This is where I want my actual Subscribe
code to execute"
Ref: Observable.SubscribeOn and Observable.ObserveOn
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With