Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to read gz files in Spark using wholeTextFiles

I have a folder which contains many small .gz files (compressed csv text files). I need to read them in my Spark job, but the thing is I need to do some processing based on info which is in the file name. Therefore, I did not use:

JavaRDD<<String>String> input = sc.textFile(...)

since to my understanding I do not have access to the file name this way. Instead, I used:

JavaPairRDD<<String>String,String> files_and_content = sc.wholeTextFiles(...);

because this way I get a pair of file name and the content. However, it seems that this way, the input reader fails to read the text from the gz file, but rather reads the binary Gibberish.

So, I would like to know if I can set it to somehow read the text, or alternatively access the file name using sc.textFile(...)

like image 632
Yaniv Donenfeld Avatar asked Jun 25 '14 07:06

Yaniv Donenfeld


People also ask

How do I read a TXT GZ file in Spark?

Spark document clearly specify that you can read gz file automatically: All of Spark's file-based input methods, including textFile, support running on directories, compressed files, and wildcards as well. For example, you can use textFile("/my/directory"), textFile("/my/directory/. txt"), and textFile("/my/directory/.

Can Spark read GZip files?

While a text file in GZip, BZip2, and other supported compression formats can be configured to be automatically decompressed in Apache Spark as long as it has the right file extension, you must perform additional steps to read zip files.

How do I read a zip file in Pyspark DataFrame?

Taking the ZipFileInputFormat and its helper ZipfileRecordReader class, I was able to get Spark to perfectly open and read the zip file. rdd1 = sc. newAPIHadoopFile("/Users/myname/data/compressed/target_file. ZIP", ZipFileInputFormat.


1 Answers

You cannot read gzipped files with wholeTextFiles because it uses CombineFileInputFormat which cannot read gzipped files because they are not splittable (source proving it):

  override def createRecordReader(
      split: InputSplit,
      context: TaskAttemptContext): RecordReader[String, String] = {

    new CombineFileRecordReader[String, String](
      split.asInstanceOf[CombineFileSplit],
      context,
      classOf[WholeTextFileRecordReader])
  }

You may be able to use newAPIHadoopFile with wholefileinputformat (not built into hadoop but all over the internet) to get this to work correctly.

UPDATE 1: I don't think WholeFileInputFormat will work since it just gets the bytes of the file, meaning you may have to write your own class possibly extending WholeFileInputFormat to make sure it decompresses the bytes.

Another option would be to decompress the bytes yourself using GZipInputStream

UPDATE 2: If you have access to the directory name like in the OP's comment below you can get all the files like this.

Path path = new Path("");
FileSystem fileSystem = path.getFileSystem(new Configuration()); //just uses the default one
FileStatus []  fileStatuses = fileSystem.listStatus(path);
ArrayList<Path> paths = new ArrayList<>();
for (FileStatus fileStatus : fileStatuses) paths.add(fileStatus.getPath());
like image 159
aaronman Avatar answered Nov 03 '22 00:11

aaronman