Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to perform periodic task with Flask in Python

Tags:

I've been using Flask to provide a simple web API for my k8055 USB interface board; fairly standard getters and putters, and Flask really made my life a lot easier.

But I want to be able to register changes of state as / near when whey happen.

For instance, if I have a button connected to the board, I can poll the api for that particular port. But if I wanted to have the outputs directly reflect the outputs, whether or not someone was talking to the api, I would have something like this.

while True:     board.read()     board.digital_outputs = board.digital_inputs     board.read()     time.sleep(1) 

And every second, the outputs would be updated to match the inputs.

Is there any way to do this kind of thing under Flask? I've done similar things in Twisted before but Flask is too handy for this particular application to give up on it just yet...

Thanks.

like image 645
Bolster Avatar asked Aug 04 '12 17:08

Bolster


People also ask

How do I schedule a task in flask?

To begin, let's first set up the Flask framework with Celery. Save this python script as app.py in the main project directory, and in your terminal, run: python ~/celery-scheduler/app.py . Now go to http://localhost:5000/. If everything is running fine, you should see “Hello, Flask is up and running!”

How is Python used with flask?

Flask is a lightweight Python web framework that provides useful tools and features for creating web applications in the Python Language. It gives developers flexibility and is an accessible framework for new developers because you can build a web application quickly using only a single Python file.


2 Answers

For my Flask application, I contemplated using the cron approach described by Pashka in his answer, the schedule library, and APScheduler.

I found APScheduler to be simple and serving the periodic task run purpose, so went ahead with APScheduler.

Example code:

from flask import Flask  from apscheduler.schedulers.background import BackgroundScheduler   app = Flask(__name__)  def test_job():     print('I am working...')  scheduler = BackgroundScheduler() job = scheduler.add_job(test_job, 'interval', minutes=1) scheduler.start() 
like image 70
Antony Avatar answered Nov 03 '22 03:11

Antony


You could use cron for simple tasks.

Create a flask view for your task.

# a separate view for periodic task @app.route('/task') def task():     board.read()     board.digital_outputs = board.digital_inputs 

Then using cron, download from that url periodically

# cron task to run each minute 0-59 * * * * run_task.sh 

Where run_task.sh contents are

wget http://localhost/task 

Cron is unable to run more frequently than once a minute. If you need higher frequency, (say, each 5 seconds = 12 times per minute), you must do it in tun_task.sh in the following way

# loop 12 times with a delay for i in 1 2 3 4 5 6 7 8 9 10 11 12 do     # download url in background for not to affect delay interval much     wget -b http://localhost/task     sleep 5s done 
like image 35
Pashka Avatar answered Nov 03 '22 04:11

Pashka