Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I read in a binary file from hdfs into a Spark dataframe?

I am trying to port some code from pandas to (py)Spark. Unfortunately I am already failing with the input part, where I want to read in binary data and put it in a Spark Dataframe.

So far I am using fromfile from numpy:

dt = np.dtype([('val1', '<i4'),('val2','<i4'),('val3','<i4'),('val4','f8')])
data = np.fromfile('binary_file.bin', dtype=dt)
data=data[1:]                                           #throw away header
df_bin = pd.DataFrame(data, columns=data.dtype.names)

But for Spark I couldn't find how to do it. My workaround so far was to use csv-Files instead of the binary file, but that is not an ideal solution. I am aware that I shouldn't use numpy's fromfile with spark. How can I read in a binary file that is already loaded into hdfs?

I tried something like

fileRDD=sc.parallelize(['hdfs:///user/bin_file1.bin','hdfs:///user/bin_file2.bin])
fileRDD.map(lambda x: ???)

But it is giving me a No such file or directory error.

I have seen this question: spark in python: creating an rdd by loading binary data with numpy.fromfile but that only works if I have the files stored in the home of the driver node.

like image 388
WilliamEllisWebb Avatar asked May 24 '16 12:05

WilliamEllisWebb


People also ask

Which is correct method in Spark to read a text file from HDFS?

Use textFile() and wholeTextFiles() method of the SparkContext to read files from any file system and to read from HDFS, you need to provide the hdfs path as an argument to the function.


3 Answers

So, for anyone that starts with Spark as me and stumbles upon binary files. Here is how I solved it:

dt=np.dtype([('idx_metric','>i4'),('idx_resource','>i4'),('date','>i4'),
             ('value','>f8'),('pollID','>i2')])
schema=StructType([StructField('idx_metric',IntegerType(),False),
                   StructField('idx_resource',IntegerType(),False), 
                   StructField('date',IntegerType),False), 
                   StructField('value',DoubleType(),False), 
                   StructField('pollID',IntegerType(),False)])

filenameRdd=sc.binaryFiles('hdfs://nameservice1:8020/user/*.binary')

def read_array(rdd):
    #output=zlib.decompress((bytes(rdd[1])),15+32) # in case also zipped
    array=np.frombuffer(bytes(rdd[1])[20:],dtype=dt) # remove Header (20 bytes)
    array=array.newbyteorder().byteswap() # big Endian
    return array.tolist()

unzipped=filenameRdd.flatMap(read_array)
bin_df=sqlContext.createDataFrame(unzipped,schema)

And now you can do whatever fancy stuff you want in Spark with your dataframe.

like image 134
WilliamEllisWebb Avatar answered Oct 22 '22 20:10

WilliamEllisWebb


Edit: Please review the use of sc.binaryFiles as mentioned here: https://stackoverflow.com/a/28753276/5088142


try using:

hdfs://machine_host_name:8020/user/bin_file1.bin

you the host-name in fs.defaultFS in core-site.xml

like image 28
Yaron Avatar answered Oct 22 '22 20:10

Yaron


Since Spark 3.0, Spark supports binary file data source, which reads binary files and converts each file into a single record that contains the raw content and metadata of the file.

https://spark.apache.org/docs/latest/sql-data-sources-binaryFile.html

like image 22
Aydin K. Avatar answered Oct 22 '22 19:10

Aydin K.