Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Google AppEngine Pipelines API

I would like to rewrite some of my tasks as pipelines. Mainly because of the fact that I need a way of detecting when a task finished or start a tasks in specific order. My problem is that I'm not sure how to rewrite the recursive tasks to pipelines. By recursive I mean tasks that call themselves like this:

class MyTask(webapp.RequestHandler):
    def post(self):
        cursor = self.request.get('cursor', None)

        [set cursor if not null]
        [fetch 100 entities form datastore]

        if len(result) >= 100:
            [ create the same task in the queue and pass the cursor ]

        [do actual work the task was created for]

Now I would really like to write it as a pipeline and do something similar to:

class DoSomeJob(pipeline.Pipeline):

   def run(self):
       with pipeline.InOrder():
          yield MyTask()
          yield MyOtherTask()
          yield DoSomeMoreWork(message2)

Any help with this one will be greatly appreciated. Thank you!

like image 644
rzajac Avatar asked Dec 07 '22 13:12

rzajac


2 Answers

A basic pipeline just returns a value:

class MyFirstPipeline(pipeline.Pipeline):
    def run(self):
        return "Hello World"  

The value has to be JSON serializable.

If you need to coordinate several pipelines you will need to use a generator pipeline and the yield statement.

class MyGeneratorPipeline(pipeline.Pipeline):
    def run(self):
        yield MyFirstPipeline()

You can treat the yielding of a pipeline as if it returns a 'future'.

You can pass this future as the input arg to another pipeline:

class MyGeneratorPipeline(pipeline.Pipeline):
    def run(self):
        result = yield MyFirstPipeline()
        yield MyOtherPipeline(result)

The Pipeline API will ensure that the run method of MyOtherPipeline is only called once the result future from MyFirstPipeline has been resolved to a real value.

You can't mix yield and return in the same method. If you are using yield the value has to be a Pipeline instance. This can lead to a problem if you want to do this:

class MyRootPipeline(pipeline.Pipeline):
    def run(self, *input_args):
        results = []
        for input_arg in input_args:
            intermediate = yield MyFirstPipeline(input_arg)
            result = yield MyOtherPipeline(intermediate)
            results.append(result)
        yield results

In this case the Pipeline API just sees a list in your final yield results line, so it doesn't know to resolve the futures inside it before returning and you will get an error.

They're not documented but there is a library of utility pipelines included which can help here:
https://code.google.com/p/appengine-pipeline/source/browse/trunk/src/pipeline/common.py

So a version of the above which actually works would look like:

import pipeline
from pipeline import common

class MyRootPipeline(pipeline.Pipeline):
    def run(self, *input_args):
        results = []
        for input_arg in input_args:
            intermediate = yield MyFirstPipeline(input_arg)
            result = yield MyOtherPipeline(intermediate)
            results.append(result)
        yield common.List(*results)

Now we're ok, we're yielding a pipeline instance and Pipeline API knows to resolve its future value properly. The source of the common.List pipeline is very simple:

class List(pipeline.Pipeline):
    """Returns a list with the supplied positional arguments."""

    def run(self, *args):
        return list(args)

...at the point that this pipeline's run method is called the Pipeline API has resolved all of the items in the list to actual values, which can be passed in as *args.

Anyway, back to your original example, you could do something like this:

class FetchEntitites(pipeline.Pipeline):
    def run(self, cursor=None)
        if cursor is not None:
            cursor = Cursor(urlsafe=cursor)

        # I think it's ok to pass None as the cursor here, haven't confirmed
        results, next_curs, more = MyModel.query().fetch_page(100,
                                                              start_cursor=cursor)

        # queue up a task for the next page of results immediately
        future_results = []
        if more:
            future_results = yield FetchEntitites(next_curs.urlsafe())

        current_results = [ do some work on `results` ]

        # (assumes current_results and future_results are both lists)
        # this will have to wait for all of the recursive calls in
        # future_results to resolve before it can resolve itself:
        yield common.Extend(current_results, future_results)

Further explanation

At the start I said we can treat result = yield MyPipeline() as if it returns a 'future'. This is not strictly true, obviously we are actually just yielding the instantiated pipeline. (Needless to say our run method is now a generator function.)

The weird part of how Python's yield expressions work is that, despite what it looks like, the value that you yield goes somewhere outside the function (to the Pipeline API apparatus) rather than into your result var. The value of the result var on the left side of the expression is also pushed in from outside the function, by calling send on the generator (the generator being the run method you defined).

So by yielding an instantiated Pipeline, you are letting the Pipeline API take that instance and call its run method somewhere else at some other time (in fact it will be passed into a task queue as a class name and a set of args and kwargs and re-instantiated there... this is why your args and kwargs need to be JSON serializable too).

Meanwhile the Pipeline API sends a PipelineFuture object into your run generator and this is what appears in your result var. It seems a bit magical and counter-intuitive but this is how generators with yield expressions work.

It's taken quite a bit of head-scratching for me to work it out to this level and I welcome any clarifications or corrections on anything I got wrong.

like image 60
Anentropic Avatar answered Jan 18 '23 18:01

Anentropic


When you create a pipeline, it hands back an object that represents a "stage". You can ask the stage for its id, then save it away. Later, you can reconstitute the stage from the saved id, then ask the stage if it's done.

See http://code.google.com/p/appengine-pipeline/wiki/GettingStarted and look for has_finalized. There's an example that does most of what you need.

like image 38
Dave W. Smith Avatar answered Jan 18 '23 18:01

Dave W. Smith