Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Unzip File in Dataflow Before Reading

Our client is uploading files into GCS, but they are zipped. Is there any way, using the Java Dataflow SDK, in which we can run through all the zipped files, unzip the file, combine all the resulting .csv files into one file, and then only do the TextIO transforms?

EDIT

To answer jkffs's questions,

  1. Well I don't really need to combine them all into a single file, it would just be much easier from a reading perspective.
  2. They are ZIP files, not GZ or BZ or anything else. Each ZIP contains multiple files. The files names are not really significant, and yes, I would actually prefer it TextIO transparently decompresses and concatenates all the files, on a per-archive basis.

Hope that helps!

like image 707
iLikeBreakfast Avatar asked Dec 24 '22 13:12

iLikeBreakfast


1 Answers

because I had the same problem and only came to this 1 year old and quite incomplete solution. Here is a full example on how to unzip files on google dataflow:

public class SimpleUnzip {

private static final Logger LOG = LoggerFactory.getLogger(SimpleUnzip.class);

public static void main(String[] args){
    Pipeline p = Pipeline.create(
            PipelineOptionsFactory.fromArgs(args).withValidation().create());

    GcsUtilFactory factory = new GcsUtilFactory();
    GcsUtil util = factory.create(p.getOptions());
    try{
        List<GcsPath> gcsPaths = util.expand(GcsPath.fromUri("gs://tlogdataflow/test/*.zip"));
        List<String> paths = new ArrayList<String>();

        for(GcsPath gcsp: gcsPaths){
            paths.add(gcsp.toString());
        }
        p.apply(Create.of(paths))
            .apply(ParDo.of(new UnzipFN()));
        p.run();

        }
    catch(Exception e){
        LOG.error(e.getMessage());
        }


}

public static class UnzipFN extends DoFn<String,Long>{
    private static final long serialVersionUID = 2015166770614756341L;
    private long filesUnzipped=0;
    @Override
    public void processElement(ProcessContext c){
        String p = c.element();
        GcsUtilFactory factory = new GcsUtilFactory();
        GcsUtil u = factory.create(c.getPipelineOptions());
        byte[] buffer = new byte[100000000];
        try{
            SeekableByteChannel sek = u.open(GcsPath.fromUri(p));
            InputStream is = Channels.newInputStream(sek);
            BufferedInputStream bis = new BufferedInputStream(is);
            ZipInputStream zis = new ZipInputStream(bis);
            ZipEntry ze = zis.getNextEntry();
            while(ze!=null){
                LOG.info("Unzipping File {}",ze.getName());
                WritableByteChannel wri = u.create(GcsPath.fromUri("gs://tlogdataflow/test/" + ze.getName()), getType(ze.getName()));
                OutputStream os = Channels.newOutputStream(wri);
                int len;
                while((len=zis.read(buffer))>0){
                    os.write(buffer,0,len);
                }
                os.close();
                filesUnzipped++;
                ze=zis.getNextEntry();

            }
            zis.closeEntry();
            zis.close();

        }
        catch(Exception e){
            e.printStackTrace();
        }
    c.output(filesUnzipped);
    }

    private String getType(String fName){
        if(fName.endsWith(".zip")){
            return "application/x-zip-compressed";
        }
        else {
            return "text/plain";
        }
    }
}

}

like image 126
bigdataclown Avatar answered Mar 07 '23 07:03

bigdataclown