I want to read data from Cloud Pub/Sub and write it to BigQuery with Cloud Dataflow. Each data contains a table ID where the data itself will be saved.
There are various factors that writing to BigQuery fails:
When one of the failures occurs, a streaming job will retry the task and stall. I tried using WriteResult.getFailedInserts()
in order to rescue the bad data and avoid stalling, but it did not work well. Is there any good way?
Here is my code:
public class StarterPipeline {
private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);
public class MyData implements Serializable {
String table_id;
}
public interface MyOptions extends PipelineOptions {
@Description("PubSub topic to read from, specified as projects/<project_id>/topics/<topic_id>")
@Validation.Required
ValueProvider<String> getInputTopic();
void setInputTopic(ValueProvider<String> value);
}
public static void main(String[] args) {
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
Pipeline p = Pipeline.create(options);
PCollection<MyData> input = p
.apply("ReadFromPubSub", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
.apply("ParseJSON", MapElements.into(TypeDescriptor.of(MyData.class))
.via((String text) -> new Gson().fromJson(text, MyData.class)));
WriteResult writeResult = input
.apply("WriteToBigQuery", BigQueryIO.<MyData>write()
.to(new SerializableFunction<ValueInSingleWindow<MyData>, TableDestination>() {
@Override
public TableDestination apply(ValueInSingleWindow<MyData> input) {
MyData myData = input.getValue();
return new TableDestination(myData.table_id, null);
}
})
.withSchema(new TableSchema().setFields(new ArrayList<TableFieldSchema>() {{
add(new TableFieldSchema().setName("table_id").setType("STRING"));
}}))
.withFormatFunction(new SerializableFunction<MyData, TableRow>() {
@Override
public TableRow apply(MyData myData) {
return new TableRow().set("table_id", myData.table_id);
}
})
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withFailedInsertRetryPolicy(InsertRetryPolicy.neverRetry()));
writeResult.getFailedInserts()
.apply("LogFailedData", ParDo.of(new DoFn<TableRow, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) {
TableRow row = c.element();
LOG.info(row.get("table_id").toString());
}
}));
p.run();
}
}
There is no easy way to catch exceptions when writing to output in a pipeline definition. I suppose you could do it by writing a custom PTransform
for BigQuery. However, there is no way to do it natively in Apache Beam. I also recommend against this because it undermines Cloud Dataflow's automatic retry functionality.
In your code example, you have the failed insert retry policy set to never retry. You can set the policy to always retry. This is only effective during something like an intermittent network failure (4th bullet point).
.withFailedInsertRetryPolicy(InsertRetryPolicy.alwaysRetry())
If the table ID format is incorrect (1st bullet point), then the CREATE_IF_NEEDED
create disposition configuration should allow the Dataflow job to automatically create a new table without error, even if the table ID is incorrect.
If the dataset does not exist or there is an access permission issue to the dataset (2nd and 3rd bullet points), then my opinion is that the streaming job should stall and ultimately fail. There is no way to proceed under any circumstances without manual intervention.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With