Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reliable way to execute thousands of independent transaction?

I am reaching a bottleneck on my application and having a tough time finding a solution around it. A little background:

  • My app pings an API to gather information on hundreds of thousands of items and store them to the datastore
  • We need to perform simple aggregations on a mix of dimensions of these items, which we try and compute during the time we store the items

Current implementation:

  • We kick off a download of these items manually as needed, which creates tasks on a backend dedicated for downloading these items. Each task will launch more tasks depending on the # of API calls required to paginate through and obtain every item.
  • Each task will download, parse, and bulk store the items, while keeping the aggregations we want in memory by use of a dictionary.
  • At the end of each tasks execution, we write the dictionary of aggregates to a pull queue.
  • Once we detect we are nearing the end of the API calls we kick off an aggregation task to a second backend configuration
  • This "aggregation task" pulls from the pull queue (20 at a time), and merges the dictionaries found in each task (futher doing in memory aggregation), before trying to store each aggregate. This task will also launch other tasks to perform aggregations for remaining tasks in the pull queue (hundreds)
  • We use the sharded counter approach to help alleviate any contention when storing to the datastore
  • Each aggregation task can try and store 500-1500 aggregations, which should all be independent of one another

There are additional checks and such in there to ensure all pull queue tasks are properly processed and all items are downloaded.

The Problem:

