Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Rx 2.1: How to subscribe and observe on Dispatcher correctly

First of all, I'm using the most recent of Rx, that is 2.1. As far as I understand, lots of things have changed when Rx turned 2, so I'm really looking forward to receive an up-to-date answer. Thanks in advance.

I'm implementing a classic task for Rx: observing text of TextBox (AutoCompleteBox from WPToolkit to be exact), in order to provide a list of suggestions to user. Suggestions are fetched from the network, and I want to use these ordinary Rx goodies, like Throttle, DistinctUntilChanged, etc.

I'm also using recently released portable HttpClient for Windows Phone 8, since it provides task-based asynchronous API, which is nice.

The problem I'm having is the cross-thread access while reading the Text value of 'AutoCompleteBox`. Here is the code:

var http = new HttpClient();
var searchFunc = Observable.FromAsync<HttpResponseMessage>(() => 
            http.GetAsync(FormatUrl(SEARCH_URL, "DE", new GeoCoordinate(51, 13), searchBox.Text /* <-- causes exception */, 10, "")));

var uithread = new SynchronizationContextScheduler(SynchronizationContext.Current);
var textChange = Observable.FromEventPattern<RoutedEventArgs>(searchBox, "TextChanged")                             
        .Throttle(TimeSpan.FromMilliseconds(800))
        .DistinctUntilChanged()     
        .SubscribeOn(uithread)           
        .SelectMany(searchFunc)                
        .Select(async (resp) => SearchResultsParser.ParseSearchResults(await resp.Content.ReadAsStreamAsync(), new GeoCoordinate(51, 13)))
        .Select(async (results) => searchBox.ItemsSource = await results)
        .ObserveOn(uithread)
        .Subscribe();

Exception happens when searchFunc is executed. I see from VS that it executes on a Worker Thread, despite I use SubscribeOn.

Here's the example using SynchronizationContextScheduler, but I've also tried just SubscribeOnDispatcher, with the same result. Looks like I'm missing something important with this ObserveOn stuff, or maybe regarding Observable.FromAsync. Could you kindly point me to my mistake?

like image 754
Haspemulator Avatar asked Feb 18 '23 08:02

Haspemulator


1 Answers

SubscribeOn is almost never what you want - you might think it means "Where my Subscribe method runs", but it actually means "Where the actual wiring to the IDisposable (and disposal) runs" - ObserveOn is the equivalent for "This is where I want my actual Subscribe code to execute"

Ref: Observable.SubscribeOn and Observable.ObserveOn

like image 76
JerKimball Avatar answered Feb 20 '23 00:02

JerKimball