I’m using DatastoreIO from my streaming Dataflow pipeline and getting an error when writing an entity with the same key.
2016-12-10T22:51:04.385Z: Error: (af00222cfd901860): Exception: com.google.datastore.v1.client.DatastoreException: A non-transactional commit may not contain multiple mutations affecting the same entity., code=INVALID_ARGUMENT
If I use a random number in the key then things work but I need to update the same key so is there a transactional way to do this using DataStoreIO?
static class CreateEntityFn extends DoFn<KV<String, Tile>, Entity> {
private static final long serialVersionUID = 0;
private final String namespace;
private final String kind;
CreateEntityFn(String namespace, String kind) {
this.namespace = namespace;
this.kind = kind;
}
public Entity makeEntity(String key, Tile tile) {
Entity.Builder entityBuilder = Entity.newBuilder();
Key.Builder keyBuilder = makeKey(kind, key );
if (namespace != null) {
keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
}
entityBuilder.setKey(keyBuilder.build());
entityBuilder.getMutableProperties().put("tile", makeValue(tile.toString()).build());
return entityBuilder.build();
}
@Override
public void processElement(ProcessContext c) {
String key = c.element().getKey();
// this works key = key.concat(":" + UUID.randomUUID().toString());
c.output(makeEntity(key, c.element().getValue()));
}
}
...
...
inputData = pipeline
.apply(PubsubIO.Read.topic(pubsubTopic));
windowedDataStreaming = inputData
.apply(Window.<String>into(
SlidingWindows.of(Duration.standardMinutes(15))
.every(Duration.standardSeconds(31))));
...
...
...
//Create a Datastore entity
PCollection<Entity> siteTileEntities = tileSiteKeyed
.apply(ParDo.named("CreateSiteEntities").of(new CreateEntityFn(options.getNamespace(), options.getKind())));
// write site tiles to datastore
siteTileEntities
.apply(DatastoreIO.v1().write().withProjectId(options.getDataset()));
// Run the pipeline
pipeline.run();
Your code snippet doesn't explain how tileSiteKeyed
is created. Presumably it's a PCollection<KV<String, Tile>
, but if it might have duplicate String
keys, that would explain the issue.
Generally a PCollection<KV<K, V>>
may contain multiple KV pairs with the same key. If you'd like to ensure unique keys per window, you can use a GroupByKey
to do that. That will give you a PCollection<KV<K, Iterable<V>>>
with unique keys per window. Then augment CreateEntityFn
to take an Iterable<Tile>
and create a single mutation with the changes you need to make.
This error indicates that Cloud Datastore received a Commit
request with two mutations for the same key (i.e. it tries to insert the same entity twice or modify the same entity twice).
You can avoid the error by only including one mutation per key per Commit
request.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With