i need to download a file through ftp, change it and upload it back. I am using celery to do this but i am running into problems when trying to use chaining, where i am getting :
TypeError: upload_ftp_image() takes exactly 5 arguments (6 given)
Also, can i use chains and be assured that the steps will be sequential? if not what is the alternative?
res = chain(download_ftp_image.s(server, username , password, "/test_app_2/model.dae" ,"tmp/test_app_2/"), upload_ftp_image.s(server, username , password, "tmp/test_app_2/model.dae" ,"tmp/test_app_2/")).apply_async()
print res.get()
Tasks:
@task()
def download_ftp_image(ftp_server, username , password , filename, directory):
try:
ftp = FTP(ftp_server)
ftp.login(username, password)
if not os.path.exists(directory):
os.makedirs(directory)
ftp.retrbinary("RETR /default_app/model.dae" , open(directory + 'model.dae', 'wb').write)
else:
ftp.retrbinary("RETR /default_app/model.dae" , open(directory + 'model.dae', 'wb').write)
ftp.quit()
except error_perm, resp:
raise download_ftp_image.retry(countdown=15)
return "SUCCESS: "
@task()
def upload_ftp_image(ftp_server, username , password , file , directory):
try:
ftp = FTP(ftp_server)
ftp.login(username, password)
new_file= file.replace(directory, "")
directory = directory.replace("tmp","")
try:
ftp.storbinary("STOR " + directory + new_file , open(file, "rb"))
except:
ftp.mkd(directory)
ftp.storbinary("STOR " + directory + new_file, open(file, "rb"))
ftp.quit()
except error_perm, resp:
raise upload_ftp_image.retry(countdown=15)
return "SUCCESS: "
and is this a good or a bad practice for my specific case? :
result = download_ftp_image.apply_async((server, username , password, "/test_app_2/model.dae" ,"tmp/test_app_2/",), queue='rep_data')
result.get()
result = upload_ftp_image.apply_async((server, username , password, "tmp/test_app_2/model.dae" ,"tmp/test_app_2/",), queue='rep_data')
#result.get()
Celery tasks run asynchronously, which means that the Celery function call in the calling process returns immediately after the message request to perform the task is sent to the broker. There are two ways to get results back from your tasks.
Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task, the Celery client adds a message to the queue, and the broker then delivers that message to a worker. The most commonly used brokers are Redis and RabbitMQ.
Celery will stop retrying after 7 failed attempts and raise an exception.
A chain is always passed the previous result as a first argument. From the chains documentation:
The linked task will be applied with the result of its parent task as the first argument, which in the above case will result in
mul(4, 16)
since the result is 4.
Your upload_ftp_image
task doesn't accept this extra argument, and thus it fails.
You have a fine use case for chaining; the second task is guaranteed to be called after the first task is completed (otherwise the result could not be passed on anyway).
Simply add an argument for the result from the previous task:
def upload_ftp_image(download_result, ftp_server, username , password , file , directory):
You could make some use of that result value; maybe make it the download method return the path of the downloaded file so the upload method knows what to upload?
Another option if you don't want the return value of the previous task to be used as an argument, is to use 'immutability'.
http://docs.celeryproject.org/en/latest/userguide/canvas.html#immutability
Instead of defining your subtasks as:
download_ftp_image.s(...) and upload_ftp_image.s(...)
define them as:
download_ftp_image.si(...) and upload_ftp_image.si(...)
And you can now use the tasks with the usual number of arguments in a chain.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With