I wonder if this is too a broad question, but recently I made myself to come across a piece of code I'd like to be certain on how to translate from C# into proper F#. The journey starts from here (1) (the original problem with TPL-F# interaction), and continues here (2) (some example code I'm contemplating to translate into F#).
The example code is too long to reproduce here, but the interesting functions are ActivateAsync
, RefreshHubs
and AddHub
. Particularly the interesting points are
AddHub
has a signature of private async Task AddHub(string address)
.RefreshHubs
calls AddHub
in a loop and collects a list of tasks
, which it then awaits in the very end by await Task.WhenAll(tasks)
and consequently the return value matches its signature of private async Task RefreshHubs(object _)
.RefreshHubs
is called by ActivateAsync
just as await RefreshHubs(null)
and then in the end there's a call await base.ActivateAsync()
matching the function signature public override async Task ActivateAsync()
.Question:
What would be the correct translation of such function signatures to F# that still maintains the interface and functionality and respects the default, custom scheduler? And I'm not otherwise too sure of this "async/await in F#" either. As in how to do it "mechanically". :)
The reason is that in the link "here (1)" there seem to be problem (I haven't verified this) in that F# async operations do not respect a custom, cooperative scheduler set by the (Orleans) runtime. Also, it's stated here that TPL operations escape the scheduler and go to the task pool and their use is therefore prohibited.
One way I can think of dealing with this is with a F# function as follows
//Sorry for the inconvenience of shorterned code, for context see the link "here (1)"...
override this.ActivateAsync() =
this.RegisterTimer(new Func<obj, Task>(this.FlushQueue), null, TimeSpan.FromMilliseconds(100.0), TimeSpan.FromMilliseconds(100.0)) |> ignore
if RoleEnvironment.IsAvailable then
this.RefreshHubs(null) |> Async.awaitPlainTask |> Async.RunSynchronously
else
this.AddHub("http://localhost:48777/") |> Async.awaitPlainTask |> Async.RunSynchronously
//Return value comes from here.
base.ActivateAsync()
member private this.RefreshHubs(_) =
//Code omitted, in case mor context is needed, take a look at the link "here (2)", sorry for the inconvinience...
//The return value is Task.
//In the C# version the AddHub provided tasks are collected and then the
//on the last line there is return await Task.WhenAll(newHubAdditionTasks)
newHubs |> Array.map(fun i -> this.AddHub(i)) |> Task.WhenAll
member private this.AddHub(address) =
//Code omitted, in case mor context is needed, take a look at the link "here (2)", sorry for the inconvinience...
//In the C# version:
//...
//hubs.Add(address, new Tuple<HubConnection, IHubProxy>(hubConnection, hub))
//}
//so this is "void" and could perhaps be Async<void> in F#...
//The return value is Task.
hubConnection.Start() |> Async.awaitTaskVoid |> Async.RunSynchronously
TaskDone.Done
The startAsPlainTask
function is by Sacha Barber from here. Another interesting option could be here as
module Async =
let AwaitTaskVoid : (Task -> Async<unit>) =
Async.AwaitIAsyncResult >> Async.Ignore
<edit: I just noticed the Task.WhenAll
would need to be awaited too. But what would be the proper way? Uh, time to sleep (a bad pun)...
<edit 2: At here (1) (the original problem with TPL-F# interaction) in Codeplex it was mentioned that F# uses synchronization contexts to push work to threads, whereas TPL does not. Now, this is a plausible explanation, I feel (although I'd still have problems in translating these snippets properly regardless of the custom scheduler). Some interesting additional information could be to had from
SingleThreadSynchronizationContext
is provided that looks like queues the work to be executed. Maybe this ought to be used? I think I need to mention Hopac in this context, as an interesting tangential and also mention I'm out of reach for the next 50 odd hours or so in case all my cross-postings go out of hand.
<edit 3: Daniel and svick give good advice in the comments to use a custom task builder. Daniel provides a link to a one that's already defined in FSharpx.
Looking at the the source I see the interface with the parameters are defined as
type TaskBuilder(?continuationOptions, ?scheduler, ?cancellationToken) =
let contOptions = defaultArg continuationOptions TaskContinuationOptions.None
let scheduler = defaultArg scheduler TaskScheduler.Default
let cancellationToken = defaultArg cancellationToken CancellationToken.None
If one were to use this in Orleans, it looks like the TaskScheduler
ought to be TaskScheduler.Current
as per documentation here
Orleans has it's own task scheduler which provides the single threaded execution model used within grains. It's important that when running tasks the Orleans scheduler is used, and not the .NET thread pool.
Should your grain code require a subtask to be created, you should use Task.Factory.StartNew:
await Task.Factory.StartNew(() =>{ /* logic */ });
This technique will use the current task scheduler, which will be the Orleans scheduler.
You should avoid using Task.Run, which always uses the .NET thread pool, and therefore will not run in the single-threaded execution model.
It looks there's a subtle difference between TaskScheduler.Current and TaskScheduler.Default. Maybe this warrants a question on in which example cases there'll be an undesired difference. As the Orleans documentation points out not to use Task.Run
and instead guides to Task.Factory.StartNew
, I wonder if one ought to define TaskCreationOptions.DenyAttachChild as is recommended by such authorities as Stephen Toub at Task.Run vs Task.Factory.StartNew and Stephen Cleary at StartNew is Dangerous. Hmm, it looks like the .Default
will be .DenyAttachChilld
unless I'm mistaken.
Moreover, as there is a problem with Task.Run
viz Task.Factory.CreateNew
regarding the custom scheduler, I wonder if this particular problem could be removed by using a custom TaskFactory as explained in Task Scheduler (Task.Factory) and controlling the number of threads and How to: Create a Task Scheduler That Limits Concurrency.
Hmm, this is becoming quite a long "pondering" already. I wonder how should I close this? Maybe if svick and Daniel could make their comments as answers and I'd upvote both and accept svick's?
You can use use TaskBuilder
in FSharpx and pass in TaskScheduler.Current
. Here's my attempt at translating RefreshHubs
. Note that Task<unit>
is used in lieu of Task
.
let RefreshHubs _ =
let task = TaskBuilder(scheduler = TaskScheduler.Current)
task {
let addresses =
RoleEnvironment.Roles.["GPSTracker.Web"].Instances
|> Seq.map (fun instance ->
let endpoint = instance.InstanceEndpoints.["InternalSignalR"]
sprintf "http://%O" endpoint.IPEndpoint
)
|> Seq.toList
let newHubs = addresses |> List.filter (not << hubs.ContainsKey)
let deadHubs = hubs.Keys |> Seq.filter (fun x ->
not (List.exists ((=) x) addresses))
// remove dead hubs
deadHubs |> Seq.iter (hubs.Remove >> ignore)
// add new hubs
let! _ = Task.WhenAll [| for hub in newHubs -> AddHub hub |]
return ()
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With