I am trying to execute a dataflow pipeline job which would execute one function on N entries at a time from datastore. In my case this function is sending batch of 100 entries to some REST service as payload. This means that I want to go through all entries from one datastore entity, and send 100 batched entries at once to some outside REST service.
My current solution
Above described scenario in pseudo code (ignoring details):
final int BATCH_SIZE = 100;
// 1. Read input from datastore
pipeline.apply(DatastoreIO.readFrom(datasetId, query))
// 2. create keys to be used in group by so we get iterator in next task
.apply(ParDo.of(new DoFn<DatastoreV1.Entity, KV<String, EntryPOJO>>() {
@Override
public void processElement(ProcessContext c) throws Exception {
String key = generateKey(c);
EntryPOJO entry = processEntity(c);
c.output(KV.of(key, entry));
}
}))
// 3. Group by key
.apply(GroupByKey.create())
// 4. Programatically batch users
.apply(ParDo.of(new DoFn<KV<String, Iterable<EntryPOJO>>() {
@Override
public void processElement(ProcessContext c) throws Exception {
List<EntryPOJO> batchedEntries = new ArrayList<>();
for (EntryPOJO entry : c.element().getValue()) {
if (batchedEntries.size() >= BATCH_SIZE) {
sendToRESTEndpoint(batchedEntries);
batchedEntries = new ArrayList<>();
}
batchedEntries.add(entry);
}
sendToRESTEndpoint(batchedEntries);
}
}));
Main problem with my current solution
GroupByKey blocks the execution of last ParDo (blocks step no.4) until all entries are assigned to a key.
Solution generally works, but I would like to do everything in parallel (send batch of 100 entries to the REST endpoint immediately after they are loaded from datastore), which is not possible with my current solution, since GroupByKey doesn't output any data until every entry from database is fetched and inserted into key-value pair. So execution is actually in 2 steps: 1. Fetch all the data from datastore and assign it a key, 2. Process entries as batch
Question
So what I would like to know is if there is some existing feature to be able to do this. Or at least to get Iterable without GroupByKey step so that batching function task doesn't need to wait for data to be dumped.
A basic understanding of Apache Beam and Google Cloud Dataflow is beneficial. Basically, you have five options to launch the pipeline: Locally, this option is suitable for development purposes. Directly in Dataflow, in case you wish to run the pipeline once. In the Dataflow UI. With the gcloud Command. Via Cloud Dataflow API.
Second Challenge: Working with Dataflow: Dataflow is one of the biggest services offered by Google to transform and manipulate data with support for stream and batch processing.
In the Dataflow UI. With the gcloud Command. Via Cloud Dataflow API. Please note that for the last 3 options you need to create a Dataflow template. To create a Dataflow template you have to use the add_value_provider_argument ValueProvider. This is how we can pass arguments to our Dataflow pipeline.
To create a Dataflow template you have to use the add_value_provider_argument ValueProvider. This is how we can pass arguments to our Dataflow pipeline. In our case, for example, the path to the csv file. help='Local file or a file in a Google Storage Bucket.')
You can batch these elements up within your DoFn
. For example:
final int BATCH_SIZE = 100;
pipeline
// 1. Read input from datastore
.apply(DatastoreIO.readFrom(datasetId, query))
// 2. Programatically batch users
.apply(ParDo.of(new DoFn<DatastoreV1.Entity, Iterable<EntryPOJO>>() {
private final List<EntryPOJO> accumulator = new ArrayList<>(BATCH_SIZE);
@Override
public void processElement(ProcessContext c) throws Exception {
EntryPOJO entry = processEntity(c);
accumulator.add(c);
if (accumulator.size() >= BATCH_SIZE) {
c.output(accumulator);
accumulator = new ArrayList<>(BATCH_SIZE);
}
}
@Override
public void finishBundle(Context c) throws Exception {
if (accumulator.size() > 0) {
c.output(accumulator);
}
}
});
// 3. Consume those bundles
.apply(ParDo.of(new DoFn<Iterable<EntryPOJO>, Object>() {
@Override
public void processElement(ProcessContext c) throws Exception {
sendToRESTEndpoint(batchedEntries);
}
}));
You could also combine steps 2 and 3 in a single DoFn
if you didn't want the separate "batching" step.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With