Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python Threading stdin/stdout

I have a file which contains a lot of data. Each row is a record. And I am trying to do some ETL work against the whole file. Right now I am using standard input to read the data line by line. The cool thing about this is your script could be very flexible to integrate with other script and shell commands. I write the result to standard output. For example.

$ cat input_file
line1 
line2
line3
line4
...

My current python code looks like this - parse.py

import sys
for line in sys.stdin:
    result = ETL(line)    # ETL is some self defined function which takes a while to execute.
    print result

The code below is how it is working right now:

cat input_file | python parse.py > output_file

I have looked at the Threading module of Python and I am wondering if the performance would be dramatically improved if I use that module.

Question1: How should I plan the quotas for each thread, why?

...
counter = 0
buffer = []
for line in sys.stdin:
    buffer.append(line)
    if counter % 5 == 0:   # maybe assign 5 rows to each thread? if not, is there a rule of thumb to determine
        counter = 0
        thread = parser(buffer)
        buffer = []
        thread.start() 

Question2: Multiple Threads might print the result back to stdout at the same time, how to organize them and avoid the situation below?

import threading
import time

class parser(threading.Thread):
    def __init__ (self, data_input):
        threading.Thread.__init__(self)
        self.data_input = data_input

    def run(self):
        for elem in self.data_input:
            time.sleep(3)
            print elem + 'Finished'

work = ['a', 'b', 'c', 'd', 'e', 'f']

thread1 = parser(['a', 'b'])  
thread2 = parser(['c', 'd'])
thread3 = parser(['e', 'f'])

thread1.start()
thread2.start()
thread3.start()   

The output is really ugly, where one row contains the outputs from two threads.

aFinished
cFinishedeFinished

bFinished
fFinished
dFinished
like image 339
B.Mr.W. Avatar asked Aug 21 '13 04:08

B.Mr.W.


2 Answers

Taking your second question first, this is what mutexes are for. You can get the cleaner output that you want by using a lock to coordinate among the parsers and ensure that only one thread has access to the output stream during a given period of time:

class parser(threading.Thread):
    output_lock = threading.Lock()

    def __init__ (self, data_input):
        threading.Thread.__init__(self)
        self.data_input = data_input

    def run(self):
        for elem in self.data_input:
            time.sleep(3)
            with self.output_lock:
                print elem + 'Finished'

As regards your first question, note that it's probably the case that multi-threading will provide no benefit for your particular workload. It largely depends on whether the work you do with each input line (your ETL function) is primarily CPU-bound or IO-bound. If the former (which I suspect is likely), threads will be of no help, because of the global interpreter lock. In that case, you would want to use the multiprocessing module to distribute work among multiple processes instead of multiple threads.

But you can get the same results with an easier to implement workflow: Split the input file into n pieces (using, e.g., the split command); invoke the extract-and-transform script separately on each subfile; then concatenate the resulting output files.

One nitpick: "using standard input to read the data line by line because it won't load the whole file into memory" involves a misconception. You can read a file line by line from within Python by, e.g., replacing sys.stdin with a file object in a construct like:

for line in sys.stdin:

See also the readline() method of file objects, and note that read() can take as parameter the maximum number of bytes to read.

like image 140
Alp Avatar answered Oct 07 '22 13:10

Alp


Whether threading will be helpful you is highly dependent on on your situation. In particular, if your ETL() function involves a lot of disk access, then threading would likely give you pretty significant speed improvement.

In response to your first question, I've always found that it just depends. There are a lot of factors at play when determining the ideal number of threads, and many of them are program-dependent. If you're doing a lot of disk access (which is pretty slow), for example, then you'll want more threads to take advantage of the downtime while waiting for disk access. If the program is CPU-bound, though, tons of threads may not be super helpful. So, while it may be possible to analyze all the factors to come up with an ideal number of threads, it's usually a lot faster to make an initial guess and then adjust from there.

More specifically, though, assigning a certain number of lines to each thread probably isn't the best way to go about divvying up the work. Consider, for example, if one line takes a particularly long time to process. It would be best if one thread could work away at that one line and the other threads could each do a few more lines in the meantime. The best way to handle this is to use a Queue. If you push each line into a Queue, then each thread can pull a line off the Queue, handle it, and repeat until the Queue is empty. This way, the work gets distributed such that no thread is ever without work to do (until the end, of course).

Now, the second question. You're definitely right that writing to stdout from multiple threads at once isn't an ideal solution. Ideally, you would arrange things so that the writing to stdout happens in only one place. One great way to do that is to use a Queue. If you have each thread write its output to a shared Queue, then you can spawn an additional thread whose sole task is to pull items out of that Queue and print them to stdout. By restricting the printing to just one threading, you'll avoid the issues inherent in multiple threads trying to print at once.

like image 37
mculhane Avatar answered Oct 07 '22 13:10

mculhane