Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to catch any exceptions thrown by BigQueryIO.Write and rescue the data which is failed to output?

I want to read data from Cloud Pub/Sub and write it to BigQuery with Cloud Dataflow. Each data contains a table ID where the data itself will be saved.

There are various factors that writing to BigQuery fails:

  • Table ID format is wrong.
  • Dataset does not exist.
  • Dataset does not allow the pipeline to access.
  • Network failure.

When one of the failures occurs, a streaming job will retry the task and stall. I tried using WriteResult.getFailedInserts() in order to rescue the bad data and avoid stalling, but it did not work well. Is there any good way?

Here is my code:

public class StarterPipeline {
  private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);

  public class MyData implements Serializable {
    String table_id;
  }

  public interface MyOptions extends PipelineOptions {
    @Description("PubSub topic to read from, specified as projects/<project_id>/topics/<topic_id>")
    @Validation.Required
    ValueProvider<String> getInputTopic();
    void setInputTopic(ValueProvider<String> value);
  }

  public static void main(String[] args) {
    MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);

    Pipeline p = Pipeline.create(options);

    PCollection<MyData> input = p
        .apply("ReadFromPubSub", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
        .apply("ParseJSON", MapElements.into(TypeDescriptor.of(MyData.class))
            .via((String text) -> new Gson().fromJson(text, MyData.class)));
    WriteResult writeResult = input
        .apply("WriteToBigQuery", BigQueryIO.<MyData>write()
            .to(new SerializableFunction<ValueInSingleWindow<MyData>, TableDestination>() {
              @Override
              public TableDestination apply(ValueInSingleWindow<MyData> input) {
                MyData myData = input.getValue();
                return new TableDestination(myData.table_id, null);
              }
            })
            .withSchema(new TableSchema().setFields(new ArrayList<TableFieldSchema>() {{
              add(new TableFieldSchema().setName("table_id").setType("STRING"));
            }}))
            .withFormatFunction(new SerializableFunction<MyData, TableRow>() {
              @Override
              public TableRow apply(MyData myData) {
                return new TableRow().set("table_id", myData.table_id);
              }
            })
            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
            .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
            .withFailedInsertRetryPolicy(InsertRetryPolicy.neverRetry()));
    writeResult.getFailedInserts()
        .apply("LogFailedData", ParDo.of(new DoFn<TableRow, TableRow>() {
          @ProcessElement
          public void processElement(ProcessContext c) {
            TableRow row = c.element();
            LOG.info(row.get("table_id").toString());
          }
        }));

    p.run();
  }
}
like image 703
hmmnrst Avatar asked Dec 28 '17 05:12

hmmnrst


1 Answers

There is no easy way to catch exceptions when writing to output in a pipeline definition. I suppose you could do it by writing a custom PTransform for BigQuery. However, there is no way to do it natively in Apache Beam. I also recommend against this because it undermines Cloud Dataflow's automatic retry functionality.

In your code example, you have the failed insert retry policy set to never retry. You can set the policy to always retry. This is only effective during something like an intermittent network failure (4th bullet point).

.withFailedInsertRetryPolicy(InsertRetryPolicy.alwaysRetry())

If the table ID format is incorrect (1st bullet point), then the CREATE_IF_NEEDED create disposition configuration should allow the Dataflow job to automatically create a new table without error, even if the table ID is incorrect.

If the dataset does not exist or there is an access permission issue to the dataset (2nd and 3rd bullet points), then my opinion is that the streaming job should stall and ultimately fail. There is no way to proceed under any circumstances without manual intervention.

like image 180
Andrew Nguonly Avatar answered Sep 28 '22 18:09

Andrew Nguonly