Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Perform action after Dataflow pipeline has processed all data

Is it possible to perform an action once a batch Dataflow job has finished processing all data? Specifically, I'd like to move the text file that the pipeline just processed to a different GCS bucket. I'm not sure where to place that in my pipeline to ensure it executes once after the data processing has completed.

like image 852
user01380121 Avatar asked Jun 01 '17 19:06

user01380121


People also ask

How do you rerun a failed job in dataflow?

hi @sɐunıɔןɐqɐp, there is an option clone at the top of the jobs dashboard in the dataflow. It will simply clone the current job and let you edit and run it again.

How do you drain a dataflow job?

To either drain or cancel a Dataflow job, you can use the gcloud dataflow jobs command in the Cloud Shell or a local terminal installed with the gcloud CLI.


2 Answers

I don't see why you need to do this post pipeline execution. You could use side outputs to write the file to multiple buckets, and save yourself the copy after the pipeline finishes.

If that's not going to work for you (for whatever reason), then you can simply run your pipeline in blocking execution mode i.e. use pipeline.run().waitUntilFinish(), and then just write the rest of your code (which does the copy) after that.

[..]
// do some stuff before the pipeline runs
Pipeline pipeline = ...
pipeline.run().waitUntilFinish();
// do something after the pipeline finishes here
[..]
like image 138
Graham Polley Avatar answered Sep 24 '22 19:09

Graham Polley


A little trick I got from reading the source code of apache beam's PassThroughThenCleanup.java.

Right after your reader, create a side input that 'combine' the entire collection (in the source code, it is the View.asIterable() PTransform) and connect its output to a DoFn. This DoFn will be called only after the reader has finished reading ALL elements.

P.S. The code literally name the operation, cleanupSignalView which I found really clever

Note that you can achieve the same effect using Combine.globally() (java) or beam.CombineGlobally() (python). For more info check out section 4.2.4.3 here

like image 34
Joshua H Avatar answered Sep 21 '22 19:09

Joshua H