Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I create a job queue using a MailboxProcessor?

Tags:

f#

I'm trying to model a asynchronous job processing framework using MailboxProcessor. My requirements are to Start, Stop, Pause, and Resume the job processor. Can I build Pause / Resume functionality with MailboxProcessor? Also I should be able to Stop and Start? I'm trying to model after Windows Service.

I've a system in C#, implemented using Queue / Threads. I was looking for design alternatives, that's when I saw MailboxProcessor. I believe I can use it but couldnt figure out how to handle the above scenarios. So is it possible to achieve this functionality?

like image 966
Vasu Balakrishnan Avatar asked Dec 17 '22 07:12

Vasu Balakrishnan


2 Answers

Sure :) Just hold an internal queue of jobs and enumerate through the queue when the job processor is in Start mode. In any other mode, just enqueue new jobs until the processor goes into start mode.

type 'a msg =       // '
    | Start
    | Stop
    | Pause
    | Job of (unit -> unit)

type processQueue() =        
    let mb = MailboxProcessor.Start(fun inbox ->
        let rec loop state (jobs : System.Collections.Generic.Queue<_>) =
            async {
                if state = Start then
                    while jobs.Count > 0 do
                        let f = jobs.Dequeue()
                        f()

                let! msg = inbox.Receive()
                match msg with
                | Start -> return! loop Start jobs
                | Pause -> return! loop Pause jobs
                | Job(f) -> jobs.Enqueue(f); return! loop state jobs
                | Stop -> return ()
            }
        loop Start (new System.Collections.Generic.Queue<_>()))

    member this.Resume() = mb.Post(Start)
    member this.Stop() = mb.Post(Stop)
    member this.Pause() = mb.Post(Pause)
    member this.QueueJob(f) = mb.Post(Job f)

This class behaves as expected: You can enqueue jobs in the Pause state, but they'll only run in the Start state. Once the processQueue is stopped, it can't be restarted, and none of the enqueued jobs will run (its easy enough to change this behavior so that, rather than killing the queue, it just doesn't enqueue a job in the Stop state).

Use MailboxProcessor.PostAndReply if you need two-way communication between the mailbox processor and your code.

like image 145
Juliet Avatar answered Dec 19 '22 21:12

Juliet


You may want to check out Luca's blog, as I think it has some recent relevant stuff.

like image 39
Brian Avatar answered Dec 19 '22 21:12

Brian