From: https://cloud.google.com/dataflow/service/dataflow-service-desc#preventing-fusion
You can insert a GroupByKey and ungroup after your first ParDo. The Dataflow service never fuses ParDo operations across an aggregation.
This is what I came up with in python - is this reasonable / is there a simpler way?
def prevent_fuse(collection):
return (
collection
| beam.Map(lambda x: (x, 1))
| beam.GroupByKey()
| beam.FlatMap(lambda x: (x[0] for v in x[1]))
)
EDIT, in response to Ben Chambers' question
We want to prevent fusion because we have a collection which generates a much larger collection, and we need parallelization across the larger collection. If it fuses, I only get one worker across the larger collection.
Apache Beam SDK 2.3.0 adds the experimental Reshuffle
transform, which is the Python alternative to the Reshuffle.viaRandomKey
operation mentioned by @BenChambers. You can use it in place of your custom prevent_fuse
code.
That should work. There are other ways, but they partly depend on what you are trying to do and why you want to prevent fusion. Keep in mind that fusion is an important optimization to improve the performance of your pipeline.
Could you elaborate on why you want to prevent fusion?
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With