Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Can datastore input in google dataflow pipeline be processed in a batch of N entries at a time?

I am trying to execute a dataflow pipeline job which would execute one function on N entries at a time from datastore. In my case this function is sending batch of 100 entries to some REST service as payload. This means that I want to go through all entries from one datastore entity, and send 100 batched entries at once to some outside REST service.

My current solution

  1. Read input from datastore
  2. Create as many keys as there are workers specified in pipeline options (1 worker = 1 key).
  3. Group by key, so that we get iterator as output (iterator input in step no.4)
  4. Programatically batch users in temporary list and send them as a batch to the REST endpoint.

Above described scenario in pseudo code (ignoring details):

final int BATCH_SIZE = 100;

// 1. Read input from datastore
pipeline.apply(DatastoreIO.readFrom(datasetId, query))

    // 2. create keys to be used in group by so we get iterator in next task
    .apply(ParDo.of(new DoFn<DatastoreV1.Entity, KV<String, EntryPOJO>>() {
        @Override
        public void processElement(ProcessContext c) throws Exception {
            String key = generateKey(c);
            EntryPOJO entry = processEntity(c);
            c.output(KV.of(key, entry));
        }
    }))

    // 3. Group by key
    .apply(GroupByKey.create())

    // 4. Programatically batch users
    .apply(ParDo.of(new DoFn<KV<String, Iterable<EntryPOJO>>() {
        @Override
        public void processElement(ProcessContext c) throws Exception {
            List<EntryPOJO> batchedEntries = new ArrayList<>();
            for (EntryPOJO entry : c.element().getValue()) {
                if (batchedEntries.size() >= BATCH_SIZE) {
                    sendToRESTEndpoint(batchedEntries);
                    batchedEntries = new ArrayList<>();
                }
                batchedEntries.add(entry);
            }
            sendToRESTEndpoint(batchedEntries);
        }
    }));

Main problem with my current solution

GroupByKey blocks the execution of last ParDo (blocks step no.4) until all entries are assigned to a key.

Solution generally works, but I would like to do everything in parallel (send batch of 100 entries to the REST endpoint immediately after they are loaded from datastore), which is not possible with my current solution, since GroupByKey doesn't output any data until every entry from database is fetched and inserted into key-value pair. So execution is actually in 2 steps: 1. Fetch all the data from datastore and assign it a key, 2. Process entries as batch

Question

So what I would like to know is if there is some existing feature to be able to do this. Or at least to get Iterable without GroupByKey step so that batching function task doesn't need to wait for data to be dumped.

like image 227
Nejc Sever Avatar asked Jan 28 '16 15:01

Nejc Sever


People also ask

How to launch a pipeline in Google Cloud Dataflow?

A basic understanding of Apache Beam and Google Cloud Dataflow is beneficial. Basically, you have five options to launch the pipeline: Locally, this option is suitable for development purposes. Directly in Dataflow, in case you wish to run the pipeline once. In the Dataflow UI. With the gcloud Command. Via Cloud Dataflow API.

What is Google dataflow and how to use it?

Second Challenge: Working with Dataflow: Dataflow is one of the biggest services offered by Google to transform and manipulate data with support for stream and batch processing.

How to pass arguments to a dataflow pipeline?

In the Dataflow UI. With the gcloud Command. Via Cloud Dataflow API. Please note that for the last 3 options you need to create a Dataflow template. To create a Dataflow template you have to use the add_value_provider_argument ValueProvider. This is how we can pass arguments to our Dataflow pipeline.

How to create a dataflow template with arguments?

To create a Dataflow template you have to use the add_value_provider_argument ValueProvider. This is how we can pass arguments to our Dataflow pipeline. In our case, for example, the path to the csv file. help='Local file or a file in a Google Storage Bucket.')


1 Answers

You can batch these elements up within your DoFn. For example:

final int BATCH_SIZE = 100;

pipeline
  // 1. Read input from datastore  
  .apply(DatastoreIO.readFrom(datasetId, query))

  // 2. Programatically batch users
  .apply(ParDo.of(new DoFn<DatastoreV1.Entity, Iterable<EntryPOJO>>() {

    private final List<EntryPOJO> accumulator = new ArrayList<>(BATCH_SIZE);

    @Override
    public void processElement(ProcessContext c) throws Exception {
      EntryPOJO entry = processEntity(c);
      accumulator.add(c);
      if (accumulator.size() >= BATCH_SIZE) {
        c.output(accumulator);
        accumulator = new ArrayList<>(BATCH_SIZE);
      }
    }

    @Override
    public void finishBundle(Context c) throws Exception {
      if (accumulator.size() > 0) {
        c.output(accumulator);
      }
    }
  });

  // 3. Consume those bundles
  .apply(ParDo.of(new DoFn<Iterable<EntryPOJO>, Object>() {
    @Override
    public void processElement(ProcessContext c) throws Exception {
        sendToRESTEndpoint(batchedEntries);
    }
  }));

You could also combine steps 2 and 3 in a single DoFn if you didn't want the separate "batching" step.

like image 84
Ben Chambers Avatar answered Oct 16 '22 14:10

Ben Chambers