Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to open/stream .zip files through Spark?

I have zip files that I would like to open 'through' Spark. I can open .gzip file no problem because of Hadoops native Codec support, but am unable to do so with .zip files.

Is there an easy way to read a zip file in your Spark code? I've also searched for zip codec implementations to add to the CompressionCodecFactory, but am unsuccessful so far.

like image 710
JeffLL Avatar asked Feb 17 '15 19:02

JeffLL


People also ask

How do I open a zip file with Spark?

Taking the ZipFileInputFormat and its helper ZipfileRecordReader class, I was able to get Spark to perfectly open and read the zip file. rdd1 = sc. newAPIHadoopFile("/Users/myname/data/compressed/target_file.

How do I open a zipped zip file?

Open File Explorer and find the zipped folder. To unzip the entire folder, right-click to select Extract All, and then follow the instructions. To unzip a single file or folder, double-click the zipped folder to open it. Then, drag or copy the item from the zipped folder to a new location.

How do I read a zip file in Databricks?

The following notebooks show how to read zip files. After you download a zip file to a temp directory, you can invoke the Azure Databricks %sh zip magic command to unzip the file. For the sample file used in the notebooks, the tail step removes a comment line from the unzipped file.

How do I open zip attachments?

To unzip a single file or folder, open the zipped folder, then drag the file or folder from the zipped folder to a new location. To unzip all the contents of the zipped folder, press and hold (or right-click) the folder, select Extract All, and then follow the instructions.


3 Answers

There was no solution with python code and I recently had to read zips in pyspark. And, while searching how to do that I came across this question. So, hopefully this'll help others.

import zipfile
import io

def zip_extract(x):
    in_memory_data = io.BytesIO(x[1])
    file_obj = zipfile.ZipFile(in_memory_data, "r")
    files = [i for i in file_obj.namelist()]
    return dict(zip(files, [file_obj.open(file).read() for file in files]))


zips = sc.binaryFiles("hdfs:/Testing/*.zip")
files_data = zips.map(zip_extract).collect()

In the above code I returned a dictionary with filename in the zip as a key and the text data in each file as the value. you can change it however you want to suit your purposes.

like image 172
TrigonaMinima Avatar answered Oct 14 '22 16:10

TrigonaMinima


@user3591785 pointed me in the correct direction, so I marked his answer as correct.

For a bit more detail, I was able to search for ZipFileInputFormat Hadoop, and came across this link: http://cotdp.com/2012/07/hadoop-processing-zip-files-in-mapreduce/

Taking the ZipFileInputFormat and its helper ZipfileRecordReader class, I was able to get Spark to perfectly open and read the zip file.

    rdd1  = sc.newAPIHadoopFile("/Users/myname/data/compressed/target_file.ZIP", ZipFileInputFormat.class, Text.class, Text.class, new Job().getConfiguration());

The result was a map with one element. The file name as key, and the content as the value, so I needed to transform this into a JavaPairRdd. I'm sure you could probably replace Text with BytesWritable if you want, and replace the ArrayList with something else, but my goal was to first get something running.

JavaPairRDD<String, String> rdd2 = rdd1.flatMapToPair(new PairFlatMapFunction<Tuple2<Text, Text>, String, String>() {

    @Override
    public Iterable<Tuple2<String, String>> call(Tuple2<Text, Text> textTextTuple2) throws Exception {
        List<Tuple2<String,String>> newList = new ArrayList<Tuple2<String, String>>();

        InputStream is = new ByteArrayInputStream(textTextTuple2._2.getBytes());
        BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8"));

        String line;

        while ((line = br.readLine()) != null) {

        Tuple2 newTuple = new Tuple2(line.split("\\t")[0],line);
            newList.add(newTuple);
        }
        return newList;
    }
});
like image 40
JeffLL Avatar answered Oct 14 '22 18:10

JeffLL


Please try the code below:

using API sparkContext.newAPIHadoopRDD(
    hadoopConf,
    InputFormat.class,
    ImmutableBytesWritable.class, Result.class)
like image 28
Tinku Avatar answered Oct 14 '22 16:10

Tinku