Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Best practice to query large number of ndb entities from datastore

I have run into an interesting limit with the App Engine datastore. I am creating a handler to help us analyze some usage data on one of our production servers. To perform the analysis I need to query and summarize 10,000+ entities pulled from the datastore. The calculation isn't hard, it is just a histogram of items that pass a specific filter of the usage samples. The problem I hit is that I can't get the data back from the datastore fast enough to do any processing before hitting the query deadline.

I have tried everything I can think of to chunk the query into parallel RPC calls to improve performance, but according to appstats I can't seem to get the queries to actually execute in parallel. No matter what method I try (see below) it always seems that the RPC's fall back to a waterfall of sequential next queries.

Note: the query and analysis code does work, it just runs to slowly because I can't get data quickly enough from the datastore.

Background

I don't have a live version I can share, but here is the basic model for the part of the system I am talking about:

class Session(ndb.Model):
   """ A tracked user session. (customer account (company), version, OS, etc) """
   data = ndb.JsonProperty(required = False, indexed = False)

class Sample(ndb.Model):
   name      = ndb.StringProperty  (required = True,  indexed = True)
   session   = ndb.KeyProperty     (required = True,  kind = Session)
   timestamp = ndb.DateTimeProperty(required = True,  indexed = True)
   tags      = ndb.StringProperty  (repeated = True,  indexed = True)

You can think of the samples as times when a user makes use of a capability of a given name. (ex: 'systemA.feature_x'). The tags are based upon customer details, system information, and the feature. ex: ['winxp', '2.5.1', 'systemA', 'feature_x', 'premium_account']). So the tags form a denormalized set of tokens that could be used to find samples of interest.

The analysis I am trying to do consists of taking a date range and asking how many times was a feature of set of features (perhaps all features) used per day (or per hour) per customer account (company, not per user).

So the input to the handler be something like:

  • Start Date
  • End Date
  • Tag(s)

Output would be:

[{
   'company_account': <string>,
   'counts': [
      {'timeperiod': <iso8601 date>, 'count': <int>}, ...
   ]
 }, ...
]

Common Code for Queries

Here is some code in common for all queries. The general structure of the handler is a simple get handler using webapp2 that sets up the query parameters, runs the query, processes the results, creates data to return.

# -- Build Query Object --- #
query_opts = {}
query_opts['batch_size'] = 500   # Bring in large groups of entities

q = Sample.query()
q = q.order(Sample.timestamp)

# Tags
tag_args = [(Sample.tags == t) for t in tags]
q = q.filter(ndb.query.AND(*tag_args))

def handle_sample(sample):
   session_obj = sample.session.get()    # Usually found in local or memcache thanks to ndb
   count_key   = session_obj.data['customer']
   addCountForPeriod(count_key, sample.timestamp)

Methods Tried

I have tried a variety of methods to try to pull data from the datastore as quickly as possible and in parallel. The methods I have tried so far include:

A. Single Iteration

This is more of a simple base case to compare against the other methods. I just build the query and iterate over all the items letting ndb do what it does to pull them one after the other.

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
q_iter = q.iter(**query_opts)

for sample in q_iter:
   handle_sample(sample)

B. Large Fetch

The idea here was to see if I could do a single very large fetch.

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
samples = q.fetch(20000, **query_opts)

for sample in samples:
   handle_sample(sample)

C. Async fetches across time range

The idea here is to recognize that the samples are fairly well spaced across time so I can create a set of independent queries that split the overall time region into chunks and try to run each of these in parallel using async:

# split up timestamp space into 20 equal parts and async query each of them
ts_delta       = (end_time - start_time) / 20
cur_start_time = start_time
q_futures = []

for x in range(ts_intervals):
   cur_end_time = (cur_start_time + ts_delta)
   if x == (ts_intervals-1):    # Last one has to cover full range
      cur_end_time = end_time

   f = q.filter(Sample.timestamp >= cur_start_time,
                Sample.timestamp < cur_end_time).fetch_async(limit=None, **query_opts)
   q_futures.append(f)
   cur_start_time = cur_end_time

