Is it possible to perform an action once a batch Dataflow job has finished processing all data? Specifically, I'd like to move the text file that the pipeline just processed to a different GCS bucket. I'm not sure where to place that in my pipeline to ensure it executes once after the data processing has completed.
hi @sɐunıɔןɐqɐp, there is an option clone at the top of the jobs dashboard in the dataflow. It will simply clone the current job and let you edit and run it again.
To either drain or cancel a Dataflow job, you can use the gcloud dataflow jobs command in the Cloud Shell or a local terminal installed with the gcloud CLI.
I don't see why you need to do this post pipeline execution. You could use side outputs to write the file to multiple buckets, and save yourself the copy after the pipeline finishes.
If that's not going to work for you (for whatever reason), then you can simply run your pipeline in blocking execution mode i.e. use pipeline.run().waitUntilFinish()
, and then just write the rest of your code (which does the copy) after that.
[..]
// do some stuff before the pipeline runs
Pipeline pipeline = ...
pipeline.run().waitUntilFinish();
// do something after the pipeline finishes here
[..]
A little trick I got from reading the source code of apache beam's PassThroughThenCleanup.java
.
Right after your reader, create a side input that 'combine' the entire collection (in the source code, it is the View.asIterable()
PTransform) and connect its output to a DoFn
. This DoFn
will be called only after the reader has finished reading ALL elements.
P.S. The code literally name the operation, cleanupSignalView
which I found really clever
Note that you can achieve the same effect using Combine.globally()
(java) or beam.CombineGlobally()
(python). For more info check out section 4.2.4.3 here
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With