Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Loading compressed gzipped csv file in Spark 2.0

How can I load a gzip compressed csv file in Pyspark on Spark 2.0 ?

I know that an uncompressed csv file can be loaded as follows:

spark.read.format("csv").option("header",          
                                "true").load("myfile.csv")

or

spark.read.option("header", "true").csv("myfile.csv")
like image 981
femibyte Avatar asked Nov 02 '16 10:11

femibyte


People also ask

Can Spark read compressed data?

Spark supports all compression formats that are supported by Hadoop.

How do I read a compressed file in PySpark?

Spark document clearly specify that you can read gz file automatically: All of Spark's file-based input methods, including textFile, support running on directories, compressed files, and wildcards as well. For example, you can use textFile("/my/directory"), textFile("/my/directory/. txt"), and textFile("/my/directory/.


Video Answer


1 Answers

I am unsure if this changed between the writing of the answers here and when I came to this question, but I would like to insert my findings for the future reference of myself and others who also encounter this same issue. I was loading GZIP compressed CSV files into a PySpark DataFrame on Spark version 2.4.7 and python version 3.7.4 inside of Google's managed Spark-As-A-Service offering aka "Dataproc". The underlying Dataproc image version is 1.5-debian10 if you want to further investigate the specs.

My problem was I could not successfully read the CSV without all of the input still being garbled. I was able to make one small tweak by changing the ending of the filename so that the file suffix was .gz and then things worked perfectly after that. Here is the code to reproduce the issue.

# This is a shell script to get a dummy file created with 2 different endings
echo 'foo,bar,baz' > test.csv
gzip test.csv
# So now there are 2 files with 2 endings
cp test.csv.gz test_csv

I then can run the pyspark job or even an interactive pyspark session (pictured below) then to verify that spark doesn't intelligently detect the file type so much as it looks at the filename and interprets the file type based on its name.

$ pyspark
Python 3.7.4 (default, Aug 13 2019, 20:35:49) 
[GCC 7.3.0] :: Anaconda, Inc. on linux
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  `_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.7
      /_/

Using Python version 3.7.4 (default, Aug 13 2019 20:35:49)
SparkSession available as 'spark'.
>>> filename_noend = 'test_csv'
>>> filename_end = 'test.csv.gz'
>>> schema = 'field1 string,field2 string,field3 string'
>>> df_noend = spark.read.csv(path=filename_noend, schema=schema, header=False)
>>> df_noend.show()
+--------------------+-------------+------+
|              field1|       field2|field3|
+--------------------+-------------+------+
���`test.cs...|�*.�+T+
                      |  null|
+--------------------+-------------+------+

>>> df_end = spark.read.csv(path=filename_end, schema=schema, header=False)
>>> df_end.show()
+------+------+------+
|field1|field2|field3|
+------+------+------+
|   foo|   bar|   baz|
+------+------+------+
>>> exit()

Sadly there is no way to specify something like compression='gzip' or whatever. So save your gzip compressed files with a .gz ending and you are good to go!

like image 79
Roman Czerwinski Avatar answered Sep 29 '22 10:09

Roman Czerwinski