I'm trying to a parallelize an application using multiprocessing which takes in a very large csv file (64MB to 500MB), does some work line by line, and then outputs a small, fixed size file.
Currently I do a list(file_obj)
, which unfortunately is loaded entirely
into memory (I think) and I then I break that list up into n parts, n being the
number of processes I want to run. I then do a pool.map()
on the broken up
lists.
This seems to have a really, really bad runtime in comparison to a single threaded, just-open-the-file-and-iterate-over-it methodology. Can someone suggest a better solution?
Additionally, I need to process the rows of the file in groups which preserve the value of a certain column. These groups of rows can themselves be split up, but no group should contain more than one value for this column.
Reading Large Text Files in Python We can use the file object as an iterator. The iterator will return each line one by one, which can be processed. This will not read the whole file into memory and it's suitable to read large files in Python.
Multiprocessor system facilitates parallel program execution and read/write sharing of data and thus may cause the processors to concurrently access location in the shared memory. Therefore, a correct and reliable mechanism is needed to serialize this access.
multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads.
The multiprocessing version is slower because it needs to reload the model in every map call because the mapped functions are assumed to be stateless. The multiprocessing version looks as follows. Note that in some cases, it is possible to achieve this using the initializer argument to multiprocessing.
list(file_obj)
can require a lot of memory when fileobj
is large. We can reduce that memory requirement by using itertools to pull out chunks of lines as we need them.
In particular, we can use
reader = csv.reader(f)
chunks = itertools.groupby(reader, keyfunc)
to split the file into processable chunks, and
groups = [list(chunk) for key, chunk in itertools.islice(chunks, num_chunks)]
result = pool.map(worker, groups)
to have the multiprocessing pool work on num_chunks
chunks at a time.
By doing so, we need roughly only enough memory to hold a few (num_chunks
) chunks in memory, instead of the whole file.
import multiprocessing as mp
import itertools
import time
import csv
def worker(chunk):
# `chunk` will be a list of CSV rows all with the same name column
# replace this with your real computation
# print(chunk)
return len(chunk)
def keyfunc(row):
# `row` is one row of the CSV file.
# replace this with the name column.
return row[0]
def main():
pool = mp.Pool()
largefile = 'test.dat'
num_chunks = 10
results = []
with open(largefile) as f:
reader = csv.reader(f)
chunks = itertools.groupby(reader, keyfunc)
while True:
# make a list of num_chunks chunks
groups = [list(chunk) for key, chunk in
itertools.islice(chunks, num_chunks)]
if groups:
result = pool.map(worker, groups)
results.extend(result)
else:
break
pool.close()
pool.join()
print(results)
if __name__ == '__main__':
main()
I would keep it simple. Have a single program open the file and read it line by line. You can choose how many files to split it into, open that many output files, and every line write to the next file. This will split the file into n equal parts. You can then run a Python program against each of the files in parallel.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With