There is NO documentation regarding how to convert pCollections into the pCollections necessary for input into .CoGroupByKey()
Context Essentially I have two large pCollections and I need to be able to find differences between the two, for type II ETL changes (if it doesn't exist in pColl1 then add to a nested field found in pColl2), so that I am able to retain history of these records from BigQuery.
Pipeline Architecture:
Any help would be recommended. I found a java link on SO that does the same thing I need to accomplish (but there's nothing on the Python SDK).
Convert from PCollection<TableRow> to PCollection<KV<K,V>>
Is there a documentation / support for Apache Beam, especially Python SDK?
In order to get CoGroupByKey() working, you need to have PCollections of tuples, in which the first element would be the key and second - the data.
In your case, you said that you have BigQuerySource, which in current version of Apache Beam outputs PCollection of dictionaries (code), in which every entry represents a row in the table which was read. You need to map this PCollections to tuples as stated above. This is easy to do using ParDo:
class MapBigQueryRow(beam.DoFn):
def process(self, element, key_column):
key = element.get(key_column)
yield key, element
data1 = (p
| "Read #1 BigQuery table" >> beam.io.Read(beam.io.BigQuerySource(query="your query #1"))
| "Map #1 to KV" >> beam.ParDo(MapBigQueryRow(), key_column="KEY_COLUMN_IN_TABLE_1"))
data2 = (p
| "Read #2 BigQuery table" >> beam.io.Read(beam.io.BigQuerySource(query="your query #2"))
| "Map #2 to KV" >> beam.ParDo(MapBigQueryRow(), key_column="KEY_COLUMN_IN_TABLE_2"))
co_grouped = ({"data1": data1, "data2": data2} | beam.CoGroupByKey())
# do your processing with co_grouped here
BTW, documentation of Python SDK for Apache Beam can be found here.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With