I am trying to port some code from pandas to (py)Spark. Unfortunately I am already failing with the input part, where I want to read in binary data and put it in a Spark Dataframe.
So far I am using fromfile
from numpy:
dt = np.dtype([('val1', '<i4'),('val2','<i4'),('val3','<i4'),('val4','f8')])
data = np.fromfile('binary_file.bin', dtype=dt)
data=data[1:] #throw away header
df_bin = pd.DataFrame(data, columns=data.dtype.names)
But for Spark I couldn't find how to do it. My workaround so far was to use csv-Files instead of the binary file, but that is not an ideal solution. I am aware that I shouldn't use numpy's fromfile
with spark.
How can I read in a binary file that is already loaded into hdfs?
I tried something like
fileRDD=sc.parallelize(['hdfs:///user/bin_file1.bin','hdfs:///user/bin_file2.bin])
fileRDD.map(lambda x: ???)
But it is giving me a No such file or directory
error.
I have seen this question: spark in python: creating an rdd by loading binary data with numpy.fromfile but that only works if I have the files stored in the home of the driver node.
Use textFile() and wholeTextFiles() method of the SparkContext to read files from any file system and to read from HDFS, you need to provide the hdfs path as an argument to the function.
So, for anyone that starts with Spark as me and stumbles upon binary files. Here is how I solved it:
dt=np.dtype([('idx_metric','>i4'),('idx_resource','>i4'),('date','>i4'),
('value','>f8'),('pollID','>i2')])
schema=StructType([StructField('idx_metric',IntegerType(),False),
StructField('idx_resource',IntegerType(),False),
StructField('date',IntegerType),False),
StructField('value',DoubleType(),False),
StructField('pollID',IntegerType(),False)])
filenameRdd=sc.binaryFiles('hdfs://nameservice1:8020/user/*.binary')
def read_array(rdd):
#output=zlib.decompress((bytes(rdd[1])),15+32) # in case also zipped
array=np.frombuffer(bytes(rdd[1])[20:],dtype=dt) # remove Header (20 bytes)
array=array.newbyteorder().byteswap() # big Endian
return array.tolist()
unzipped=filenameRdd.flatMap(read_array)
bin_df=sqlContext.createDataFrame(unzipped,schema)
And now you can do whatever fancy stuff you want in Spark with your dataframe.
Edit: Please review the use of sc.binaryFiles as mentioned here: https://stackoverflow.com/a/28753276/5088142
try using:
hdfs://machine_host_name:8020/user/bin_file1.bin
you the host-name in fs.defaultFS in core-site.xml
Since Spark 3.0, Spark supports binary file data source, which reads binary files and converts each file into a single record that contains the raw content and metadata of the file.
https://spark.apache.org/docs/latest/sql-data-sources-binaryFile.html
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With