Our client is uploading files into GCS, but they are zipped. Is there any way, using the Java Dataflow SDK, in which we can run through all the zipped files, unzip the file, combine all the resulting .csv files into one file, and then only do the TextIO
transforms?
EDIT
To answer jkffs's questions,
Hope that helps!
because I had the same problem and only came to this 1 year old and quite incomplete solution. Here is a full example on how to unzip files on google dataflow:
public class SimpleUnzip {
private static final Logger LOG = LoggerFactory.getLogger(SimpleUnzip.class);
public static void main(String[] args){
Pipeline p = Pipeline.create(
PipelineOptionsFactory.fromArgs(args).withValidation().create());
GcsUtilFactory factory = new GcsUtilFactory();
GcsUtil util = factory.create(p.getOptions());
try{
List<GcsPath> gcsPaths = util.expand(GcsPath.fromUri("gs://tlogdataflow/test/*.zip"));
List<String> paths = new ArrayList<String>();
for(GcsPath gcsp: gcsPaths){
paths.add(gcsp.toString());
}
p.apply(Create.of(paths))
.apply(ParDo.of(new UnzipFN()));
p.run();
}
catch(Exception e){
LOG.error(e.getMessage());
}
}
public static class UnzipFN extends DoFn<String,Long>{
private static final long serialVersionUID = 2015166770614756341L;
private long filesUnzipped=0;
@Override
public void processElement(ProcessContext c){
String p = c.element();
GcsUtilFactory factory = new GcsUtilFactory();
GcsUtil u = factory.create(c.getPipelineOptions());
byte[] buffer = new byte[100000000];
try{
SeekableByteChannel sek = u.open(GcsPath.fromUri(p));
InputStream is = Channels.newInputStream(sek);
BufferedInputStream bis = new BufferedInputStream(is);
ZipInputStream zis = new ZipInputStream(bis);
ZipEntry ze = zis.getNextEntry();
while(ze!=null){
LOG.info("Unzipping File {}",ze.getName());
WritableByteChannel wri = u.create(GcsPath.fromUri("gs://tlogdataflow/test/" + ze.getName()), getType(ze.getName()));
OutputStream os = Channels.newOutputStream(wri);
int len;
while((len=zis.read(buffer))>0){
os.write(buffer,0,len);
}
os.close();
filesUnzipped++;
ze=zis.getNextEntry();
}
zis.closeEntry();
zis.close();
}
catch(Exception e){
e.printStackTrace();
}
c.output(filesUnzipped);
}
private String getType(String fName){
if(fName.endsWith(".zip")){
return "application/x-zip-compressed";
}
else {
return "text/plain";
}
}
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With