I am using simple threading modules to do concurrent jobs. Now I would like to take advantages of concurrent futures modules. Can some put me a example of using a queue with concurrent library?
I am getting TypeError: 'Queue' object is not iterable I dont know how to iterate queues
code snippet:
def run(item):
self.__log.info(str(item))
return True
<queue filled here>
with concurrent.futures.ThreadPoolExecutor(max_workers = 100) as executor:
furtureIteams = { executor.submit(run, item): item for item in list(queue)}
for future in concurrent.futures.as_completed(furtureIteams):
f = furtureIteams[future]
print(f)
The concurrent. futures module provides a high-level interface for asynchronously executing callables. The asynchronous execution can be performed with threads, using ThreadPoolExecutor , or separate processes, using ProcessPoolExecutor .
From Python 3.2 onwards a new class called ThreadPoolExecutor was introduced in Python in concurrent. futures module to efficiently manage and create threads.
It utilizes at most 32 CPU cores for CPU bound tasks which release the GIL. And it avoids using very large resources implicitly on many-core machines.
How to create a ThreadPoolExecutor? With the help of concurrent. futures module and its concrete subclass Executor, we can easily create a pool of threads. For this, we need to construct a ThreadPoolExecutor with the number of threads we want in the pool.
I would suggest something like this:
def run(queue):
item = queue.get()
self.__log.info(str(item))
return True
<queue filled here>
workerThreadsToStart = 10
with concurrent.futures.ThreadPoolExecutor(max_workers = 100) as executor:
furtureIteams = { executor.submit(run, queue): index for intex in range(workerThreadsToStart)}
for future in concurrent.futures.as_completed(furtureIteams):
f = furtureIteams[future]
print(f)
The problem you will run in is that a queue is thought to be endless and as a medium to decouple the threads that put something into the queue and threads that get items out of the queue.
When
and afterwards process them in parallel, a queue makes no sense. A ThreadPoolExecutor makes a queue obsolete in these cases.
I had a look at the ThreadPoolExecutor source:
def submit(self, fn, *args, **kwargs): # line 94
self._work_queue.put(w) # line 102
A Queue is used inside.
As commented above, you can use the iter()
function to execute a ThreadPool on a queue object. A very general code for this would look something like this:
with concurrent.futures.ThreadPoolExecutor() as executor:
executor.map(run, iter(queue.get, None))
Where the run method executes the aspired work on the items of the queue.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With