Working on reading files from multiple folders and then output the file contents with the file name like (filecontents, filename) to bigquery in apache beam using the python sdk and a dataflow runner.
Originally thought I could create A pcollection for each file then map the file contents with the filename.
def read_documents(pipeline):
"""Read the documents at the provided uris and returns (uri, line) pairs."""
pcolls = []
count = 0
with open(TESTIN) as uris:
for uri in uris:
#print str(uri).strip("[]/'")
pcolls.append(
pipeline
| 'Read: uri' + str(uri) >>ReadFromText(str(uri).strip("[]/'"), compression_type = 'gzip')
| 'WithKey: uri' + str(uri) >> beam.Map(lambda v, uri: (v, str(uri).strip("[]")), uri)
)
return pcolls | 'FlattenReadPColls' >> beam.Flatten()
This worked fine but was slow and wouldn't work on dataflow cloud after about 10000 files. It would suffer from a broken pipe if over 10000 or so files.
Currently trying to overload the ReadAllFromText function from Text.io. Text.io is designed to read tons of files quickly from a pcollection of filenames or patterns. There is a bug in this module if reading from Google cloud storage and the file has content encoding. Google Cloud storage automatically gunzips files and transcodes them but for some reason ReadAllFromText doesn't work with it. You have to change the metadata of the file to remove content encoding and set the compression type on ReadAllFromText to gzip. I'm including this issue url in case anyone else has problems with ReadAllFromText https://issues.apache.org/jira/browse/BEAM-1874
My current code looks like this
class ReadFromGs(ReadAllFromText):
def __init__(self):
super(ReadFromGs, self).__init__(compression_type="gzip")
def expand(self, pvalue):
files = self._read_all_files
return (
pvalue
| 'ReadAllFiles' >> files #self._read_all_files
| 'Map values' >> beam.Map( lambda v: (v, filename)) # filename is a placeholder for the input filename that im trying to figure out how to include in the output.
)
ReadAllFromText is contained in Text.io and calls ReadAllText from filebasedsource.py and inherits from PTransform.
I believe i'm just missing something simple missing.
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filebasedsource.py
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/textio.py
As you found, ReadFromText
doesn't currently support dynamic filenames and you definitely don't want to create individual steps for the each URL. From your initial sentence I understand you want get the filename and the file content out as one item. That means you won't need or benefit from any streaming of parts of the file. You can simply read the file contents. Something like:
import apache_beam as beam
from apache_beam.io.filesystems import FileSystems
def read_all_from_url(url):
with FileSystems.open(url) as f:
return f.read()
def read_from_urls(pipeline, urls):
return (
pipeline
| beam.Create(urls)
| 'Read File' >> beam.Map(lambda url: (
url,
read_all_from_url(url)
))
)
You can customise it if you think you're having issues with metadata. The output will be a tuple (url, file contents). If your file contents is very large you might need a slightly different approach depending on your use case.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With