Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Read Files from multiple folders in Apache Beam and map outputs to filenames

Working on reading files from multiple folders and then output the file contents with the file name like (filecontents, filename) to bigquery in apache beam using the python sdk and a dataflow runner.

Originally thought I could create A pcollection for each file then map the file contents with the filename.

def read_documents(pipeline):
  """Read the documents at the provided uris and returns (uri, line) pairs."""
  pcolls = []
  count = 0
  with open(TESTIN) as uris:
       for uri in uris:
    #print str(uri).strip("[]/'")
         pcolls.append(
         pipeline
         | 'Read: uri' + str(uri)  >>ReadFromText(str(uri).strip("[]/'"), compression_type = 'gzip')
         | 'WithKey: uri'  + str(uri)   >> beam.Map(lambda v, uri: (v, str(uri).strip("[]")), uri) 
         )
       return pcolls | 'FlattenReadPColls' >> beam.Flatten()

This worked fine but was slow and wouldn't work on dataflow cloud after about 10000 files. It would suffer from a broken pipe if over 10000 or so files.

Currently trying to overload the ReadAllFromText function from Text.io. Text.io is designed to read tons of files quickly from a pcollection of filenames or patterns. There is a bug in this module if reading from Google cloud storage and the file has content encoding. Google Cloud storage automatically gunzips files and transcodes them but for some reason ReadAllFromText doesn't work with it. You have to change the metadata of the file to remove content encoding and set the compression type on ReadAllFromText to gzip. I'm including this issue url in case anyone else has problems with ReadAllFromText https://issues.apache.org/jira/browse/BEAM-1874

My current code looks like this

class ReadFromGs(ReadAllFromText):

    def __init__(self):
        super(ReadFromGs, self).__init__(compression_type="gzip")

    def expand(self, pvalue):
        files = self._read_all_files
        return (
            pvalue          
            | 'ReadAllFiles' >> files #self._read_all_files
            | 'Map values' >>  beam.Map( lambda v: (v, filename)) # filename is a placeholder for the input filename that im trying to figure out how to include in the output.
            )

ReadAllFromText is contained in Text.io and calls ReadAllText from filebasedsource.py and inherits from PTransform.

I believe i'm just missing something simple missing.

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filebasedsource.py

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/textio.py

like image 560
theintense Avatar asked Jul 20 '18 00:07

theintense


1 Answers

As you found, ReadFromText doesn't currently support dynamic filenames and you definitely don't want to create individual steps for the each URL. From your initial sentence I understand you want get the filename and the file content out as one item. That means you won't need or benefit from any streaming of parts of the file. You can simply read the file contents. Something like:

import apache_beam as beam
from apache_beam.io.filesystems import FileSystems


def read_all_from_url(url):
    with FileSystems.open(url) as f:
        return f.read()


def read_from_urls(pipeline, urls):
    return (
        pipeline
        | beam.Create(urls)
        | 'Read File' >> beam.Map(lambda url: (
            url,
            read_all_from_url(url)
        ))
    )

You can customise it if you think you're having issues with metadata. The output will be a tuple (url, file contents). If your file contents is very large you might need a slightly different approach depending on your use case.

like image 189
de1 Avatar answered Sep 18 '22 15:09

de1