I need to run a mapreduce job that is dynamic in the sense that parameters need to be passed to the map and reduce functions each time the mapreduce job is run (e.g., in response to a user request).
How do I accomplish this? I could not see anywhere in the documentation how to do dynamic processing at runtime for map and reduce.
class MatchProcessing(webapp2.RequestHandler):
def get(self):
requestKeyID=int(self.request.get('riderbeeRequestID'))
userKey=self.request.get('userKey')
pipeline = MatchingPipeline(requestKeyID, userKey)
pipeline.start()
self.redirect(pipeline.base_path + "/status?root=" + pipeline.pipeline_id)
class MatchingPipeline(base_handler.PipelineBase):
def run(self, requestKeyID, userKey):
yield mapreduce_pipeline.MapreducePipeline(
"riderbee_matching",
"tasks.matchingMR.riderbee_map",
"tasks.matchingMR.riderbee_reduce",
"mapreduce.input_readers.DatastoreInputReader",
"mapreduce.output_writers.BlobstoreOutputWriter",
mapper_params={
"entity_kind": "models.rides.RiderbeeRequest",
"requestKeyID": requestKeyID,
"userKey": userKey,
},
reducer_params={
"mime_type": "text/plain",
},
shards=16)
def riderbee_map(riderbeeRequest):
# would like to access the requestKeyID and userKey parameters that were passed in mapper_params
# so that we can do some processing based on that
yield (riderbeeRequest.user.email, riderbeeRequest.key().id())
def riderbee_reduce(key, values):
# would like to access the requestKeyID and userKey parameters that were passed earlier, perhaps through reducer_params
# so that we can do some processing based on that
yield "%s: %s\n" % (key, len(values))
Help please?
I'm pretty sure you can just specify parameters in mapper_parameters, and read them from the context module. See http://code.google.com/p/appengine-mapreduce/wiki/UserGuidePython#Mapper_parameters for more details.
This is how to access the mapper parameters from the mapper function, using the context module:
from mapreduce import context
def riderbee_map(riderbeeRequest):
ctx = context.get()
params = ctx.mapreduce_spec.mapper.params
requestKeyID = params["requestKeyID"]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With