I have a request handler that updates an entity, saves it to the datastore, then needs to perform some additional work before returning (like queuing a background task and json-serializing some results). I want to parallelize this code, so that the additional work is done while the entity is being saved.
Here's what my handler code boils down to:
class FooHandler(webapp2.RequestHandler):
@ndb.toplevel
def post(self):
foo = yield Foo.get_by_id_async(some_id)
# Do some work with foo
# Don't yield, as I want to perform the code that follows
# while foo is being saved to the datastore.
# I'm in a toplevel, so the handler will not exit as long as
# this async request is not finished.
foo.put_async()
taskqueue.add(...)
json_result = generate_result()
self.response.headers["Content-Type"] = "application/json; charset=UTF-8"
self.response.write(json_result)
However, Appstats shows that the datastore.Put
RPC is being done serially, after taskqueue.Add
:
A little digging around in ndb.context.py
shows that a put_async()
call ends up being added to an AutoBatcher
instead of the RPC being issued immediately.
So I presume that the _put_batcher
ends up being flushed when the toplevel
waits for all async calls to be complete.
I understand that batching puts has real benefits in certain scenarios, but in my case here I really want the put RPC to be sent immediately, so I can perform other work while the entity is being saved.
If I do yield foo.put_async()
, then I get the same waterfall in Appstats, but with datastore.Put
being done before the rest:
This is to be expected, as yield
makes my handler wait for the put_async()
call to complete before executing the rest of the code.
I also have tried adding a call to ndb.get_context().flush()
right after foo.put_async()
, but the datastore.Put
and taskqueue.BulkAdd
calls are still not being made in parallel according to Appstats.
So my question is: how can I force the call to put_async()
to bypass the auto batcher and issue the RPC immediately?
There's no supported way to do it. Maybe there should be. Can you try if this works?
loop - ndb.eventloop.get_event_loop()
while loop.run_idle():
pass
You may have to look at the source code of ndb/eventloop.py to see what else you could try -- basically you want to try most of what run0() does except waiting for RPCs. In particular, it's possible that you would have to do this:
while loop.current:
loop.run0()
while loop.run_idle():
pass
(This still isn't supported, because there are other conditions you may have to handle too, but those don't seem to occur in your example.)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With