Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

MapReduce on more than one datastore kind in Google App Engine

I just watched Batch data processing with App Engine session of Google I/O 2010, read some parts of MapReduce article from Google Research and now I am thinking to use MapReduce on Google App Engine to implement a recommender system in Python.

I prefer using appengine-mapreduce instead of Task Queue API because the former offers easy iteration over all instances of some kind, automatic batching, automatic task chaining, etc. The problem is: my recommender system needs to calculate correlation between instances of two different Models, i.e., instances of two distinct kinds.

Example: I have these two Models: User and Item. Each one has a list of tags as an attribute. Below are the functions to calculate correlation between users and items. Note that calculateCorrelation should be called for every combination of users and items:

def calculateCorrelation(user, item):
    return calculateCorrelationAverage(u.tags, i.tags)

def calculateCorrelationAverage(tags1, tags2):
    correlationSum = 0.0
    for (tag1, tag2) in allCombinations(tags1, tags2):
        correlationSum += correlation(tag1, tag2)
    return correlationSum / (len(tags1) + len(tags2))

def allCombinations(list1, list2):
    combinations = []
    for x in list1:
        for y in list2:
            combinations.append((x, y))
    return combinations             

But that calculateCorrelation is not a valid Mapper in appengine-mapreduce and maybe this function is not even compatible with MapReduce computation concept. Yet, I need to be sure... it would be really great for me having those appengine-mapreduce advantages like automatic batching and task chaining.

Is there any solution for that?

Should I define my own InputReader? A new InputReader that reads all instances of two different kinds is compatible with the current appengine-mapreduce implementation?

Or should I try the following?

  • Combine all keys of all entities of these two kinds, two by two, into instances of a new Model (possibly using MapReduce)
  • Iterate using mappers over instances of this new Model
  • For each instance, use keys inside it to get the two entities of different kinds and calculate the correlation between them.
like image 755
fjsj Avatar asked Sep 22 '10 03:09

fjsj


2 Answers

Following Nick Johnson suggestion, I wrote my own InputReader. This reader fetch entities from two different kinds. It yields tuples with all combinations of these entities. Here it is:

class TwoKindsInputReader(InputReader):
    _APP_PARAM = "_app"
    _KIND1_PARAM = "kind1"
    _KIND2_PARAM = "kind2"
    MAPPER_PARAMS = "mapper_params"

    def __init__(self, reader1, reader2):
        self._reader1 = reader1
        self._reader2 = reader2

    def __iter__(self):
        for u in self._reader1:
            for e in self._reader2:
                yield (u, e)

    @classmethod
    def from_json(cls, input_shard_state):
        reader1 = DatastoreInputReader.from_json(input_shard_state[cls._KIND1_PARAM])
        reader2 = DatastoreInputReader.from_json(input_shard_state[cls._KIND2_PARAM])

        return cls(reader1, reader2)

    def to_json(self):
        json_dict = {}
        json_dict[self._KIND1_PARAM] = self._reader1.to_json()
        json_dict[self._KIND2_PARAM] = self._reader2.to_json()
        return json_dict

    @classmethod
    def split_input(cls, mapper_spec):
        params = mapper_spec.params
        app = params.get(cls._APP_PARAM)
        kind1 = params.get(cls._KIND1_PARAM)
        kind2 = params.get(cls._KIND2_PARAM)
        shard_count = mapper_spec.shard_count
        shard_count_sqrt = int(math.sqrt(shard_count))

        splitted1 = DatastoreInputReader._split_input_from_params(app, kind1, params, shard_count_sqrt)
        splitted2 = DatastoreInputReader._split_input_from_params(app, kind2, params, shard_count_sqrt)
        inputs = []

        for u in splitted1:
            for e in splitted2:
                inputs.append(TwoKindsInputReader(u, e))

        #mapper_spec.shard_count = len(inputs) #uncomment this in case of "Incorrect number of shard states" (at line 408 in handlers.py)
        return inputs

    @classmethod
    def validate(cls, mapper_spec):
        return True #TODO

This code should be used when you need to process all combinations of entities of two kinds. You can also generalize this for more than two kinds.

Here it is a valid the mapreduce.yaml for TwoKindsInputReader:

mapreduce:
- name: recommendationMapReduce
  mapper:
    input_reader: customInputReaders.TwoKindsInputReader
    handler: recommendation.calculateCorrelationHandler
    params:
    - name: kind1
      default: kinds.User
    - name: kind2
      default: kinds.Item
    - name: shard_count
      default: 16
like image 75
fjsj Avatar answered Nov 04 '22 01:11

fjsj


It's difficult to know what to recommend without more details of what you're actually calculating. One simple option is to simply fetch the related entity inside the map call - there's nothing preventing you from doing datastore operations there.

This will result in a lot of small calls, though. Writing a custom InputReader, as you suggest, will allow you to fetch both sets of entities in parallel, which will significantly improve performance.

If you give more details as to how you need to join these entities, we may be able to provide more concrete suggestions.

like image 38
Nick Johnson Avatar answered Nov 04 '22 01:11

Nick Johnson