list A: 25M hashes
list B: 175K hashes
I want to check each hash in list B for existence in list A. For this I have a ParDo function and I yield when it's not matched. This is a deduplication process.
How do I set up this ParDo efficiently, now I do a side input of list A while processing list B. But shouldnt the side input go to setup() or start_bundle() of the ParDo so I store the lookup list (A) in the worker just once?
class Checknewrecords(beam.DoFn):
def process(self, element, hashlist):
if element['TA_HASH'] not in hashlist:
yield element
else:
pass
If you have a the answer please include a link to the documentation because I did not find any good documentation for the Python version.
current_data is a PCollection from a BigQuery.read
new_records = transformed_records | 'Checknewrecords' >> beam.ParDo(Checknewrecords(), pvalue.AsList(current_data))
Sorry, I initially misunderstood the question. Actually I don't think its possible to have a side input in start_bundle. It is only accessible in process_bundle. But you could instead do the work on the first call to process bundle and get a similar result.
class DoFnMethods(beam.DoFn):
def __init__(self):
self.first_element_processed = False
self.once_retrieved_side_input_data = None
def called_once(self, side_input):
if self.first_element_processed:
return
self.once_retrieved_side_input_data = side_input.get(...)
self.first_element_processed = True
def process(self, element, side_input):
self.called_once(side_input)
...
Note: You do need to be aware of the fact that start bundle and finish bundle will be called once for the bundle across all windows, and the side input is provided to process is different for each window computed. So if you are working with windows you may need to use a dict(keyed by window) for the self.first_element_processed and self.once_retrieved_side_input_data variables so you can called_onc once for each window.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With