I would like to join multiple streams on a common key and trigger a result either as soon as all of the streams have contributed at least one element or at the end of the window. CoGroupByKey seems to be the appropriate building block, but there does not seem to be a way to express the early trigger condition (count trigger applies per input collection)?
I believe CoGroupByKey
is implemented as Flatten + GroupByKey
under the hood. Once multiple streams are flattened into one, data-driven trigger (or any other triggers) won't have enough control to achieve what you want.
Instead of using CoGroupByKey
, you can use Flatten
and StatefulDoFn
that fills an object backed by State
for each key. Also in this case, StatefulDoFn
would have the chance to decide what to do when stream A has 2 elements arrived but stream B doesn't have any element yet.
Another potential solution that comes to mind is (a stateless) DoFn that filters the CoGBK results to remove those that don't have at least one occurrence for each joined stream. For the end of window result (which does not have the same restriction), it would then be necessary to have a parallel CoGBK and its result would not go through the filter. I don't think there is a way to tag results with the trigger that emitted it?
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With