Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Tweaking celery for high performance

I'm trying to send ~400 HTTP GET requests and collect the results. I'm running from django. My solution was to use celery with gevent.

To start the celery tasks I call get_reports :

def get_reports(self, clients, *args, **kw):
    sub_tasks = []
    for client in clients:  
            s = self.get_report_task.s(self, client, *args, **kw).set(queue='io_bound')
        sub_tasks.append(s)
    res = celery.group(*sub_tasks)()
    reports = res.get(timeout=30, interval=0.001)
    return reports

@celery.task
def get_report_task(self, client, *args, **kw):
    report = send_http_request(...)
    return report

I use 4 workers:

manage celery worker -P gevent --concurrency=100 -n a0 -Q io_bound
manage celery worker -P gevent --concurrency=100 -n a1 -Q io_bound
manage celery worker -P gevent --concurrency=100 -n a2 -Q io_bound
manage celery worker -P gevent --concurrency=100 -n a3 -Q io_bound

And I use RabbitMq as the broker.

And although it works much faster than running the requests sequentially (400 requests took ~23 seconds), I noticed that most of that time was overhead from celery itself, i.e. if I changed get_report_task like this:

@celery.task
def get_report_task(self, client, *args, **kw):
    return []

this whole operation took ~19 seconds. That means that I spend 19 seconds only on sending all the tasks to celery and getting the results back

The queuing rate of messages to rabbit mq is seems to be bound to 28 messages / sec and I think that this is my bottleneck.

I'm running on a win 8 machine if that matters.

some of the things I've tried:

  • using redis as broker
  • using redis as results backend
  • tweaking with those settings

    BROKER_POOL_LIMIT = 500

    CELERYD_PREFETCH_MULTIPLIER = 0

    CELERYD_MAX_TASKS_PER_CHILD = 100

    CELERY_ACKS_LATE = False

    CELERY_DISABLE_RATE_LIMITS = True

I'm looking for any suggestions that will help speed things up.

like image 578
lev Avatar asked Sep 17 '13 07:09

lev


3 Answers

Are you really running on Windows 8 without a Virtual Machine? I did the following simple test on 2 Core Macbook 8GB RAM running OS X 10.7:

import celery
from time import time

@celery.task
def test_task(i):
    return i

grp = celery.group(test_task.s(i) for i in range(400))
tic1 = time(); res = grp(); tac1 = time()
print 'queued in', tac1 - tic1
tic2 = time(); vals = res.get(); tac2 = time()
print 'executed in', tac2 - tic2

I'm using Redis as broker, Postgres as a result backend and default worker with --concurrency=4. Guess what is the output? Here it is:

queued in 3.5009469986

executed in 2.99818301201

like image 101
Sergey Panfilov Avatar answered Sep 22 '22 11:09

Sergey Panfilov


Well it turnes out I had 2 separate issues.

First off, the task was a member method. After extracting it out of the class, the time went down to about 12 seconds. I can only assume it has something to do with the pickling of self.

The second thing was the fact that it ran on windows. After running it on my linux machine, the run time was less than 2 seconds. Guess windows just isn't cut for high performance..

like image 29
lev Avatar answered Sep 24 '22 11:09

lev


How about using twisted instead? You can reach for much simpler application structure. You can send all 400 requests from the django process at once and wait for all of them to finish. This works simultaneously because twisted sets the sockets into non-blocking mode and only reads the data when its available.

I had a similar problem a while ago and I've developed a nice bridge between twisted and django. I'm running it in production environment for almost a year now. You can find it here: https://github.com/kowalski/featdjango/. In simple words it has the main application thread running the main twisted reactor loop and the django view results is delegated to a thread. It use a special threadpool, which exposes methods to interact with reactor and use its asynchronous capabilities.

If you use it, your code would look like this:

from twisted.internet import defer
from twisted.web.client import getPage

import threading


def get_reports(self, urls, *args, **kw):
    ct = threading.current_thread()

    defers = list()
    for url in urls:
        # here the Deferred is created which will fire when
        # the call is complete
        d = ct.call_async(getPage, args=[url] + args, kwargs=kw)
        # here we keep it for reference
        defers.append(d)

    # here we create a Deferred which will fire when all the
    # consiting Deferreds are completed
    deferred_list = defer.DeferredList(defers, consumeErrors=True)
    # here we tell the current thread to wait until we are done
    results = ct.wait_for_defer(deferred_list)

    # the results is a list of the form (C{bool} success flag, result)
    # below unpack it
    reports = list()
    for success, result in results:
        if success:
            reports.append(result)
        else:
            # here handle the failure, or just ignore
            pass

    return reports

This still is something you can optimize a lot. Here, every call to getPage() would create a separate TCP connection and close it when its done. This is as optimal as it can be, providing that each of your 400 requests is sent to a different host. If this is not a case, you can use a http connection pool, which uses persistent connections and http pipelineing. You instantiate it like this:

from feat.web import httpclient

pool = httpclient.ConnectionPool(host, port, maximum_connections=3)

Than a single request is perform like this (this goes instead the getPage() call):

d = ct.call_async(pool.request, args=(method, path, headers, body))
like image 25
Marek Kowalski Avatar answered Sep 22 '22 11:09

Marek Kowalski