I'm building a internal tool for work, just to make some people's lives easier. The task involves a CSV as input, which gets processed and a new file is generated. That part's taken care of, I made a command line script that works. But I want to make it as accessible as possible so a web UI is a natural choice. I played with flask a little and built another simple internal tool with it and I'd like to do the same with this one.
This script I have can take a while to process a file, some minutes sometimes. This means I'd like to make it non-blocking in the web UI. Ideally the user selects the file, chooses a few options hits submit and the job starts in the background. There should be a progress bar to show the user when it's done. I don't plan to have user management here but it would be OK for anyone to see a list of all jobs anyone has triggered. Once the job's finished there should be a link to download the processed file.
From what I've read, this is the kind of situation where a task queueing system is used together with a message broker, like Celery and Redis. I've read some tutorials about this but I'm still unsure about how exactly to go about it... Where to start? What's the best architecture for this kind of problem? Also, how to best handle the uploading part, maybe using ajax?
I'm looking for some advice, references and code samples if possible. Any example that relates to what I want to do is appreciated.
From what you have posted, you seem to be on the right track. What you're trying to accomplish is offload resource-demanding tasks to "stub-workers" and have your web-tier concentrate on presenting to your users.
The pattern you are looking to work with is commonly referred to as a "Publish/Subscribe" pattern:
Your application is going to "publish" Celery tasks into a message broker (Rabbit MQ). Your Celery task workers will "subscribe" to the message broker and look for tasks to perform. Very straightforward pattern.
Given the tags you used, I'm going to assume you are familiar with Flask (great microframework) and am going to suggest you take a look at "Flask-Uploads" for your uploading needs.
For processing your CSV file, it would be a matter of creating the function itself and wrapping it with the @task
decorator to have Celery recognize it as a task. Using a bit of "Google-fu", I came across this particular file that might be of use to you:
https://github.com/captricity/captricity-cloud-io/blob/master/captricity_cloud_io/captricity_cloud_io/tasks.py
Take a look at the _upload_to_google
function to get an idea on how to approach this.
Finally, one of my favorite authors, Miguel Grinberg, has a great write up on Celery and Flask along with using progress bars to track your progress:
http://blog.miguelgrinberg.com/post/using-celery-with-flask
Mind you, the sample uses JavaScript to check on a given Celery task every 2 seconds. If you don't expect a lot of traffic, you can certainly get away with that but if capacity is to grow, I would suggest you look into web sockets.
Hopefully, this gets you into a good starting point. Good luck!
It seems you are one the way to solve your problem
we need one background workers to process huge data/files. if it takes good processing time. while dealing with background tasks we don't have control so we need DB help to track status of job
Follow the steps to achieve your task,
Step 1: In your main route do primary steps like reading requests(ex: .csv files) and do validations,
Step 2: Create a lookup table for keep track of job status this table is a place holder to keep track of your background job status
step 3: redis enque : call the processing helper function through redis enque and pass all required args, set timeout value as per your need
step 4: do your processing and update the status record success/failure
step 5: create another route to return current status of that job, make continuous AJAX call to this route until status changes to completed
Go through bellow example for more details am assuming you have taken care all redis setup (like installation, worke.py, establishing redis connection ect ) for #ref
Modles.py (am using Mongo DB for ex)
class TempCsvLookup(Base, Document):
id = SequenceField(unique=True)
job_id = StringField()
file_url = StringField()
status = StringField()
created_at = DateTimeField(default=dt.utcnow(), required=True)
finished_at = DateTimeField(default=(dt.utcnow() + datetime.timedelta(hours=24)))
views.py
@route('/upload_csv_file', methods=['POST'])
@require_login
def upload_csv_file():
#step 1
csv_file = request.form.get('file')
'''
do validations
'''
#Step 2: create a initial DB record
look_up = TempCsvLookup(
status = "Queued"
).save()
#step_3 : calling backgroud task using redis
job = redis_queue.enqueue(
process_csv_file,
args = (
look_up, csv_file, other_args
),
timeout = 1200
)
job.get_id()
look_up.update(
set__job = job.get_id(),
)
return jsonfy(job_id = str(job.get_id()))
def process_csv_file(look_up, csv_file, other_args):
try:
#step 4 process csv file
look_up.update(
set__status = "Processing",
)
"""
1.do all processing with input csv file
2.create new csv file
"""
look_up.update(
set__status = "Completed",
set__file_url = new_file_path,
set__finished_at = dt.utcnow()
)
except Exception as e:
look_up.update(
set__status = "Failed",
set__finished_at = dt.utcnow()
)
@route('/csv_file_lookup/<string:lookup_job_id>', methods=['GET'])
@require_login
def csv_file_lookup(lookup_job_id):
#step5
report = TempCsvLookup.objects(job_id=lookup_job_id).first()
resp = {
'status': report.status,
'file_url': report.file_url
}
return response.success(data=resp)
Hopes it helps you
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With