Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python producer/consumer with exception handling

I'm trying to write a seemingly simple implementation of the classic producer - consumer idiom in Python. There is one comparably quick producer for multiple slower consumers. In principle, this is easy to do using the Queue module, and the library documentation has an example spawning only a few lines of code.

However, I also want the code to work properly in case exceptions occur. Both the producer and all consumers should stop in case any of the following things happen:

  • the producer fails with an exception
  • any consumer fails with an exception
  • the user stops the program (causing a KeyboardInterrupt)

After that, the whole process should fail raising the initial exception to inform the caller about what went wrong.

The main challenge seems to be to cleanly terminate the consumer thread without ending up in a blocking join(). It appears to be popular to set Thread.deamon=True, but to my understanding this causes resource leaks in case the producer fails with an exception.

I managed to write an implementation that fulfills my requirements (see below). However I find the code to be a lot more complex than expected.

Is there a leaner way to deal with these scenario?

Here are a couple of example calls and the resulting final log message from my current implementation:

Produce and consume 10 items:

$ python procon.py
INFO:root:processed all items

Produce no items:

$ python procon.py --items 0
INFO:root:processed all items

Produce 5 items for 10 consumers, thus using only some of the available consumers:

$ python procon.py --items 5 --consumers 10
INFO:root:processed all items

Interrupt by pressing Control-C:

$ python procon.py
^CWARNING:root:interrupted by user

Fail to produce item 3:

$ python procon.py --producer-fails-at 3
ERROR:root:cannot produce item 3

Fail to consume item 3:

$ python procon.py --consumer-fails-at 3
ERROR:root:cannot consume item 3

Fail to consume the last item:

$ python procon.py --items 10 --consumer-fails-at 9
ERROR:root:cannot consume item 9

And here is the probably overly complex source code:

"""
Consumer/producer to test exception handling in threads. Both the producer
and the consumer can be made to fail deliberately when processing a certain
item using command line options.
"""
import logging
import optparse
import Queue
import threading
import time

_PRODUCTION_DELAY = 0.1
_CONSUMPTION_DELAY = 0.3

# Delay for ugly hacks and polling loops.
_HACK_DELAY = 0.05

class _Consumer(threading.Thread):
    """
    Thread to consume items from an item queue filled by a producer, which can
    be told to terminate in two ways:

    1. using `finish()`, which keeps processing the remaining items on the
       queue until it is empty
    2. using `cancel()`, which finishes consuming the current item and then
       terminates
    """
    def __init__(self, name, itemQueue, failedConsumers):
        super(_Consumer, self).__init__(name=name)
        self._log = logging.getLogger(name)
        self._itemQueue = itemQueue
        self._failedConsumers = failedConsumers
        self.error = None
        self.itemToFailAt = None
        self._log.info(u"waiting for items to consume")
        self._isFinishing = False
        self._isCanceled = False

    def finish(self):
        self._isFinishing = True

    def cancel(self):
        self._isCanceled = True

    def consume(self, item):
        self._log.info(u"consume item %d", item)
        if item == self.itemToFailAt:
            raise ValueError("cannot consume item %d" % item)
        time.sleep(_CONSUMPTION_DELAY)

    def run(self):
        try:
            while not (self._isFinishing and self._itemQueue.empty()) \
                    and not self._isCanceled:
                # HACK: Use a timeout when getting the item from the queue
                # because between `empty()` and `get()` another consumer might
                # have removed it.
                try:
                    item = self._itemQueue.get(timeout=_HACK_DELAY)
                    self.consume(item)
                except Queue.Empty:
                    pass
            if self._isCanceled:
                self._log.info(u"canceled")
            if self._isFinishing:
                self._log.info(u"finished")
        except Exception, error:
            self._log.error(u"cannot continue to consume: %s", error)
            self.error = error
            self._failedConsumers.put(self)


