Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is it possible to dynamically generate BigQuery table names based on the timestamps of the elements of a window?

For example, if I have a Dataflow streaming job with 5 minutes window that reads from PubSub, I understand that if I assign a two days past timestamp to an element, there will be a window with this element, and if I use the example that outputs daily tables to BigQuery described in BigQueryIO.java, the job will write the two days past element in a BigQuery table with the actual date.

I would like to write past elements to BigQuery tables with the timestamp of the elements of the window instead of the time of the current window, is it possible?

Now I'm following the example described in DataflowJavaSDK/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java:

    PCollection<TableRow> quotes = ...
    quotes.apply(Window.<TableRow>info(CalendarWindows.days(1)))
       .apply(BigQueryIO.Write
         .named("Write")
         .withSchema(schema)
         .to(new SerializableFunction<BoundedWindow, String>() {
               public String apply(BoundedWindow window) {
                 String dayString = DateTimeFormat.forPattern("yyyy_MM_dd").parseDateTime(
                   ((DaysWindow) window).getStartDate());
                 return "my-project:output.output_table_" + dayString;
               }
             }));
like image 974
bsmarcosj Avatar asked Aug 24 '15 08:08

bsmarcosj


2 Answers

If I understand correctly, you would like to make sure that BigQuery tables are created according to inherent timestamps of the elements (quotes), rather than wall-clock time when your pipeline runs.

TL;DR the code should already do what you want; if it's not, please post more details.

Longer explanation: One of the key innovations in processing in Dataflow is event-time processing. This means that data processing in Dataflow is almost completely decoupled from when the processing happens - what matters is when the events being processed happened. This is a key element of enabling exactly the same code to run on batch or streaming data sources (e.g. processing real-time user click events using the same code that processes historical click logs). It also enables flexible handling of late-arriving data.

Please see The world beyond batch, the section "Event time vs. processing time" for a description of this aspect of Dataflow's processing model (the whole article is very much worth a read). For a deeper description, see the VLDB paper. This is also described in a more user-facing way in the official documentation on windowing and triggers.

Accordingly, there is no such thing as a "current window" because the pipeline may be concurrently processing many different events that happened at different times and belong to different windows. In fact, as the VLDB paper points out, one of the important parts of the execution of a Dataflow pipeline is "group elements by window".

In the pipeline you showed, we will group the records you want to write to BigQuery into windows using provided timestamps on the records, and write each window to its own table, creating the table for newly encountered windows if necessary. If late data arrives into the window (see documentation on windowing and triggers for a discussion of late data), we will append to the already existing table.

like image 86
jkff Avatar answered Nov 04 '22 00:11

jkff


The abovementioned code did not work for me anymore. There is an updated example in the Google docs though where DaysWindow is replaced by IntervalWindow which worked for me:

 PCollection<TableRow> quotes = ...
 quotes.apply(Window.<TableRow>into(CalendarWindows.days(1)))
   .apply(BigQueryIO.Write
     .named("Write")
     .withSchema(schema)
     .to(new SerializableFunction<BoundedWindow, String>() {
       public String apply(BoundedWindow window) {
         // The cast below is safe because CalendarWindows.days(1) produces IntervalWindows.
         String dayString = DateTimeFormat.forPattern("yyyy_MM_dd")
              .withZone(DateTimeZone.UTC)
              .print(((IntervalWindow) window).start());
         return "my-project:output.output_table_" + dayString;
       }
     }));
like image 1
Matthias Baetens Avatar answered Nov 04 '22 02:11

Matthias Baetens