Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Inputs to Flatten had incompatible window windowFns when CoGroupByKey with CalendarWindows

TL;DR:

How can I CoGroupByKey a group of PCollections with the same windowing strategy set with CalendarWindows?

LONG VERSION

I'm writing a dataflow pipeline that reads from two different pub/subs, one of the PCollections is split into a PCollectionTuple and finally I try to join them in a CoGroupByKey before saving it in BigQuery.

During testing of the Pipeline my windowing strategy for my PCollections was:

private static PCollection<KV<String, Long>> applyWindowsAndCount(final PCollection<KV<String, Long>> summary, final String OperationName){
    return summary
            .apply("Apply Windows " + OperationName, Window
                    .<KV<String, Long>>into(FixedWindows.of(Duration.standardMinutes(1))) 
                    .discardingFiredPanes()
                    .withAllowedLateness(Duration.standardDays(1)))
            .apply("Count " + OperationName, Count.perKey());
}

I set them with FixedWindow of 1 minute length in order to get results fast.

My grouping was like:

private static PCollection<KV<String, CoGbkResult>> MergeSummary(PCollection<KV<String, Long>> Avail, PCollection<KV<String, Long>> ValuationOK, PCollection<KV<String, Long>> ValuationKO){
    return KeyedPCollectionTuple.of(Util.AVAIL, Avail)
                                .and(Util.VALUATION_OK, ValuationOK)
                                .and(Util.VALUATION_KO, ValuationKO)
                                .apply("Merge Summary", CoGroupByKey.create());
}

When I tested both on local and on cloud it runs smoothly, however, as I set the windowing with the real production values , CalendarWindows of 1 day length as below:

private static PCollection<KV<String, Long>> applyWindowsAndCount(final PCollection<KV<String, Long>> summary, final String OperationName){
        return summary
                .apply("Apply Windows " + OperationName, Window
                                .<KV<String, Long>>into(CalendarWindows.days(1).withTimeZone(DateTimeZone.UTC).withStartingDay(2016,9,20)) //Per day windowing.                                    
                                .discardingFiredPanes()
                                .withAllowedLateness(Duration.standardDays(1))) //Accepts X days late data.
                .apply("Count " + OperationName, Count.perKey());
    }

Then I can't even compile the code, as I get a message like:

Exception in thread "main" java.lang.IllegalStateException: Inputs to Flatten had incompatible window windowFns: com.google.cloud.dataflow.sdk.transforms.windowing.CalendarWindows$DaysWindows@6af9fcb2, com.google.cloud.dataflow.sdk.transforms.windowing.CalendarWindows$DaysWindows@6cce16f4
at com.google.cloud.dataflow.sdk.transforms.Flatten$FlattenPCollectionList.apply(Flatten.java:121)
at com.google.cloud.dataflow.sdk.transforms.Flatten$FlattenPCollectionList.apply(Flatten.java:105)
at com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:74)
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:413)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:367)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:274)
at com.google.cloud.dataflow.sdk.values.PCollectionList.apply(PCollectionList.java:175)
at com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey.apply(CoGroupByKey.java:124)
at com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey.apply(CoGroupByKey.java:74)
at com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:74)
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:413)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:367)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:290)
at com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple.apply(KeyedPCollectionTuple.java:116)

Reading on the documentation I found this:

When using CoGroupByKey to group PCollections that have a windowing strategy applied, all of the PCollections you want to group must use the same windowing strategy and window sizing. For example, all the collections you're merging must use (hypothetically) identical 5-minute fixed windows or 4-minute sliding windows starting every 30 seconds.

If your pipeline attempts to use CoGroupByKey to merge PCollections with incompatible windows, Dataflow will generate an IllegalStateException error when your pipeline is constructed.

Is it clear that dataflow considers that my PCollections have incompatible windows, however, all of them are applied using the function I copied before. So, how can I CoGroupByKey a group of PCollections with the same windowing strategy set with CalendarWindows?

like image 775
Manuel Alvarez Avatar asked Nov 09 '22 08:11

Manuel Alvarez


1 Answers

Looks like this a bug in CalendarWindows; to work around it you can create a single CalendarWindows object, and use that as the WindowFn for each PCollection instead of creating separate CalendarWindows objects for each one.

like image 196
danielm Avatar answered Dec 20 '22 21:12

danielm