Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What is the safest way to queue multiple threads originating in a loop?

My script loops through each line of an input file and performs some actions using the string in each line. Since the tasks performed on each line are independent of each other, I decided to separate the task into threads so that the script doesn't have to wait for the task to complete to continue with the loop. The code is given below.


def myFunction(line, param):
    # Doing something with line and param
    # Sends multiple HTTP requests and parse the response and produce outputs
    # Returns nothing

param = arg[1]   
with open(targets, "r") as listfile:
    for line in listfile:
        print("Starting a thread for: ",line)
        t=threading.Thread(target=myFunction, args=(line, param,)) 
        threads.append(t)
        t.start()

I realized that this is a bad idea as the number of lines in the input file grew large. With this code, there would be as many threads as the number of lines. Researched a bit and figured that queues would be the way.

I want to understand the optimal way of using queues for this scenario and if there are any alternatives which I can use.

like image 365
hax Avatar asked Sep 29 '20 16:09

hax


People also ask

What is a thread-safe queue?

A queue, as implemented by Thread::Queue is a thread-safe data structure much like a list. Any number of threads can safely add elements to the end of the list, or remove elements from the head of the list. (Queues don't permit adding or removing elements from the middle of the list).

Is queue put thread-safe?

Thread Programming Luckily, Queue() class has a thread-safe implementation with all the required locking mechanism. So producer and consumer from different threads can work with the same queue instance safely and easily.

What are the advantages of using multiple threads over multiple processes?

On a multiprocessor system, multiple threads can concurrently run on multiple CPUs. Therefore, multithreaded programs can run much faster than on a uniprocessor system. They can also be faster than a program using multiple processes, because threads require fewer resources and generate less overhead.

Is it a good idea to use multi thread to speed your Python code?

Both multithreading and multiprocessing allow Python code to run concurrently. Only multiprocessing will allow your code to be truly parallel. However, if your code is IO-heavy (like HTTP requests), then multithreading will still probably speed up your code.


Video Answer


2 Answers

To go around this problem, you can use the concept of Thread Pools, where you define a fixed number of Threads/workers to be used, for example 5 workers, and whenever a Thread finishes executing, an other Future(ly) submmited thread would take its place automatically.

Example :

import concurrent.futures

def myFunction(line, param):
    print("Done with :", line, param)

param = "param_example"

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    futures = []
    with open("targets", "r") as listfile:
        for line in listfile:
            print("Starting a thread for: ", line)
            futures.append(executor.submit(myFunction, line=line, param=param))

    # waiting for the threads to finish and maybe print a result :
    for future in concurrent.futures.as_completed(futures):
        print(future.result()) # an Exceptino should be handled here!!!
like image 195
Pixel_teK Avatar answered Oct 17 '22 01:10

Pixel_teK


Queues are one way to do it. The way to use them is to put function parameters on a queue, and use threads to get them and do the processing.

The queue size doesn't matter too much in this case because reading the next line is fast. In another case, a more optimized solution would be to set the queue size to at least twice the number of threads. That way if all threads finish processing an item from the queue at the same time, they will all have the next item in the queue ready to be processed.

To avoid complicating the code threads can be set as daemonic so that they don't stop the program from finishing after the processing is done. They will be terminated when the main process finishes.

The alternative is to put a special item on the queue (like None) for each thread and make the threads exit after getting it from the queue and then join the threads.

For the examples bellow the number of worker threads is set using the workers variable.

Here is an example of a solution using a queue.

from queue import Queue
from threading import Thread

queue = Queue(workers * 2)
def work():
    while True:
        myFunction(*queue.get())
        queue.task_done()

for _ in range(workers):
    Thread(target=work, daemon=True).start()

with open(targets, 'r') as listfile:
    for line in listfile:
        queue.put((line, param))
queue.join()

A simpler solution might be using ThreadPoolExecutor. It is especially simple in this case because the function being called doesn't return anything that needs to be used in the main thread.

from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor(max_workers=workers) as executor:
    with open(targets, 'r') as listfile:
        for line in listfile:
            executor.submit(myFunction, line, param)

Also, if it's not a problem to have all lines stored in memory, there is a solution which doesn't use anything other than threads. The work is split in such a way that the threads read some lines from a list and ignore other lines. A simple example with two threads is where one thread reads odd lines and the other reads even lines.

from threading import Thread

with open(targets, 'r') as listfile:
    lines = listfile.readlines()

def work_split(n):
    for line in lines[n::workers]:
        myFunction(line, param)

threads = []
for n in range(workers):
    t = Thread(target=work_split, args=(n,))
    t.start()
    threads.append(t)

for t in threads:
    t.join()

I have done a quick benchmark and the Queue is slightly faster than the ThreadPoolExecutor, but the solution with the split work is faster than both.

like image 5
GitFront Avatar answered Oct 17 '22 01:10

GitFront