Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

BigQueryIO - Can't use DynamicDestination with CREATE_IF_NEEDED for unbounded PCollection and FILE_LOADS

My workflow : KAFKA -> Dataflow streaming -> BigQuery

Given that having low-latency isn't important in my case, I use FILE_LOADS to reduce the costs. I'm using BigQueryIO.Write with a DynamicDestination (one new table every hour, with the current hour as a suffix).

This BigQueryIO.Write is configured like this :

.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withMethod(Method.FILE_LOADS)
.withTriggeringFrequency(triggeringFrequency)
.withNumFileShards(100)

The first table is successfully created and is written to. But then the following tables are never created and I get these exceptions:

(99e5cd8c66414e7a): java.lang.RuntimeException: Failed to create load job with id prefix 5047f71312a94bf3a42ee5d67feede75_5295fbf25e1a7534f85e25dcaa9f4986_00001_00023, reached max retries: 3, last failed load job: {
  "configuration" : {
    "load" : {
      "createDisposition" : "CREATE_NEVER",
      "destinationTable" : {
        "datasetId" : "dev_mydataset",
        "projectId" : "myproject-id",
        "tableId" : "mytable_20180302_16"
      },

For the first table the CreateDisposition used is CREATE_IF_NEEDED as specified, but then this parameter is not taken into account and CREATE_NEVER is used by default.

I also created the issue on JIRA.

like image 818
benjben Avatar asked Nov 08 '22 09:11

benjben


1 Answers

According to the documentation of Apache Beam's BigQueryIO, the method BigQueryIO.Write.CreateDisposition requires that a table schema is provided using the precondition .withSchema() when CREATE_IF_NEEDED is used.

As stated also in the Dataflow documentation:

Note that if you specify CREATE_IF_NEEDED as the CreateDisposition and you don't supply a TableSchema, the transform may fail at runtime with a java.lang.IllegalArgumentException if the target table does not exist.

The error that documentation states is not the same one as you are receiving (you get java.lang.RuntimeException), but according to the BigQueryIO.Write() configuration you shared, you are not specifying any table schema, and therefore, if tables are missing, the job is prone to failure.

So as a first measure to solve your issue, you should create the table schema TableSchema() that matches the data you will load into BQ, and then use the precondition .withSchema(schema) accordingly:

List<TableFieldSchema> fields = new ArrayList<>();
// Add fields like:
fields.add(new TableFieldSchema().setName("<FIELD_NAME>").setType("<FIELD_TYPE>"));
TableSchema schema = new TableSchema().setFields(fields);

// BigQueryIO.Write configuration plus:
    .withSchema(schema)
like image 70
dsesto Avatar answered Nov 15 '22 10:11

dsesto