I want to use the mapreduce library to update all entities that satisfy a query. There are a couple of complications:
I know how to upload the csv file to Blobstore and read each row using a Blobstore input reader. I am also aware of the Datastore input reader that gets entities using a query.
My question is how can I create a Mapper class that reads input data from the Blobstore, fetches the datastore entities and updates them as efficiently as possible?
Given that the list of possible values for property1 is long, using a query to filter doesn't seem like a good option (because you would need to use a IN filter, which actually runs one query per value)
An alternative using MR would be to load your CSV into memory using a Map (from property1 to property2), and then fire a MR job that iterates all entities, and if their property1 is part of the Keys on the Map, modify it using the mapped value.
As @Ryan B says, you don't need to use MR for this if you just want to take advantage of batch puts, as you can use an Iterable
to put using the DatastoreService.
You can use a DatastoreInputReader, and in the map function, find out if the property1 is actually in the csv: Reading from a csv each time would be very slow, what you can do is use memcache to provide that info after it is read just once from it's own Datastore model. To populate the datastore model, I would recommend using property1 as the custom Id of each row, that way, querying it is straight forward. You would only update the Datastore for those values that actually change and use mutation pool to make it performant (op.db.Put()). I leave you pseudo code (sorry... I only have it in python) of how the different pieces would look like, I further recommend you reading this article on Mapreduce on Google App Engine: http://sookocheff.com/posts/2014-04-15-app-engine-mapreduce-api-part-1-the-basics/
#to get the to_dict method
from google.appengine.ext import ndb
from mapreduce import operation as op
from mapreduce.lib import pipeline
from mapreduce import mapreduce_pipeline
class TouchPipeline(pipeline.Pipeline):
"""
Pipeline to update the field of entities that have certain condition
"""
def run(self, *args, **kwargs):
""" run """
mapper_params = {
"entity_kind": "yourDatastoreKind",
}
yield mapreduce_pipeline.MapperPipeline(
"Update entities that have certain condition",
handler_spec="datastore_map",
input_reader_spec="mapreduce.input_readers.DatastoreInputReader",
params=mapper_params,
shards=64)
class csvrow(ndb.Model):
#you dont store property 1 because you are going to use its value as key
substitutefield=ndb.StringProperty()
def create_csv_datastore():
# instead of running this, make a 10,000 row function with each csv value,
# or read it from the blobstore, iterate and update the values accordingly
for i in range(10000):
#here we are using our own key as id of this row and just storing the other column that
#eventually will be subtitute if it matches
csvrow.get_or_insert('property%s' % i, substitutefield = 'substitute%s').put()
def queryfromcsv(property1):
csvrow=ndb.Key('csvrow', property1).get()
if csvrow:
return csvrow.substitutefield
else:
return property1
def property1InCSV(property1):
data = memcache.get(property1)
if data is not None:
return data
else:
data = self.queryfromcsv(property1)
memcache.add(property1, data, 60)
return data
def datastore_map(entity_type):
datastorepropertytocheck = entity_type.property1
newvalue = property1InCSV(datastorepropertytocheck)
if newvalue!=datastoreproperty:
entity_type.property11 = newvalue
#use the mutation pool
yield op.db.Put(entity)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With