Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

BigQueryIO - Write performance with streaming and FILE_LOADS

My pipeline : Kafka -> Dataflow streaming (Beam v2.3) -> BigQuery

Given that low-latency isn't important in my case, I use FILE_LOADS to reduce the costs, like this :

BigQueryIO.writeTableRows()
  .withJsonSchema(schema)
  .withWriteDisposition(WriteDisposition.WRITE_APPEND)
  .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
  .withMethod(Method.FILE_LOADS)
  .withTriggeringFrequency(triggeringFrequency)      
  .withCustomGcsTempLocation(gcsTempLocation)
  .withNumFileShards(numFileShards) 
  .withoutValidation()
  .to(new SerializableFunction[ValueInSingleWindow[TableRow], TableDestination]() {
    def apply(element: ValueInSingleWindow[TableRow]): TableDestination = {
      ...
    }
  }

This Dataflow step is introducing an always bigger delay in the pipeline, so that it can't keep up with Kafka throughput (less than 50k events/s), even with 40 n1-standard-s4 workers. As shown on the screenshot below, the system lag is very big (close to pipeline up-time) for this step, whereas Kafka system lag is only a few seconds.

System lag introduced by BigQueryIO.Write

If I understand correctly, Dataflow writes the elements into numFileShards in gcsTempLocation and every triggeringFrequency a load job is started to insert them into BigQuery. For instance if I choose a triggeringFrequency of 5 minutes, I can see (with bq ls -a -j) that all the load jobs need less than 1 minute to be completed. But still the step is introducing more and more delay, resulting in Kafka consuming less and less elements (thanks to bcackpressure). Increasing/decreasing numFileShards and triggeringFrequency doesn't correct the problem.

I don't manually specify any window, I just the default one. Files are not accumulating in gcsTempLocation.

Any idea what's going wrong here?

like image 237
benjben Avatar asked Nov 07 '22 09:11

benjben


1 Answers

You mention that you don't explicitly specify a Window, which means that by default Dataflow will use the "Global window". The windowing documentation contains this warning:

Caution: Dataflow's default windowing behavior is to assign all elements of a PCollection to a single, global window, even for unbounded PCollections. Before you use a grouping transform such as GroupByKey on an unbounded PCollection, you must set a non-global windowing function. See Setting Your PCollection's Windowing Function.

If you don't set a non-global windowing function for your unbounded PCollection and subsequently use a grouping transform such as GroupByKey or Combine, your pipeline will generate an error upon construction and your Dataflow job will fail.

You can alternatively set a non-default Trigger for a PCollection to allow the global window to emit "early" results under some other conditions.

It appears that your pipeline doesn't do any explicit grouping, but I wonder if internal grouping via BigQuery write is causing issues.

Can you see in the UI if your downstream DropInputs has received any elements? If not, this is an indication that data is getting held up in the upstream BigQuery step.

like image 151
Scott Wegner Avatar answered Nov 15 '22 08:11

Scott Wegner