Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Common Cloud Dataflow pattern - is there a better way?

We are finding ourselves frequently using the following pattern in Dataflow:

  1. Perform a key extract ParDo from a BigQuery TableRow
  2. Perform a GroupByKey on the result of 1
  3. Perform a flatten ParDo on the result of 2

Is there an operation in Dataflow to achieve this in one hit (at least from the API perspective)?

I've had a look at Combine operation, but that seems more suited to be used when calculating values e.g. sums/averages etc.

like image 242
Graham Polley Avatar asked Dec 06 '25 05:12

Graham Polley


1 Answers

Without much details in your question I can only give general advise.

You could create a PTransform that combines the above pattern into a single Composite Transform. This allows you to put together the frequently used operations into a single reusable component.

The following code should give you an idea of what I mean:

import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.io.BigQueryIO;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.transforms.*;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;


class ExtractKeyFn extends DoFn<TableRow, KV<String, TableRow>> {

    @Override
    public void processElement(ProcessContext c) throws Exception {
        TableRow row = c.element();
        Object key = row.get("key");
        if (key != null) {
            c.output(KV.of(key.toString(), row));
        }
    }
}

class CompositeTransform extends PTransform<PCollection<TableRow>, PCollection<TableRow>> {

    public CompositeTransform(String name) {
        super(name);
    }

    public static CompositeTransform named(String name) {
        return new CompositeTransform(name);
    }

    @Override
    public PCollection<TableRow> apply(PCollection<TableRow> input) {
        return input.apply(ParDo.named("parse").of(new ExtractKeyFn()))
             .apply(GroupByKey.create())
             // potentially more transformations
            .apply(Values.create()) // get only the values ( because we have a kv )
            .apply(Flatten.iterables()); // flatten them out
    }
}

public class Main {

    public static void run(PipelineOptions options) {
        Pipeline p = Pipeline.create(options);

        // read input
        p.apply(BigQueryIO.Read.from("inputTable...").named("inputFromBigQuery"))

         // apply fancy transform
         .apply(CompositeTransform.named("FancyKeyGroupAndFlatten"))

         // write output
         .apply(BigQueryIO.Write.to("outputTable...").named("outputToBigQuery"));


        p.run();
    }
}
like image 111
Ankur Chauhan Avatar answered Dec 10 '25 07:12

Ankur Chauhan