We are finding ourselves frequently using the following pattern in Dataflow:
ParDo from a BigQuery TableRowGroupByKey on the result of 1ParDo on the result of 2Is there an operation in Dataflow to achieve this in one hit (at least from the API perspective)?
I've had a look at Combine operation, but that seems more suited to be used when calculating values e.g. sums/averages etc.
Without much details in your question I can only give general advise.
You could create a PTransform that combines the above pattern into a single Composite Transform. This allows you to put together the frequently used operations into a single reusable component.
The following code should give you an idea of what I mean:
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.io.BigQueryIO;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.transforms.*;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
class ExtractKeyFn extends DoFn<TableRow, KV<String, TableRow>> {
@Override
public void processElement(ProcessContext c) throws Exception {
TableRow row = c.element();
Object key = row.get("key");
if (key != null) {
c.output(KV.of(key.toString(), row));
}
}
}
class CompositeTransform extends PTransform<PCollection<TableRow>, PCollection<TableRow>> {
public CompositeTransform(String name) {
super(name);
}
public static CompositeTransform named(String name) {
return new CompositeTransform(name);
}
@Override
public PCollection<TableRow> apply(PCollection<TableRow> input) {
return input.apply(ParDo.named("parse").of(new ExtractKeyFn()))
.apply(GroupByKey.create())
// potentially more transformations
.apply(Values.create()) // get only the values ( because we have a kv )
.apply(Flatten.iterables()); // flatten them out
}
}
public class Main {
public static void run(PipelineOptions options) {
Pipeline p = Pipeline.create(options);
// read input
p.apply(BigQueryIO.Read.from("inputTable...").named("inputFromBigQuery"))
// apply fancy transform
.apply(CompositeTransform.named("FancyKeyGroupAndFlatten"))
// write output
.apply(BigQueryIO.Write.to("outputTable...").named("outputToBigQuery"));
p.run();
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With