I want to convert my homegrown task queue system into a Celery-based task queue, but one feature I currently have is causing me some distress.
Right now, my task queue operates very coarsely; I run the job (which generates data and uploads it to another server), collect the logging using a variant on Nose's log capture library, and then I store the logging for the task as a detailed result record in the application database.
I would like to break this down as three tasks:
The real kicker here is the logging collection. Right now, using the log capture, I have a series of log records for each log call made during the data generation and upload process. These are required for diagnostic purposes. Given that the tasks are not even guaranteed to run in the same process, it's not clear how I would accomplish this in a Celery task queue.
My ideal solution to this problem will be a trivial and ideally minimally invasive method of capturing all logging during the predecessor tasks (1, 2) and making it available to the reporter task (3)
Am I best off remaining fairly coarse-grained with my task definition, and putting all of this work in one task? or is there a way to pass the existing captured logging around in order to collect it at the end?
I assume you are using logging
module. You can use separate named logger per task set to do the job. They will inherit all configuration from upper level.
in task.py
:
import logging
@task
step1(*args, **kwargs):
# `key` is some unique identifier common for a piece of data in all steps of processing
logger = logging.getLogger("myapp.tasks.processing.%s"%key)
# ...
logger.info(...) # log something
@task
step2(*args, **kwargs):
logger = logging.getLogger("myapp.tasks.processing.%s"%key)
# ...
logger.info(...) # log something
Here, all records were sent to the same named logger. Now, you can use 2 approaches to fetch those records:
Configure file listener with name that depends on logger name. After last step, just read all info from that file. Make sure output buffering is disabled for this listener or you risk loosing records.
Create custom listener that would accumulate records in memory then return them all when told so. I'd use memcached for storage here, it's simpler than creating your own cross-process storage.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With