Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Beam/Google Dataflow PubSub to BigQuery Pipeline: Handling Insert Errors and Unexpected Retry Behavior

I have pulled down a copy of the Pub/Sub to BigQuery Dataflow template from Google's github repository. I am running it on my local machine using the direct-runner.

In testing I confirmed that the template only writes failures to the "deadletter" table if an error occurs during UDF processing or conversion from JSON to TableRow.

I wish to also handle failures that occur at time of insert into BigQuery more gracefully by sending them into a separate TupleTag as well so they can also be sent to the deadletter table or another output for review and processing. Currently, when executing with the dataflow-runner those errors only getting written to the Stackdriver logs and continue to be retried indefinitely until the issue is resolved.

Question One: While testing locally and publishing a message with a format not matching the destination table's schema an insert is retried 5 times and then the pipeline crashes with a RuntimeException along with the error returned from the HTTP response to Google's API. I believe this behavior is being set within BigQueryServices.Impl here:

private static final FluentBackoff INSERT_BACKOFF_FACTORY =
        FluentBackoff.DEFAULT.withInitialBackoff(Duration.millis(200)).withMaxRetries(5);

However, based on Google's documentation,

"When running in streaming mode, a bundle including a failing item will be retried indefinitely, which may cause your pipeline to permanently stall."

As Beam's Pub/Sub.IO,

create and consume unbounded PCollections

I am under the impression that streaming mode should be enabled by default when reading from Pub/Sub. I even went as far as adding the Streaming_Inserts method on my call to writeTableRows() and it did not impact this behavior.

.apply(
            "WriteSuccessfulRecords",      
            BigQueryIO.writeTableRows()
                .withMethod(Method.STREAMING_INSERTS)
  1. Is this behavior somehow being influenced by which runner I am using? If not, where is the flaw in my understanding?

Question Two:

  1. Is there a difference in performance when using BigQueryIO.write vs BigQueryIO.writeTableRows?

I ask because I do not see how I can capture the insert related errors without creating my own static class that overrides the expand method and uses a ParDo and DoFn where I can add my own custom logic to create separate TupleTags for successful records and failure records, similar to how this was done within JavascriptTextTransformer for FailsafeJavascriptUdf.

Update:

public static PipelineResult run(DirectOptions options) {

options.setRunner(DirectRunner.class);

    Pipeline pipeline = Pipeline.create(options);

    // Register the coder for pipeline
    FailsafeElementCoder<PubsubMessage, String> coder =
        FailsafeElementCoder.of(PubsubMessageWithAttributesCoder.of(), StringUtf8Coder.of());

    CoderRegistry coderRegistry = pipeline.getCoderRegistry();
    coderRegistry.registerCoderForType(coder.getEncodedTypeDescriptor(), coder);

     PCollectionTuple transformOut =
        pipeline
             //Step #1: Read messages in from Pub/Sub
            .apply(
                "ReadPubsubMessages",
  PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()))

             //Step #2: Transform the PubsubMessages into TableRows
            .apply("ConvertMessageToTableRow", new PubsubMessageToTableRow(options));

    WriteResult writeResult = null;

    try {
      writeResult = 
            transformOut
        .get(TRANSFORM_OUT)
        .apply(
            "WriteSuccessfulRecords",      
            BigQueryIO.writeTableRows()
                .withMethod(Method.STREAMING_INSERTS)
                .withoutValidation()
                .withCreateDisposition(CreateDisposition.CREATE_NEVER)
                .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
                .to("myproject:MyDataSet.MyTable"));
    } catch (Exception e) {
        System.out.print("Cause of the Standard Insert Failure is: ");
        System.out.print(e.getCause());
    }

    try {
        writeResult
            .getFailedInserts()
            .apply(
                    "WriteFailedInsertsToDeadLetter",
                    BigQueryIO.writeTableRows()
                        .to(options.getOutputDeadletterTable())
                        .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                        .withWriteDisposition(WriteDisposition.WRITE_APPEND));
    } catch (Exception e) {
        System.out.print("Cause of the Error Insert Failure is: ");
        System.out.print(e.getCause());
    }

     PCollectionList.of(transformOut.get(UDF_DEADLETTER_OUT))
        .and(transformOut.get(TRANSFORM_DEADLETTER_OUT))
        .apply("Flatten", Flatten.pCollections())
        .apply(
            "WriteFailedRecords",
            WritePubsubMessageErrors.newBuilder()
                .setErrorRecordsTable(
                    maybeUseDefaultDeadletterTable(
                        options.getOutputDeadletterTable(),
                        options.getOutputTableSpec(),
                        DEFAULT_DEADLETTER_TABLE_SUFFIX))
                .setErrorRecordsTableSchema(getDeadletterTableSchemaJson())
                .build());

    return pipeline.run();
  }

Error:

Cause of the Error Insert Failure is: null[WARNING] 
java.lang.NullPointerException: Outputs for non-root node WriteFailedInsertsToDeadLetter are null
    at org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:864)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:672)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
    at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
    at org.apache.beam.sdk.Pipeline.validate(Pipeline.java:575)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
    at com.google.cloud.teleport.templates.PubSubToBigQuery.run(PubSubToBigQuery.java:312)
    at com.google.cloud.teleport.templates.PubSubToBigQuery.main(PubSubToBigQuery.java:186)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
    at java.lang.Thread.run(Thread.java:748)
like image 757
HendPro12 Avatar asked Aug 27 '18 17:08

HendPro12


1 Answers

In the latest versions of Beam, the BigQueryIO.Write transform returns back a WriteResult object which enables you to retrieve a PCollection of TableRows that failed output to BigQuery. Using this, you can easily retrieve the failures, format them in the structure of your deadletter output, and resubmit the records to BigQuery. This eliminates the need for a separate class to manage successful and failed records.

Below is an example of what that could look like for your pipeline.

// Attempt to write the table rows to the output table.
WriteResult writeResult =
    pipeline.apply(
        "WriteRecordsToBigQuery",
        BigQueryIO.writeTableRows()
            .to(options.getOutputTable())
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));

/*
 * 1) Get the failed inserts
 * 2) Transform to the deadletter table format.
 * 3) Output to the deadletter table.
*/
writeResult
  .getFailedInserts()
    .apply("FormatFailedInserts", ParDo.of(new FailedInsertFormatter()))
    .apply(
        "WriteFailedInsertsToDeadletter",
        BigQueryIO.writeTableRows()
            .to(options.getDeadletterTable())
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND));

Additionally, to answer your questions:

  1. According to the beam docs, you must set the streaming option to true for the DirectRunner.
  2. There should be no performance difference. In either case, you'll need to convert the input records to TableRow objects. It should make no difference if you do that in a ParDo beforehand or within a serializable function using the BigQueryIO.Write.withFormatFunction.
like image 84
Ryan McDowell Avatar answered Oct 03 '22 01:10

Ryan McDowell