I'm having huge performance issues with Datastore write speed. Most of the time it stays under 100 elements/s.
I was able to achieve the speeds of around 2600 elements/s when bench marking the write speed on my local machine using the datastore client (com.google.cloud:google-cloud-datastore) and running the batched writes in parallel.
I've set up a simple Apache Beam pipeline using the Java API. Here's it's graph:
Here's speed when running without the Datastore node:
It is much faster this way. It all points to DatastoreV1.Write to be a bottleneck in this pipeline - judging by both the speed of pipeline without the write node and the wall time of the DatastoreV1.Write compared to the wall time of the other nodes.
Approaches I've tried to solve this:
• Increasing the number of initial workers (tried 1 and 10, with no noticeable difference). Datastore decreases number of writes to 1 after some time (probably after the first 2 nodes finish processing). Based on that I suspect that DatastoreIO.v1().write() does not run its workers in parallel. Why though?
• Making sure everything is being run in the same location: the GCP project, dataflow pipeline workers & metadata, storage - all are set to us-central. This is suggested here
• Trying to use different entity key generation strategies (per this post ). Currently using this approach: Key.Builder keyBuilder = DatastoreHelper.makeKey("someKind", UUID.randomUUID().toString());
. I'm not perfectly certain this generates keys distributed evenly enough but I guess even if it doesn't the performance should not be so low?
Please note, I was unable to use the provided Apache Beam & Google libraries without workarounds: I've had to force google-api-client version to be 1.22.0 & Guava to be 23.0 due to their dependency issues (see for example https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/607 ).
Looking at the DatastoreV1.Write node log:
It pushes batches of 500 entities every circa 5s which is not very fast.
Overall it looks like DatastoreIO.v1().write() speed is slow and its workers are not being run in parallel. Any idea how to fix this or what could be the cause?
A PCollection can contain either a bounded or unbounded number of elements. Bounded and unbounded PCollections are produced as the output of PTransforms (including root PTransforms like Read and Create ), and can be passed as the inputs of other PTransforms.
Each pipeline represents a single, repeatable job. A PCollection represents a potentially distributed, multi-element dataset that acts as the pipeline's data. Apache Beam transforms use PCollection objects as inputs and outputs for each step in your pipeline.
ParDo is the core element-wise transform in Apache Beam, invoking a user-specified function on each of the elements of the input PCollection to produce zero or more output elements, all of which are collected into the output PCollection .
Example 1: ParDo with a simple DoFn The process method is called once per element, and it can yield zero or more output elements.
I should not leave this question unanswered.
After reaching out to the GCP support I've been provided a suggestion that the cause could be the TextIO.Read node reading from the compressed (gzipped) files. Apparently this is a non-parallelizable operation. Indeed, after switching to uncompressed files for the source the performance improved.
Another solution suggested was to run manual reparitioning of the pipeline after reading from the source. This means adding arbitrary keys to the items in the pipeline, grouping by the arbitrary key and then removing the arbitrary key. It works as well. This approach comes down to this code:
Pipeline code:
pipeline.apply(TextIO.read().from("foo").withCompression(Compression.GZIP)
.apply(ParDo.of(new PipelineRepartitioner.AddArbitraryKey<>()))
.apply(GroupByKey.create())
.apply(ParDo.of(new PipelineRepartitioner.RemoveArbitraryKey<>()))
/* further transforms */
The helper class:
public class PipelineRepartitioner<T> {
public static class AddArbitraryKey<T> extends DoFn<T, KV<Integer, T>> {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(KV.of(ThreadLocalRandom.current().nextInt(), c.element()));
}
}
public static class RemoveArbitraryKey<T> extends DoFn<KV<Integer, Iterable<T>>, T> {
@ProcessElement
public void processElement(ProcessContext c) {
for (T s : c.element().getValue()) {
c.output(s);
}
}
}
}
I've seen tickets related to that issue on Apache Beam Jira so this could be fixed in the future.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With