Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Datastore poor performance with Apache Beam & Dataflow

I'm having huge performance issues with Datastore write speed. Most of the time it stays under 100 elements/s.

I was able to achieve the speeds of around 2600 elements/s when bench marking the write speed on my local machine using the datastore client (com.google.cloud:google-cloud-datastore) and running the batched writes in parallel.

I've set up a simple Apache Beam pipeline using the Java API. Here's it's graph:

full-pipeline

Here's speed when running without the Datastore node:

pipeline-without-datastore-write

It is much faster this way. It all points to DatastoreV1.Write to be a bottleneck in this pipeline - judging by both the speed of pipeline without the write node and the wall time of the DatastoreV1.Write compared to the wall time of the other nodes.


Approaches I've tried to solve this:

• Increasing the number of initial workers (tried 1 and 10, with no noticeable difference). Datastore decreases number of writes to 1 after some time (probably after the first 2 nodes finish processing). Based on that I suspect that DatastoreIO.v1().write() does not run its workers in parallel. Why though?

pipeline-log-workers

• Making sure everything is being run in the same location: the GCP project, dataflow pipeline workers & metadata, storage - all are set to us-central. This is suggested here

• Trying to use different entity key generation strategies (per this post ). Currently using this approach: Key.Builder keyBuilder = DatastoreHelper.makeKey("someKind", UUID.randomUUID().toString());. I'm not perfectly certain this generates keys distributed evenly enough but I guess even if it doesn't the performance should not be so low?


Please note, I was unable to use the provided Apache Beam & Google libraries without workarounds: I've had to force google-api-client version to be 1.22.0 & Guava to be 23.0 due to their dependency issues (see for example https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/607 ).

Looking at the DatastoreV1.Write node log:

datastore-log-write

It pushes batches of 500 entities every circa 5s which is not very fast.

Overall it looks like DatastoreIO.v1().write() speed is slow and its workers are not being run in parallel. Any idea how to fix this or what could be the cause?

like image 758
tpx Avatar asked Jan 22 '18 17:01

tpx


People also ask

What is PCollection and PTransform in dataflow?

A PCollection can contain either a bounded or unbounded number of elements. Bounded and unbounded PCollections are produced as the output of PTransforms (including root PTransforms like Read and Create ), and can be passed as the inputs of other PTransforms.

What is PCollection in Apache Beam?

Each pipeline represents a single, repeatable job. A PCollection represents a potentially distributed, multi-element dataset that acts as the pipeline's data. Apache Beam transforms use PCollection objects as inputs and outputs for each step in your pipeline.

What is ParDo in Apache Beam?

ParDo is the core element-wise transform in Apache Beam, invoking a user-specified function on each of the elements of the input PCollection to produce zero or more output elements, all of which are collected into the output PCollection .

How many times will be the process Processelement method of a DoFn called?

Example 1: ParDo with a simple DoFn The process method is called once per element, and it can yield zero or more output elements.


1 Answers

I should not leave this question unanswered.

After reaching out to the GCP support I've been provided a suggestion that the cause could be the TextIO.Read node reading from the compressed (gzipped) files. Apparently this is a non-parallelizable operation. Indeed, after switching to uncompressed files for the source the performance improved.

Another solution suggested was to run manual reparitioning of the pipeline after reading from the source. This means adding arbitrary keys to the items in the pipeline, grouping by the arbitrary key and then removing the arbitrary key. It works as well. This approach comes down to this code:

Pipeline code:

pipeline.apply(TextIO.read().from("foo").withCompression(Compression.GZIP)  
        .apply(ParDo.of(new PipelineRepartitioner.AddArbitraryKey<>()))
        .apply(GroupByKey.create())
        .apply(ParDo.of(new PipelineRepartitioner.RemoveArbitraryKey<>()))
        /* further transforms */ 

The helper class:

public class PipelineRepartitioner<T> {
    public static class AddArbitraryKey<T> extends DoFn<T, KV<Integer, T>> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            c.output(KV.of(ThreadLocalRandom.current().nextInt(), c.element()));
        }
    }

    public static class RemoveArbitraryKey<T> extends DoFn<KV<Integer, Iterable<T>>, T> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            for (T s : c.element().getValue()) {
                c.output(s);
            }
        }
    }
}

I've seen tickets related to that issue on Apache Beam Jira so this could be fixed in the future.

like image 89
tpx Avatar answered Oct 10 '22 02:10

tpx