Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Write data to hdf file using multiprocessing

This seems like a simple issue but I cant get my head around it.

I have a simulation which runs in a double for loop and writes the results to an HDF file. A simple version of this program is shown below:

import tables as pt

a = range(10)
b = range(5)

def Simulation():
    hdf = pt.openFile('simulation.h5',mode='w')
    for ii in a:
        print(ii)
        hdf.createGroup('/','A%s'%ii)
        for i in b:
            hdf.createArray('/A%s'%ii,'B%s'%i,[ii,i])
        hdf.close()
    return
Simulation()

This code does exactly what I want but since the process can take quite a while to run I tried to use the multiprocessing module and use the following code:

import multiprocessing
import tables as pt

a = range(10)
b = range(5)

def Simulation(ii):
    hdf = pt.openFile('simulation.h5',mode='w')
    print(ii)
        hdf.createGroup('/','A%s'%ii)
        for i in b:
            hdf.createArray('/A%s'%ii,'B%s'%i,[ii,i])
        hdf.close()
    return

if __name__ == '__main__':
    jobs = []
    for ii in a:
        p = multiprocessing.Process(target=Simulation, args=(ii,))
        jobs.append(p)       
        p.start()

This however only prints the last simulation to the HDF file, somehow it overrites all the other groups.

like image 423
user2143958 Avatar asked Mar 29 '13 13:03

user2143958


1 Answers

Each time you open a file in write (w) mode, a new file is created -- so the contents of the file is lost if it already exists. Only the last file handle can successfully write to the file. Even if you changed that to append mode, you should not try to write to the same file from multiple processes -- the output will get garbled if two processes try to write at the same time.

Instead, have all the worker processes put output in a queue, and have a single dedicated process (either a subprocess or the main process) handle the output from the queue and write to the file:


import multiprocessing as mp
import tables as pt


num_arrays = 100
num_processes = mp.cpu_count()
num_simulations = 1000
sentinel = None


def Simulation(inqueue, output):
    for ii in iter(inqueue.get, sentinel):
        output.put(('createGroup', ('/', 'A%s' % ii)))
        for i in range(num_arrays):
            output.put(('createArray', ('/A%s' % ii, 'B%s' % i, [ii, i])))


def handle_output(output):
    hdf = pt.openFile('simulation.h5', mode='w')
    while True:
        args = output.get()
        if args:
            method, args = args
            getattr(hdf, method)(*args)
        else:
            break
    hdf.close()

if __name__ == '__main__':
    output = mp.Queue()
    inqueue = mp.Queue()
    jobs = []
    proc = mp.Process(target=handle_output, args=(output, ))
    proc.start()
    for i in range(num_processes):
        p = mp.Process(target=Simulation, args=(inqueue, output))
        jobs.append(p)
        p.start()
    for i in range(num_simulations):
        inqueue.put(i)
    for i in range(num_processes):
        # Send the sentinal to tell Simulation to end
        inqueue.put(sentinel)
    for p in jobs:
        p.join()
    output.put(None)
    proc.join()

For comparison, here is a version which uses mp.Pool:

import multiprocessing as mp
import tables as pt


num_arrays = 100
num_processes = mp.cpu_count()
num_simulations = 1000


def Simulation(ii):
    result = []
    result.append(('createGroup', ('/', 'A%s' % ii)))
    for i in range(num_arrays):
        result.append(('createArray', ('/A%s' % ii, 'B%s' % i, [ii, i])))
    return result


def handle_output(result):
    hdf = pt.openFile('simulation.h5', mode='a')
    for args in result:
        method, args = args
        getattr(hdf, method)(*args)
    hdf.close()


if __name__ == '__main__':
    # clear the file
    hdf = pt.openFile('simulation.h5', mode='w')
    hdf.close()
    pool = mp.Pool(num_processes)
    for i in range(num_simulations):
        pool.apply_async(Simulation, (i, ), callback=handle_output)
    pool.close()
    pool.join()

It looks simpler doesn't it? However there is one signficant difference. The original code used output.put to send args to handle_output which was running in its own subprocess. handle_output would take args from the output queue and handle them immediately. With the Pool code above, Simulation accumulates a whole bunch of args in result and result is not sent to handle_output until after Simulation returns.

If Simulation takes a long time, there will be a long waiting period while nothing is being written to simulation.h5.

like image 107
unutbu Avatar answered Oct 27 '22 03:10

unutbu