I need to continuously get data from a MySQL
database which gets data with an update frequency of around 200 ms. I need to continuously update the data value on the dashboard text field.My dashboard is built on Django.
I have read a lot about Channels
but all the tutorials are about chat applications. I know that I need to implement WebSockets
which will basically have an open connection and get the data. With the chat application, it makes sense but I haven't come across anything which talks about MySQL
database.
I also read about mysql-events
. Since the data which is getting in the table is from an external sensor, I don't understand how I can monitor a table inside Django i.e whenever a new row is added in the table, I need to get that new inserted based on a column value.
Any ideas on how to go about it? I have gone through a lot of articles and I couldnt find something specific to this requirement.
Django Channels facilitates support of WebSockets in Django in a manner similar to traditional HTTP views. It wraps Django's native asynchronous view support, allowing Django projects to handle not only HTTP, but also protocols that require long-running connections, such as WebSockets, MQTT, chatbots, etc.
Process to get MySQL data So, we need to create the model of the data and implement that model into the db of django. Open 'models.py' and put in the following code. The database table 'students' will be created by the following code. Suppose we have the following records in the 'students' table.
I'm fairly new to Django, and now i'm focusing on how to update my data in real time, without having to reload the whole page. Some clarification: the real time data should be update regularly, not only through a user input.
Django Channels has everything you need to establish WebSocket connections in Django. First, create a project in Django and then an application by any name of your choice and create a virtual environment (Basic level Django stuff) Go to your settings.py file and add “channels” to your list of “installed applications”
For real-time update of data, polling the REST API at regular intervals is the best option you have. Something like: Now add Django Rest Framework to your project. They have a simple tutorial here. Create an API endpoint which will return the data as JSON, and use that URL in the AJAX call.
Real-time applications (GPS tracking, reading instant data from sensors and taking action) by connecting with IOT devices (such as rasberrypie …) Django channels: It is a package that provides long-running connections for Django projects such as WebSockets, MQTT, chatbots, amateur radio and more … .
Thanks to Timothee Legros answer, it kinda helped me move along in the right direction.
Everywhere on the internet, it says that Django channels is/can be used for real-time applications, but nowhere it talks about the exact implementation(other than chat applications).
I used Celery, Django Channels and Celery's Beat to accomplish the task and it works as expected.
There are three parts to it. Setting up channel's, then creating a celery task, calling it periodically (with the help of Celery Beat) and then sending that task's output to channel's so that it can send that data to the websocket.
Channels
I followed the original tutorial on Channel's website and build up on that.
routing.py
from django.urls import re_path
from . import consumers
websocket_urlpatterns = [
re_path(r'ws/chat/(?P<room_name>\w+)/$', consumers.ChatConsumer),
re_path(r'ws/realtimeupdate/$', consumers.RealTimeConsumer),
]
consumers.py
class RealTimeConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.channel_group_name = 'core-realtime-data'
# Join room group
await self.channel_layer.group_add(
self.channel_group_name,
self.channel_name
)
await self.accept()
async def disconnect(self, close_code):
# Leave room group
await self.channel_layer.group_discard(
self.channel_group_name,
self.channel_name
)
# Receive message from WebSocket
async def receive(self, text_data):
print(text_data)
pass
async def loc_message(self, event):
# print(event)
message_trans = event['message_trans']
message_tag = event['message_tag']
# print("sending data to websocket")
await self.send(text_data=json.dumps({
'message_trans': message_trans,
'message_tag': message_tag
}))
This class will basically send data to the websocket
once it receives it. Above two will be specific to the app.
Now we will setup Celery.
In the project's base directory, where the setting file resides, we need to make three files.
celery.py
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj_name.settings')
app = Celery('proj_name', backend='redis://localhost', broker='redis://localhost/')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
routing.py
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from app_name import routing
application = ProtocolTypeRouter({
# (http->django views is added by default)
'websocket': AuthMiddlewareStack(
URLRouter(
routing.websocket_urlpatterns
)
),
})
tasks.py
@shared_task(name='realtime_task')
def RealTimeTask():
time_s = time.time()
result_trans = CustomModel_1.objects.all()
result_tag = CustomModel_2.objects.all()
result_trans_json = serializers.serialize('json', result_trans)
result_tag_json = serializers.serialize('json', result_tag)
# output = {"ktr": result_transmitter_json, "ktag": result_tag_json}
# print(output)
channel_layer = get_channel_layer()
message = {'type': 'loc_message',
'message_transmitter': result_trans_json,
'message_tag': result_tag_json}
async_to_sync(channel_layer.group_send)('core-realtime-data', message)
print(time.time()-time_s)
The task, after completing the task, sends the result back to the Channels, which in turn will relay it to the websocket.
Settings.py
# Channels
CHANNEL_LAYERS = {
'default': {
'BACKEND': 'channels_redis.core.RedisChannelLayer',
'CONFIG': {
"hosts": [('127.0.0.1', 6379)],
},
},
}
CELERY_BEAT_SCHEDULE = {
'task-real': {
'task': 'realtime_task',
'schedule': 1 # this means, the task will run itself every second
},
}
Now the only thing left is to create a websocket in the javascript
file and start listening to it.
//Create web socket to receive data
const chatSocket = new WebSocket(
'ws://'
+ window.location.host
+ '/ws/realtimeupdate'
+ '/'
);
chatSocket.onmessage = function(e) {
const data = JSON.parse(e.data);
console.log(e.data + '\n');
// For trans
var arrayOfObjects = JSON.parse(data.message_trans);
//Do your thing
//For tags
var arrayOfObjects_tag = JSON.parse(data.message_tag);
//Do your thing
}
};
chatSocket.onclose = function(e) {
console.error('Chat socket closed unexpectedly');
};
To answer the MySQL usage, I am inserting data into the MySQL
database from external sensor and in the tasks.py
, am querying the table using Django ORM.
Overall, it does the intended work, populate a real-time dashboard with real-time data from MySQL . Am sure, there might be different and better approach to it, please let me know about it.
Your best bet if you need to constantly query your sql database would be to use Celery or dramatiq which is simpler/easier but less battle tested in combination with Django Channels.
Celery allows you to create workers (kind of like background processes) that you can send tasks (functions) to. When a worker receives a task it will execute. All this is done in the background. From the task that the worker is executing you can actually send data back through a websocket directly from the worker. This only works if you have django channels + channel layers enabled because when you enable channel layers, each consumer instance created when you open a channel/websocket will have a name that you can pass to the worker so that it knows which websocket to send the query data back to.
Here is what the flow of this process would look like:
I suggest reading django channels documentation on consumers and channel layers as well as celery or dramatiq tutorials to understand how those work. For all this to work you will also have to learn about Redis and a message queue service such as RabbitMQ. There is just too much to put in a simple answer but I can provide more information if you have specific questions.
Edit:
Get Redis Server Setup on your machine. If you are on Windows like me then you have to download WSL 2 and install Ubuntu from the Windows Store (free). This link can walk you through it.
Get RabbitMQ server setup. Follow their tutorial
Enable Django Channels and Django-Channel-layers and then setup Redis as your default Django-channels backend.
Setup Dramatiq or Celery. I prefer Dramatiq as it is basically a new and improved version of Celery albeit being less popular. It is much easier to setup and use. This is the github repo for Django-dramatiq and it will walk you through how to set it up. Note that just like when you launch your django server with python manage.py runserver
you have to launch dramatiq workers with python manage.py rundramatiq
before testing you website.
Create a tasks.py file in your django app and inside of that task implement your code to check MySQL database for new entries. If you haven't figured that out already here is the link to get started with that. In your tasks file you should have a function with the dramatiq.actor
decorator on top so that dramatiq knows that the function is a task.
Build a django-channels consumer to handle WebSocket connections as well as allow you to send data through the WebSocket connection. This is what the standard consumer would look like:
class AsyncDashboardConsumer(AsyncJsonWebsocketConsumer):
async def connect(self):
await self.accept()
async def disconnect(self, code):
await self.close()
async def receive_json(self, text_data=None, bytes_data=None, **kwargs):
someData = text_data['someData']
someOtherData = text_data['someOtherData']
if 'execute_getMySQLdata' in text_data['function']:
await self.getData(someData, someOtherData)
async def sendDataToClient(self, event):
await self.send(text_data=event['text'])
async def getData(self, someData, someOtherData):
sync_to_async(SQLData.send(self.channel_name, someData, someOtherData))
connect
function is called when the client attempts to connect to the WebSocket URL that your routing file (in step 2) points to this consumer.
recieve_json
function is called whenever the client sends data to your django server.
getData
function is called from the recieve_json
function and sends a message to start your dramatiq task that you created earlier to check SQL db. Note that when you send the message you must pass in self.channel_name as you use that channel_name to send data back through the WebSocket directly from the dramatiq worker/task.
sendDataToClient
function is used when you send data back to the client. So when you send data from your task this is the function you must pass in as a callable.
To send data from the task you created earlier use this: async_to_sync(channel_layer.send)(channelName, {'type': 'sendData', 'text': jsonPayload})
. Notice how you pass the channelName as well as the sendData
function from your consumer.
Finally, this is what the javascript on the client side would look like:
let socket = new WebSocket("wss://javascript.info/article/websocket/demo/hello");
socket.onopen = function(e) {
alert("[open] Connection established");
alert("Sending to server");
socket.send("My name is John");
};
socket.onmessage = function(event) {
alert(`[message] Data received from server: ${event.data}`);
};
socket.onclose = function(event) {
if (event.wasClean) {
alert(`[close] Connection closed cleanly, code=${event.code} reason=${event.reason}`);
} else {
// e.g. server process killed or network down
// event.code is usually 1006 in this case
alert('[close] Connection died');
}
};
socket.onerror = function(error) {
alert(`[error] ${error.message}`);
};
This code came directly from this JavaScript WebSocket walkthrough.
This is how a basic web application with background workers would continually update information in real-time. There are probably other ways of doing this without background workers but since you want to get information as fast as possible as soon as it arrives it is better to have a background process that is continually checking for updates. On another note, the code above means that separate connections to the database are opened for each new client that connects but you can easily take advantage of django-channels groups and have one connection to your database that then just sends to all clients in certain groups.
Another way to implement such a feature - is to build a standalone WebSocket microservice.
Monolyth architecture isn't what you need here. Every WebSocket will open a connection to the Django (which will be behind reverse proxy and server: NGINX
and Gunicorn
ex.). If your client opens two tabs in the browser you will get 2 connections etc...
My recommendation is to modify the tech stack (yes, I'm a huge fan of Django, but there are many cool solutions in building WS):
Starlette
ready for production framework with build-in WebSockets: https://www.starlette.io/websockets/
uvicorn.workers.UvicornWorker
for Gunicorn
to manage your ASGI application: this is only 1 line of code, like gunicorn -w 4 -k uvicorn.workers.UvicornWorker --log-level warning example:app
So your models, templates, the view will be managed by Django. Your WebSocket connections will be managed by Starlette in a native async way. If you're interested in such an option I can make detailed instructions.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With