I'm trying to write a seemingly simple implementation of the classic producer - consumer idiom in Python. There is one comparably quick producer for multiple slower consumers. In principle, this is easy to do using the Queue module, and the library documentation has an example spawning only a few lines of code.
However, I also want the code to work properly in case exceptions occur. Both the producer and all consumers should stop in case any of the following things happen:
After that, the whole process should fail raising the initial exception to inform the caller about what went wrong.
The main challenge seems to be to cleanly terminate the consumer thread without ending up in a blocking join(). It appears to be popular to set Thread.deamon=True, but to my understanding this causes resource leaks in case the producer fails with an exception.
I managed to write an implementation that fulfills my requirements (see below). However I find the code to be a lot more complex than expected.
Is there a leaner way to deal with these scenario?
Here are a couple of example calls and the resulting final log message from my current implementation:
Produce and consume 10 items:
$ python procon.py
INFO:root:processed all items
Produce no items:
$ python procon.py --items 0
INFO:root:processed all items
Produce 5 items for 10 consumers, thus using only some of the available consumers:
$ python procon.py --items 5 --consumers 10
INFO:root:processed all items
Interrupt by pressing Control-C:
$ python procon.py
^CWARNING:root:interrupted by user
Fail to produce item 3:
$ python procon.py --producer-fails-at 3
ERROR:root:cannot produce item 3
Fail to consume item 3:
$ python procon.py --consumer-fails-at 3
ERROR:root:cannot consume item 3
Fail to consume the last item:
$ python procon.py --items 10 --consumer-fails-at 9
ERROR:root:cannot consume item 9
And here is the probably overly complex source code:
"""
Consumer/producer to test exception handling in threads. Both the producer
and the consumer can be made to fail deliberately when processing a certain
item using command line options.
"""
import logging
import optparse
import Queue
import threading
import time
_PRODUCTION_DELAY = 0.1
_CONSUMPTION_DELAY = 0.3
# Delay for ugly hacks and polling loops.
_HACK_DELAY = 0.05
class _Consumer(threading.Thread):
"""
Thread to consume items from an item queue filled by a producer, which can
be told to terminate in two ways:
1. using `finish()`, which keeps processing the remaining items on the
queue until it is empty
2. using `cancel()`, which finishes consuming the current item and then
terminates
"""
def __init__(self, name, itemQueue, failedConsumers):
super(_Consumer, self).__init__(name=name)
self._log = logging.getLogger(name)
self._itemQueue = itemQueue
self._failedConsumers = failedConsumers
self.error = None
self.itemToFailAt = None
self._log.info(u"waiting for items to consume")
self._isFinishing = False
self._isCanceled = False
def finish(self):
self._isFinishing = True
def cancel(self):
self._isCanceled = True
def consume(self, item):
self._log.info(u"consume item %d", item)
if item == self.itemToFailAt:
raise ValueError("cannot consume item %d" % item)
time.sleep(_CONSUMPTION_DELAY)
def run(self):
try:
while not (self._isFinishing and self._itemQueue.empty()) \
and not self._isCanceled:
# HACK: Use a timeout when getting the item from the queue
# because between `empty()` and `get()` another consumer might
# have removed it.
try:
item = self._itemQueue.get(timeout=_HACK_DELAY)
self.consume(item)
except Queue.Empty:
pass
if self._isCanceled:
self._log.info(u"canceled")
if self._isFinishing:
self._log.info(u"finished")
except Exception, error:
self._log.error(u"cannot continue to consume: %s", error)
self.error = error
self._failedConsumers.put(self)
class Worker(object):
"""
Controller for interaction between producer and consumers.
"""
def __init__(self, itemsToProduceCount, itemProducerFailsAt,
itemConsumerFailsAt, consumerCount):
self._itemsToProduceCount = itemsToProduceCount
self._itemProducerFailsAt = itemProducerFailsAt
self._itemConsumerFailsAt = itemConsumerFailsAt
self._consumerCount = consumerCount
self._itemQueue = Queue.Queue()
self._failedConsumers = Queue.Queue()
self._log = logging.getLogger("producer")
self._consumers = []
def _possiblyRaiseConsumerError(self):
if not self._failedConsumers.empty():
failedConsumer = self._failedConsumers.get()
self._log.info(u"handling failed %s", failedConsumer.name)
raise failedConsumer.error
def _cancelAllConsumers(self):
self._log.info(u"canceling all consumers")
for consumerToCancel in self._consumers:
consumerToCancel.cancel()
self._log.info(u"waiting for consumers to be canceled")
for possiblyCanceledConsumer in self._consumers:
# In this case, we ignore possible consumer errors because there
# already is an error to report.
possiblyCanceledConsumer.join(_HACK_DELAY)
if possiblyCanceledConsumer.isAlive():
self._consumers.append(possiblyCanceledConsumer)
def work(self):
"""
Launch consumer thread and produce items. In case any consumer or the
producer raise an exception, fail by raising this exception
"""
self.consumers = []
for consumerId in range(self._consumerCount):
consumerToStart = _Consumer(u"consumer %d" % consumerId,
self._itemQueue, self._failedConsumers)
self._consumers.append(consumerToStart)
consumerToStart.start()
if self._itemConsumerFailsAt is not None:
consumerToStart.itemToFailAt = self._itemConsumerFailsAt
self._log = logging.getLogger("producer ")
self._log.info(u"producing %d items", self._itemsToProduceCount)
for itemNumber in range(self._itemsToProduceCount):
self._possiblyRaiseConsumerError()
self._log.info(u"produce item %d", itemNumber)
if itemNumber == self._itemProducerFailsAt:
raise ValueError("ucannot produce item %d" % itemNumber)
# Do the actual work.
time.sleep(_PRODUCTION_DELAY)
self._itemQueue.put(itemNumber)
self._log.info(u"telling consumers to finish the remaining items")
for consumerToFinish in self._consumers:
consumerToFinish.finish()
self._log.info(u"waiting for consumers to finish")
for possiblyFinishedConsumer in self._consumers:
self._possiblyRaiseConsumerError()
possiblyFinishedConsumer.join(_HACK_DELAY)
if possiblyFinishedConsumer.isAlive():
self._consumers.append(possiblyFinishedConsumer)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
parser = optparse.OptionParser()
parser.add_option("-c", "--consumer-fails-at", metavar="NUMBER",
type="long", help="number of items at which consumer fails (default: %default)")
parser.add_option("-i", "--items", metavar="NUMBER", type="long",
help="number of items to produce (default: %default)", default=10)
parser.add_option("-n", "--consumers", metavar="NUMBER", type="long",
help="number of consumers (default: %default)", default=2)
parser.add_option("-p", "--producer-fails-at", metavar="NUMBER",
type="long", help="number of items at which producer fails (default: %default)")
options, others = parser.parse_args()
worker = Worker(options.items, options.producer_fails_at,
options.consumer_fails_at, options.consumers)
try:
worker.work()
logging.info(u"processed all items")
except KeyboardInterrupt:
logging.warning(u"interrupted by user")
worker._cancelAllConsumers()
except Exception, error:
logging.error(u"%s", error)
worker._cancelAllConsumers()
A snake is a secondary consumer as it eats herbivores. It is also a tertiary consumer because it eats other carnivores, including some species of snakes. A snake is not a decomposer, and neither is it a producer. Although the snake is close to the top of the food chain, it is not an apex predator.
In Python, exceptions can be handled using a try statement. The critical operation which can raise an exception is placed inside the try clause. The code that handles the exceptions is written in the except clause.
Solution to the problem using Semaphore empty: This semaphore stores the number of slots that are empty in our buffer. The initial value of this semaphore is the size of our bounded buffer. Before adding any data in the buffer, the Producer thread will try to acquire this semaphore and will decrease its value by 1.
Python Multithread In the following example, the Consumer and Producer threads runs indefinitely while checking the status of the queue. The Producer thread is responsible for putting items into the queue if it is not full while the Consumer thread consumes items if there are any.
You need a queue with a cancel method that empties the internal queue, sets a cancelled flag, and then wakes everyone up. The worker will wake up from join(), check the cancelled flag on the queue and act appropriately. The consumers will wake up from get() and check the cancelled flag on the queue and print an error. Then your consumer would just need to call the cancel() method in the event of an exception.
Unfortunately the Python Queue doesn't have a cancel method. A few choices jump to mind:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With