Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

celery doesn't work with global variable

Tags:

python

celery

from celery import Celery

app = Celery('tasks', backend='amqp://guest@localhost//', broker='amqp://guest@localhost//')

a_num = 0

@app.task
def addone():
    global a_num
    a_num = a_num + 1
    return a_num

this is the code I used to test celery. I hope every time I use addone() the return value should increase. But it's always 1 why???

Results

python
>> from tasks import addone
>> r = addone.delay()
>> r.get()
   1
>> r = addone.delay()
>> r.get()
   1
>> r = addone.delay()
>> r.get()
   1
like image 615
xren Avatar asked Sep 02 '15 10:09

xren


Video Answer


1 Answers

By default when a worker is started Celery starts it with a concurrency of 4, which means it has 4 processes started to handle task requests. (Plus a process that controls the other processes.) I don't know what algorithm is used to assign task requests to the processes started for a worker but eventually, if you execute addone.delay().get() enough, you'll see the number get greater than 1. What happens is that each process (not each task) gets its own copy of a_num. When I try it here, my fifth execution of addone.delay().get() returns 2.

You could force the number to increment each time by starting your worker with a single process to handle requests. (e.g. celery -A tasks worker -c1) However, if you ever restart your worker, the numbering will be reset to 0. Moreover, I would not design code that works only if the number of processes handling requests is forced to be 1. One day down the road a colleague decides that multiple processes should handle the requests for the tasks and then things break. (Big fat warnings in comments in the code can only do so much.)

At the end of the day, such state should be shared in a cache, like Redis, or a database used as a cache, which would work for the code in your question.

However, in a comment you wrote:

Let's see I want use a task to send something. Instead of connecting every time in task, I want to share a global connection.

Storing the connection in a cache won't work. I would strongly advocate having each process that Celery starts use its own connection rather than try to share it among processes. The connection does not need to be reopened with each new task request. It is opened once per process, and then each task request served by this process reuses the connection.

In many cases, trying to share the same connection among processes (through sharing virtual memory through a fork, for instance) would flat out not work anyway. Connections often carry state with them (e.g. whether a database connection is in autocommit mode). If two parts of the code expect the connection to be in different states, the code will operate inconsistently.

like image 160
Louis Avatar answered Oct 26 '22 06:10

Louis