Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Stream results in celery

Tags:

python

celery

I'm trying to use celery to schedule and run tasks on a fleet of servers. Each task is somewhat long running (few hours), and involves using subprocess to call a certain program with the given inputs. This program produces a lot of output both in stdout and stderr.

Is there some way to show the output produced by the program to the client in near real time? Stream the output, so that the client can watch the output spewed by the task running on the server without logging into the server?

like image 513
Anand Avatar asked May 04 '13 07:05

Anand


3 Answers

You did not specify many requirements and constraints. I'm going to assume you already have a redis instance somewhere.

What you can do is read the output from the other process line by line and publish it through redis:

Here's an example where you can echo data into a file /tmp/foo for testing:

import redis
redis_instance = redis.Redis()
p = subprocess.Popen(shlex.split("tail -f /tmp/foo"), stdout=subprocess.PIPE)

while True:
    line = p.stdout.readline()
    if line:
        redis_instance.publish('process log', line)
    else:
        break

In a separate process:

import redis

redis_instance = redis.Redis()
pubsub = redis_instance.pubsub()
pubsub.subscribe('process log')

while True:
    for message in pubsub.listen():
        print message  #  or use websockets to comunicate with a browser

If you want the process to end, you can e.g. send a "quit" after the celery task is done.

You can use different channels (the string in subscribe) to separate the output from different processes.

You can also store your log output in redis, if you want to,

redis_instance.rpush('process log', message)

and later retrieve it in full.

like image 62
Thomas Fenzl Avatar answered Oct 24 '22 09:10

Thomas Fenzl


The one way I see how to do it is to write custom Logger which will be used for stderr and stdout (see the docs:

from celery.app.log import Logger
Logger.redirect_stdouts_to_logger(MyLogger())

Your logger can save the data into the database, Memcached, Redis or whatever shared storage you'll use to get the data.

I'm not sure about the structure of the logger, but I guess something like this will work:

from logging import Logger

class MyLogger(Logger):
    def log(lvl, msg):
        # Do something with the message
like image 6
Denis Malinovsky Avatar answered Oct 24 '22 09:10

Denis Malinovsky


This is an old question but it's still pretty much the only result about this specific topic.

Here's how I went about it, I created a simple file-like object that publishes to a specific channel over Redis

class RedisFileObject(object):
    def __init__(self, _key):
        self.connection = redis.Redis()
        self.key = _key
        self.connection.publish('debug', 'Created channel %s' % self.key)

    def write(self, data):
        self.connection.publish(self.key, data)

    def close(self):
        pass

I have a BaseTask from which all of my tasks inherits various functions incl. this one that replaces stdout and stderr with the Redis file-like object.

def capture_output(self):
    sys.stdout = RedisFileObject(self.request.id)
    sys.stderr = RedisFileObject(self.request.id)

From there on anything written to stdout/stderr will be forwarded to a Redis channel named after the task id.

like image 4
Leo Avatar answered Oct 24 '22 09:10

Leo