I am running calculations on 40GB worth of data. Each file is a compressed gzip file containing lines of json. Each file has a maximum of 500,000 lines, or about 500MB. I have an amazon instance running with 128 cpu's and 1952 GB of memory. What I'm trying to do is process each file as quickly as possible.
I'm using multiprocessing Pools like this:
def initializeLock(l):
global lock
lock = l
if __name__ == '__main__':
directory = '/home/ubuntu/[directory_containing_files]/*.gz'
file_names = glob.glob(directory)
lock = Lock()
pool = Pool(initializer=initializeLock, initargs=(lock,))
pool.map(do_analysis, file_names)
pool.close()
pool.join()
What I'd expect to happen is for a large amount of processes to be created, and each one handles one file. What's actually happening is initially over 100 processes are created. At this point I'm using about 85% of my memory, this is great! Then each of those complete. Eventually the amount of processes running gets down to about 10. At this point I'm only using 5% of the memory. Periodically additional processes are started, but it never gets back to running 100 or so. So I have this large cpu with all this free memory, but I'm running at most 10 processes most of the time.
Any idea as to how to get it to continue to run 100 processes until all files are complete?
EDIT:
I added some logging to the application. Initially it loads 127 processes, I think this is because I have 128 CPU's and one was in use at the time the processes are loaded. Some of the processes finish successfully, and the result is saved. Then at some point all but few of the running processes end. When I check to see how many files had finished only 22 of the 127 were complete. Then it just runs using 5-10 processes, and all of these finish successfully. I'm thinking maybe it runs out of memory and crashes. But why? I have so much memory and so many CPU's.
EDIT 2:
So I've found the issue. The problem was I was setting a lock in do_analysis method and all of the processes were finishing around the same time and waiting for the lock to be released. The processes weren't stopped, they were sleep. So this brings me to another question: My main goal here is to take each file that has many json lines, get the ID property from the json line, and then append this to a file that contains other lines with the same id. If the file doesn't exist I create it. What I did was set a lock when a file is being accessed as to avoid it being accessed by another process. Here is my code.
for key, value in dataframe.iteritems():
if os.path.isfile(file_name):
lock.acquire()
value.to_csv(filename), mode='a', header=False, encoding='utf-8')
lock.release()
else:
value.to_csv(filename), header=True, encoding='utf-8')
So now I'm trying to think of a creative way to append to files, but not block every other process. I'm dealing with a lot of data, and the chance that two files will need to be accessed simultaneously is low, but it still will happen. So I need to ensure that when a file is being appended to, another process doesn't try to open that file.
The multiprocessing version is slower because it needs to reload the model in every map call because the mapped functions are assumed to be stateless. The multiprocessing version looks as follows. Note that in some cases, it is possible to achieve this using the initializer argument to multiprocessing.
It works like a map-reduce architecture. It maps the input to the different processors and collects the output from all the processors. After the execution of code, it returns the output in form of a list or array. It waits for all the tasks to finish and then returns the output.
Key Takeaways. Python is NOT a single-threaded language. Python processes typically use a single thread because of the GIL. Despite the GIL, libraries that perform computationally heavy tasks like numpy, scipy and pytorch utilise C-based implementations under the hood, allowing the use of multiple cores.
However, Python will allow you to set the value to cpu_count() or even higher. Since Python will only run processes on available cores, setting max_number_processes to 20 on a 10 core machine will still mean that Python may only use 8 worker processes.
Thanks everyone for your input. Here is my current solution to the problem, I plan to make it more efficient in the coming week. I took Martin's advice and I glue the files together once they're all done, however, I'd like to work to implement daphtdazz solution of having a process work to do the gluing with a queue while I produce more files.
def do_analyis(file):
# To keep the file names unique, I append the process id to the end
process_id = multiprocessing.current_process().pid
# doing analysis work...
for key, value in dataframe.iteritems():
if os.path.isfile(filename):
value.to_csv(filename), mode='a', header=False, encoding='utf-8')
else:
value.to_csv(filename), header=True, encoding='utf-8')
def merge_files(base_file_name):
write_directory = 'write_directory'
all_files = glob.glob('{0}*'.format(base_file_name))
is_file_created = False
for file in all_files:
if is_file_created:
print 'File already exists, appending'
dataframe = pandas.read_csv(file, index_col=0)
dataframe.to_csv('{0}{1}.csv'.format(write_directory, os.path.basename(base_file_name)), mode='a', header=False, encoding='utf-8')
else:
print 'File does not exist, creating.'
dataframe = pandas.read_csv(file, index_col=0)
dataframe.to_csv('{0}{1}.csv'.format(write_directory, os.path.basename(base_file_name)), header=True, encoding='utf-8')
is_file_created = True
if __name__ == '__main__':
# Run the code to do analysis and group files by the id in the json lines
directory = 'directory'
file_names = glob.glob(directory)
pool = Pool()
pool.imap_unordered(do_analysis, file_names, 1)
pool.close()
pool.join()
# Merge all of the files together
base_list = get_unique_base_file_names('file_directory')
pool = Pool()
pool.imap_unordered(merge_files, base_list, 100)
pool.close()
pool.join()
This saves each file with a unique process id appended to the end of the file, then goes back and gets all of the files by the id in the json file and merge them all together. While creating the files, the cpu usage is between 60-70%. That's decent. While merging the files, the cpu usage is around 8%. This is because the files are merged so quickly that I don't need all of the cpu processing power I have. This solution works. But it could be more efficient. I'm going to work to do both of these simultaneously. Any suggestions are welcomed.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With