Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pytables/Pandas : Combining (reading?) mutliple HDF5 stores split by rows

In "write once, read many" workflow, i frequently parse large text files (20GB-60GB) dumped from Teradata using FastExport utility and load them into Pytables using Pandas. I am using multiprocessing to chunk the text files and distributing them to different processes to write a .H5 files split based on row count around 5MM each to support parallel writing. This is quite fast around 12 minutes for writing multiple hdf5 files in parallel as compared two 22 minutes for writing a single hdf5 file for 25MM rows x 64 columns.

%timeit -n 1 write_single_hdf_multiprocess()
1 loops, best of 3: 22min 42s per loop

%timeit -n 1 write_multiple_hdf_multiprocess()
1 loops, best of 3: 12min 12s per loop

For the case of writing multiple h5 files split by rows, i end up having multiple files with same structure that i wish to combine in a single h5file root/data/table

To test combining functionality, here is the code snippet:

import tables as tb
import pandas as pd

tb.setBloscMaxThreads(15)
store =pd.HDFStore('temp15.h5',complib='blosc')

filenames=['part_1.h5','part_2.h5','part_3.h5','part_4.h5','part_5.h5']

for f in filenames:
    s=pd.HDFStore(f)
    df=s.select('data')
    store.append(key='data',value=df,format='t',chunksize=200000)

store.close()

Here is %timeit result for this:

1 loops, best of 3: 8min 22s per loop

This basically eats up most of the time that i gained by writing multiple h5 files in parallel. I have a two part question:

  1. Is there a way to combine (append) h5 files with the same table format more efficiently?(SQL Union like functionality).I tried this SO but couldn't get it to append tables.

  2. if not, is splitting on rows a reasonable thing to do when most of the queries are select from where for all the columns? i am thinking about writing a map/combine function that will look in all the parts of a table for select from where queries. Pandas select_as_multiple() function does this for splitting based on columns.


Update Based on Jeff's Suggestions:

Great call-out on removing the indexing and compression on pre-merge file writing process. After removing the indexing,compression and setting the max row count per pre-merge file to 1MM rows:

%timeit -n 1 write_multiple_hdf_multiprocess()
1 loops, best of 3: 9min 37s per loop

This is a little over 2 minutes faster than before and pretty much as fast as i can parse the data. After setting the data columns to the desired fields (3 in my case):

for f in filenames:
    s=pd.HDFStore(f)
    df=s.select('data')
    dc=df.columns[1:4]
    store.append(key='data',value=df,format='t',data_columns=dc)

This is about 2 minutes slower than before: 1 loops, best of 3: 10min 23s per loop. After removing compression from the above code, i get 1 loops, best of 3: 8min 48s per loop (almost identical to the first try with compression and no data column index). To give you an idea of how well the compression works, the uncompressed store is around 13.5GB while the compressed version using blosc is around 3.7GB.

In summary, my process takes 18 minutes 15 seconds to create a merged uncompressed hdf5 file. This as compared to the single file writing (compressed) is about 4 minutes 7 seconds faster.

This brings me to the second part of my questions, what if i don't merge the files and use the pre-merge files to be processed in map/combine way, could that be a reasonable way to approach this? how should i think about implementing this?

For full disclosure, i am on Pandas version 0.12.0, Pytables version 3.0.0 and my data processing workflow is as follows (pseudo-code):

def generate_chunks_from_text_file(reader,chunksize=50000):
    """ generator that yields processed text chunks """

    for i, line in enumerate(reader.readlines()):
        ----process data and yield chunk -----


def data_reader(reader,queue):
    """ read data from file and put it into a queue for multiprocessing """

    for chunk in self.generate_chunks_from_text_file(reader):
        queue.put(chunk) # put data in the queue for the writer

def data_processor(queue,filename,dtype,min_size):
    """" subprocess that reads the next value in the queue and writes hdf store. """

    store=pd.HDFStore(filename)

    while True:

        results = queue.get()
        array=np.array(results,dtype=dt) # convert to numpy array
        df = pd.DataFrame(array) #covert to pandas array

        store.append(key='data', value=df, format='t', min_itemsize=dict(min_size), data_columns=[],index=False)
    store.close()
        ----when queue exhausts - break-----
like image 316
Hussain Sultan Avatar asked Mar 30 '14 21:03

Hussain Sultan


1 Answers

I do a very similar, split-process-combine method, using multiple processes to create intermediate files, then use a single process to merge the resulting files. Here are some tips to get better performance:

  • Turn off indexing while you are writing the files by passing index=False, see here for the docs. I believe that PyTables incrementally updates the index, which in this case is completely unecessary (as you are going to merge them afterwards). Index only the final file. This should speed up the writing quite a bit.

  • You might consider changing the default indexing scheme / level, depending on what your queries are (assume you follow the advice several points below to NOT create too many data columns).

  • In a similar vein, don't create a compressed file when writing the pre-merged files, rather create it AFTER the indexed file is written (in an uncompressed state), so this ends up being your final step. See the docs here. Furthermore, it is very important to pass --chunkshape=auto when using ptrepack which recomputes the PyTables chunksize (e.g. how much data is read/written in a single block), as it will take into account the entire table.

  • RE compression, YMMV may vary here, depending on how well your data actually compresses, and what kinds of queries you are doing. I have some types of data that I find it is faster to NOT compress at all even though in theory it should be better. You have to just experiment (though I always do use blosc). Blosc only has one compression level (its either on for levels 1-9 or off for level 0). So changing this will not change anything.

  • I merge the files in the indexed order, basically by reading a subset of the pre-merge files into memory (a constant number to use only a constant amount of memory), then append them one-by-one to the final file. (not 100% sure this makes a difference but seems to work well).

  • You will find that the vast majority of your time is spent creating the index.

  • Furthermore, only index the columns that you actually need! by making sure to specify data_columns=a_small_subset_of_columns when writing each file.

  • I find that writing a lot of smallish files is better, then merging to create a largish file, rather than writing a few large files, but YMMV here. (e.g. say 100 100MB pre-merge files to yield a 10GB file, rather than 5 2GB files). Though this may be a function of my processing pipeline as I tend to bottleneck on the processing rather than the actual writing.

  • I have not used, but hear amazing things about using a SSD (sold-state-drive), even if it's relatively small for this kind of thing. You can get an order of magnitude of speedup using one (and compression may change this result).

like image 121
Jeff Avatar answered Sep 27 '22 18:09

Jeff