As far as my knowledge about semaphores goes, a semaphore is used to protect resources which can be counted and are vulnerable to race conditions. But while reading the SBCL documentation of semaphores I could not figure out, how to properly use the provided semaphore implementation to protect a resource.
A usual work flow, as I recall would be:
a process wants to retrieve some of the by the semaphore protected data (which is for the sake of the example a trivial queue). As the semaphore counter is 0, the process waits
another process puts something in the queue and as the semaphore is incremented, a signal is sent to all waiting processes
Given the possibility of interleaving, one has to protect any of those resource accesses as they might not be in that order, or any linear order at all. Therefore e.g. Java interprets each class as an implicit monitor and provides a syncronized
keyword with which a programmer can define a protected area which can only be accessed by one process at a time.
How to I emulate this functionality in common-lisp, as I am pretty sure my current code is as thread safe as without the semaphore, as the semaphore has no clue what code to protect.
;;the package
(defpackage :tests (:use :cl :sb-thread))
(in-package :tests)
(defclass thread-queue ()
((semaphore
:initform (make-semaphore :name "thread-queue-semaphore"))
(in-stack
:initform nil)
(out-stack
:initform nil)))
(defgeneric enqueue-* (queue element)
(:documentation "adds an element to the queue"))
(defgeneric dequeue-* (queue &key timeout)
(:documentation "removes and returns the first element to get out"))
(defmethod enqueue-* ((queue thread-queue) element)
(signal-semaphore (slot-value queue 'semaphore))
(setf (slot-value queue 'in-stack) (push element (slot-value queue 'in-stack))))
(defmethod dequeue-* ((queue thread-queue) &key timeout)
(wait-on-semaphore (slot-value queue 'semaphore) :timeout timeout)
(when (= (length (slot-value queue 'out-stack)) 0)
(setf (slot-value queue 'out-stack) (reverse (slot-value queue 'in-stack)))
(setf (slot-value queue 'in-stack) nil))
(let ((first (car (slot-value queue 'out-stack))))
(setf (slot-value queue 'out-stack) (cdr (slot-value queue 'out-stack)))
first))
(defparameter *test* (make-instance 'thread-queue))
(dequeue-* *test* :timeout 5)
(enqueue-* *test* 42)
(enqueue-* *test* 41)
(enqueue-* *test* 40)
(dequeue-* *test* :timeout 5)
(dequeue-* *test* :timeout 5)
(dequeue-* *test* :timeout 5)
(dequeue-* *test* :timeout 5)
What you already have is a semaphore of count = 0, on which consumers wait.
What you also need is an exclusive lock around access your stacks (perhaps one for each), or alternatively a lock-free queue. If you want/must use semaphores, a binary semaphore can serve as an exclusive lock.
EDIT: In SBCL, you already have lock-free queues, you might want to use one of these instead of two stacks. Another possibility is to use atomic operations.
Finally, if that still doesn't suit you, use a mutex, wrapping code that acesses and updates the stacks inside with-mutex
or with-recursive-lock
.
Be sure to use the lock/mutex after waking up from the semaphore, not around the waiting for the semaphore, otherwise you lose the advantage that semaphores give you, which is the possibility of waking up multiple waiters in a row, instead of one at a time.
You can read all about these things in the SBCL manual.
Also, I think some work has been done to rename every lock-like thing in SBCL to lock
, according to this blog post, but I don't know the status of it and it states that the old names will be supported for a while.
You'll almost surely also need a semaphore of count = limit for producers, to not exceed your queue limit.
In your enqueue-*
, you should signal the semaphore after updating the queue. The setf
is not needed, push
already stores the new head of the list in place.
In your dequeue-*
, length
is a lengthy function when applied to lists, but checking if a list is empty is cheap with null
or endp
. Instead of taking the car
and store the cdr
, you can use pop
, it does exactly that.
You need to hold a mutual exclusion semaphore (aka a 'mutex') for the duration of your queue operations. Use a SBCL mutex as such:
(defclass thread-queue ()
((lock :initform (sb-thread:make-mutex :name 'thread-queue-lock))
...))
(defmethod enqueue-* ((queue thread-queue) element)
(sb-thread:with-recursive-lock ((slot-value queue 'lock))
(setf (slot-value queue 'in-stack) (push element (slot-value queue 'in-stack)))))
* (defvar lock (sb-thread:make-mutex))
LOCK
* lock
#S(SB-THREAD:MUTEX
:NAME NIL
:%OWNER NIL
:LUTEX #<unknown pointer object, widetag=#x5E {11CEB15F}>)
* (sb-thread:with-recursive-lock (lock) 'foo)
FOO
* (sb-thread:with-recursive-lock (lock) (sb-thread:with-recursive-lock (lock) 'foo))
FOO
Presumably the with-recursive-lock
macro will do the right thing (unlock the lock, using unwind-protect
or some such) for a non-local exit.
This is the equivalent of Java synchronized
- the above protects the enqueue-*
method; you'd need to do it to every other method that can be called asynchronously.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With