Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Loading bigger than memory hdf5 file in pyspark

I have a big file (say 20 Gb) stored in HDF5 format. The file is basically a set of 3D coordinates that evolve over time (a molecular simulation trajectory). This basically is an array of shape (8000 (frames), 50000 (particles), 3 (coordinates))

In regular python I would simply load the hdf5 datafile using for h5py or pytables and index the datafile like if it was a numpy (the library lazily loads whatever data it needs).

However, if I try to load this file in Spark using SparkContext.parallelize it obviously clogs the memory:

sc.parallelize(data, 10)

How can I handle this problem? Is there a preferred data format for huge arrays? Can I make the rdd to be written on disk without passing by memory?

like image 470
pygabriel Avatar asked Jun 23 '15 17:06

pygabriel


People also ask

Why is HDF5 file so large?

This is probably due to your chunk layout - the more chunk sizes are small the more your HDF5 file will be bloated. Try to find an optimal balance between chunk sizes (to solve your use-case properly) and the overhead (size-wise) that they introduce in the HDF5 file.

Is HDF5 compressed?

One of the most powerful features of HDF5 is its ability to store and modify compressed data. The HDF5 Library comes with two pre-defined compression methods, GNUzip (Gzip) and Szip and has the capability of using third-party compression methods as well.

Does spark load all data in memory?

Does my data need to fit in memory to use Spark? No. Spark's operators spill data to disk if it does not fit in memory, allowing it to run well on any sized data.


1 Answers

Spark (and Hadoop) doesn't have support for reading parts of the HDF5 binary files. (I suspect that the reason for this is that HDF5 is a container format for storing documents and it allows to specify tree like hierarchy for the documents).

But if you need to read file from the local disk - it is doable with Spark especially if you know internal structure of your HDF5 file.

Here is an example - it assumes that you'll run local spark job, and you know in advance that your HDF5 dataset '/mydata' consists out of 100 chunks.

h5file_path="/absolute/path/to/file"

def readchunk(v):
    empty = h5.File(h5file_path)
    return empty['/mydata'][v,:]

foo = sc.parallelize(range(0,100)).map(lambda v: readchunk(v))
foo.count()

Going further you can modify the program to detect the number of chunks using f5['/mydata'].shape[0]

The next step would be to iterate over multiple datasets (you can list data sets with f5.keys()).

Also there is another article "From HDF5 Datasets to Apache Spark RDDs" that describe similar approach.

The same approach would work on a distributed cluster, but it gets little inefficient. h5py requires the file to in on a local file system. So this can be achieved in several ways: copy the file to all workers and keep it under the same location on worker's disk; or put the file to HDFS and mount HDFS using fusefs - so workers could access the file. Both ways have some inefficiencies, but it should be good enough for ad-hoc tasks.

Here is optimized version that opens h5 only once on every executor:

h5file_path="/absolute/path/to/file"

_h5file = None    
def readchunk(v):
    # code below will be executed on executor - in another python process on remote server
    # original value for _h5file (None) is sent from driver
    # and on executor is updated to h5.File object when the `readchunk` is called for the first time
    global _h5file
    if _h5file is None:
         _h5file = h5.File(h5file_path)
    return _h5file['/mydata'][v,:]

foo = sc.parallelize(range(0,100)).map(lambda v: readchunk(v))
foo.count()
like image 185
vvladymyrov Avatar answered Sep 28 '22 09:09

vvladymyrov