Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Dynamically fork Beam (Dataflow) Pipeline Based on number of TaggedOutputs

I'm currently attempting to fork a Beam pipeline running on Google Dataflow into many destinations based on a specific key contained within the data. I am able to get this working when hardcoding each of the endpoints of the 'fork' with the TaggedOutput tag. However, in the future, I will not always know what keys exist in the underlying data, so I am looking to dynamically create the subsequent steps in the process using a for loop similar to the below:

p = beam.Pipeline(options=pipeline_options)

pipe = p | 'ReadFromGCS' >> ReadFromText(args['input']) \
         | 'TagAllLines' >> beam.ParDo(produce_tagged_output_keys).with_outputs()

for client in pipe:
  client = pipe.client | client+'MapLinesToDicts' >> beam.Map(lambda line: dict(record=line)) \
                         | client+'WriteToBQTable' >> WriteToBigQuery(client+'_test', dataset=bq_dataset, project=project_id, schema='record:string')

My understanding is that the results of .with_outputs() should be iterable, no? When I run the above, it executes pipe without issue, but ignores the for loop entirely. Is there a way to dynamically do this that I am missing?

like image 470
andre622 Avatar asked Mar 07 '23 04:03

andre622


1 Answers

The result of .with_outputs() is only iterable if tags are specified

The result of .with_outputs() without specifying tags is not iterable, since the outputs are not yet known at pipeline construction time.
The result of .with_outputs(tag1, tag2, main=tag0) however is iterable:
Test the difference here

Remember that with apache beam, you first construct the pipeline, and then let the data flow through it. So you can't let your pipeline architecture depend on the data at runtime.

I'll propose another solution however:

Let's assume you have a pcollection collection with elements structured like this:

(key, bq_row)

Where the key is the feature deciding which table to write to and bq_row the data you want to write to a bigquery row.

You can then group the elements by key:

grouped = collection | beam.GroupByKey()

Now you get one element for every different key. So all the bq_row's in one element should be written to the same bigquery table, which depends on the element key.

You can then define a DoFn which writes all the rows in each element to the appropriate table. (example code)

class WriteToBigQueryFn(beam.DoFn):

    def __init__(self, dataset_name):
        super(BigQueryWriter, self).__init__()
        self.client = bigquery.Client()
        self.dataset = client.dataset(dataset_name)

    def process(self, (key, data)):
        table_name = get_table_name(key) # get table name based on key
        table = self.dataset.table(table_name)
        table.reload(self.client)

        table.insert_data(data)

grouped | beam.ParDo(WriteToBigQueryFn)
like image 93
Robbe Avatar answered Apr 27 '23 05:04

Robbe