Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to upload CSV, process it in the background and make new file available to download in Flask

Tags:

python

flask

I'm building a internal tool for work, just to make some people's lives easier. The task involves a CSV as input, which gets processed and a new file is generated. That part's taken care of, I made a command line script that works. But I want to make it as accessible as possible so a web UI is a natural choice. I played with flask a little and built another simple internal tool with it and I'd like to do the same with this one.

This script I have can take a while to process a file, some minutes sometimes. This means I'd like to make it non-blocking in the web UI. Ideally the user selects the file, chooses a few options hits submit and the job starts in the background. There should be a progress bar to show the user when it's done. I don't plan to have user management here but it would be OK for anyone to see a list of all jobs anyone has triggered. Once the job's finished there should be a link to download the processed file.

From what I've read, this is the kind of situation where a task queueing system is used together with a message broker, like Celery and Redis. I've read some tutorials about this but I'm still unsure about how exactly to go about it... Where to start? What's the best architecture for this kind of problem? Also, how to best handle the uploading part, maybe using ajax?

I'm looking for some advice, references and code samples if possible. Any example that relates to what I want to do is appreciated.

like image 552
bergonzzi Avatar asked Feb 11 '23 16:02

bergonzzi


2 Answers

From what you have posted, you seem to be on the right track. What you're trying to accomplish is offload resource-demanding tasks to "stub-workers" and have your web-tier concentrate on presenting to your users.

The pattern you are looking to work with is commonly referred to as a "Publish/Subscribe" pattern:

Pub/Sub

Your application is going to "publish" Celery tasks into a message broker (Rabbit MQ). Your Celery task workers will "subscribe" to the message broker and look for tasks to perform. Very straightforward pattern.

Given the tags you used, I'm going to assume you are familiar with Flask (great microframework) and am going to suggest you take a look at "Flask-Uploads" for your uploading needs.

For processing your CSV file, it would be a matter of creating the function itself and wrapping it with the @task decorator to have Celery recognize it as a task. Using a bit of "Google-fu", I came across this particular file that might be of use to you:

https://github.com/captricity/captricity-cloud-io/blob/master/captricity_cloud_io/captricity_cloud_io/tasks.py

Take a look at the _upload_to_google function to get an idea on how to approach this.

Finally, one of my favorite authors, Miguel Grinberg, has a great write up on Celery and Flask along with using progress bars to track your progress:

http://blog.miguelgrinberg.com/post/using-celery-with-flask

Mind you, the sample uses JavaScript to check on a given Celery task every 2 seconds. If you don't expect a lot of traffic, you can certainly get away with that but if capacity is to grow, I would suggest you look into web sockets.

Hopefully, this gets you into a good starting point. Good luck!

like image 191
Carlos Avatar answered Feb 13 '23 10:02

Carlos


It seems you are one the way to solve your problem

we need one background workers to process huge data/files. if it takes good processing time. while dealing with background tasks we don't have control so we need DB help to track status of job

Follow the steps to achieve your task,

Step 1: In your main route do primary steps like reading requests(ex: .csv files) and do validations,

Step 2: Create a lookup table for keep track of job status this table is a place holder to keep track of your background job status

  • initially it will save status as queue and return the job to process #step5
  • step 3
  • Return the job id

step 3: redis enque : call the processing helper function through redis enque and pass all required args, set timeout value as per your need

step 4: do your processing and update the status record success/failure

step 5: create another route to return current status of that job, make continuous AJAX call to this route until status changes to completed

Go through bellow example for more details am assuming you have taken care all redis setup (like installation, worke.py, establishing redis connection ect ) for #ref

Modles.py (am using Mongo DB for ex)

class TempCsvLookup(Base, Document):
    id = SequenceField(unique=True)
    job_id = StringField()
    file_url = StringField()
    status = StringField()
    created_at = DateTimeField(default=dt.utcnow(), required=True)
    finished_at = DateTimeField(default=(dt.utcnow()  + datetime.timedelta(hours=24)))

views.py

@route('/upload_csv_file', methods=['POST'])
@require_login
def upload_csv_file():

    #step 1
    csv_file = request.form.get('file')
    '''
        do validations
    '''
    #Step 2: create a initial DB record
    look_up = TempCsvLookup(
        status = "Queued"
    ).save()

    #step_3 : calling backgroud task using redis
    job = redis_queue.enqueue(
            process_csv_file,
            args = (
                look_up, csv_file, other_args 
            ),
            timeout = 1200
        )
    job.get_id()

    look_up.update(
        set__job = job.get_id(),
    )

    return jsonfy(job_id = str(job.get_id()))

def process_csv_file(look_up, csv_file, other_args):
    try:
        #step 4 process csv file
        look_up.update(
            set__status = "Processing",
        )

        """ 
           1.do all processing with input csv file
           2.create new csv file
        """
        look_up.update(
            set__status = "Completed",
            set__file_url = new_file_path,
            set__finished_at = dt.utcnow()
        )
    except Exception as e:
        look_up.update(
            set__status = "Failed",
            set__finished_at = dt.utcnow()
        )

@route('/csv_file_lookup/<string:lookup_job_id>', methods=['GET'])
@require_login
def csv_file_lookup(lookup_job_id):
    #step5
    report = TempCsvLookup.objects(job_id=lookup_job_id).first()

    resp = {
        'status': report.status,
        'file_url': report.file_url
    }

    return response.success(data=resp)

Hopes it helps you

like image 38
kartheek Avatar answered Feb 13 '23 11:02

kartheek