Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Animate chart with Observable based on infinite sequence

I'm having trouble creating an animated visualisation of some data of mine using FSharp.Charting and FSharp.Control.Reactive.

Essentially, I have an infinite generator of points. My real generator is more complicated than the following example, but this simplified example reproduces the problem:

let move points =
    Seq.initInfinite (fun i -> points |> List.map (fun (x, y) -> x + i, y + i))

This move function has the type (int * int) list -> seq<(int * int) list>. It translates all points in the input list by 1, for every iteration, so that the points move up and to the right.

I want to show this on a LiveChart.Point. The sequence generated by move can be converted to an appropriate Observable, but by itself it runs quite fast, so I first slow it down a bit:

// Slow down any sequence
let delay (duration : TimeSpan) xs = seq {
    for x in xs do
    duration.TotalMilliseconds |> int |> Async.Sleep |> Async.RunSynchronously
    yield x }

This enables me to create an Observable out of the sequence of points:

let obs =
    [(1, 0); (0, 1)]
    |> move
    |> delay (TimeSpan.FromSeconds 0.5)
    |> Observable.toObservable

I can also see that if I print to the console, it works:

obs.Subscribe (fun x -> printfn "%O" x)

By printing to the console, it's also quite clear that this blocks the execution environment; if, for example, you send the script to F# Interactive (FSI), it keeps printing, and you'll have to cancel evaluation or reset the session in order to stop it.

My theory is that it's because obs is running on the same thread as the execution environment.

The same happens if I try to make a LiveChart.Point out of it:

let lc = obs |> LiveChart.Point
lc.ShowChart()

If I send this to FSI, nothing happens (no chart is shown), and FSI blocks.

This seems consistent with my theory that the Observer is running on the same thread as the chart.

How can I make the Observer run on a different thread?

I've discovered Observable.observeOn, which takes an IScheduler. Browsing MSDN, I've discovered NewThreadScheduler, ThreadPoolScheduler, and TaskPoolScheduler, all of which implement IScheduler. The names of these classes sound promising, but I can't find them!

According to the documentation, they're all defined in System.Reactive.dll, but while I have all the dependencies of FSharp.Control.Reactive, I don't have that assembly anywhere. Searching the internet hasn't revealed where to get it, either.

Is it an older or newer version of Reactive Extensions? Am I even looking in the right direction?

How can I visualise my infinite sequence of points on a LiveChart?


Here's a complete script to reproduce the issue:

#r @"../packages/Rx-Interfaces.2.2.5/lib/net45/System.Reactive.Interfaces.dll"
#r @"../packages/Rx-Core.2.2.5/lib/net45/System.Reactive.Core.dll"
#r @"../packages/Rx-Linq.2.2.5/lib/net45/System.Reactive.Linq.dll"
#r @"../packages/FSharp.Control.Reactive.3.2.0/lib/net40/FSharp.Control.Reactive.dll"
#r @"../packages/FSharp.Charting.0.90.12/lib/net40/FSharp.Charting.dll"
#r "System.Windows.Forms.DataVisualization"

open System
open FSharp.Control.Reactive
open FSharp.Charting

let move points =
    Seq.initInfinite (fun i -> points |> List.map (fun (x, y) -> x + i, y + i))

// Slow down any sequence
let delay (duration : TimeSpan) xs = seq {
    for x in xs do
    duration.TotalMilliseconds |> int |> Async.Sleep |> Async.RunSynchronously
    yield x }

let obs =
    [(1, 0); (0, 1)]
    |> move
    |> delay (TimeSpan.FromSeconds 0.5)
    |> Observable.toObservable

//obs.Subscribe (fun x -> printfn "%O" x)

let lc = obs |> LiveChart.Point
lc.ShowChart()

Installed NuGet packages:

Id                                  Versions
--                                  --------
FSharp.Charting                     {0.90.12}
FSharp.Control.Reactive             {3.2.0}
FSharp.Core                         {3.1.2}
Rx-Core                             {2.2.5}
Rx-Interfaces                       {2.2.5}
Rx-Linq                             {2.2.5}
like image 794
Mark Seemann Avatar asked Aug 21 '15 07:08

Mark Seemann


2 Answers

The trick is to use subscribeOn. From introtorx.com on subscribeOn and observeOn:

One pitfall I want to point out here is, the first few times I used these overloads, I was confused as to what they actually do. You should use the SubscribeOn method to describe how you want any warm-up and background processing code to be scheduled. For example, if you were to use SubscribeOn with Observable.Create, the delegate passed to the Create method would be run on the specified scheduler.

The ObserveOn method is used to declare where you want your notifications to be scheduled to. I would suggest the ObserveOn method is most useful when working with STA systems, most commonly UI applications.

Complete script below:

#r @"packages/Rx-Interfaces.2.2.5/lib/net45/System.Reactive.Interfaces.dll"
#r @"packages/Rx-PlatformServices.2.2.5/lib/net45/System.Reactive.PlatformServices.dll"
#r @"packages/Rx-Core.2.2.5/lib/net45/System.Reactive.Core.dll"
#r @"packages/Rx-Linq.2.2.5/lib/net45/System.Reactive.Linq.dll"
#r @"packages/FSharp.Control.Reactive.3.2.0/lib/net40/FSharp.Control.Reactive.dll"
#r @"packages/FSharp.Charting.0.90.12/lib/net40/FSharp.Charting.dll"
#r "System.Windows.Forms.DataVisualization"

open System
open FSharp.Control.Reactive
open FSharp.Charting
open System.Reactive.Concurrency

let move points =
    Seq.initInfinite (fun i -> points |> List.map (fun (x, y) -> x + i, y + i))

// Slow down any sequence
let delay (duration : TimeSpan) xs = seq {
    for x in xs do
    duration.TotalMilliseconds |> int |> Async.Sleep |> Async.RunSynchronously
    yield x }

let obs =
    [(1, 0); (0, 1)]
    |> move
    |> delay (TimeSpan.FromSeconds 0.5)
    |> Observable.toObservable
    |> Observable.subscribeOn NewThreadScheduler.Default 

let lc = obs |> LiveChart.Point
lc.ShowChart()

Usually in a UI application you would pair subscribeOn and observeOn to ensure results are being delivered back on the UI thread. Does not seem to be needed here as it looks like the chart handles this for you (works for me).

like image 135
Kevin Avatar answered Sep 27 '22 16:09

Kevin


Schedulers are defined in System.Reactive.PlatformServices.dll (that is installed by Rx-PlatformServices package).

I've found them here: https://github.com/Reactive-Extensions/Rx.NET/tree/master/Rx.NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency

For example to use New Thread Scheduler: (observable).ObserveOn(System.Reactive.Concurrency.NewThreadScheduler.Default)

like image 28
Dawid Kowalski Avatar answered Sep 27 '22 17:09

Dawid Kowalski