Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Simple counter example using mapreduce in Google App Engine

I'm somewhat confused with the current state of mapreduce support in GAE. According to the docs http://code.google.com/p/appengine-mapreduce/ reduce phase isn't supported yet, but in the description of the session from I/O 2011 ( http://www.youtube.com/watch?v=EIxelKcyCC0 ) it's written "It is now possible to run full Map Reduce jobs on App Engine". I wonder if I can use mapreduce in this task:

What I want to do:

I have model Car with field color:

class Car(db.Model):
    color = db.StringProperty()

I want to run mapreduce process (from time to time, cron-defined) which can compute how many cars are in each color ans store this result in the datastore. Seems like a job well suited for mapreduce (but if I'm wrong correct me), phase "map" will yield pairs (, 1) for each Car entity, and phase "reduce" should merge this data by color_name giving me expected results. Final result I want to get are entities with computed data stored in the datastore, something like that:

class CarsByColor(db.Model):
    color_name = db.StringProperty()
    cars_num = db.IntegerProperty()

Problem: I don't know how to implement this in appengine ... The video shows examples with defined map and reduce functions, but they seem to be very general examples not related to the datastore. All other examples that i found are using one function to process the data from DatastoreInputReader, but they seem to be only the "map" phase, there is no example of how to do the "reduce" (and how to store reduce results in the datastore).

like image 950
Pawel Markowski Avatar asked May 19 '11 14:05

Pawel Markowski


1 Answers

You don't really need a reduce phase. You can accomplish this with a linear task chain, more or less as follows:

def count_colors(limit=100, totals={}, cursor=None):
  query = Car.all()
  if cursor:
    query.with_cursor(cursor)
  cars = query.fetch(limit)
  for car in cars:
    try:
      totals[car.color] += 1
    except KeyError:
      totals[car.color] = 1
  if len(cars) == limit:
    cursor = query.cursor()
    return deferred.defer(count_colors, limit, totals, cursor)
  entities = []
  for color in totals:
    entity = CarsByColor(key_name=color)
    entity.cars_num = totals[color]
    entities.append(entity)
  db.put(entities)

deferred.defer(count_colors)

This should iterate over all your cars, pass a query cursor and a running tally to a series of ad-hoc tasks, and store the totals at the end.

A reduce phase might make sense if you had to merge data from multiple datastores, multiple models, or multiple indexes in a single model. As is I don't think it would buy you anything.

Another option: use the task queue to maintain live counters for each color. When you create a car, kick off a task to increment the total for that color. When you update a car, kick off one task to decrement the old color and another to increment the new color. Update counters transactionally to avoid race conditions.

like image 114
Drew Sears Avatar answered Oct 06 '22 23:10

Drew Sears