Is it currently possible to read froma a gzip file in python using Apache Beam? My pipeline is pulling gzip files from gcs with this line of code:
beam.io.Read(beam.io.TextFileSource('gs://bucket/file.gz', compression_type='GZIP'))
But I am getting this error:
UnicodeDecodeError: 'utf8' codec can't decode byte 0x8b in position 1: invalid start byte
We noticed in the python beam source code that compressed files seem to be handled when writing to a sink. https://github.com/apache/incubator-beam/blob/python-sdk/sdks/python/apache_beam/io/fileio.py#L445
More Detailed Traceback:
Traceback (most recent call last):
File "beam-playground.py", line 11, in <module>
p.run()
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/pipeline.py", line 159, in run
return self.runner.run(self)
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 103, in run
super(DirectPipelineRunner, self).run(pipeline)
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 98, in run
pipeline.visit(RunVisitor(self))
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/pipeline.py", line 182, in visit
self._root_transform().visit(visitor, self, visited)
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/pipeline.py", line 419, in visit
part.visit(visitor, pipeline, visited)
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/pipeline.py", line 422, in visit
visitor.visit_transform(self)
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 93, in visit_transform
self.runner.run_transform(transform_node)
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 168, in run_transform
return m(transform_node)
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 99, in func_wrapper
func(self, pvalue, *args, **kwargs)
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 258, in run_Read
read_values(reader)
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 245, in read_values
read_result = [GlobalWindows.windowed_value(e) for e in reader]
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/io/fileio.py", line 807, in __iter__
yield self.source.coder.decode(line)
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/coders/coders.py", line 187, in decode
return value.decode('utf-8')
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/encodings/utf_8.py", line 16, in decode
return codecs.utf_8_decode(input, errors, True)
UnicodeDecodeError: 'utf8' codec can't decode byte 0x8b in position 1: invalid start byte
UPDATE: TextIO
in the Python SDK now supports reading from compressed files.
Today TextIO
in the Python SDK does not actually support reading from compressed files.
I ran into a similar problem. I have a custom binary source that I wanted to parse and grab data out of. The issue is the file.io API is based out of CSV or ARVO and no matter what I tried it wouldn't give me the lines without trying to split them on line break. As you can imagine, a binary file doesn't handle this well.
At first I tried a custom source, that ended up being 3 classes long to implement and it was duplicating core Dataflow/Beam code. In the end I coded this WONDERFUL little bit of monkeypatching to get what I needed done (deep level source coding testing here).
import apache_beam as beam
from apache_beam.io.fileio import coders
def _TextFileReader__iter(self):
# The full data file is had here and can be read like normal
# You can even limit the character bit here. (I did 9 to grab the file format)
data = self._file.read()
# Now you can either yield the whole file as a single data entry
# and run a ParDo to split it, or you can iterate in here and
# yield each row. I chose the latter, but I'm showing an example
# of the former.
yield data
# This monkeypatch good!
beam.io.fileio.TextFileReader.__iter__ = _TextFileReader__iter
To call this source, and make sure it's BINARY, I did the following:
pipeline | 'start_3' >> beam.io.Read(
beam.io.TextFileSource( 'gs://MY_BUCKET/sample.bin',
coder=coders.BytesCoder()
)
)
Notice the coders.BytesCoders()
? Without that it tried to convert the Bytes to something non-binary which wasn't good for my parsing engine. ;)
Took me a good day chunk of time to figure this out. However, if you use this method you can do almost anything with the file.io class in Dataflow. ;)
I ran into the same problem. I was trying to read binary GZ files from GCS, decompress them, and then ship them off to somewhere else for processing. I solved it in two steps.
First, make sure you're using the right Python library; my original library was out of date (I use at least v0.4): pip install --upgrade google-cloud-dataflow
.
Second, I constructed my pipeline as follows:
import apache_beam as beam
from apache_beam import (coders, io, transforms)
raw_logs = (p
| io.Read("ReadLogsFromGCS", beam.io.TextFileSource(
"gs://my-bucket/logs-*.gz",
coder=coders.BytesCoder()))
| transforms.Map(lambda x: x)
| io.Write("WriteToLocalhost", io.textio.WriteToText(
"/tmp/flattened-logs",
file_name_suffix=".json")))
p.run()
You should have a file called /tmp/flattened-logs.json
after running the pipeline.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With