I am reaching a bottleneck on my application and having a tough time finding a solution around it. A little background:
Current implementation:
There are additional checks and such in there to ensure all pull queue tasks are properly processed and all items are downloaded.
The Problem:
We want to download and store all the items and aggregates as fast as possible. I have 20 instances enabled for each backend configuration described (I'll refer to them as the "aggregator" backend and "downloader" backend). The downloader backend seems to get through the API calls fairly fast. I make heavy use of the NDB library and asynchronous URL Fetches/Datastore calls in order to obtain this. I've also enabled threadsafe:true so that no instance will be waiting for RPC calls to finish before starting the next task (all tasks can operated independent of one another and are idempotent).
The aggregator backend is where the big time sink comes to play. Storing 500-1500 of these aggregates asynchronously through transactions takes 40 seconds or more (and I don't even think all transactions are being properly committed). I keep this backend with threadsafe:false since I use a pull queue expiration deadline of 300 seconds, but if I allow more than one task to execute on a single instance, they may cascade down and push finishing some of the tasks over the 300 second mark, thus allowing for another task to pull the same task a second time and possibly double-counting.
The logs shows BadRequestError: Nested transactions are not supported.
with a previous error (in the stack trace) of TransactionFailedError: too much contention on these datastore entities. please try again.
. Another error I commonly see is BadRequestError(The referenced transaction has expired or is no longer valid.)
From my understanding, sometimes these errors mean that a transaction can still be committed without further interaction. How do I know if this has been properly committed? Am I doing this in a logical/efficient manner or is there more room for concurrency without risk of messing everything up?
Relevant Code:
class GeneralShardConfig(ndb.Model):
"""Tracks the number of shards for each named counter."""
name = ndb.StringProperty(required=True)
num_shards = ndb.IntegerProperty(default=4)
class GeneralAggregateShard(ndb.Model):
"""Shards for each named counter"""
name = ndb.StringProperty(name='n', required=True)
count = ndb.FloatProperty(name='c', default=0.00) #acts as a total now
@ndb.tasklet
def increment_batch(data_set):
def run_txn(name, value):
@ndb.tasklet
def txn():
to_put = []
dbkey = ndb.Key(GeneralShardConfig, name)
config = yield dbkey.get_async(use_memcache=False)
if not config:
config = GeneralShardConfig(key=dbkey,name=name)
to_put.append(config)
index = random.randint(0, config.num_shards-1)
shard_name = name + str(index)
dbkey = ndb.Key(GeneralAggregateShard, shard_name)
counter = yield dbkey.get_async()
if not counter:
counter = GeneralAggregateShard(key=dbkey, name=name)
counter.count += value
to_put.append(counter)
yield ndb.put_multi_async(to_put)
return ndb.transaction_async(txn, use_memcache=False, xg=True)
res = yield[run_txn(key, value) for key, value in data_set.iteritems() if value != 0.00]
raise ndb.Return(res)
Given the implementation, the only room for "contention" I see is if 2 or more aggregate tasks need to update the same aggregate name, which shouldn't happen too frequently, and with sharded counters I would expect this overlap to rarely, if ever, occur. I assume the
BadRequestError(The referenced transaction has expired or is no longer valid.)
error appears when the event loop is checking the status of all the tasklets and hits a reference to a transaction that is finished. Problem here is it errors out so does that mean all transactions are prematurely cut off or can I assume all transactions went through? I further assume this line res = yield[run_txn(key, value) for key, value in data_set.iteritems() if value != 0.00]
needs to be broken into a try/except for each tasklet to detect these errors.
Before I drive myself mad over this, I'd appreciate any guidance/help on how to optimize this process and do so in a reliable way.
EDIT 1: I modified the aggregator task behavior as follows:
This has helped reduce the contention errors I've been seeing, but its still not very reliable. Most recently, I hit BadRequestError: Nested transactions are not supported.
with a stack trace indicating RuntimeError: Deadlock waiting for <Future fbf0db50 created by transaction_async(model.py:3345) for tasklet transaction(context.py:806) suspended generator transaction(context.py:876); pending>
I am under the belief that this modification should optimize the process by allowing all possible overlaps in the aggregation process to be combined and tried all at once in a single instance, versus multiple instances all performing transactions that may collide. I am still having issues saving the results in a reliable manner.
By reducing the datastore I/O (leaving work to the autobatchers and disabling indexing) you can be more certain that the datastore writes complete (less contention) and it should be faster.
The config (renamed counter) gets are outside of the transaction(s) and can run concurrently whilst looping through the transactions.
Methods and a total property were added to Counter to (hopefully) make it easier to modify in future.
Created a new ndb Property for decimal support (assuming that is why you are specifying 0.00 instead of 0.0).
EDIT:
Removed the need for transactions and changed the sharding system for reliability.
import webapp2
import copy
import decimal
import logging
import random
import string
from google.appengine.api import datastore_errors
from google.appengine.datastore import entity_pb
from google.appengine.ext import deferred
from google.appengine.ext import ndb
TEST_BATCH_SIZE = 250
TEST_NAME_LEN = 12
class DecimalProperty(ndb.Property):
"""A Property whose value is a decimal.Decimal object."""
def _datastore_type(self, value):
return str(value)
def _validate(self, value):
if not isinstance(value, decimal.Decimal):
raise datastore_errors.BadValueError('Expected decimal.Decimal, got %r'
% (value,))
return value
def _db_set_value(self, v, p, value):
value = str(value)
v.set_stringvalue(value)
if not self._indexed:
p.set_meaning(entity_pb.Property.TEXT)
def _db_get_value(self, v, _):
if not v.has_stringvalue():
return None
value = v.stringvalue()
return decimal.Decimal(value)
class BatchInProgress(ndb.Model):
"""Use a scheduler to delete batches in progress after a certain time"""
started = ndb.DateTimeProperty(auto_now=True)
def clean_up(self):
qry = Shard.query().filter(Shard.batch_key == self.key)
keys = qry.fetch(keys_only=True)
while keys:
ndb.delete_multi(keys)
keys = qry.fetch(keys_only=True)
def cleanup_failed_batch(batch_key):
batch = batch_key.get()
if batch:
batch.clean_up()
batch.delete()
class Shard(ndb.Model):
"""Shards for each named counter"""
counter_key = ndb.KeyProperty(name='c')
batch_key = ndb.KeyProperty(name='b')
count = DecimalProperty(name='v', default=decimal.Decimal('0.00'),
indexed=False)
class Counter(ndb.Model):
"""Tracks the number of shards for each named counter"""
@property
def shards(self):
qry = Shard.query().filter(Shard.counter_key == self.key)
results = qry.fetch(use_cache=False, use_memcache=False)
return filter(None, results)
@property
def total(self):
count = decimal.Decimal('0.00') # Use initial value if no shards
for shard in self.shards:
count += shard.count
return count
@ndb.tasklet
def incr_async(self, value, batch_key):
index = batch_key.id()
name = self.key.id() + str(index)
shard = Shard(id=name, count=value,
counter_key=self.key, batch_key=batch_key)
yield shard.put_async(use_cache=False, use_memcache=False)
def incr(self, *args, **kwargs):
return self.incr_async(*args, **kwargs).get_result()
@ndb.tasklet
def increment_batch(data_set):
batch_key = yield BatchInProgress().put_async()
deferred.defer(cleanup_failed_batch, batch_key, _countdown=3600)
# NOTE: mapping is modified in place, hence copying
mapping = copy.copy(data_set)
# (1/3) filter and fire off counter gets
# so the futures can autobatch
counters = {}
ctr_futs = {}
ctr_put_futs = []
zero_values = set()
for name, value in mapping.iteritems():
if value != decimal.Decimal('0.00'):
ctr_fut = Counter.get_by_id_async(name) # Use cache(s)
ctr_futs[name] = ctr_fut
else:
# Skip zero values because...
zero_values.add(name)
continue
for name in zero_values:
del mapping[name] # Remove all zero values from the mapping
del zero_values
while mapping: # Repeat until all transactions succeed
# (2/3) wait on counter gets and fire off increment transactions
# this way autobatchers should fill time
incr_futs = {}
for name, value in mapping.iteritems():
counter = counters.get(name)
if not counter:
counter = counters[name] = yield ctr_futs.pop(name)
if not counter:
logging.info('Creating new counter %s', name)
counter = counters[name] = Counter(id=name)
ctr_put_futs.append(counter.put_async())
else:
logging.debug('Reusing counter %s', name)
incr_fut = counter.incr_async(value, batch_key)
incr_futs[(name, value)] = incr_fut
# (3/3) wait on increments and handle errors
# by using a tuple key for variable access
for (name, value), incr_fut in incr_futs.iteritems():
counter = counters[name]
try:
yield incr_fut
except:
pass
else:
del mapping[name]
if mapping:
logging.warning('%i increments failed this batch.' % len(mapping))
yield batch_key.delete_async(), ctr_put_futs
raise ndb.Return(counters.values())
class ShardTestHandler(webapp2.RequestHandler):
@ndb.synctasklet
def get(self):
if self.request.GET.get('delete'):
ndb.delete_multi_async(Shard.query().fetch(keys_only=True))
ndb.delete_multi_async(Counter.query().fetch(keys_only=True))
ndb.delete_multi_async(BatchInProgress.query().fetch(keys_only=True))
else:
data_set_test = {}
for _ in xrange(TEST_BATCH_SIZE):
name = ''
for _ in xrange(TEST_NAME_LEN):
name += random.choice(string.letters)
value = decimal.Decimal('{0:.2f}'.format(random.random() * 100))
data_set_test[name] = value
yield increment_batch(data_set_test)
self.response.out.write("Done!")
app = webapp2.WSGIApplication([('/shard_test/', ShardTestHandler)], debug=True)
app = ndb.toplevel(app.__call__)
Specifically on the topic of the "The referenced transaction has expired or is no longer valid" BadRequestError, it's a little-advertised fact that transactions will time out much sooner than a request will. From creation, you get 15s of life for free, and after that the transaction is killed if it spends 15 consecutive seconds idle (so effective minimum lifespan is 30 seconds), and is hard-killed no matter what after 60 seconds. This makes it difficult to run large numbers of transactions in parallel since CPU contention and an unfair tasklet scheduling algorithm can conspire to keep some transactions idle too long.
The following monkeypatch to ndb's transaction method helps a bit by retrying expired transactions, but ultimately you have to tune your batching to reduce contention to manageable levels.
_ndb_context_transaction = ndb.Context.transaction
@ndb.tasklet
def _patched_transaction(self, callback, **ctx_options):
if (self.in_transaction() and
ctx_options.get('propagation') != ndb.TransactionOptions.INDEPENDENT):
raise ndb.Return((yield _ndb_context_transaction(self, callback, **ctx_options)))
attempts = 1
start_time = time.time()
me = random.getrandbits(16)
logging.debug('Transaction started <%04x>', me)
while True:
try:
result = yield _ndb_context_transaction(self, callback, **ctx_options)
except datastore_errors.BadRequestError as e:
if not ('expired' in str(e) and
attempts < _MAX_BAD_REQUEST_RECOVERY_ATTEMPTS):
raise
logging.warning(
'Transaction retrying <%04x> (attempt #%d, %.1f seconds) on BadRequestError: %s',
me, attempts, time.time() - start_time, e)
attempts += 1
else:
logging.debug(
'Transaction finished <%04x> (attempt #%d, %.1f seconds)',
me, attempts, time.time() - start_time)
raise ndb.Return(result)
ndb.Context.transaction = _patched_transaction
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With