Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Efficient ParDo setup or start_bundle for side input

list A: 25M hashes
list B: 175K hashes

I want to check each hash in list B for existence in list A. For this I have a ParDo function and I yield when it's not matched. This is a deduplication process.

How do I set up this ParDo efficiently, now I do a side input of list A while processing list B. But shouldnt the side input go to setup() or start_bundle() of the ParDo so I store the lookup list (A) in the worker just once?

class Checknewrecords(beam.DoFn):
    def process(self, element, hashlist):
        if element['TA_HASH'] not in hashlist:
            yield element
        else:
            pass

If you have a the answer please include a link to the documentation because I did not find any good documentation for the Python version.

  • transformed_records is a PCollection from a previous transformation
  • current_data is a PCollection from a BigQuery.read

    new_records = transformed_records | 'Checknewrecords' >> beam.ParDo(Checknewrecords(), pvalue.AsList(current_data))

like image 380
Thijs Avatar asked Dec 18 '25 07:12

Thijs


1 Answers

Sorry, I initially misunderstood the question. Actually I don't think its possible to have a side input in start_bundle. It is only accessible in process_bundle. But you could instead do the work on the first call to process bundle and get a similar result.

class DoFnMethods(beam.DoFn):
  def __init__(self):
    self.first_element_processed = False
    self.once_retrieved_side_input_data = None

  def called_once(self, side_input):
    if self.first_element_processed:
      return
    self.once_retrieved_side_input_data = side_input.get(...)
    self.first_element_processed = True

  def process(self, element, side_input):
    self.called_once(side_input)
    ...

Note: You do need to be aware of the fact that start bundle and finish bundle will be called once for the bundle across all windows, and the side input is provided to process is different for each window computed. So if you are working with windows you may need to use a dict(keyed by window) for the self.first_element_processed and self.once_retrieved_side_input_data variables so you can called_onc once for each window.

like image 189
Alex Amato Avatar answered Dec 20 '25 21:12

Alex Amato