Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Interoperating with Django/Celery From Java

Our company has a Python based web site and some Python based worker nodes which communicate via Django/Celery and RabbitMQ. I have a Java based application which needs to submit tasks to the Celery based workers. I can send jobs to RabbitMQ from Java just fine, but the Celery based workers are never picking up the jobs. From looking at the packet captures of both types of job submissions, there are differences, but I cannot fathom how to account for them because a lot of it is binary that I cannot find documentation about decoding. Does anyone here have any reference or experience with having Java/RabbitMQ and Celery working together?

like image 953
Deven Phillips Avatar asked Aug 03 '11 22:08

Deven Phillips


People also ask

Can we use Celery without django?

Yes you can. Celery is a generic asynchronous task queue. In place of "django_project" you would point to your module.

How does Celery Redis work?

Specifically, Redis is used to store messages produced by the application code describing the work to be done in the Celery task queue. Redis also serves as storage of results coming off the celery queues which are then retrieved by consumers of the queue.

How does Celery work in Python?

Celery is an open-source Python library which is used to run the tasks asynchronously. It is a task queue that holds the tasks and distributes them to the workers in a proper manner. It is primarily focused on real-time operation but also supports scheduling (run regular interval tasks).


1 Answers

I found the solution. The Java library for RabbitMQ refers to exchanges/queues/routekeys. In Celery, the queue name is actually mapping to the exchange referred to in the Java library. By default, the queue for Celery is simply "celery". If your Django settings define a queue called "myqueue" using the following syntax:

CELERY_ROUTES = {
    'mypackage.myclass.runworker'      : {'queue':'myqueue'},
}

Then the Java based code needs to do something like the following:

        ConnectionFactory factory = new ConnectionFactory();
        Connection connection = null ;
        try {
            connection = factory.newConnection(mqHost, mqPort);
        } catch (IOException ioe) {
            log.error("Unable to create new MQ connection from factory.", ioe) ;
        }

        Channel channel = null ;
        try {
            channel = connection.createChannel();
        } catch (IOException ioe) {
            log.error("Unable to create new channel for MQ connection.", ioe) ;
        }

        try {
            channel.queueDeclare("celery", false, false, false, true, null);
        } catch (IOException ioe) {
            log.error("Unable to declare queue for MQ channel.", ioe) ;
        }

        try {
            channel.exchangeDeclare("myqueue", "direct") ;
        } catch (IOException ioe) {
            log.error("Unable to declare exchange for MQ channel.", ioe) ;
        }

        try {
            channel.queueBind("celery", "myqueue", "myqueue") ;
        } catch (IOException ioe) {
            log.error("Unable to bind queue for channel.", ioe) ;
        }

            // Generate the message body as a string here.

        try {
            channel.basicPublish(mqExchange, mqRouteKey, 
                new AMQP.BasicProperties("application/json", "ASCII", null, null, null, null, null, null, null, null, null, "guest", null, null),
                messageBody.getBytes("ASCII"));
        } catch (IOException ioe) {
            log.error("IOException encountered while trying to publish task via MQ.", ioe) ;
        }

It turns out that it is just a difference in terminology.

like image 195
Deven Phillips Avatar answered Oct 13 '22 20:10

Deven Phillips