A producer thread queries a data store and puts objects into a queue. Each consumer thread(s) will then pull an object off of the shared queue and do a very long call to an external service. When the call returns, the consumer marks the object as having been completed.
My problem is that I basically have to wait until the queue is empty before the producer can add to it again, or else I risk getting duplicates being sent through.
[edit] Someone asked a good question over IRC and I figured I would add the answer here. The question was, "Why do your producers produce duplicates?" The answer is basically that the producer produces duplicates because we don't track a "sending" state of each object, only "sent" or "unsent".
Is there a way that I can check for duplicates in the queue?
It seems to me like it's not really a problem to have duplicate objects in the queue; you just want to make sure you only do the processing once per object.
EDIT: I originally suggested using a set or OrderedDict to keep track of the objects, but Python has a perfect solution: functools.lru_cache
Use @functools.lru_cache as a decorator on your worker function, and it will manage a cache for you. You can set a maximum size, and it will not grow beyond that size. If you use an ordinary set and don't manage it, it could grow to very large size and slow down your workers.
If you are using multiple worker processes instead of threads, you would need a solution that works across processes. Instead of a set or an lru_cache you could use a shared dict where the key is the unique ID value you use to detect duplicates, and the value is a timestamp for when the object went into the dict; then from time to time you could delete the really old entries in the dict. Here's a StackOverflow answer about shared dict objects:
multiprocessing: How do I share a dict among multiple processes?
And the rest of my original answer follows:
If so, I suggest you have the consumer thread(s) use a set to keep track of objects that have been seen. If an object is not in the set, add it and process it; if it is in the set, ignore it as a duplicate.
If this will be a long-running system, instead of a set, use an OrderedDict to track seen objects. Then from time to time clean out the oldest entries in the OrderedDict.
If you talk about the classes in the Queue module: following the API there is no way to detect if a queue contains a given object.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With