Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to get AppEngine map reduce to scale out?

I have written a simple MapReduce flow to read in lines from a CSV from a file on Google Cloud Storage and subsequently make an Entity. However, I can't seem to get it to run on more than one shard.

The code makes use of mapreduce.control.start_map and looks something like this.

class LoadEntitiesPipeline(webapp2.RequestHandler):
        id = control.start_map(map_name,
                          handler_spec="backend.line_processor",
                          reader_spec="mapreduce.input_readers.FileInputReader",
                          queue_name=get_queue_name("q-1"),
                          shard_count=shard_count,
                          mapper_parameters={
                              'shard_count': shard_count,
                              'batch_size': 50,
                              'processing_rate': 1000000,
                              'files': [gsfile],
                              'format': 'lines'})

I have shard_count in both places, because I'm not sure what methods actually need it. Setting shard_count anywhere from 8 to 32, doesn't change anything as the status page always says 1/1 shards running. To separate things, I've made everything run on a backend queue with a large number of instances. I've tried adjusting the queue parameters per this wiki. In the end, it seems to just run serially.

Any ideas? Thanks!

Update (Still no success):

In trying to isolate things, I tried making the call using direct calls to pipeline like so:

class ImportHandler(webapp2.RequestHandler):

    def get(self, gsfile):
        pipeline = LoadEntitiesPipeline2(gsfile)
        pipeline.start(queue_name=get_queue_name("q-1"))

        self.redirect(pipeline.base_path + "/status?root=" + pipeline.pipeline_id)


class LoadEntitiesPipeline2(base_handler.PipelineBase):

    def run(self, gsfile):
        yield mapreduce_pipeline.MapperPipeline(
           'loadentities2_' + gsfile,
           'backend.line_processor',
           'mapreduce.input_readers.FileInputReader',
           params={'files': [gsfile], 'format': 'lines'},
           shards=32
        )

With this new code, it still only runs on one shard. I'm starting to wonder if mapreduce.input_readers.FileInputReader is capable of parallelizing input by line.

like image 857
Lee Becker Avatar asked Oct 05 '13 07:10

Lee Becker


People also ask

Can App Engine scale to zero?

Scaling characteristics While both environments use App Engine's automatic scaling infrastructure, the way in which they scale is different. The standard environment can scale from zero instances up to thousands very quickly.

How does Google App Engine scale?

By default, your app uses automatic scaling, which means App Engine will manage the number of idle instances. Automatic scaling creates instances based on request rate, response latencies, and other application metrics.

Does Google still use MapReduce?

Google has abandoned MapReduce, the system for running data analytics jobs spread across many servers the company developed and later open sourced, in favor of a new cloud analytics system it has built called Cloud Dataflow.

When using standard App Engine How low can it scale down the number of underlying virtual machines when the app is not receiving any activity?

One of the benefits App Engine standard environment's "pay for only what you need" platform as described earlier in Scaling Down is that the system autoscales the number of instances down to zero when there is no traffic.


1 Answers

It looks like FileInputReader can only shard via files. The format params only change the way mapper function got call. If you pass more than one files to the mapper, it will start to run on more than one shard. Otherwise it will only use one shard to process the data.

EDIT #1:

After dig deeper in the mapreduce library. MapReduce will decide whether or not to split file into pieces based on the can_split method return for each file type it defined. Currently, the only format which implement split method is ZipFormat. So, if your file format is not zip, it won't split the file to run on more than one shard.

@classmethod
  def can_split(cls):
    """Indicates whether this format support splitting within a file boundary.

    Returns:
      True if a FileFormat allows its inputs to be splitted into
    different shards.
    """

https://code.google.com/p/appengine-mapreduce/source/browse/trunk/python/src/mapreduce/file_formats.py

But it looks like it is possible to write your own file format split method. You can try to hack and add split method on _TextFormat first and see if more than one shard running.

@classmethod
def split(cls, desired_size, start_index, opened_file, cache):
    pass

EDIT #2:

An easy workaround would be left the FileInputReader run serially but move the time-cosuming task to parallel reduce stage.

def line_processor(line):
    # serial
    yield (random.randrange(1000), line)

def reducer(key, values):
    # parallel
    entities = []
    for v in values:
        entities.append(CREATE_ENTITY_FROM_VALUE(v))
    db.put(entities)

EDIT #3:

If try to modify the FileFormat, here is an example (haven't been test yet)

from file_formats import _TextFormat, FORMATS


class _LinesSplitFormat(_TextFormat):
  """Read file line by line."""

  NAME = 'split_lines'

  def get_next(self):
    """Inherited."""
    index = self.get_index()
    cache = self.get_cache()
    offset = sum(cache['infolist'][:index])

    self.get_current_file.seek(offset)
    result = self.get_current_file().readline()
    if not result:
      raise EOFError()
    if 'encoding' in self._kwargs:
      result = result.encode(self._kwargs['encoding'])
    return result

  @classmethod
  def can_split(cls):
    """Inherited."""
    return True

  @classmethod
  def split(cls, desired_size, start_index, opened_file, cache):
    """Inherited."""
    if 'infolist' in cache:
      infolist = cache['infolist']
    else:
      infolist = []
      for i in opened_file:
        infolist.append(len(i))
        cache['infolist'] = infolist

    index = start_index
    while desired_size > 0 and index < len(infolist):
      desired_size -= infolist[index]
      index += 1
    return desired_size, index


FORMATS['split_lines'] = _LinesSplitFormat

Then the new file format can be called via change the mapper_parameters from lines to split_line.

class LoadEntitiesPipeline(webapp2.RequestHandler):
    id = control.start_map(map_name,
                      handler_spec="backend.line_processor",
                      reader_spec="mapreduce.input_readers.FileInputReader",
                      queue_name=get_queue_name("q-1"),
                      shard_count=shard_count,
                      mapper_parameters={
                          'shard_count': shard_count,
                          'batch_size': 50,
                          'processing_rate': 1000000,
                          'files': [gsfile],
                          'format': 'split_lines'})
like image 163
lucemia Avatar answered Sep 27 '22 17:09

lucemia