There is NO documentation regarding how to convert pCollections into the pCollections necessary for input into .CoGroupByKey()
Context Essentially I have two large pCollections and I need to be able to find differences between the two, for type II ETL changes (if it doesn't exist in pColl1 then add to a nested field found in pColl2), so that I am able to retain history of these records from BigQuery.
Pipeline Architecture:
Any help would be recommended. I found a java link on SO that does the same thing I need to accomplish (but there's nothing on the Python SDK).
Convert from PCollection<TableRow> to PCollection<KV<K,V>>
Is there a documentation / support for Apache Beam, especially Python SDK?
In order to get CoGroupByKey()
working, you need to have PCollections
of tuples
, in which the first element would be the key and second - the data.
In your case, you said that you have BigQuerySource
, which in current version of Apache Beam outputs PCollection of dictionaries
(code), in which every entry represents a row in the table which was read. You need to map this PCollections to tuples as stated above. This is easy to do using ParDo
:
class MapBigQueryRow(beam.DoFn):
def process(self, element, key_column):
key = element.get(key_column)
yield key, element
data1 = (p
| "Read #1 BigQuery table" >> beam.io.Read(beam.io.BigQuerySource(query="your query #1"))
| "Map #1 to KV" >> beam.ParDo(MapBigQueryRow(), key_column="KEY_COLUMN_IN_TABLE_1"))
data2 = (p
| "Read #2 BigQuery table" >> beam.io.Read(beam.io.BigQuerySource(query="your query #2"))
| "Map #2 to KV" >> beam.ParDo(MapBigQueryRow(), key_column="KEY_COLUMN_IN_TABLE_2"))
co_grouped = ({"data1": data1, "data2": data2} | beam.CoGroupByKey())
# do your processing with co_grouped here
BTW, documentation of Python SDK for Apache Beam can be found here.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With