Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Opening a gzip file in python Apache Beam

Is it currently possible to read froma a gzip file in python using Apache Beam? My pipeline is pulling gzip files from gcs with this line of code:

beam.io.Read(beam.io.TextFileSource('gs://bucket/file.gz', compression_type='GZIP')) 

But I am getting this error:

UnicodeDecodeError: 'utf8' codec can't decode byte 0x8b in position 1: invalid start byte

We noticed in the python beam source code that compressed files seem to be handled when writing to a sink. https://github.com/apache/incubator-beam/blob/python-sdk/sdks/python/apache_beam/io/fileio.py#L445

More Detailed Traceback:

Traceback (most recent call last):
  File "beam-playground.py", line 11, in <module>
    p.run() 
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/pipeline.py", line 159, in run
    return self.runner.run(self)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 103, in run
    super(DirectPipelineRunner, self).run(pipeline)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 98, in run
    pipeline.visit(RunVisitor(self))
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/pipeline.py", line 182, in visit
    self._root_transform().visit(visitor, self, visited)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/pipeline.py", line 419, in visit
    part.visit(visitor, pipeline, visited)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/pipeline.py", line 422, in visit
    visitor.visit_transform(self)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 93, in visit_transform
    self.runner.run_transform(transform_node)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 168, in run_transform
    return m(transform_node)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 99, in func_wrapper
    func(self, pvalue, *args, **kwargs)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 258, in run_Read
    read_values(reader)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 245, in read_values
    read_result = [GlobalWindows.windowed_value(e) for e in reader]
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/io/fileio.py", line 807, in __iter__
    yield self.source.coder.decode(line)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/coders/coders.py", line 187, in decode
    return value.decode('utf-8')
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/encodings/utf_8.py", line 16, in decode
    return codecs.utf_8_decode(input, errors, True)
UnicodeDecodeError: 'utf8' codec can't decode byte 0x8b in position 1: invalid start byte
like image 960
agsolid Avatar asked Aug 22 '16 17:08

agsolid


3 Answers

UPDATE: TextIO in the Python SDK now supports reading from compressed files.

Today TextIO in the Python SDK does not actually support reading from compressed files.

like image 90
Kenn Knowles Avatar answered Oct 20 '22 07:10

Kenn Knowles


I ran into a similar problem. I have a custom binary source that I wanted to parse and grab data out of. The issue is the file.io API is based out of CSV or ARVO and no matter what I tried it wouldn't give me the lines without trying to split them on line break. As you can imagine, a binary file doesn't handle this well.

At first I tried a custom source, that ended up being 3 classes long to implement and it was duplicating core Dataflow/Beam code. In the end I coded this WONDERFUL little bit of monkeypatching to get what I needed done (deep level source coding testing here).

import apache_beam as beam
from apache_beam.io.fileio import coders

def _TextFileReader__iter(self):
    # The full data file is had here and can be read like normal
    # You can even limit the character bit here. (I did 9 to grab the file format)
    data = self._file.read()
    # Now you can either yield the whole file as a single data entry
    # and run a ParDo to split it, or you can iterate in here and 
    # yield each row. I chose the latter, but I'm showing an example 
    # of the former.
    yield data

# This monkeypatch good!
beam.io.fileio.TextFileReader.__iter__ = _TextFileReader__iter

To call this source, and make sure it's BINARY, I did the following:

pipeline | 'start_3' >> beam.io.Read(
    beam.io.TextFileSource( 'gs://MY_BUCKET/sample.bin',
        coder=coders.BytesCoder()
    )
)

Notice the coders.BytesCoders()? Without that it tried to convert the Bytes to something non-binary which wasn't good for my parsing engine. ;)

Took me a good day chunk of time to figure this out. However, if you use this method you can do almost anything with the file.io class in Dataflow. ;)

like image 34
Dovy Avatar answered Oct 20 '22 09:10

Dovy


I ran into the same problem. I was trying to read binary GZ files from GCS, decompress them, and then ship them off to somewhere else for processing. I solved it in two steps.

First, make sure you're using the right Python library; my original library was out of date (I use at least v0.4): pip install --upgrade google-cloud-dataflow.

Second, I constructed my pipeline as follows:

import apache_beam as beam
from apache_beam import (coders, io, transforms)

raw_logs = (p
            | io.Read("ReadLogsFromGCS", beam.io.TextFileSource(
                      "gs://my-bucket/logs-*.gz",
                      coder=coders.BytesCoder()))
            | transforms.Map(lambda x: x)
            | io.Write("WriteToLocalhost", io.textio.WriteToText(
                       "/tmp/flattened-logs",
                       file_name_suffix=".json")))
p.run()

You should have a file called /tmp/flattened-logs.json after running the pipeline.

like image 3
Jason Vertrees Avatar answered Oct 20 '22 08:10

Jason Vertrees