I'm doing a simple pipeline using Apache Beam in python (on GCP Dataflow) to read from PubSub and write on Big Query but can't handle exceptions on pipeline to create alternatives flows.
On a simple WriteToBigQuery example:
output = json_output | 'Write to BigQuery' >> beam.io.WriteToBigQuery('some-project:dataset.table_name')
I tried to put this inside a try/except
code, but it doesnt work because when it fails, exceptions seems to be throwed on a Java layer outside my python execution:
INFO:root:2019-01-29T15:49:46.516Z: JOB_MESSAGE_ERROR: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction -87: Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 135, in _execute
response = task()
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 170, in <lambda>
self._execute(lambda: worker.do_instruction(work), work)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 221, in do_instruction
request.instruction_id)
...
...
...
self.signature.finish_bundle_method.method_value())
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/bigquery.py", line 1368, in finish_bundle
self._flush_batch()
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/bigquery.py", line 1380, in _flush_batch
self.table_id, errors))
RuntimeError: Could not successfully insert rows to BigQuery table [<myproject:datasetname.tablename>]. Errors: [<InsertErrorsValueListEntry
errors: [<ErrorProto
debugInfo: u''
location: u''
message: u'Missing required field: object.teste.'
reason: u'invalid'>]
index: 0>] [while running 'generatedPtransform-63']
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:276)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:84)
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:119)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction -87: Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 135, in _execute
response = task()
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 170, in <lambda>
self._execute(lambda: worker.do_instruction(work), work)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 221, in do_instruction
request.instruction_id)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 237, in process_bundle
bundle_processor.process_bundle(instruction_id)
...
...
...
self.signature.finish_bundle_method.method_value())
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/bigquery.py", line 1368, in finish_bundle
self._flush_batch()
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/bigquery.py", line 1380, in _flush_batch
self.table_id, errors))
Even trying to handle this:
RuntimeError: Could not successfully insert rows to BigQuery table [<myproject:datasetname.tablename>]. Errors: [<InsertErrorsValueListEntry
errors: [<ErrorProto
debugInfo: u''
location: u''
message: u'Missing required field: object.teste.'
reason: u'invalid'>]
index: 0>] [while running 'generatedPtransform-63']
Using:
try:
...
except RuntimeException as e:
...
Or using generic Exception
didn't work.
I could find a lot of examples of errors handling in Apache Beam using Java, but no one in python handling errors.
Does anyone knows how to got this?
An exception is an event, which occurs during the execution of a program that disrupts the normal flow of the program's instructions. In general, when a Python script encounters a situation that it cannot cope with, it raises an exception. An exception is a Python object that represents an error.
Python 3 support Apache Beam 2.14. 0 and higher support Python 3.5, 3.6, and 3.7. We continue to improve the experience for Python 3 users and plan to phase out Python 2 support (BEAM-8371): See details on the Python SDK's Roadmap.
In Python, exceptions can be handled using a try statement. The critical operation which can raise an exception is placed inside the try clause. The code that handles the exceptions is written in the except clause.
PCollection : represents a collection of data, which could be bounded or unbounded in size. PTransform : represents a computation that transforms input PCollections into output PCollections.
I've been only able to catch exceptions at the DoFn
level, so something like this:
class MyPipelineStep(beam.DoFn):
def process(self, element, *args, **kwargs):
try:
# do stuff...
yield pvalue.TaggedOutput('main_output', output_element)
except Exception as e:
yield pvalue.TaggedOutput('exception', str(e))
However WriteToBigQuery
is PTransform
that wraps the DoFn
BigQueryWriteFn
So you may need to do something like this
class MyBigQueryWriteFn(BigQueryWriteFn):
def process(self, *args, **kwargs):
try:
return super(BigQueryWriteFn, self).process(*args, **kwargs)
except Exception as e:
# Do something here
class MyWriteToBigQuery(WriteToBigQuery):
# Copy the source code of `WriteToBigQuery` here,
# but replace `BigQueryWriteFn` with `MyBigQueryWriteFn`
https://beam.apache.org/releases/pydoc/2.9.0/_modules/apache_beam/io/gcp/bigquery.html#WriteToBigQuery
You can also use the generator flavor of FlatMap
:
This is similar to the other answer, in that you can use a DoFn
in the place of something else, e.g. a CombineFn
to produce no outputs when there is an exception or other kind of failed-preconditions.
def sum_values(values: List[int]) -> Generator[int, None, None]:
if not values or len(values) < 10:
logging.error(f'received invalid inputs: {...}')
return
yield sum(values)
# Now instead of use |CombinePerKey|
(inputs
| 'WithKey' >> beam.Map(lambda x: (x.key, x)) \
| 'GroupByKey' >> beam.GroupByKey() \
| 'Values' >> beam.Values() \
| 'MaybeSum' >> beam.FlatMap(sum_values))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With