Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Exception Handling in Apache Beam pipelines using Python

I'm doing a simple pipeline using Apache Beam in python (on GCP Dataflow) to read from PubSub and write on Big Query but can't handle exceptions on pipeline to create alternatives flows.

On a simple WriteToBigQuery example:

output = json_output | 'Write to BigQuery' >> beam.io.WriteToBigQuery('some-project:dataset.table_name')

I tried to put this inside a try/except code, but it doesnt work because when it fails, exceptions seems to be throwed on a Java layer outside my python execution:

INFO:root:2019-01-29T15:49:46.516Z: JOB_MESSAGE_ERROR: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction -87: Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 135, in _execute
    response = task()
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 170, in <lambda>
    self._execute(lambda: worker.do_instruction(work), work)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 221, in do_instruction
    request.instruction_id)
...
...
...
    self.signature.finish_bundle_method.method_value())
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/bigquery.py", line 1368, in finish_bundle
    self._flush_batch()
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/bigquery.py", line 1380, in _flush_batch
    self.table_id, errors))
RuntimeError: Could not successfully insert rows to BigQuery table [<myproject:datasetname.tablename>]. Errors: [<InsertErrorsValueListEntry
 errors: [<ErrorProto
 debugInfo: u''
 location: u''
 message: u'Missing required field: object.teste.'
 reason: u'invalid'>]
 index: 0>] [while running 'generatedPtransform-63']

        java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
        org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
        org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:276)
        org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:84)
        org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:119)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction -87: Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 135, in _execute
    response = task()
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 170, in <lambda>
    self._execute(lambda: worker.do_instruction(work), work)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 221, in do_instruction
    request.instruction_id)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 237, in process_bundle
    bundle_processor.process_bundle(instruction_id)
...
...
...
    self.signature.finish_bundle_method.method_value())
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/bigquery.py", line 1368, in finish_bundle
    self._flush_batch()
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/bigquery.py", line 1380, in _flush_batch
    self.table_id, errors))

Even trying to handle this:

RuntimeError: Could not successfully insert rows to BigQuery table [<myproject:datasetname.tablename>]. Errors: [<InsertErrorsValueListEntry
 errors: [<ErrorProto
 debugInfo: u''
 location: u''
 message: u'Missing required field: object.teste.'
 reason: u'invalid'>]
 index: 0>] [while running 'generatedPtransform-63']

Using:

try:
 ...
except RuntimeException as e:
 ...

Or using generic Exception didn't work.

I could find a lot of examples of errors handling in Apache Beam using Java, but no one in python handling errors.

Does anyone knows how to got this?

like image 532
Marcelo Santino Avatar asked Jan 29 '19 17:01

Marcelo Santino


People also ask

What is exception handling in Python with examples?

An exception is an event, which occurs during the execution of a program that disrupts the normal flow of the program's instructions. In general, when a Python script encounters a situation that it cannot cope with, it raises an exception. An exception is a Python object that represents an error.

Does Apache beam support Python 3?

Python 3 support Apache Beam 2.14. 0 and higher support Python 3.5, 3.6, and 3.7. We continue to improve the experience for Python 3 users and plan to phase out Python 2 support (BEAM-8371): See details on the Python SDK's Roadmap.

Does Python support exception handling?

In Python, exceptions can be handled using a try statement. The critical operation which can raise an exception is placed inside the try clause. The code that handles the exceptions is written in the except clause.

What is PCollection in Python?

PCollection : represents a collection of data, which could be bounded or unbounded in size. PTransform : represents a computation that transforms input PCollections into output PCollections.


Video Answer


2 Answers

I've been only able to catch exceptions at the DoFn level, so something like this:

class MyPipelineStep(beam.DoFn):

    def process(self, element, *args, **kwargs):
        try:
            # do stuff...
            yield pvalue.TaggedOutput('main_output', output_element)
        except Exception as e:
            yield pvalue.TaggedOutput('exception', str(e))

However WriteToBigQuery is PTransform that wraps the DoFn BigQueryWriteFn

So you may need to do something like this

class MyBigQueryWriteFn(BigQueryWriteFn):

    def process(self, *args, **kwargs):
        try:
            return super(BigQueryWriteFn, self).process(*args, **kwargs)
        except Exception as e:
            # Do something here

class MyWriteToBigQuery(WriteToBigQuery):
    # Copy the source code of `WriteToBigQuery` here, 
    # but replace `BigQueryWriteFn` with `MyBigQueryWriteFn`

https://beam.apache.org/releases/pydoc/2.9.0/_modules/apache_beam/io/gcp/bigquery.html#WriteToBigQuery

like image 118
Alex Avatar answered Oct 19 '22 17:10

Alex


You can also use the generator flavor of FlatMap:

This is similar to the other answer, in that you can use a DoFn in the place of something else, e.g. a CombineFn to produce no outputs when there is an exception or other kind of failed-preconditions.

def sum_values(values: List[int]) -> Generator[int, None, None]:
    if not values or len(values) < 10:
        logging.error(f'received invalid inputs: {...}')
        return
    yield sum(values)


# Now instead of use |CombinePerKey|
(inputs
  | 'WithKey' >> beam.Map(lambda x: (x.key, x)) \
  | 'GroupByKey' >> beam.GroupByKey() \
  | 'Values' >> beam.Values() \
  | 'MaybeSum' >> beam.FlatMap(sum_values))
like image 37
dayfine Avatar answered Oct 19 '22 17:10

dayfine