We are currently working on a streaming pipeline on Apache Beam with DataflowRunner. We are reading messages from Pub/Sub and do some processing on them and afterwards we window them in slidings windows (currently the window size is 3 seconds and the interval is 3 seconds as well). Once the window is fired we do some post-processing on the elements inside the window. This post-processing step is significantly larger than the window size, it takes about 15 seconds.
The apache beam code of the pipeline:
input = ( pipeline | beam.io.ReadFromPubSub(subscription=<subscription_path>)
| beam.Map(process_fn))
windows = input | beam.WindowInto(beam.window.SlidingWindows(3, 3),
trigger=AfterCount(30),
accumulation_mode = AccumulationModel.DISCARDING)
group = windows | beam.GroupByKey()
group | beam.Map(post_processing_fn)
As you know, Dataflow tries to perform some optimizations on your pipeline steps. In our case it fusions everything together from the windowing onwards (clustered operations: 1/ processing 2/ windowing + post-processing) which is causing a slow sequential post-processing of all the windows by just 1 worker. We see logs every 15 seconds that the pipeline is processing the next window. However, we would like to have multiple workers picking up separate windows instead of the workload going to a single worker.
Therefore we were looking for ways to prevent this fusion from happening so Dataflow separates the window from post-processing of the windows. In that way we would expect Dataflow to be able to assign multiple workers again to the post-processing of fired windows.
What we have tried so far:
The last two actions indeed created a third clustered operation (1/ processing 2/ windowing 3/ post-processing ) but we noticed that still the same worker is executing everything after the windowing.
Is there any solution that can resolve this problem statement?
The current workaround we are now considering is to build another streaming pipeline which receives the windows so these worker can process the windows in parallel but it is cumbersome..
You have done the right thing to break fusion in your elements. I suspect there may be an issue getting you into trouble.
For streaming, a single key always gets processed in the same worker. By any chance, are all or most of your records assigned to a single key? If so, your processing will be done in a single worker.
Something that you can do to prevent this is to make the window a part of the key, so that the elements for multiple windows can be processed in different workers even though they have the same key:
class KeyIntoKeyPlusWindow(core.DoFn):
def process(self, element, window=core.DoFn.WindowParam):
key, values = element
yield ((key, window), element)
group = windows | beam.ParDo(KeyIntoKeyPlusWindow() | beam.GroupByKey()
And once you've done that, you can apply your post-processing:
group | beam.Map(post_processing_fn)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With