Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use transactional DatastoreIO

I’m using DatastoreIO from my streaming Dataflow pipeline and getting an error when writing an entity with the same key.

2016-12-10T22:51:04.385Z: Error:   (af00222cfd901860): Exception: com.google.datastore.v1.client.DatastoreException: A non-transactional commit may not contain multiple mutations affecting the same entity., code=INVALID_ARGUMENT

If I use a random number in the key then things work but I need to update the same key so is there a transactional way to do this using DataStoreIO?

static class CreateEntityFn extends DoFn<KV<String, Tile>, Entity> {
  private static final long serialVersionUID = 0;

  private final String namespace;
  private final String kind;

  CreateEntityFn(String namespace, String kind) {
    this.namespace = namespace;
    this.kind = kind;
  }

  public Entity makeEntity(String key, Tile tile) {
    Entity.Builder entityBuilder = Entity.newBuilder();
    Key.Builder keyBuilder = makeKey(kind, key );
    if (namespace != null) {
      keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
    }
    entityBuilder.setKey(keyBuilder.build());
    entityBuilder.getMutableProperties().put("tile", makeValue(tile.toString()).build()); 
    return entityBuilder.build();   
  }

  @Override
  public void processElement(ProcessContext c) { 
    String key = c.element().getKey();
    // this works    key = key.concat(":" + UUID.randomUUID().toString());
    c.output(makeEntity(key, c.element().getValue()));
  }
}

...

...
 inputData = pipeline
                .apply(PubsubIO.Read.topic(pubsubTopic));
 windowedDataStreaming = inputData
                .apply(Window.<String>into(
                      SlidingWindows.of(Duration.standardMinutes(15))
                                    .every(Duration.standardSeconds(31))));


                             ...
                             ...
                             ...
 //Create a Datastore entity 
 PCollection<Entity> siteTileEntities = tileSiteKeyed
      .apply(ParDo.named("CreateSiteEntities").of(new CreateEntityFn(options.getNamespace(), options.getKind())));       

 // write site tiles to datastore
 siteTileEntities
      .apply(DatastoreIO.v1().write().withProjectId(options.getDataset()));

 // Run the pipeline
 pipeline.run(); 
like image 569
Melissa Stockman Avatar asked Dec 10 '16 23:12

Melissa Stockman


2 Answers

Your code snippet doesn't explain how tileSiteKeyed is created. Presumably it's a PCollection<KV<String, Tile>, but if it might have duplicate String keys, that would explain the issue.

Generally a PCollection<KV<K, V>> may contain multiple KV pairs with the same key. If you'd like to ensure unique keys per window, you can use a GroupByKey to do that. That will give you a PCollection<KV<K, Iterable<V>>> with unique keys per window. Then augment CreateEntityFn to take an Iterable<Tile> and create a single mutation with the changes you need to make.

like image 56
Frances Avatar answered Nov 05 '22 20:11

Frances


This error indicates that Cloud Datastore received a Commit request with two mutations for the same key (i.e. it tries to insert the same entity twice or modify the same entity twice).

You can avoid the error by only including one mutation per key per Commit request.

like image 2
Ed Davisson Avatar answered Nov 05 '22 19:11

Ed Davisson