class Worker(object):
    """
    Controller for interaction between producer and consumers.
    """
    def __init__(self, itemsToProduceCount, itemProducerFailsAt,
            itemConsumerFailsAt, consumerCount):
        self._itemsToProduceCount = itemsToProduceCount
        self._itemProducerFailsAt = itemProducerFailsAt
        self._itemConsumerFailsAt = itemConsumerFailsAt
        self._consumerCount = consumerCount
        self._itemQueue = Queue.Queue()
        self._failedConsumers = Queue.Queue()
        self._log = logging.getLogger("producer")
        self._consumers = []

    def _possiblyRaiseConsumerError(self):
            if not self._failedConsumers.empty():
                failedConsumer = self._failedConsumers.get()
                self._log.info(u"handling failed %s", failedConsumer.name)
                raise failedConsumer.error

    def _cancelAllConsumers(self):
        self._log.info(u"canceling all consumers")
        for consumerToCancel in self._consumers:
            consumerToCancel.cancel()
        self._log.info(u"waiting for consumers to be canceled")
        for possiblyCanceledConsumer in self._consumers:
            # In this case, we ignore possible consumer errors because there
            # already is an error to report.
            possiblyCanceledConsumer.join(_HACK_DELAY)
            if possiblyCanceledConsumer.isAlive():
                self._consumers.append(possiblyCanceledConsumer)

    def work(self):
        """
        Launch consumer thread and produce items. In case any consumer or the
        producer raise an exception, fail by raising this exception  
        """
        self.consumers = []
        for consumerId in range(self._consumerCount):
            consumerToStart = _Consumer(u"consumer %d" % consumerId,
                self._itemQueue, self._failedConsumers)
            self._consumers.append(consumerToStart)
            consumerToStart.start()
            if self._itemConsumerFailsAt is not None:
                consumerToStart.itemToFailAt = self._itemConsumerFailsAt

        self._log = logging.getLogger("producer  ")
        self._log.info(u"producing %d items", self._itemsToProduceCount)

        for itemNumber in range(self._itemsToProduceCount):
            self._possiblyRaiseConsumerError()
            self._log.info(u"produce item %d", itemNumber)
            if itemNumber == self._itemProducerFailsAt:
                raise ValueError("ucannot produce item %d" % itemNumber)
            # Do the actual work.
            time.sleep(_PRODUCTION_DELAY)
            self._itemQueue.put(itemNumber)

        self._log.info(u"telling consumers to finish the remaining items")
        for consumerToFinish in self._consumers:
            consumerToFinish.finish()
        self._log.info(u"waiting for consumers to finish")
        for possiblyFinishedConsumer in self._consumers:
            self._possiblyRaiseConsumerError()
            possiblyFinishedConsumer.join(_HACK_DELAY)
            if possiblyFinishedConsumer.isAlive():
                self._consumers.append(possiblyFinishedConsumer)


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    parser = optparse.OptionParser()
    parser.add_option("-c", "--consumer-fails-at", metavar="NUMBER",
        type="long", help="number of items at which consumer fails (default: %default)")
    parser.add_option("-i", "--items", metavar="NUMBER", type="long",
        help="number of items to produce (default: %default)", default=10)
    parser.add_option("-n", "--consumers", metavar="NUMBER", type="long",
        help="number of consumers (default: %default)", default=2)
    parser.add_option("-p", "--producer-fails-at", metavar="NUMBER",
        type="long", help="number of items at which producer fails (default: %default)")
    options, others = parser.parse_args()
    worker = Worker(options.items, options.producer_fails_at,
        options.consumer_fails_at, options.consumers)
    try:
        worker.work()
        logging.info(u"processed all items")
    except KeyboardInterrupt:
        logging.warning(u"interrupted by user")
        worker._cancelAllConsumers()
    except Exception, error:
        logging.error(u"%s", error)
        worker._cancelAllConsumers()
like image 708
roskakori Avatar asked Jan 01 '12 11:01

roskakori


People also ask

Is a Python a producer consumer or decomposer?

A snake is a secondary consumer as it eats herbivores. It is also a tertiary consumer because it eats other carnivores, including some species of snakes. A snake is not a decomposer, and neither is it a producer. Although the snake is close to the top of the food chain, it is not an apex predator.

Does Python support exception handling?

In Python, exceptions can be handled using a try statement. The critical operation which can raise an exception is placed inside the try clause. The code that handles the exceptions is written in the except clause.

How do you fix producer consumer problem in Python?

Solution to the problem using Semaphore empty: This semaphore stores the number of slots that are empty in our buffer. The initial value of this semaphore is the size of our bounded buffer. Before adding any data in the buffer, the Producer thread will try to acquire this semaphore and will decrease its value by 1.

What is producer and consumer in Python?

Python Multithread In the following example, the Consumer and Producer threads runs indefinitely while checking the status of the queue. The Producer thread is responsible for putting items into the queue if it is not full while the Consumer thread consumes items if there are any.


1 Answers

You need a queue with a cancel method that empties the internal queue, sets a cancelled flag, and then wakes everyone up. The worker will wake up from join(), check the cancelled flag on the queue and act appropriately. The consumers will wake up from get() and check the cancelled flag on the queue and print an error. Then your consumer would just need to call the cancel() method in the event of an exception.

Unfortunately the Python Queue doesn't have a cancel method. A few choices jump to mind:

  • Roll your own queue (can be tricky to get it right)
  • Extend the python queue and add the cancel method (couples your code to the internal implementation of the Python Queue class)
  • Proxy the queue class and overload join/get with your busy wait logic (still a busy-wait hack, but confines it to one spot and cleans up the producer/consumer code)
  • Find another queue implementation/library out there
like image 92
Pace Avatar answered Nov 15 '22 00:11

Pace