Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

limited number of user-initiated background processes

I need to allow users to submit requests for very, very large jobs. We are talking 100 gigabytes of memory and 20 hours of computing time. This costs our company a lot of money, so it was stipulated that only 2 jobs could be running at any time, and requests for new jobs when 2 are already running would be rejected (and the user notified that the server is busy).

My current solution uses an Executor from concurrent.futures, and requires setting the Apache server to run only one process, reducing responsiveness (current user count is very low, so it's okay for now).

If possible I would like to use Celery for this, but I did not see in the documentation any way to accomplish this particular setting.

How can I run up to a limited number of jobs in the background in a Django application, and notify users when jobs are rejected because the server is busy?

like image 328
Nate Glenn Avatar asked Sep 09 '16 17:09

Nate Glenn


2 Answers

I have two solutions for this particular case, one an out of the box solution by celery, and another one that you implement yourself.

  1. You can do something like this with celery workers. In particular, you only create two worker processes with concurrency=1 (or well, one with concurrency=2, but that's gonna be threads, not different processes), this way, only two jobs can be done asynchronously. Now you need a way to raise exceptions if both jobs are occupied, then you use inspect, to count the number of active tasks and throw exceptions if required. For implementation, you can checkout this SO post.

You might also be interested in rate limits.

  1. You can do it all yourself, using a locking solution of choice. In particular, a nice implementation that makes sure only two processes are running with redis (and redis-py) is as simple as the following. (Considering you know redis, since you know celery)

    from redis import StrictRedis
    
    redis = StrictRedis('localhost', '6379')
    locks = ['compute:lock1', 'compute:lock2']
    for key in locks:
        lock = redis.lock(key, blocking_timeout=5)
        acquired = lock.acquire()
        if acquired:
            do_huge_computation()
            lock.release()
            break
        print("Gonna try next possible slot")
    
    if not acquired:
        raise SystemLimitsReached("Already at max capacity !")
    

This way you make sure only two running processes can exist in the system. A third processes will block in the line lock.acquire() for blocking_timeout seconds, if the locking was successful, acquired would be True, else it's False and you'd tell your user to wait !

I had the same requirement sometime in the past and what I ended up coding was something like the solution above. In particular

  1. This has the least amount of race conditions possible
  2. It's easy to read
  3. Doesn't depend on a sysadmin, suddenly doubling the concurrency of workers under load and blowing up the whole system.
  4. You can also implement the limit per user, meaning each user can have 2 simultaneous running jobs, by only changing the lock keys from compute:lock1 to compute:userId:lock1 and lock2 accordingly. You can't do this one with vanila celery.
like image 80
SpiXel Avatar answered Nov 01 '22 15:11

SpiXel


First of all you need to limit concurrency on your worker (docs):

celery -A proj worker --loglevel=INFO --concurrency=2 -n <worker_name>

This will help to make sure that you do not have more than 2 active tasks even if you will have errors in the code.

Now you have 2 ways to implement task number validation:

  1. You can use inspect to get number of active and scheduled tasks:

     from celery import current_app
    
     def start_job():
          inspect = current_app.control.inspect()
          active_tasks = inspect.active() or {}
          scheduled_tasks = inspect.scheduled() or {}
          worker_key = 'celery@%s' % <worker_name>
          worker_tasks = active_tasks.get(worker_key, []) + scheduled_tasks.get(worker_key, [])
          if len(worker_tasks) >= 2:
              raise MyCustomException('It is impossible to start more than 2 tasks.') 
          else:
              my_task.delay()
    
  2. You can store number of currently executing tasks in DB and validate task execution based on it.

Second approach could be better if you want to scale your functionality - introduce premium users or do not allow to execute 2 requests by one user.

like image 20
zymud Avatar answered Nov 01 '22 13:11

zymud