Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why do I need to shuffle my PCollection for it to autoscale on Cloud Dataflow?

Context

I am reading a file from Google Storage in Beam using a process that looks something like this:

data = pipeline | beam.Create(['gs://my/file.pkl']) | beam.ParDo(LoadFileDoFn)

Where LoadFileDoFn loads the file and creates a Python list of objects from it, which ParDo then returns as a PCollection.

I know I could probably implement a custom source to achieve something similar, but this answer and Beam's own documentation indicate that this approach with pseudo-dataset to read via ParDo is not uncommon and custom sources may be overkill.

It also works - I get a PCollection with the correct number of elements, which I can process as I like! However..

Autoscaling problems

The resulting PCollection does not autoscale at all on Cloud Dataflow. I first have to transform it via:

shuffled_data = data | beam.Shuffle()

I know this answer I also linked above explains pretty much this process - but it doesn't give any insight as to why this is necessary. As far as I can see at Beam's very high level of abstraction, I have a PCollection with N elements before the shuffle and a similar PCollection after the shuffle. Why does one scale, but the other not?

The documentation is not very helpful in this case (or in general, but that's another matter). What hidden attribute does the first PCollection have that prevents it from being distributed to multiple workers that the other doesn't have?

like image 981
bstr Avatar asked Dec 21 '18 13:12

bstr


1 Answers

When you read via Create you are creating a PCollection that is bound to 1 worker. Since there are no keys associated with items there is no mechanism to distribute the work. Shuffle() will create a K,V under neath the covers and then shuffle which enables the PCollection items to be distributed to new workers as they spin up. You verify this behavior by turning off auto-scaling and fixing the worker size say to 25 - without the Shuffle you will only see 1 worker doing work.

Another way to distribute this work when Creating/Reading would be to build your own custom I/O for reading PKL files1. You'd create the appropriate splitter; however, not knowing what you have pickled it may not be splittable. IMO Shuffle() is a safe bet, modulo you having optimization to gain by writing a splittable reader.

like image 127
Eric Schmidt Avatar answered Oct 07 '22 21:10

Eric Schmidt