Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Celery chaining tasks sequentially

i need to download a file through ftp, change it and upload it back. I am using celery to do this but i am running into problems when trying to use chaining, where i am getting :

TypeError: upload_ftp_image() takes exactly 5 arguments (6 given)

Also, can i use chains and be assured that the steps will be sequential? if not what is the alternative?

res = chain(download_ftp_image.s(server, username , password, "/test_app_2/model.dae" ,"tmp/test_app_2/"), upload_ftp_image.s(server, username , password, "tmp/test_app_2/model.dae" ,"tmp/test_app_2/")).apply_async()
print res.get()

Tasks:

@task()
def download_ftp_image(ftp_server, username , password , filename, directory):
    try:
        ftp = FTP(ftp_server)
        ftp.login(username, password)
        if not os.path.exists(directory):
            os.makedirs(directory)
            ftp.retrbinary("RETR /default_app/model.dae" , open(directory + 'model.dae', 'wb').write)
        else:
            ftp.retrbinary("RETR /default_app/model.dae" , open(directory + 'model.dae', 'wb').write)
        ftp.quit()
    except error_perm, resp:
        raise download_ftp_image.retry(countdown=15)

    return "SUCCESS: "  

@task()
def upload_ftp_image(ftp_server, username , password , file , directory):
    try:
        ftp = FTP(ftp_server)
        ftp.login(username, password)
        new_file= file.replace(directory, "")
        directory = directory.replace("tmp","")
        try:
            ftp.storbinary("STOR " + directory + new_file , open(file, "rb"))
        except:
            ftp.mkd(directory)
            ftp.storbinary("STOR " + directory + new_file, open(file, "rb"))
        ftp.quit()
    except error_perm, resp:
        raise upload_ftp_image.retry(countdown=15)

    return "SUCCESS: "

and is this a good or a bad practice for my specific case? :

result = download_ftp_image.apply_async((server, username , password, "/test_app_2/model.dae" ,"tmp/test_app_2/",), queue='rep_data')
result.get()
result = upload_ftp_image.apply_async((server, username , password, "tmp/test_app_2/model.dae" ,"tmp/test_app_2/",), queue='rep_data')
#result.get()
like image 442
psychok7 Avatar asked Mar 05 '13 12:03

psychok7


People also ask

Are celery tasks asynchronous?

Celery tasks run asynchronously, which means that the Celery function call in the calling process returns immediately after the message request to perform the task is sent to the broker. There are two ways to get results back from your tasks.

How does Celery task queue work?

Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task, the Celery client adds a message to the queue, and the broker then delivers that message to a worker. The most commonly used brokers are Redis and RabbitMQ.

Does Celery retry failed tasks?

Celery will stop retrying after 7 failed attempts and raise an exception.


2 Answers

A chain is always passed the previous result as a first argument. From the chains documentation:

The linked task will be applied with the result of its parent task as the first argument, which in the above case will result in mul(4, 16) since the result is 4.

Your upload_ftp_image task doesn't accept this extra argument, and thus it fails.

You have a fine use case for chaining; the second task is guaranteed to be called after the first task is completed (otherwise the result could not be passed on anyway).

Simply add an argument for the result from the previous task:

def upload_ftp_image(download_result, ftp_server, username , password , file , directory):

You could make some use of that result value; maybe make it the download method return the path of the downloaded file so the upload method knows what to upload?

like image 141
Martijn Pieters Avatar answered Oct 30 '22 03:10

Martijn Pieters


Another option if you don't want the return value of the previous task to be used as an argument, is to use 'immutability'.

http://docs.celeryproject.org/en/latest/userguide/canvas.html#immutability

Instead of defining your subtasks as:

download_ftp_image.s(...) and upload_ftp_image.s(...)

define them as:

download_ftp_image.si(...) and upload_ftp_image.si(...)

And you can now use the tasks with the usual number of arguments in a chain.

like image 24
mpaf Avatar answered Oct 30 '22 02:10

mpaf