The basic discussion of queues in the lparallel library at https://z0ltan.wordpress.com/2016/09/09/basic-concurrency-and-parallelism-in-common-lisp-part-4a-parallelism-using-lparallel-fundamentals/#channels says that queues "enable message passing between worker threads." The test below uses a shared queue to coordinate a main and a subordinate thread, where the main simply waits for the completion of the subordinate before exiting:
(defun foo (q)
(sleep 1)
(lparallel.queue:pop-queue q)) ;q is now empty
(defun test ()
(setf lparallel:*kernel* (lparallel:make-kernel 1))
(let ((c (lparallel:make-channel))
(q (lparallel.queue:make-queue)))
(lparallel.queue:push-queue 0 q)
(lparallel:submit-task c #'foo q)
(loop do (sleep .2)
(print (lparallel.queue:peek-queue q))
when (lparallel.queue:queue-empty-p q)
do (return)))
(lparallel:end-kernel :wait t))
This works as expected producing output:
* (test)
0
0
0
0
NIL
(#<SB-THREAD:THREAD "lparallel" FINISHED values: NIL {10068F2B03}>)
My question is about whether I'm using the queue functionality of lparallel correctly or fully. It would seem that a queue is simply a substitute for using a global variable to hold a thread-shared object. What is the design advantage of using a queue? Is it generally good practice to assign one queue to each submitted task (assuming the task needs to communicate)? Thanks for any deeper insights.
Multithreaded work is done by managing concurrent access to mutable shared states, i.e. you have a lock around a common data-structure, and each thread read or write into it.
It is however recommended to minimize the number of data being accessed concurrently. Queues are a way to decouple workers from each others, by having each thread manage its local state and exchange data only through messages; this is thread-safe because the access to queues is controlled by locks and condition variables.
What you are doing in your main thread is polling for when the queue is empty; this might work but this is counter-productive, since queues are used as a synchronization mechanism but here you are doing the synchronization yourself.
(ql:quickload :lparallel)
(defpackage :so (:use :cl
:lparallel
:lparallel.queue
:lparallel.kernel-util))
(in-package :so)
Let's change foo
so that it gets two queues, one for incoming
requests, and one for replies. Here, we perform a simple transform to
the data being sent and for each input message, there is exactly one
output message, but this needs not always be the case.
(defun foo (in out)
(push-queue (1+ (pop-queue in)) out))
Change test
so that the control-flow is only based on reading/writing to queues:
(defun test ()
(with-temp-kernel (1)
(let ((c (make-channel))
(foo-in (make-queue))
(foo-out (make-queue)))
(submit-task c #'foo foo-in foo-out)
;; submit data to task (could be blocking)
(push-queue 0 foo-in)
;; wait for message from task (could be blocking too)
(pop-queue foo-out))))
But how can you can avoid polling in test if there are multiple tasks running? Don’t you need to continuously check when any one of them is done so you can push-queue more work to it?
You could use a different concurrency mechanism, similar to listen and poll/epoll, where you watch for multiple
source of events and react whenever one of them is ready. There are languages like Go (select) and Erlang (receive) where
this is quite natural to express. On the Lisp side, the Calispel library provide a similar alternation mechanism (pri-alt
and fair-alt
). For example, the following it taken from Calispel's test code:
(pri-alt ((? control msg)
(ecase msg
(:clean-up (setf cleanup? t))
(:high-speed (setf slow? nil))
(:low-speed (setf slow? t))))
((? channel msg)
(declare (type fixnum msg))
(vector-push-extend msg out))
((otherwise :timeout (if cleanup? 0 nil))
(! reader-results out)
(! thread-expiration (bt:current-thread))
(return)))
In the case of lparallel, there is no such mechanism, but you can go pretty far with queues only, provided you tag your messages with identifiers.
If you need to react as soon as either a task t1
or t2
gives a result, then make both of those task write in the same result channel:
(let ((t1 (foo :id 1 :in i1 :out res))
(t2 (bar :id 2 :in i2 :out res)))
(destructuring-bind (id message) (pop-queue res)
(case id
(1 ...)
(2 ...))))
If you need to synchronize code for when both t1
and t2
emit result, let them write in different channels:
(let ((t1 (foo :id 1 :in i1 :out o1))
(t2 (bar :id 2 :in i2 :out o2)))
(list (pop-queue o1)
(pop-queue o2)))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With