I have pulled down a copy of the Pub/Sub to BigQuery Dataflow template from Google's github repository. I am running it on my local machine using the direct-runner.
In testing I confirmed that the template only writes failures to the "deadletter" table if an error occurs during UDF processing or conversion from JSON to TableRow.
I wish to also handle failures that occur at time of insert into BigQuery more gracefully by sending them into a separate TupleTag as well so they can also be sent to the deadletter table or another output for review and processing. Currently, when executing with the dataflow-runner those errors only getting written to the Stackdriver logs and continue to be retried indefinitely until the issue is resolved.
Question One: While testing locally and publishing a message with a format not matching the destination table's schema an insert is retried 5 times and then the pipeline crashes with a RuntimeException along with the error returned from the HTTP response to Google's API. I believe this behavior is being set within BigQueryServices.Impl here:
private static final FluentBackoff INSERT_BACKOFF_FACTORY =
FluentBackoff.DEFAULT.withInitialBackoff(Duration.millis(200)).withMaxRetries(5);
However, based on Google's documentation,
"When running in streaming mode, a bundle including a failing item will be retried indefinitely, which may cause your pipeline to permanently stall."
As Beam's Pub/Sub.IO,
create and consume unbounded PCollections
I am under the impression that streaming mode should be enabled by default when reading from Pub/Sub. I even went as far as adding the Streaming_Inserts method on my call to writeTableRows() and it did not impact this behavior.
.apply(
"WriteSuccessfulRecords",
BigQueryIO.writeTableRows()
.withMethod(Method.STREAMING_INSERTS)
Question Two:
I ask because I do not see how I can capture the insert related errors without creating my own static class that overrides the expand method and uses a ParDo and DoFn where I can add my own custom logic to create separate TupleTags for successful records and failure records, similar to how this was done within JavascriptTextTransformer for FailsafeJavascriptUdf.
Update:
public static PipelineResult run(DirectOptions options) {
options.setRunner(DirectRunner.class);
Pipeline pipeline = Pipeline.create(options);
// Register the coder for pipeline
FailsafeElementCoder<PubsubMessage, String> coder =
FailsafeElementCoder.of(PubsubMessageWithAttributesCoder.of(), StringUtf8Coder.of());
CoderRegistry coderRegistry = pipeline.getCoderRegistry();
coderRegistry.registerCoderForType(coder.getEncodedTypeDescriptor(), coder);
PCollectionTuple transformOut =
pipeline
//Step #1: Read messages in from Pub/Sub
.apply(
"ReadPubsubMessages",
PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()))
//Step #2: Transform the PubsubMessages into TableRows
.apply("ConvertMessageToTableRow", new PubsubMessageToTableRow(options));
WriteResult writeResult = null;
try {
writeResult =
transformOut
.get(TRANSFORM_OUT)
.apply(
"WriteSuccessfulRecords",
BigQueryIO.writeTableRows()
.withMethod(Method.STREAMING_INSERTS)
.withoutValidation()
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.to("myproject:MyDataSet.MyTable"));
} catch (Exception e) {
System.out.print("Cause of the Standard Insert Failure is: ");
System.out.print(e.getCause());
}
try {
writeResult
.getFailedInserts()
.apply(
"WriteFailedInsertsToDeadLetter",
BigQueryIO.writeTableRows()
.to(options.getOutputDeadletterTable())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
} catch (Exception e) {
System.out.print("Cause of the Error Insert Failure is: ");
System.out.print(e.getCause());
}
PCollectionList.of(transformOut.get(UDF_DEADLETTER_OUT))
.and(transformOut.get(TRANSFORM_DEADLETTER_OUT))
.apply("Flatten", Flatten.pCollections())
.apply(
"WriteFailedRecords",
WritePubsubMessageErrors.newBuilder()
.setErrorRecordsTable(
maybeUseDefaultDeadletterTable(
options.getOutputDeadletterTable(),
options.getOutputTableSpec(),
DEFAULT_DEADLETTER_TABLE_SUFFIX))
.setErrorRecordsTableSchema(getDeadletterTableSchemaJson())
.build());
return pipeline.run();
}
Error:
Cause of the Error Insert Failure is: null[WARNING]
java.lang.NullPointerException: Outputs for non-root node WriteFailedInsertsToDeadLetter are null
at org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:864)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:672)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
at org.apache.beam.sdk.Pipeline.validate(Pipeline.java:575)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
at com.google.cloud.teleport.templates.PubSubToBigQuery.run(PubSubToBigQuery.java:312)
at com.google.cloud.teleport.templates.PubSubToBigQuery.main(PubSubToBigQuery.java:186)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
at java.lang.Thread.run(Thread.java:748)
In the latest versions of Beam, the BigQueryIO.Write transform returns back a WriteResult object which enables you to retrieve a PCollection of TableRows that failed output to BigQuery. Using this, you can easily retrieve the failures, format them in the structure of your deadletter output, and resubmit the records to BigQuery. This eliminates the need for a separate class to manage successful and failed records.
Below is an example of what that could look like for your pipeline.
// Attempt to write the table rows to the output table.
WriteResult writeResult =
pipeline.apply(
"WriteRecordsToBigQuery",
BigQueryIO.writeTableRows()
.to(options.getOutputTable())
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));
/*
* 1) Get the failed inserts
* 2) Transform to the deadletter table format.
* 3) Output to the deadletter table.
*/
writeResult
.getFailedInserts()
.apply("FormatFailedInserts", ParDo.of(new FailedInsertFormatter()))
.apply(
"WriteFailedInsertsToDeadletter",
BigQueryIO.writeTableRows()
.to(options.getDeadletterTable())
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
Additionally, to answer your questions:
streaming
option to true
for the DirectRunner.If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With