We want to download and store all the items and aggregates as fast as possible. I have 20 instances enabled for each backend configuration described (I'll refer to them as the "aggregator" backend and "downloader" backend). The downloader backend seems to get through the API calls fairly fast. I make heavy use of the NDB library and asynchronous URL Fetches/Datastore calls in order to obtain this. I've also enabled threadsafe:true so that no instance will be waiting for RPC calls to finish before starting the next task (all tasks can operated independent of one another and are idempotent).

The aggregator backend is where the big time sink comes to play. Storing 500-1500 of these aggregates asynchronously through transactions takes 40 seconds or more (and I don't even think all transactions are being properly committed). I keep this backend with threadsafe:false since I use a pull queue expiration deadline of 300 seconds, but if I allow more than one task to execute on a single instance, they may cascade down and push finishing some of the tasks over the 300 second mark, thus allowing for another task to pull the same task a second time and possibly double-counting.

The logs shows BadRequestError: Nested transactions are not supported. with a previous error (in the stack trace) of TransactionFailedError: too much contention on these datastore entities. please try again.. Another error I commonly see is BadRequestError(The referenced transaction has expired or is no longer valid.)

From my understanding, sometimes these errors mean that a transaction can still be committed without further interaction. How do I know if this has been properly committed? Am I doing this in a logical/efficient manner or is there more room for concurrency without risk of messing everything up?

Relevant Code:

class GeneralShardConfig(ndb.Model):
    """Tracks the number of shards for each named counter."""
    name = ndb.StringProperty(required=True)
    num_shards = ndb.IntegerProperty(default=4)

class GeneralAggregateShard(ndb.Model):
    """Shards for each named counter"""
    name = ndb.StringProperty(name='n', required=True)
    count = ndb.FloatProperty(name='c', default=0.00) #acts as a total now

@ndb.tasklet
def increment_batch(data_set):
    def run_txn(name, value):
        @ndb.tasklet
        def txn():
            to_put = []
            dbkey = ndb.Key(GeneralShardConfig, name)
            config = yield dbkey.get_async(use_memcache=False)
            if not config:
                config = GeneralShardConfig(key=dbkey,name=name)
                to_put.append(config)
            index = random.randint(0, config.num_shards-1)
            shard_name =  name + str(index)
            dbkey = ndb.Key(GeneralAggregateShard, shard_name)
            counter = yield dbkey.get_async()
            if not counter:
                counter = GeneralAggregateShard(key=dbkey, name=name)
            counter.count += value
            to_put.append(counter)
            yield ndb.put_multi_async(to_put)
        return ndb.transaction_async(txn, use_memcache=False, xg=True)
    res = yield[run_txn(key, value) for key, value in data_set.iteritems() if value != 0.00]
    raise ndb.Return(res)

Given the implementation, the only room for "contention" I see is if 2 or more aggregate tasks need to update the same aggregate name, which shouldn't happen too frequently, and with sharded counters I would expect this overlap to rarely, if ever, occur. I assume the BadRequestError(The referenced transaction has expired or is no longer valid.) error appears when the event loop is checking the status of all the tasklets and hits a reference to a transaction that is finished. Problem here is it errors out so does that mean all transactions are prematurely cut off or can I assume all transactions went through? I further assume this line res = yield[run_txn(key, value) for key, value in data_set.iteritems() if value != 0.00] needs to be broken into a try/except for each tasklet to detect these errors.

Before I drive myself mad over this, I'd appreciate any guidance/help on how to optimize this process and do so in a reliable way.

EDIT 1: I modified the aggregator task behavior as follows:

  • If more than 1 tasks was leased from the queue, aggregate the tasks in memory, then store the result in another task in the pull-queue, and immediately launch another "aggregator task"
  • Else, if 1 task was leased, try to save the results

This has helped reduce the contention errors I've been seeing, but its still not very reliable. Most recently, I hit BadRequestError: Nested transactions are not supported. with a stack trace indicating RuntimeError: Deadlock waiting for <Future fbf0db50 created by transaction_async(model.py:3345) for tasklet transaction(context.py:806) suspended generator transaction(context.py:876); pending>

I am under the belief that this modification should optimize the process by allowing all possible overlaps in the aggregation process to be combined and tried all at once in a single instance, versus multiple instances all performing transactions that may collide. I am still having issues saving the results in a reliable manner.

like image 534
someone1 Avatar asked Jun 28 '12 18:06

someone1


2 Answers

By reducing the datastore I/O (leaving work to the autobatchers and disabling indexing) you can be more certain that the datastore writes complete (less contention) and it should be faster.

The config (renamed counter) gets are outside of the transaction(s) and can run concurrently whilst looping through the transactions.

Methods and a total property were added to Counter to (hopefully) make it easier to modify in future.

Created a new ndb Property for decimal support (assuming that is why you are specifying 0.00 instead of 0.0).

EDIT:

Removed the need for transactions and changed the sharding system for reliability.

import webapp2

import copy
import decimal
import logging
import random
import string

from google.appengine.api import datastore_errors
from google.appengine.datastore import entity_pb
from google.appengine.ext import deferred
from google.appengine.ext import ndb


TEST_BATCH_SIZE = 250
TEST_NAME_LEN = 12


class DecimalProperty(ndb.Property):
    """A Property whose value is a decimal.Decimal object."""

    def _datastore_type(self, value):
      return str(value)

    def _validate(self, value):
      if not isinstance(value, decimal.Decimal):
        raise datastore_errors.BadValueError('Expected decimal.Decimal, got %r'
                                             % (value,))
      return value

    def _db_set_value(self, v, p, value):
        value = str(value)
        v.set_stringvalue(value)
        if not self._indexed:
            p.set_meaning(entity_pb.Property.TEXT)

    def _db_get_value(self, v, _):
        if not v.has_stringvalue():
            return None
        value = v.stringvalue()
        return decimal.Decimal(value)

class BatchInProgress(ndb.Model):
    """Use a scheduler to delete batches in progress after a certain time"""

    started = ndb.DateTimeProperty(auto_now=True)

    def clean_up(self):
        qry = Shard.query().filter(Shard.batch_key == self.key)
        keys = qry.fetch(keys_only=True)
        while keys:
            ndb.delete_multi(keys)
            keys = qry.fetch(keys_only=True)

def cleanup_failed_batch(batch_key):
    batch = batch_key.get()

    if batch:
        batch.clean_up()
        batch.delete()

class Shard(ndb.Model):
    """Shards for each named counter"""

    counter_key = ndb.KeyProperty(name='c')
    batch_key = ndb.KeyProperty(name='b')
    count = DecimalProperty(name='v', default=decimal.Decimal('0.00'),
                            indexed=False)

class Counter(ndb.Model):
    """Tracks the number of shards for each named counter"""

    @property
    def shards(self):
        qry = Shard.query().filter(Shard.counter_key == self.key)
        results = qry.fetch(use_cache=False, use_memcache=False)
        return filter(None, results)

    @property
    def total(self):
        count = decimal.Decimal('0.00') # Use initial value if no shards

        for shard in self.shards:
            count += shard.count

        return count

    @ndb.tasklet
    def incr_async(self, value, batch_key):
        index = batch_key.id()
        name = self.key.id() + str(index)

        shard = Shard(id=name, count=value,
                      counter_key=self.key, batch_key=batch_key)

        yield shard.put_async(use_cache=False, use_memcache=False)

    def incr(self, *args, **kwargs):
        return self.incr_async(*args, **kwargs).get_result()

@ndb.tasklet
def increment_batch(data_set):
    batch_key = yield BatchInProgress().put_async()
    deferred.defer(cleanup_failed_batch, batch_key, _countdown=3600)

    # NOTE: mapping is modified in place, hence copying
    mapping = copy.copy(data_set)

    # (1/3) filter and fire off counter gets
    #       so the futures can autobatch
    counters = {}
    ctr_futs = {}
    ctr_put_futs = []
    zero_values = set()
    for name, value in mapping.iteritems():
        if value != decimal.Decimal('0.00'):
            ctr_fut = Counter.get_by_id_async(name) # Use cache(s)
            ctr_futs[name] = ctr_fut
        else:
            # Skip zero values because...
            zero_values.add(name)
            continue

    for name in zero_values:
        del mapping[name] # Remove all zero values from the mapping
    del zero_values

    while mapping: # Repeat until all transactions succeed

        # (2/3) wait on counter gets and fire off increment transactions
        #       this way autobatchers should fill time
        incr_futs = {}
        for name, value in mapping.iteritems():
            counter = counters.get(name)
            if not counter:
                counter = counters[name] = yield ctr_futs.pop(name)
            if not counter:
                logging.info('Creating new counter %s', name)
                counter = counters[name] = Counter(id=name)
                ctr_put_futs.append(counter.put_async())
            else:
                logging.debug('Reusing counter %s', name)
            incr_fut = counter.incr_async(value, batch_key)
            incr_futs[(name, value)] = incr_fut

        # (3/3) wait on increments and handle errors
        #       by using a tuple key for variable access
        for (name, value), incr_fut in incr_futs.iteritems():
            counter = counters[name]
            try:
                yield incr_fut
            except:
                pass
            else:
                del mapping[name]

        if mapping:
            logging.warning('%i increments failed this batch.' % len(mapping))

    yield batch_key.delete_async(), ctr_put_futs

    raise ndb.Return(counters.values())

class ShardTestHandler(webapp2.RequestHandler):

    @ndb.synctasklet
    def get(self):
        if self.request.GET.get('delete'):
            ndb.delete_multi_async(Shard.query().fetch(keys_only=True))
            ndb.delete_multi_async(Counter.query().fetch(keys_only=True))
            ndb.delete_multi_async(BatchInProgress.query().fetch(keys_only=True))
        else:
            data_set_test = {}
            for _ in xrange(TEST_BATCH_SIZE):
                name = ''
                for _ in xrange(TEST_NAME_LEN):
                    name += random.choice(string.letters)
                value = decimal.Decimal('{0:.2f}'.format(random.random() * 100))
                data_set_test[name] = value
            yield increment_batch(data_set_test)
        self.response.out.write("Done!")

app = webapp2.WSGIApplication([('/shard_test/', ShardTestHandler)], debug=True)
app = ndb.toplevel(app.__call__)
like image 167
33 revs Avatar answered Nov 05 '22 20:11

33 revs


Specifically on the topic of the "The referenced transaction has expired or is no longer valid" BadRequestError, it's a little-advertised fact that transactions will time out much sooner than a request will. From creation, you get 15s of life for free, and after that the transaction is killed if it spends 15 consecutive seconds idle (so effective minimum lifespan is 30 seconds), and is hard-killed no matter what after 60 seconds. This makes it difficult to run large numbers of transactions in parallel since CPU contention and an unfair tasklet scheduling algorithm can conspire to keep some transactions idle too long.

The following monkeypatch to ndb's transaction method helps a bit by retrying expired transactions, but ultimately you have to tune your batching to reduce contention to manageable levels.

_ndb_context_transaction = ndb.Context.transaction

@ndb.tasklet
def _patched_transaction(self, callback, **ctx_options):
  if (self.in_transaction() and
      ctx_options.get('propagation') != ndb.TransactionOptions.INDEPENDENT):
    raise ndb.Return((yield _ndb_context_transaction(self, callback, **ctx_options)))

  attempts = 1
  start_time = time.time()
  me = random.getrandbits(16)
  logging.debug('Transaction started <%04x>', me)
  while True:
    try:
      result = yield _ndb_context_transaction(self, callback, **ctx_options)
    except datastore_errors.BadRequestError as e:
      if not ('expired' in str(e) and
              attempts < _MAX_BAD_REQUEST_RECOVERY_ATTEMPTS):
        raise
      logging.warning(
          'Transaction retrying <%04x> (attempt #%d, %.1f seconds) on BadRequestError: %s',
          me, attempts, time.time() - start_time, e)
      attempts += 1
    else:
      logging.debug(
          'Transaction finished <%04x> (attempt #%d, %.1f seconds)',
           me, attempts, time.time() - start_time)
      raise ndb.Return(result)

ndb.Context.transaction = _patched_transaction
like image 44
Piotr Avatar answered Nov 05 '22 19:11

Piotr