Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using Celery as a control channel for Twisted applications

I am trying to use Celery as the control channel for a Twisted application. My Twisted application is an abstraction layer that provides a standard interface to various locally running processes (via ProcessProtocol). I would like to use Celery to control this remotely - AMQP seems like the ideal method of controlling many Twisted apps from a central location, and I would like to take advantage of Celery's task-based features, e.g. task retries, subtasks etc.

This is not working as I had planned, and I am hoping someone can help point me in the right direction to get this working.

The behaviour I am trying to achieve when I run my script is:

  • Start a slightly-modified celeryd (see below)
  • Wait for Celery tasks
  • When the 'start process' task is received, spawn a ProcessProtocol
  • When other tasks are received, run a function on the Twisted protocol and return the result using Deferreds

The 'slightly-modified celeryd' is celeryd with a small modification that allows tasks to access the Twisted reactor via self.app.twisted, and the spawned process via self.app.process. To keep things simple I am using Celery's 'solo' process pool implentation, which does not fork a new process for task workers.

My problem occurs when I try and use a Celery task to initialise the ProcessProtocol (i.e. starting the external process). The process starts correctly, but the ProcessProtocol's childDataReceived never gets called. I think this is something to do with file descriptors not being inherited/set correctly.

Below is some sample code, based on the 'wc' example in the ProcessProtocol documentation. It includes two Celery tasks - one to start the wc process, and another to count the words in some text (using the previously-started wc process).

This example is rather contrived but if I can get this working it will serve as a good starting point for implementing my ProcessProtocols, which are long-running processes which will respond to commands written to stdin.

I am testing this by running the Celery daemon first:

python2.6 mycelery.py -l info -P solo

Then, in another window, running a script which sends two tasks:

python2.6 command_test.py

The expected behaviour of command_test.py is for two commands to execute - one starts the wc process, and the other sends some text to CountWordsTask. What actually happens is:

  • The StartProcTask spawns the process, and receives 'process started' as a response via a Deffered
  • The CountWordsTask never receives a result, because childDataReceived is never called

Can anyone shed some light on this, or offer some advice on how best to use Celery as a control channel for Twisted ProcessProtocols?

Would it be better to write a Twisted-backed ProcessPool implementation for Celery? Is my method of calling WorkerCommand.execute_from_commandline via reactor.callLater the right approach to ensure everything happens within the Twisted thread?

I have read about AMPoule, which I think could provide some of this functionality, but would like to stick with Celery if possible as I use it in other parts of my application.

Any help or assistance will be gratefully appreciated!

myceleryd.py

from functools import partial
from celery.app import App
from celery.bin.celeryd import WorkerCommand
from twisted.internet import reactor


class MyCeleryApp(App):
    def __init__(self, twisted, *args, **kwargs):
        self.twisted = twisted
        super(MyCeleryApp, self).__init__(*args, **kwargs)

def main():
    get_my_app = partial(MyCeleryApp, reactor)
    worker = WorkerCommand(get_app=get_my_app)
    reactor.callLater(1, worker.execute_from_commandline)
    reactor.run()

if __name__ == '__main__':
    main()

protocol.py

from twisted.internet import protocol
from twisted.internet.defer import Deferred

class WCProcessProtocol(protocol.ProcessProtocol):

    def __init__(self, text):
        self.text = text
        self._waiting = {} # Dict to contain deferreds, keyed by command name

    def connectionMade(self):
        if 'startup' in self._waiting:
            self._waiting['startup'].callback('process started')

    def outReceived(self, data):
        fieldLength = len(data) / 3
        lines = int(data[:fieldLength])
        words = int(data[fieldLength:fieldLength*2])
        chars = int(data[fieldLength*2:])
        self.transport.loseConnection()
        self.receiveCounts(lines, words, chars)

        if 'countWords' in self._waiting:
            self._waiting['countWords'].callback(words)

    def processExited(self, status):
        print 'exiting'


    def receiveCounts(self, lines, words, chars):
        print >> sys.stderr, 'Received counts from wc.'
        print >> sys.stderr, 'Lines:', lines
        print >> sys.stderr, 'Words:', words
        print >> sys.stderr, 'Characters:', chars

    def countWords(self, text):
        self._waiting['countWords'] = Deferred()
        self.transport.write(text)
        return self._waiting['countWords']

tasks.py

from celery.task import Task
from protocol import WCProcessProtocol
from twisted.internet.defer import Deferred
from twisted.internet import reactor

class StartProcTask(Task):
    def run(self):
        self.app.proc = WCProcessProtocol('testing')
        self.app.proc._waiting['startup'] = Deferred()
        self.app.twisted.spawnProcess(self.app.proc,
                                      'wc',
                                      ['wc'],
                                      usePTY=True)
        return self.app.proc._waiting['startup']

class CountWordsTask(Task):
    def run(self):
        return self.app.proc.countWords('test test')
like image 920
Mike Ryan Avatar asked Nov 15 '11 13:11

Mike Ryan


1 Answers

Celery probably blocks while waiting for new messages from the network. Since you're running it in one single-threaded process along with the Twisted reactor, it blocks the reactor from running. This will disable most of Twisted, which requires the reactor to actually run (you called reactor.run, but with Celery blocking it, it is effectively not running).

reactor.callLater only delays the startup of Celery. Once Celery starts, it's still blocking the reactor.

The problem you need to avoid is blocking the reactor.

One solution would be to run Celery in one thread and the reactor in another thread. Use reactor.callFromThread to send messages to Twisted ("call functions in the reactor thread") from the Celery thread. Use the Celery equivalent if you need to send messages back to Celery from the Twisted thread.

Another solution would be to implement the Celery protocol (AMQP? - see txAMQP) as a native Twisted library and use that to process Celery messages without blocking.

like image 110
Jean-Paul Calderone Avatar answered Nov 07 '22 23:11

Jean-Paul Calderone