My workflow : KAFKA -> Dataflow streaming -> BigQuery
Given that having low-latency isn't important in my case, I use FILE_LOADS to reduce the costs. I'm using BigQueryIO.Write with a DynamicDestination (one new table every hour, with the current hour as a suffix).
This BigQueryIO.Write is configured like this :
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withMethod(Method.FILE_LOADS)
.withTriggeringFrequency(triggeringFrequency)
.withNumFileShards(100)
The first table is successfully created and is written to. But then the following tables are never created and I get these exceptions:
(99e5cd8c66414e7a): java.lang.RuntimeException: Failed to create load job with id prefix 5047f71312a94bf3a42ee5d67feede75_5295fbf25e1a7534f85e25dcaa9f4986_00001_00023, reached max retries: 3, last failed load job: {
  "configuration" : {
    "load" : {
      "createDisposition" : "CREATE_NEVER",
      "destinationTable" : {
        "datasetId" : "dev_mydataset",
        "projectId" : "myproject-id",
        "tableId" : "mytable_20180302_16"
      },
For the first table the CreateDisposition used is CREATE_IF_NEEDED as specified, but then this parameter is not taken into account and CREATE_NEVER is used by default.
I also created the issue on JIRA.
According to the documentation of Apache Beam's BigQueryIO, the method BigQueryIO.Write.CreateDisposition requires that a table schema is provided using the precondition .withSchema() when CREATE_IF_NEEDED is used.
As stated also in the Dataflow documentation:
Note that if you specify CREATE_IF_NEEDED as the CreateDisposition and you don't supply a TableSchema, the transform may fail at runtime with a java.lang.IllegalArgumentException if the target table does not exist.
The error that documentation states is not the same one as you are receiving (you get java.lang.RuntimeException), but according to the BigQueryIO.Write() configuration you shared, you are not specifying any table schema, and therefore, if tables are missing, the job is prone to failure.
So as a first measure to solve your issue, you should create the table schema TableSchema() that matches the data you will load into BQ, and then use the precondition .withSchema(schema) accordingly:
List<TableFieldSchema> fields = new ArrayList<>();
// Add fields like:
fields.add(new TableFieldSchema().setName("<FIELD_NAME>").setType("<FIELD_TYPE>"));
TableSchema schema = new TableSchema().setFields(fields);
// BigQueryIO.Write configuration plus:
    .withSchema(schema)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With