Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to read a zip containing multiple files in Apache Spark

I am having a Zipped file containing multiple text files. I want to read each of the file and build a List of RDD containining the content of each files.

val test = sc.textFile("/Volumes/work/data/kaggle/dato/test/5.zip")

will just entire files, but how to iterate through each content of zip and then save the same in RDD using Spark.

I am fine with Scala or Python.

Possible solution in Python with using Spark -

archive = zipfile.ZipFile(archive_path, 'r')
file_paths = zipfile.ZipFile.namelist(archive)
for file_path in file_paths:
    urls = file_path.split("/")
    urlId = urls[-1].split('_')[0]
like image 684
Abhishek Choudhary Avatar asked Aug 18 '15 18:08

Abhishek Choudhary


3 Answers

Apache Spark default compression support

I have written all the necessary theory in other answer, that you might want to refer to: https://stackoverflow.com/a/45958182/1549135

Read zip containing multiple files

I have followed the advice given by @Herman and used ZipInputStream. This gave me this solution, which returns RDD[String] of the zip content.

import java.io.{BufferedReader, InputStreamReader}
import java.util.zip.ZipInputStream
import org.apache.spark.SparkContext
import org.apache.spark.input.PortableDataStream
import org.apache.spark.rdd.RDD

implicit class ZipSparkContext(val sc: SparkContext) extends AnyVal {

    def readFile(path: String,
                 minPartitions: Int = sc.defaultMinPartitions): RDD[String] = {

      if (path.endsWith(".zip")) {
        sc.binaryFiles(path, minPartitions)
          .flatMap { case (name: String, content: PortableDataStream) =>
            val zis = new ZipInputStream(content.open)
            Stream.continually(zis.getNextEntry)
                  .takeWhile {
                      case null => zis.close(); false
                      case _ => true
                  }
                  .flatMap { _ =>
                      val br = new BufferedReader(new InputStreamReader(zis))
                      Stream.continually(br.readLine()).takeWhile(_ != null)
                  }
        }
      } else {
        sc.textFile(path, minPartitions)
      }
    }
  }

simply use it by importing the implicit class and call the readFile method on SparkContext:

import com.github.atais.spark.Implicits.ZipSparkContext
sc.readFile(path)
like image 186
Atais Avatar answered Sep 21 '22 19:09

Atais


If you are reading binary files use sc.binaryFiles. This will return an RDD of tuples containing the file name and a PortableDataStream. You can feed the latter into a ZipInputStream.

like image 42
Herman Avatar answered Sep 22 '22 19:09

Herman


Here's a working version of @Atais solution (which needs enhancement by closing the streams) :

implicit class ZipSparkContext(val sc: SparkContext) extends AnyVal {

def readFile(path: String,
             minPartitions: Int = sc.defaultMinPartitions): RDD[String] = {

  if (path.toLowerCase.contains("zip")) {

    sc.binaryFiles(path, minPartitions)
      .flatMap {
        case (zipFilePath, zipContent) ⇒
          val zipInputStream = new ZipInputStream(zipContent.open())
          Stream.continually(zipInputStream.getNextEntry)
            .takeWhile(_ != null)
            .map { _ ⇒
              scala.io.Source.fromInputStream(zipInputStream, "UTF-8").getLines.mkString("\n")
            } #::: { zipInputStream.close; Stream.empty[String] }
      }
  } else {
    sc.textFile(path, minPartitions)
  }
}
}

Then all you have to do is the following to read a zip file :

sc.readFile(path)
like image 28
mahmoud mehdi Avatar answered Sep 22 '22 19:09

mahmoud mehdi