I have run into an interesting limit with the App Engine datastore. I am creating a handler to help us analyze some usage data on one of our production servers. To perform the analysis I need to query and summarize 10,000+ entities pulled from the datastore. The calculation isn't hard, it is just a histogram of items that pass a specific filter of the usage samples. The problem I hit is that I can't get the data back from the datastore fast enough to do any processing before hitting the query deadline.
I have tried everything I can think of to chunk the query into parallel RPC calls to improve performance, but according to appstats I can't seem to get the queries to actually execute in parallel. No matter what method I try (see below) it always seems that the RPC's fall back to a waterfall of sequential next queries.
Note: the query and analysis code does work, it just runs to slowly because I can't get data quickly enough from the datastore.
I don't have a live version I can share, but here is the basic model for the part of the system I am talking about:
class Session(ndb.Model):
""" A tracked user session. (customer account (company), version, OS, etc) """
data = ndb.JsonProperty(required = False, indexed = False)
class Sample(ndb.Model):
name = ndb.StringProperty (required = True, indexed = True)
session = ndb.KeyProperty (required = True, kind = Session)
timestamp = ndb.DateTimeProperty(required = True, indexed = True)
tags = ndb.StringProperty (repeated = True, indexed = True)
You can think of the samples as times when a user makes use of a capability of a given name. (ex: 'systemA.feature_x'). The tags are based upon customer details, system information, and the feature. ex: ['winxp', '2.5.1', 'systemA', 'feature_x', 'premium_account']). So the tags form a denormalized set of tokens that could be used to find samples of interest.
The analysis I am trying to do consists of taking a date range and asking how many times was a feature of set of features (perhaps all features) used per day (or per hour) per customer account (company, not per user).
So the input to the handler be something like:
Output would be:
[{
'company_account': <string>,
'counts': [
{'timeperiod': <iso8601 date>, 'count': <int>}, ...
]
}, ...
]
Here is some code in common for all queries. The general structure of the handler is a simple get handler using webapp2 that sets up the query parameters, runs the query, processes the results, creates data to return.
# -- Build Query Object --- #
query_opts = {}
query_opts['batch_size'] = 500 # Bring in large groups of entities
q = Sample.query()
q = q.order(Sample.timestamp)
# Tags
tag_args = [(Sample.tags == t) for t in tags]
q = q.filter(ndb.query.AND(*tag_args))
def handle_sample(sample):
session_obj = sample.session.get() # Usually found in local or memcache thanks to ndb
count_key = session_obj.data['customer']
addCountForPeriod(count_key, sample.timestamp)
I have tried a variety of methods to try to pull data from the datastore as quickly as possible and in parallel. The methods I have tried so far include:
This is more of a simple base case to compare against the other methods. I just build the query and iterate over all the items letting ndb do what it does to pull them one after the other.
q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
q_iter = q.iter(**query_opts)
for sample in q_iter:
handle_sample(sample)
The idea here was to see if I could do a single very large fetch.
q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
samples = q.fetch(20000, **query_opts)
for sample in samples:
handle_sample(sample)
The idea here is to recognize that the samples are fairly well spaced across time so I can create a set of independent queries that split the overall time region into chunks and try to run each of these in parallel using async:
# split up timestamp space into 20 equal parts and async query each of them
ts_delta = (end_time - start_time) / 20
cur_start_time = start_time
q_futures = []
for x in range(ts_intervals):
cur_end_time = (cur_start_time + ts_delta)
if x == (ts_intervals-1): # Last one has to cover full range
cur_end_time = end_time
f = q.filter(Sample.timestamp >= cur_start_time,
Sample.timestamp < cur_end_time).fetch_async(limit=None, **query_opts)
q_futures.append(f)
cur_start_time = cur_end_time
# Now loop through and collect results
for f in q_futures:
samples = f.get_result()
for sample in samples:
handle_sample(sample)
I tried this method because the documentation made it sound like ndb may exploit some parallelism automatically when using the Query.map_async method.
q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
@ndb.tasklet
def process_sample(sample):
period_ts = getPeriodTimestamp(sample.timestamp)
session_obj = yield sample.session.get_async() # Lookup the session object from cache
count_key = session_obj.data['customer']
addCountForPeriod(count_key, sample.timestamp)
raise ndb.Return(None)
q_future = q.map_async(process_sample, **query_opts)
res = q_future.get_result()
I tested out one example query to collect overall response time and appstats traces. The results are:
real: 15.645s
This one goes sequentially through fetching batches one after the other and then retrieves every session from memcache.
real: 12.12s
Effectively the same as option A but a bit faster for some reason.
real: 15.251s
Appears to provide more parallelism at the start but seems to get slowed down by a sequence of calls to next during iteration of the results. Also doesn't seem to be able to overlap the session memcache lookups with the pending queries.
real: 13.752s
This one is the hardest for me to understand. It looks like it has q good deal of overlapping, but everything seems to stretch out in a waterfall instead of in parallel.
Based upon all this, what am I missing? Am I just hitting a limit on App Engine or is there a better way to pull down large number of entities in parallel?
I am at a loss as to what to try next. I thought about rewriting the client to make multiple requests to app engine in parallel but this seems pretty brute force. I would really expect that app engine should be able to handle this use case so I am guessing there is something I am missing.
In the end I found that option C was the best for my case. I was able to optimize it to complete in 6.1 seconds. Still not perfect, but much better.
After getting advice from several people, I found that the following items were key to understand and keep in mind:
So what I did to make it faster:
I am still not getting the performance I would expect or like, but it is workable for now. I just wish their was a better way to pull large numbers of sequential entities into memory quickly in handlers.
Large processing like this should not be done in a user request, which has a 60s time limit. Instead, it should be done in a context that supports long-running requests. The task queue supports requests up to 10 minutes, and (I believe) normal memory restraints (F1 instances, the default, have 128MB of memory). For even higher limits (no request timeout, 1GB+ of memory), use backends.
Here's something to try: set up a URL that, when accessed, fires off a task queue task. It returns a web page that polls every ~5s to another URL that responds with true/false if the task queue task has been completed yet. The task queue processes the data, which can take some 10s of seconds, and saves the result to the datastore either as the computed data or a rendered web page. Once the initial page detects that it has completed, the user is redirected to the page, which fetches the now computed results from the datastore.
The new experimental Data Processing feature (an AppEngine API for MapReduce) looks very suitable for solving this problem. It does automatic sharding to execute multiple parallel worker processes.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With