Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

When using unbounded PCollection from TextIO to BigQuery, data is stuck in Reshuffle/GroupByKey inside of BigQueryIO

I'm using TextIO for reading from Cloud Storage. As I want to have the job running continously, I use watchForNewFiles.

For completeness, the data I read is working fine if I use bounded PCollections (no watchForNewFiles and BigQueryIO in batch mode), so there is no data issue.

I have p.run().waitUntilFinish(); in my code, so the pipeline runs. And it does not give any error.

Apache beam version is 2.8.0

PCollection<String> stream =
        p.apply("Read File", TextIO
                .read()
                .from(options.getInput())
                .watchForNewFiles(
                        Duration.standardMinutes(1),
                        Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1))
                )
                .withCompression(Compression.AUTO));

This works perfectly fine and reads files as soon as they are available. the PCollection is unbounded and contains lines of text from these files.

After doing some transformations

PCollection<List<String>> lines = stream.apply("Parse CSV",
        ParDo.of(new ParseCSV())
);

PCollection<TableRow> rows = lines.apply("Convert to BQ",
        ParDo.of(new BigQueryConverter(schema))
);

The ParseCSV step adds timestamps to its receiver via outputWithTimestamp.

I end up with a PCollection of TableRows ready to stream to BigQuery. For that, I use

WriteResult result = rows.apply("WriteToBigQuery",
        BigQueryIO.
                <TableRow>write()
                .withFormatFunction(input -> input)
                .withSchema(bqSchema)
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
                .withExtendedErrorInfo()
                .to(options.getOutput())

);

This never writes data to BigQuery. If I take a look into the UI, I see that BigQueryIO does

  • ShardTableWrites
  • TagWithUniqueId
  • Reshuffle
    • Window.into
    • GroupByKey

Data enters and leaves the first two steps. But never the Reshuffle. This only reads data but never passes data on. The step inside Reshuffle which causes that is GroupByKey.

As the collection is unbounded, I tried to configure the window with

lines = lines.apply(Window.configure()
        .<List<String>>into(FixedWindows
                .of(Duration.standardSeconds(10))
        )
);

which should force anything doing a GroupByKey to release a window after 10 seconds. But it does not.

lines = lines.apply(Window.configure()
        .<List<String>>into(FixedWindows
                .of(Duration.standardSeconds(10))
        )
        .triggering(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(10)))
        .withAllowedLateness(Duration.standardSeconds(0))
        .discardingFiredPanes()
);

Adding a specific trigger on processing time also did not help. Any clue? Thanks in advance!

like image 609
Florian Baumert Avatar asked Nov 12 '18 16:11

Florian Baumert


1 Answers

One workaround could be (which worked for me) to assagin a new key to every element and force the Dataflow to decouple transformations with a Reshuffle or a GroupByKey.

streams.apply(WithKeys.of(input -> 1)).setCoder(KvCoder.of(VarIntCoder.of(), StringUtf8Coder.of()))
       .apply(Reshuffle.of())
       .apply(MapElements.via(new SimpleFunction<KV<Integer, String>, String>() {
           @Override
           public String apply(KV<Integer, String> input) {
               return input.getValue();
           }
       }))
       .apply("convertToTableRow", ...)
       .apply("WriteToBigQuery", ...)

The key can be a constant like in the example or a random. If you choose random then you have to set the range small enough to fit in the JVM memory. Like ThreadLocalRandom.current().nextInt(0, 5000)

like image 116
tade Avatar answered Oct 26 '22 17:10

tade