I am reading a file from Google Storage in Beam using a process that looks something like this:
data = pipeline | beam.Create(['gs://my/file.pkl']) | beam.ParDo(LoadFileDoFn)
Where LoadFileDoFn
loads the file and creates a Python list of objects from it, which ParDo
then returns as a PCollection
.
I know I could probably implement a custom source to achieve something similar, but this answer and Beam's own documentation indicate that this approach with pseudo-dataset to read via ParDo
is not uncommon and custom sources may be overkill.
It also works - I get a PCollection
with the correct number of elements, which I can process as I like! However..
The resulting PCollection
does not autoscale at all on Cloud Dataflow. I first have to transform it via:
shuffled_data = data | beam.Shuffle()
I know this answer I also linked above explains pretty much this process - but it doesn't give any insight as to why this is necessary. As far as I can see at Beam's very high level of abstraction, I have a PCollection
with N elements before the shuffle and a similar PCollection
after the shuffle. Why does one scale, but the other not?
The documentation is not very helpful in this case (or in general, but that's another matter). What hidden attribute does the first PCollection
have that prevents it from being distributed to multiple workers that the other doesn't have?
When you read via Create you are creating a PCollection that is bound to 1 worker. Since there are no keys associated with items there is no mechanism to distribute the work. Shuffle() will create a K,V under neath the covers and then shuffle which enables the PCollection items to be distributed to new workers as they spin up. You verify this behavior by turning off auto-scaling and fixing the worker size say to 25 - without the Shuffle you will only see 1 worker doing work.
Another way to distribute this work when Creating/Reading would be to build your own custom I/O for reading PKL files1. You'd create the appropriate splitter; however, not knowing what you have pickled it may not be splittable. IMO Shuffle() is a safe bet, modulo you having optimization to gain by writing a splittable reader.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With