Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Dataflow/apache beam - how to access current filename when passing in pattern?

I have seen this question answered before on stack overflow (https://stackoverflow.com/questions/29983621/how-to-get-filename-when-using-file-pattern-match-in-google-cloud-dataflow), but not since apache beam has added splittable dofn functionality for python. How would I access the filename of the current file being processed when passing in a file pattern to a gcs bucket?

I want to pass the filename into my transform function:

with beam.Pipeline(options=pipeline_options) as p:                              
    lines = p | ReadFromText('gs://url to file')                                        


    data = (                                                                    
        lines                                                                   
        | 'Jsonify' >> beam.Map(jsonify)                                        
        | 'Unnest' >> beam.FlatMap(unnest)                                      
        | 'Write to BQ' >> beam.io.Write(beam.io.BigQuerySink(                  
            'project_id:dataset_id.table_name', schema=schema,                     
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,    
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)       
        )                                                   

Ultimately, what I want to do is pass the filename into my transform function when I transform each row of the json (see this and then use the filename to do a lookup in a different BQ table to get a value). I think once I manage to know how to get the filename I will be able to figure out the side input part in order to do the lookup in the bq table and get the unique value.

like image 671
WIT Avatar asked Nov 21 '18 02:11

WIT


People also ask

What is PCollection and PTransform in dataflow?

A PCollection can contain either a bounded or unbounded number of elements. Bounded and unbounded PCollections are produced as the output of PTransforms (including root PTransforms like Read and Create ), and can be passed as the inputs of other PTransforms.

What is PCollection in dataflow?

A PCollection represents a potentially distributed, multi-element dataset that acts as the pipeline's data. Apache Beam transforms use PCollection objects as inputs and outputs for each step in your pipeline.

What is PTransform in Apache Beam?

A PTransform is an object describing (not executing) a computation. The actual execution semantics for a transform is captured by a runner object. A transform object always belongs to a pipeline object.

What is DoFn in Apache Beam?

DoFn is a Beam SDK class that describes a distributed processing function.


1 Answers

I tried to implement a solution with the previously cited case. There, as well as in other approaches such as this one they also get a list of file names but load all the file into a single element which might not scale well with large files. Therefore, I looked into adding the filename to each record.

As input I used two csv files:

$ gsutil cat gs://$BUCKET/countries1.csv
id,country
1,sweden
2,spain

gsutil cat gs://$BUCKET/countries2.csv
id,country
3,italy
4,france

Using GCSFileSystem.match we can access metadata_list to retrieve FileMetadata containing the file path and size in bytes. In my example:

[FileMetadata(gs://BUCKET_NAME/countries1.csv, 29),
 FileMetadata(gs://BUCKET_NAME/countries2.csv, 29)]

The code is:

result = [m.metadata_list for m in gcs.match(['gs://{}/countries*'.format(BUCKET)])]

We will read each of the matching files into a different PCollection. As we don't know the number of files a priori we need to create programmatically a list of names for each PCollection (p0, p1, ..., pN-1) and ensure that we have unique labels for each step ('Read file 0', 'Read file 1', etc.):

variables = ['p{}'.format(i) for i in range(len(result))]
read_labels = ['Read file {}'.format(i) for i in range(len(result))]
add_filename_labels = ['Add filename {}'.format(i) for i in range(len(result))]

Then we proceed to read each different file into its corresponding PCollection with ReadFromText and then we call the AddFilenamesFn ParDo to associate each record with the filename.

for i in range(len(result)):   
  globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.ParDo(AddFilenamesFn(), result[i].path)

where AddFilenamesFn is:

class AddFilenamesFn(beam.DoFn):
    """ParDo to output a dict with filename and row"""
    def process(self, element, file_path):
        file_name = file_path.split("/")[-1]
        yield {'filename':file_name, 'row':element}

My first approach was using a Map function directly which results in simpler code. However, result[i].path was resolved at the end of the loop and each record was incorrectly mapped to the last file of the list:

globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.Map(lambda elem: (result[i].path, elem))

Finally, we flatten all the PCollections into one:

merged = [globals()[variables[i]] for i in range(len(result))] | 'Flatten PCollections' >> beam.Flatten()

and we check the results by logging the elements:

INFO:root:{'filename': u'countries2.csv', 'row': u'id,country'}
INFO:root:{'filename': u'countries2.csv', 'row': u'3,italy'}
INFO:root:{'filename': u'countries2.csv', 'row': u'4,france'}
INFO:root:{'filename': u'countries1.csv', 'row': u'id,country'}
INFO:root:{'filename': u'countries1.csv', 'row': u'1,sweden'}
INFO:root:{'filename': u'countries1.csv', 'row': u'2,spain'}

I tested this with both DirectRunner and DataflowRunner for Python SDK 2.8.0.

I hope this addresses the main issue here and you can continue by integrating BigQuery into your full use case now. You might need to use the Python Client Library for that, I wrote a similar Java example.

Full code:

import argparse, logging
from operator import add

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import ReadFromText
from apache_beam.io.filesystem import FileMetadata
from apache_beam.io.filesystem import FileSystem
from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem

class GCSFileReader:
  """Helper class to read gcs files"""
  def __init__(self, gcs):
      self.gcs = gcs

class AddFilenamesFn(beam.DoFn):
    """ParDo to output a dict with filename and row"""
    def process(self, element, file_path):
        file_name = file_path.split("/")[-1]
        # yield (file_name, element) # use this to return a tuple instead
        yield {'filename':file_name, 'row':element}

# just logging output to visualize results
def write_res(element):
  logging.info(element)
  return element

def run(argv=None):
  parser = argparse.ArgumentParser()
  known_args, pipeline_args = parser.parse_known_args(argv)

  p = beam.Pipeline(options=PipelineOptions(pipeline_args))
  gcs = GCSFileSystem(PipelineOptions(pipeline_args))
  gcs_reader = GCSFileReader(gcs)

  # in my case I am looking for files that start with 'countries'
  BUCKET='BUCKET_NAME'
  result = [m.metadata_list for m in gcs.match(['gs://{}/countries*'.format(BUCKET)])]
  result = reduce(add, result)

  # create each input PCollection name and unique step labels
  variables = ['p{}'.format(i) for i in range(len(result))]
  read_labels = ['Read file {}'.format(i) for i in range(len(result))]
  add_filename_labels = ['Add filename {}'.format(i) for i in range(len(result))]

  # load each input file into a separate PCollection and add filename to each row
  for i in range(len(result)):
    # globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.Map(lambda elem: (result[i].path, elem))
    globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.ParDo(AddFilenamesFn(), result[i].path)

  # flatten all PCollections into a single one
  merged = [globals()[variables[i]] for i in range(len(result))] | 'Flatten PCollections' >> beam.Flatten() | 'Write results' >> beam.Map(write_res)

  p.run()

if __name__ == '__main__':
  run()
like image 75
Guillem Xercavins Avatar answered Oct 15 '22 11:10

Guillem Xercavins