I have searched here about how to do threading in python, but by far i haven't been able to get the answer i need. I'm not very familiar with the Queue and Threading python classes and for that reason some of the answers present here makes no sense at all to me.
I want to create a pool of threads which i can give different task and when all of them have ended get the result values and process them. So far i have tried to do this but i'm not able to get the results. The code i have written is:
from threading import Thread from Queue import Queue class Worker(Thread): """Thread executing tasks from a given tasks queue""" def __init__(self, tasks): Thread.__init__(self) self.tasks = tasks self.daemon = True self.result = None self.start() def run(self): while True: func, args, kargs = self.tasks.get() try: self.result = func(*args, **kargs) except Exception, e: print e self.tasks.task_done() def get_result(self): return self.result class ThreadPool: """Pool of threads consuming tasks from a queue""" def __init__(self, num_threads): self.tasks = Queue(num_threads) self.results = [] for _ in range(num_threads): w = Worker(self.tasks) self.results.append(w.get_result()) def add_task(self, func, *args, **kargs): """Add a task to the queue""" self.tasks.put((func, args, kargs)) def wait_completion(self): """Wait for completion of all the tasks in the queue""" self.tasks.join() def get_results(self): return self.results def foo(word, number): print word*number return number words = ['hello', 'world', 'test', 'word', 'another test'] numbers = [1,2,3,4,5] pool = ThreadPool(5) for i in range(0, len(words)): pool.add_task(foo, words[i], numbers[i]) pool.wait_completion() results = pool.get_results() print results
The output prints the strings with word given times the number given but the results list is full with None values, so where i should put the return values of the func.
Or the easy way is to create a list where i fill the Queue and add a dictionary or some variable to store the result as an argument to my function, and after the task is added to the Queue add this result argument to a list of results:
def foo(word, number, r): print word*number r[(word,number)] = number return number words = ['hello', 'world', 'test', 'word', 'another test'] numbers = [1,2,3,4,5] pool = ThreadPool(5) results = [] for i in range(0, len(words)): r = {} pool.add_task(foo, words[i], numbers[i], r) results.append(r) print results
For example: def foo(bar, result, index): print 'hello {0}'. format(bar) result[index] = "foo" from threading import Thread threads = [None] * 10 results = [None] * 10 for i in range(len(threads)): threads[i] = Thread(target=foo, args=('world! ', results, i)) threads[i].
A thread pool is a pattern for managing multiple threads efficiently. Use ThreadPoolExecutor class to manage a thread pool in Python. Call the submit() method of the ThreadPoolExecutor to submit a task to the thread pool for execution.
ThreadPoolExecutor. ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously. An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously.
Creating Thread Using Threading ModuleDefine a new subclass of the Thread class. Override the __init__(self [,args]) method to add additional arguments. Then, override the run(self [,args]) method to implement what the thread should do when started.
Python actually has a built-in thread pool you can use, its just not well documented:
from multiprocessing.pool import ThreadPool def foo(word, number): print (word * number) r[(word,number)] = number return number words = ['hello', 'world', 'test', 'word', 'another test'] numbers = [1,2,3,4,5] pool = ThreadPool(5) results = [] for i in range(0, len(words)): results.append(pool.apply_async(foo, args=(words[i], numbers[i]))) pool.close() pool.join() results = [r.get() for r in results] print results
Or (using map
instead of apply_async
):
from multiprocessing.pool import ThreadPool def foo(word, number): print word*number return number def starfoo(args): """ We need this because map only supports calling functions with one arg. We need to pass two args, so we use this little wrapper function to expand a zipped list of all our arguments. """ return foo(*args) words = ['hello', 'world', 'test', 'word', 'another test'] numbers = [1,2,3,4,5] pool = ThreadPool(5) # We need to zip together the two lists because map only supports calling functions # with one argument. In Python 3.3+, you can use starmap instead. results = pool.map(starfoo, zip(words, numbers)) print results pool.close() pool.join()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With