For example, if I have a Dataflow streaming job with 5 minutes window that reads from PubSub, I understand that if I assign a two days past timestamp to an element, there will be a window with this element, and if I use the example that outputs daily tables to BigQuery described in BigQueryIO.java, the job will write the two days past element in a BigQuery table with the actual date.
I would like to write past elements to BigQuery tables with the timestamp of the elements of the window instead of the time of the current window, is it possible?
Now I'm following the example described in DataflowJavaSDK/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java:
PCollection<TableRow> quotes = ...
quotes.apply(Window.<TableRow>info(CalendarWindows.days(1)))
.apply(BigQueryIO.Write
.named("Write")
.withSchema(schema)
.to(new SerializableFunction<BoundedWindow, String>() {
public String apply(BoundedWindow window) {
String dayString = DateTimeFormat.forPattern("yyyy_MM_dd").parseDateTime(
((DaysWindow) window).getStartDate());
return "my-project:output.output_table_" + dayString;
}
}));
If I understand correctly, you would like to make sure that BigQuery tables are created according to inherent timestamps of the elements (quotes), rather than wall-clock time when your pipeline runs.
TL;DR the code should already do what you want; if it's not, please post more details.
Longer explanation: One of the key innovations in processing in Dataflow is event-time processing. This means that data processing in Dataflow is almost completely decoupled from when the processing happens - what matters is when the events being processed happened. This is a key element of enabling exactly the same code to run on batch or streaming data sources (e.g. processing real-time user click events using the same code that processes historical click logs). It also enables flexible handling of late-arriving data.
Please see The world beyond batch, the section "Event time vs. processing time" for a description of this aspect of Dataflow's processing model (the whole article is very much worth a read). For a deeper description, see the VLDB paper. This is also described in a more user-facing way in the official documentation on windowing and triggers.
Accordingly, there is no such thing as a "current window" because the pipeline may be concurrently processing many different events that happened at different times and belong to different windows. In fact, as the VLDB paper points out, one of the important parts of the execution of a Dataflow pipeline is "group elements by window".
In the pipeline you showed, we will group the records you want to write to BigQuery into windows using provided timestamps on the records, and write each window to its own table, creating the table for newly encountered windows if necessary. If late data arrives into the window (see documentation on windowing and triggers for a discussion of late data), we will append to the already existing table.
The abovementioned code did not work for me anymore. There is an updated example in the Google docs though where DaysWindow is replaced by IntervalWindow which worked for me:
PCollection<TableRow> quotes = ...
quotes.apply(Window.<TableRow>into(CalendarWindows.days(1)))
.apply(BigQueryIO.Write
.named("Write")
.withSchema(schema)
.to(new SerializableFunction<BoundedWindow, String>() {
public String apply(BoundedWindow window) {
// The cast below is safe because CalendarWindows.days(1) produces IntervalWindows.
String dayString = DateTimeFormat.forPattern("yyyy_MM_dd")
.withZone(DateTimeZone.UTC)
.print(((IntervalWindow) window).start());
return "my-project:output.output_table_" + dayString;
}
}));
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With