I'm looking into using celery (3.1.8) to process huge text files (~30GB) each. These files are in fastq format and contain about 118M sequencing "reads", which are essentially each a combination of header, DNA sequence, and quality string). Also, these sequences are from a paired-end sequencing run, so I'm iterating two files simultaneously (via itertools.izip). What I'd like to be able to do is take each pair of reads, send them to a queue, and have them be processed on one of the machines in our cluster (don't care which) to return a cleaned-up version of the read, if cleaning needs to happen (e.g., based on quality).
I've set up celery and rabbitmq, and my workers are launched as follows:
celery worker -A tasks --autoreload -Q transient
and configured like:
from kombu import Queue
BROKER_URL = 'amqp://guest@godel97'
CELERY_RESULT_BACKEND = 'rpc'
CELERY_TASK_SERIALIZER = 'pickle'
CELERY_RESULT_SERIALIZER = 'pickle'
CELERY_ACCEPT_CONTENT=['pickle', 'json']
CELERY_TIMEZONE = 'America/New York'
CELERY_ENABLE_UTC = True
CELERYD_PREFETCH_MULTIPLIER = 500
CELERY_QUEUES = (
Queue('celery', routing_key='celery'),
Queue('transient', routing_key='transient',delivery_mode=1),
)
I've chosen to use an rpc backend and pickle serialization for performance, as well as not writing anything to disk in the 'transient' queue (via delivery_mode).
To set up the celery framework, I first launch the rabbitmq server (3.2.3, Erlang R16B03-1) on a 64-way box, writing log files to a fast /tmp disk. Worker processes (as above) are launched on each node on the cluster (about 34 of them) ranging anywhere from 8-way to 64-way SMP for a total of 688 cores. So, I have a ton of available CPUs for the workers to use to process of the queue.
Once celery is up and running, I submit the jobs via an ipython notebook as below:
files = [foo, bar]
f1 = open(files[0])
f2 = open(files[1])
res = []
count = 0
for r1, r2 in izip(FastqGeneralIterator(f1), FastqGeneralIterator(f2)):
count += 1
res.append(tasks.process_read_pair.s(r1, r2))
if count == 10000:
break
t.stop()
g = group(res)
for task in g.tasks:
task.set(queue="transient")
This takes about a 1.5s for 10000 pairs of reads. Then, I call delay on the group to submit to the workers, which takes about 20s, as below:
result = g.delay()
Monitoring with rabbitmq console, I see that I'm doing OK, but not nearly fast enough.
So, is there any way to speed this up? I mean, I'd like to see at least 50,000 read pairs processed every second rather than 500. Is there anything obvious that I'm missing in my celery configuration? My worker and rabbit logs are essentially empty. Would love some advice on how to get my performance up. Each individual read pair processes pretty quickly, too:
[2014-01-29 13:13:06,352: INFO/Worker-1] tasks.process_read_pair[95ec7f2f-0143-455a-a23b-c032998951b8]: HWI-ST425:143:C04A5ACXX:3:1101:13938:2894 1:N:0:ACAGTG HWI-ST425:143:C04A5ACXX:3:1101:13938:2894 2:N:0:ACAGTG 0.00840497016907 sec
So up to this point, I've googled all I can think of with celery, performance, routing, rabbitmq, etc. I've been through the celery website and docs. If I can't get the performance higher, I'll have to abandon this method in favor of another solution (basically dividing up the work into many smaller physical files and processing them directly on each compute node with multiprocessing or something). It would be a shame to not be able to spread this load out over the cluster, though. Plus, this seems like an exquisitely elegant solution.
Thanks in advance for any help!
Not an answer but too long for a comment.
Let's narrow the problem down a little...
Firstly, try skipping all your normal logic/message preparation and just do the tightest possible publishing loop with your current library. See what rate you get. This will identify if it's a problem with your non-queue-related code.
If it's still slow, set up a new python script but use amqplib instead of celery. I've managed to get it publishing at over 6000/s while doing useful work (and json encoding) on a mid-range desktop, so I know that it's performant. This will identify if the problem is with the celery library. (To save you time, I've snipped the following from a project of mine and hopefully not broken it when simplifying...)
from amqplib import client_0_8 as amqp
try:
lConnection = amqp.Connection(
host=###,
userid=###,
password=###,
virtual_host=###,
insist=False)
lChannel = lConnection.channel()
Exchange = ###
for i in range(100000):
lMessage = amqp.Message("~130 bytes of test data..........................................................................................................")
lMessage.properties["delivery_mode"] = 2
lChannel.basic_publish(lMessage, exchange=Exchange)
lChannel.close()
lConnection.close()
except Exception as e:
#Fail
Between the two approaches above you should be able to track down the problem to one of the Queue, the Library or your code.
Reusing the producer instance should give you some performance improvement:
with app.producer_or_acquire() as producer:
task.apply_async(producer=producer)
Also the task may be a proxy object and if so must be evaluated for every invocation:
task = task._get_current_object()
Using group
will automatically reuse the producer and is usually what you would
do in a loop like this:
process_read_pair = tasks.process_read_pair.s
g = group(
process_read_pair(r1, r2)
for r1, r2 in islice(
izip(FastGeneralIterator(f1), FastGeneralIterator(f2)), 0, 1000)
)
result = g.delay()
You can also consider installing the librabbitmq
module which is written in C.
The amqp://
transport will automatically use it if available (or can be specified manually using librabbitmq://
:
pip install librabbitmq
Publishing messages directly using the underlying library may be faster since it will bypass the celery routing helpers and so on, but I would not think it was that much slower. If so there is definitely room for optimization in Celery, as I have mostly focused on optimizing the consumer side so far.
Note also that you may want to process multiple DNA pairs in the same task, as using coarser task granularity may be beneficial for CPU/memory caches and so on, and it will often saturate parallelization anyway since that is a finite resource.
NOTE: The transient queue should be durable=False
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With