Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use queue with concurrent future ThreadPoolExecutor in python 3?

I am using simple threading modules to do concurrent jobs. Now I would like to take advantages of concurrent futures modules. Can some put me a example of using a queue with concurrent library?

I am getting TypeError: 'Queue' object is not iterable I dont know how to iterate queues

code snippet:

 def run(item):
      self.__log.info(str(item))
      return True
<queue filled here>

with concurrent.futures.ThreadPoolExecutor(max_workers = 100) as executor:
        furtureIteams = { executor.submit(run, item): item for item in list(queue)}
        for future in concurrent.futures.as_completed(furtureIteams):
            f = furtureIteams[future]
            print(f)
like image 792
user2433024 Avatar asked Jun 04 '13 09:06

user2433024


People also ask

How does concurrent futures () work in Python?

The concurrent. futures module provides a high-level interface for asynchronously executing callables. The asynchronous execution can be performed with threads, using ThreadPoolExecutor , or separate processes, using ProcessPoolExecutor .

Is ThreadPoolExecutor concurrent?

From Python 3.2 onwards a new class called ThreadPoolExecutor was introduced in Python in concurrent. futures module to efficiently manage and create threads.

Does ThreadPoolExecutor use multiple cores?

It utilizes at most 32 CPU cores for CPU bound tasks which release the GIL. And it avoids using very large resources implicitly on many-core machines.

How do you use Threadpool in Python?

How to create a ThreadPoolExecutor? With the help of concurrent. futures module and its concrete subclass Executor, we can easily create a pool of threads. For this, we need to construct a ThreadPoolExecutor with the number of threads we want in the pool.


2 Answers

I would suggest something like this:

def run(queue):
      item = queue.get()
      self.__log.info(str(item))
      return True
<queue filled here>
workerThreadsToStart = 10
with concurrent.futures.ThreadPoolExecutor(max_workers = 100) as executor:
        furtureIteams = { executor.submit(run, queue): index for intex in range(workerThreadsToStart)}
        for future in concurrent.futures.as_completed(furtureIteams):
            f = furtureIteams[future]
            print(f)

The problem you will run in is that a queue is thought to be endless and as a medium to decouple the threads that put something into the queue and threads that get items out of the queue.

When

  1. you have a finite number of items or
  2. you compute all items at once

and afterwards process them in parallel, a queue makes no sense. A ThreadPoolExecutor makes a queue obsolete in these cases.

I had a look at the ThreadPoolExecutor source:

def submit(self, fn, *args, **kwargs): # line 94
    self._work_queue.put(w) # line 102

A Queue is used inside.

like image 107
User Avatar answered Sep 19 '22 11:09

User


As commented above, you can use the iter() function to execute a ThreadPool on a queue object. A very general code for this would look something like this:

with concurrent.futures.ThreadPoolExecutor() as executor:
    executor.map(run, iter(queue.get, None))

Where the run method executes the aspired work on the items of the queue.

like image 20
ffent Avatar answered Sep 17 '22 11:09

ffent