Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I convert table row PCollections to key,value PCollections in Python?

There is NO documentation regarding how to convert pCollections into the pCollections necessary for input into .CoGroupByKey()

Context Essentially I have two large pCollections and I need to be able to find differences between the two, for type II ETL changes (if it doesn't exist in pColl1 then add to a nested field found in pColl2), so that I am able to retain history of these records from BigQuery.

Pipeline Architecture:

  1. Read BQ Tables into 2 pCollections: dwsku and product.
  2. Apply a CoGroupByKey() to the two sets to return --> Results
  3. Parse results to find and nest all changes in dwsku into product.

Any help would be recommended. I found a java link on SO that does the same thing I need to accomplish (but there's nothing on the Python SDK).

Convert from PCollection<TableRow> to PCollection<KV<K,V>>

Is there a documentation / support for Apache Beam, especially Python SDK?

like image 691
codebrotherone Avatar asked Jan 30 '23 02:01

codebrotherone


1 Answers

In order to get CoGroupByKey() working, you need to have PCollections of tuples, in which the first element would be the key and second - the data.

In your case, you said that you have BigQuerySource, which in current version of Apache Beam outputs PCollection of dictionaries (code), in which every entry represents a row in the table which was read. You need to map this PCollections to tuples as stated above. This is easy to do using ParDo:

class MapBigQueryRow(beam.DoFn):
    def process(self, element, key_column):
        key = element.get(key_column)
        yield key, element


data1 = (p
            | "Read #1 BigQuery table" >> beam.io.Read(beam.io.BigQuerySource(query="your query #1"))
            | "Map #1 to KV" >> beam.ParDo(MapBigQueryRow(), key_column="KEY_COLUMN_IN_TABLE_1"))

data2 = (p
            | "Read #2 BigQuery table" >> beam.io.Read(beam.io.BigQuerySource(query="your query #2"))
            | "Map #2 to KV" >> beam.ParDo(MapBigQueryRow(), key_column="KEY_COLUMN_IN_TABLE_2"))

co_grouped = ({"data1": data1, "data2": data2} | beam.CoGroupByKey())

# do your processing with co_grouped here

BTW, documentation of Python SDK for Apache Beam can be found here.

like image 132
Marcin Zablocki Avatar answered May 03 '23 02:05

Marcin Zablocki