I'm using TextIO for reading from Cloud Storage. As I want to have the job running continously, I use watchForNewFiles.
For completeness, the data I read is working fine if I use bounded PCollections (no watchForNewFiles and BigQueryIO in batch mode), so there is no data issue.
I have p.run().waitUntilFinish(); in my code, so the pipeline runs. And it does not give any error.
Apache beam version is 2.8.0
PCollection<String> stream =
p.apply("Read File", TextIO
.read()
.from(options.getInput())
.watchForNewFiles(
Duration.standardMinutes(1),
Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1))
)
.withCompression(Compression.AUTO));
This works perfectly fine and reads files as soon as they are available. the PCollection is unbounded and contains lines of text from these files.
After doing some transformations
PCollection<List<String>> lines = stream.apply("Parse CSV",
ParDo.of(new ParseCSV())
);
PCollection<TableRow> rows = lines.apply("Convert to BQ",
ParDo.of(new BigQueryConverter(schema))
);
The ParseCSV step adds timestamps to its receiver via outputWithTimestamp.
I end up with a PCollection of TableRows ready to stream to BigQuery. For that, I use
WriteResult result = rows.apply("WriteToBigQuery",
BigQueryIO.
<TableRow>write()
.withFormatFunction(input -> input)
.withSchema(bqSchema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.withExtendedErrorInfo()
.to(options.getOutput())
);
This never writes data to BigQuery. If I take a look into the UI, I see that BigQueryIO does
Data enters and leaves the first two steps. But never the Reshuffle. This only reads data but never passes data on. The step inside Reshuffle which causes that is GroupByKey.
As the collection is unbounded, I tried to configure the window with
lines = lines.apply(Window.configure()
.<List<String>>into(FixedWindows
.of(Duration.standardSeconds(10))
)
);
which should force anything doing a GroupByKey to release a window after 10 seconds. But it does not.
lines = lines.apply(Window.configure()
.<List<String>>into(FixedWindows
.of(Duration.standardSeconds(10))
)
.triggering(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(10)))
.withAllowedLateness(Duration.standardSeconds(0))
.discardingFiredPanes()
);
Adding a specific trigger on processing time also did not help. Any clue? Thanks in advance!
One workaround could be (which worked for me) to assagin a new key to every element and force the Dataflow to decouple transformations with a Reshuffle or a GroupByKey.
streams.apply(WithKeys.of(input -> 1)).setCoder(KvCoder.of(VarIntCoder.of(), StringUtf8Coder.of()))
.apply(Reshuffle.of())
.apply(MapElements.via(new SimpleFunction<KV<Integer, String>, String>() {
@Override
public String apply(KV<Integer, String> input) {
return input.getValue();
}
}))
.apply("convertToTableRow", ...)
.apply("WriteToBigQuery", ...)
The key can be a constant like in the example or a random. If you choose random then you have to set the range small enough to fit in the JVM memory. Like ThreadLocalRandom.current().nextInt(0, 5000)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With