I'm trying to use celery to schedule and run tasks on a fleet of servers. Each task is somewhat long running (few hours), and involves using subprocess to call a certain program with the given inputs. This program produces a lot of output both in stdout and stderr.
Is there some way to show the output produced by the program to the client in near real time? Stream the output, so that the client can watch the output spewed by the task running on the server without logging into the server?
You did not specify many requirements and constraints. I'm going to assume you already have a redis instance somewhere.
What you can do is read the output from the other process line by line and publish it through redis:
Here's an example where you can echo
data into a file /tmp/foo
for testing:
import redis
redis_instance = redis.Redis()
p = subprocess.Popen(shlex.split("tail -f /tmp/foo"), stdout=subprocess.PIPE)
while True:
line = p.stdout.readline()
if line:
redis_instance.publish('process log', line)
else:
break
In a separate process:
import redis
redis_instance = redis.Redis()
pubsub = redis_instance.pubsub()
pubsub.subscribe('process log')
while True:
for message in pubsub.listen():
print message # or use websockets to comunicate with a browser
If you want the process to end, you can e.g. send a "quit" after the celery task is done.
You can use different channels (the string in subscribe
) to separate the output from different processes.
You can also store your log output in redis, if you want to,
redis_instance.rpush('process log', message)
and later retrieve it in full.
The one way I see how to do it is to write custom Logger which will be used for stderr and stdout (see the docs:
from celery.app.log import Logger
Logger.redirect_stdouts_to_logger(MyLogger())
Your logger can save the data into the database, Memcached, Redis or whatever shared storage you'll use to get the data.
I'm not sure about the structure of the logger, but I guess something like this will work:
from logging import Logger
class MyLogger(Logger):
def log(lvl, msg):
# Do something with the message
This is an old question but it's still pretty much the only result about this specific topic.
Here's how I went about it, I created a simple file-like object that publishes to a specific channel over Redis
class RedisFileObject(object):
def __init__(self, _key):
self.connection = redis.Redis()
self.key = _key
self.connection.publish('debug', 'Created channel %s' % self.key)
def write(self, data):
self.connection.publish(self.key, data)
def close(self):
pass
I have a BaseTask from which all of my tasks inherits various functions incl. this one that replaces stdout and stderr with the Redis file-like object.
def capture_output(self):
sys.stdout = RedisFileObject(self.request.id)
sys.stderr = RedisFileObject(self.request.id)
From there on anything written to stdout/stderr will be forwarded to a Redis channel named after the task id.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With