Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use mapreduce to bulk update datastore entities that satisfy a query?

I want to use the mapreduce library to update all entities that satisfy a query. There are a couple of complications:

  1. The query that finds the entities to update checks if the value of a particular property "property1" is contained in a long list of values (~10000 entries) from a csv file
  2. For each entity satisfying the query, another property "property2" needs to be updated to be equal to the value in the second column and same row of the csv file

I know how to upload the csv file to Blobstore and read each row using a Blobstore input reader. I am also aware of the Datastore input reader that gets entities using a query.

My question is how can I create a Mapper class that reads input data from the Blobstore, fetches the datastore entities and updates them as efficiently as possible?

like image 466
Price Avatar asked Jan 15 '15 07:01

Price


2 Answers

Given that the list of possible values for property1 is long, using a query to filter doesn't seem like a good option (because you would need to use a IN filter, which actually runs one query per value)

An alternative using MR would be to load your CSV into memory using a Map (from property1 to property2), and then fire a MR job that iterates all entities, and if their property1 is part of the Keys on the Map, modify it using the mapped value.

As @Ryan B says, you don't need to use MR for this if you just want to take advantage of batch puts, as you can use an Iterable to put using the DatastoreService.

like image 157
marianosimone Avatar answered Nov 12 '22 12:11

marianosimone


You can use a DatastoreInputReader, and in the map function, find out if the property1 is actually in the csv: Reading from a csv each time would be very slow, what you can do is use memcache to provide that info after it is read just once from it's own Datastore model. To populate the datastore model, I would recommend using property1 as the custom Id of each row, that way, querying it is straight forward. You would only update the Datastore for those values that actually change and use mutation pool to make it performant (op.db.Put()). I leave you pseudo code (sorry... I only have it in python) of how the different pieces would look like, I further recommend you reading this article on Mapreduce on Google App Engine: http://sookocheff.com/posts/2014-04-15-app-engine-mapreduce-api-part-1-the-basics/

#to get the to_dict method
from google.appengine.ext import ndb
from mapreduce import operation as op 
from mapreduce.lib import pipeline
from mapreduce import mapreduce_pipeline

class TouchPipeline(pipeline.Pipeline):
    """
    Pipeline to update the field of entities that have certain condition
    """

    def run(self, *args, **kwargs):
        """ run """
        mapper_params = {
            "entity_kind": "yourDatastoreKind",
        }
        yield mapreduce_pipeline.MapperPipeline(
            "Update entities that have certain condition",
            handler_spec="datastore_map",
            input_reader_spec="mapreduce.input_readers.DatastoreInputReader",
            params=mapper_params,
            shards=64)


class csvrow(ndb.Model):
  #you dont store property 1 because you are going to use its value as key
  substitutefield=ndb.StringProperty()

def create_csv_datastore():
  # instead of running this, make a 10,000 row function with each csv value, 
  # or read it from the blobstore, iterate and update the values accordingly
  for i in range(10000):
    #here we are using our own key as id of this row and just storing the other column that
    #eventually will be subtitute if it matches
    csvrow.get_or_insert('property%s' % i, substitutefield = 'substitute%s').put()


def queryfromcsv(property1):
  csvrow=ndb.Key('csvrow', property1).get()
  if csvrow:
    return csvrow.substitutefield
  else:
    return property1

def property1InCSV(property1):
  data = memcache.get(property1)
  if data is not None:
      return data
  else:
      data = self.queryfromcsv(property1)
      memcache.add(property1, data, 60)
      return data

def datastore_map(entity_type):
  datastorepropertytocheck = entity_type.property1
  newvalue = property1InCSV(datastorepropertytocheck)
  if newvalue!=datastoreproperty:
    entity_type.property11 = newvalue
    #use the mutation pool
    yield op.db.Put(entity)
like image 2
Alejandro Santamaria Arza Avatar answered Nov 12 '22 11:11

Alejandro Santamaria Arza