I have a very large datasets distributed in 10 big clusters and the task is to do some computations for each cluster and write (append) the results line by line into 10 files where each file contains the results obtained corresponding to each one of the 10 clusters, each cluster can be computed independently, and I want to parallelize the code into ten CPUs (or threads) such that I can do the computations on all the clusters at once, a simplified pseudo code for my task is the following:
for(c in range (1,10)): #this is the loop over the clusters
for(l in "readlines from cluster C")
# do some computations for line l in cluster c
# append the results in file named "cluster_c" one file for each cluter c
You can use joblib to parallelizate the analysis. If you have a function process_line
:
from joblib import Parallel, delayed
data = Parallel(n_jobs=-1)(delayed(process_line)(line)
for line in open('bigfile'))
You want to save the information serially. Depending on the ratio of computation time/size of data to be saved you can use different approaches:
The overhead of communicating between threads is very small. The simplest option then is for each process to write on an independent file, and just cat together the results at the end. You can make sure you are not overwritting by passing an index and using it to create the file.
A more advanced solution is to pass the file handler as an argument and write to the file only after acquiring a multiprocessing.Lock. The only problem would be if many processes try to acquire the lock at the same time, they will be taking up CPU resources but not computing.
def process_line(line, outfile, lock)
data = line[0]
lock.aquire()
print >> outfile, data
lock.release()
If you have more data, writing to a file could induce some overhead, specially if you are going to reload it on memory afterwards. Here you have two options:
The data does not fit in memory, you have to consume it on the fly. You need a consumer-producer pattern. Something like:
from multiprocessing import Process, JoinableQueue
from joblib import Parallel, delayed
def saver(q):
with open('out.txt', 'w') as out:
while True:
val = q.get()
if val is None: break
print >> out, val
q.task_done()
# Finish up
q.task_done()
def foo(x):
q.put(x**3+2)
q = JoinableQueue()
p = Process(target=saver, args=(q,))
p.start()
Parallel(n_jobs=2, verbose=0)(delayed(foo)(i) for i in xrange(1000))
q.put(None) # Poison pill
q.join()
p.join()
If the amount of data is very big compared to the computing time, you will find a a lot of overhead just transferring the data from one process to the others. If that is your limit, then you should use more advanced technology, like OpenMP, and perhaps Cython to get rid of the GIL, and use threads instead of processes.
Note that I have not specified how small is "small"; as this depends very much on the configuration of your cluster. How fast is the communication, the underlying file system, etc; but nothing you can't experiment with fairly easily, for example, timing the time it takes for a dummy program to send a line to another process.
Just like the answer form @Davidmh, but working in python3:
from multiprocessing import Process, JoinableQueue
from joblib import Parallel, delayed
def saver(q):
with open('out.txt', 'w') as out:
while True:
val = q.get()
if val is None: break
out.write(val + '\n')
q.task_done()
# Finish up
q.task_done()
def foo(x):
import os
q.put(str(os.getpid()) + '-' + str(x**3+2))
q = JoinableQueue()
p = Process(target=saver, args=(q,))
p.start()
Parallel(n_jobs=-1, verbose=0)(delayed(foo)(i) for i in range(1000))
q.put(None) # Poison pill
p.join()
PS: I've also added the PID to each output line in order to check that everything is working as expected ;-)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With