# Now loop through and collect results
for f in q_futures:
   samples = f.get_result()
   for sample in samples:
      handle_sample(sample)

D. Async mapping

I tried this method because the documentation made it sound like ndb may exploit some parallelism automatically when using the Query.map_async method.

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)

@ndb.tasklet
def process_sample(sample):
   period_ts   = getPeriodTimestamp(sample.timestamp)
   session_obj = yield sample.session.get_async()    # Lookup the session object from cache
   count_key   = session_obj.data['customer']
   addCountForPeriod(count_key, sample.timestamp)
   raise ndb.Return(None)

q_future = q.map_async(process_sample, **query_opts)
res = q_future.get_result()

Outcome

I tested out one example query to collect overall response time and appstats traces. The results are:

A. Single Iteration

real: 15.645s

This one goes sequentially through fetching batches one after the other and then retrieves every session from memcache.

Method A appstats

B. Large Fetch

real: 12.12s

Effectively the same as option A but a bit faster for some reason.

Method B appstats

C. Async fetches across time range

real: 15.251s

Appears to provide more parallelism at the start but seems to get slowed down by a sequence of calls to next during iteration of the results. Also doesn't seem to be able to overlap the session memcache lookups with the pending queries.

Method C appstats

D. Async mapping

real: 13.752s

This one is the hardest for me to understand. It looks like it has q good deal of overlapping, but everything seems to stretch out in a waterfall instead of in parallel.

Method D appstats

Recommendations

Based upon all this, what am I missing? Am I just hitting a limit on App Engine or is there a better way to pull down large number of entities in parallel?

I am at a loss as to what to try next. I thought about rewriting the client to make multiple requests to app engine in parallel but this seems pretty brute force. I would really expect that app engine should be able to handle this use case so I am guessing there is something I am missing.

Update

In the end I found that option C was the best for my case. I was able to optimize it to complete in 6.1 seconds. Still not perfect, but much better.

After getting advice from several people, I found that the following items were key to understand and keep in mind:

  • Multiple queries can run in parallel
  • Only 10 RPC's can be in flight at once
  • Try to denormalize to the point that there are no secondary queries
  • This type of task is better left to map reduce and task queues, not real-time queries

So what I did to make it faster:

  • I partitioned the query space from the beginning based upon time. (note: the more equal the partitions are in terms of entities returned, the better)
  • I denormalized the data further to remove the need for the secondary session query
  • I made use of ndb async operations and wait_any() to overlap the queries with the processing

I am still not getting the performance I would expect or like, but it is workable for now. I just wish their was a better way to pull large numbers of sequential entities into memory quickly in handlers.

like image 686
Allen Avatar asked Jul 16 '12 17:07

Allen


2 Answers

Large processing like this should not be done in a user request, which has a 60s time limit. Instead, it should be done in a context that supports long-running requests. The task queue supports requests up to 10 minutes, and (I believe) normal memory restraints (F1 instances, the default, have 128MB of memory). For even higher limits (no request timeout, 1GB+ of memory), use backends.

Here's something to try: set up a URL that, when accessed, fires off a task queue task. It returns a web page that polls every ~5s to another URL that responds with true/false if the task queue task has been completed yet. The task queue processes the data, which can take some 10s of seconds, and saves the result to the datastore either as the computed data or a rendered web page. Once the initial page detects that it has completed, the user is redirected to the page, which fetches the now computed results from the datastore.

like image 119
mjibson Avatar answered Nov 14 '22 06:11

mjibson


The new experimental Data Processing feature (an AppEngine API for MapReduce) looks very suitable for solving this problem. It does automatic sharding to execute multiple parallel worker processes.

like image 2
Martin Berends Avatar answered Nov 14 '22 08:11

Martin Berends