We want to improve the costs of running a specific Apache Beam pipeline (Python SDK) in GCP Dataflow.
We have built a memory-intensive Apache Beam pipeline, which requires approximately 8.5 GB of RAM to be run on each executor. A large machine learning model is currently loaded in a transformation DoFn.setup
method so we can precompute recommendations for a few millions of users.
The existing GCP Compute Engine machine types either have a lower memory/vCPU ratio than we require (up to 8GB RAM per vCPU) or a much higher proportion (24GB RAM per vCPU): https://cloud.google.com/compute/docs/machine-types#machine_type_comparison
We have successfully run this pipeline by using the GCP m1-ultramem-40
machine type. However, the hardware usage - and therefore, the costs - were sub-optimal. This machine type has a ratio of 24 GB RAM per vCPU. When using it to run the said pipeline, the VMs used less than 36% of the memory available - but, as expected, we paid for it all.
When attempting to run the same pipeline using a custom-2-13312
machine type (2 vCPU and 13 GB RAM), Dataflow crashed, with the error:
Root cause: The worker lost contact with the service.
While monitoring the Compute Engine instances running the Dataflow job, it was clear that they were running out of memory. Dataflow tried to load the model in memory twice - once per vCPU - but the available memory was only enough for one.
If we were able to inform Apache Beam/Dataflow that a particular transformation requires a specific amount of memory, the problem would be solved. But we didn't manage to find a way of achieving this.
The other solution we could think of was to try to change the ratio of Dataflow executors per Compute Engine VM. This would allow us to find a ratio in which we would waste as little vCPU as possible while respecting the pipeline memory requirements. While using the previously mentioned custom-2-13312
machine type, we attempted to run the pipeline using the following configurations:
--number_of_worker_harness_threads=1 --experiments=use_runner_v2
--experiments=no_use_multiple_sdk_containers --experiments=beam_fn_api
--sdk_worker_parallelism=1
When using (1), we managed to have a single thread, but Dataflow spawned two Python executor processes per VM. It resulted in the pipeline crashing as there was an attempt of loading the model to memory twice when there was enough space for only one.
When using (2), a single Python process was spawn per VM, but it ran using two threads. Each of those threads tried to load the model, and the VM runs out of memory. Approach (3) had a very similar outcome to (1) and (2).
It was not possible to combine multiple of these configurations.
Would there be a (set of) configuration(s) which would allow us to have control on the number of executors of Dataflow per VM?
Are there any other alternatives to reducing the costs which we might not have though of?
Keeping GCP Services within the same Region This is a very common mistake we all make while creating other GCP services. Try to keep them in the same region to avoid ingress/egress costs. An eg. can be the source files are in a bucket which is in a different region where the dataflow job is running.
Dataproc is a Google Cloud product with Data Science/ML service for Spark and Hadoop. In comparison, Dataflow follows a batch and stream processing of data. It creates a new pipeline for data processing and resources produced or removed on-demand.
Dataflow enables fast, simplified streaming data pipeline development with lower data latency.
Dataflow has two data pipeline types: streaming and batch. Both types of pipelines run jobs that are defined in Dataflow templates. A streaming data pipeline runs a Dataflow streaming job immediately after it is created. A batch data pipeline runs a Dataflow batch job on a user-defined schedule.
We are working on long-term solutions to these problems, but here is a tactical fix that should prevent the model duplication that you saw in approaches 1 and 2:
Share the model in a VM across workers, to avoid it being duplicated in each worker. Use the following utility (https://github.com/apache/beam/blob/master/sdks/python/apache_beam/utils/shared.py), which is available out of the box in Beam 2.24 If you are using an earlier version of Beam, copy just the shared.py to your project and use it as user code.
I don't think that at this moment there's an option to control the number of executors per VM, it seems that the closest that you will get there is by using the option (1) and assume a Python executor per core.
Option (1)
--number_of_worker_harness_threads=1 --experiments=use_runner_v2
To compensate on the cpu-mem ratio you need, I'd suggest using custom machines with extended memory. This approach should be more cost-effective.
For example, the cost of a running a single executor and a single thread on a n1-standard-4
machine (4 CPUs - 15GB) will be roughly around 30% more expensive than running the same workload using a custom-1-15360-ext
(1 CPU - 15GB) custom machine.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With