Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Real-time update on Django application using MySQL <> WebSocket

I need to continuously get data from a MySQL database which gets data with an update frequency of around 200 ms. I need to continuously update the data value on the dashboard text field.My dashboard is built on Django.

I have read a lot about Channels but all the tutorials are about chat applications. I know that I need to implement WebSockets which will basically have an open connection and get the data. With the chat application, it makes sense but I haven't come across anything which talks about MySQL database.

I also read about mysql-events. Since the data which is getting in the table is from an external sensor, I don't understand how I can monitor a table inside Django i.e whenever a new row is added in the table, I need to get that new inserted based on a column value.

Any ideas on how to go about it? I have gone through a lot of articles and I couldnt find something specific to this requirement.

like image 728
driftking9987 Avatar asked Aug 07 '20 12:08

driftking9987


People also ask

Can I use WebSockets in Django?

Django Channels facilitates support of WebSockets in Django in a manner similar to traditional HTTP views. It wraps Django's native asynchronous view support, allowing Django projects to handle not only HTTP, but also protocols that require long-running connections, such as WebSockets, MQTT, chatbots, etc.

How fetch data from MySQL to Django?

Process to get MySQL data So, we need to create the model of the data and implement that model into the db of django. Open 'models.py' and put in the following code. The database table 'students' will be created by the following code. Suppose we have the following records in the 'students' table.

Is it possible to update data in real time in Django?

I'm fairly new to Django, and now i'm focusing on how to update my data in real time, without having to reload the whole page. Some clarification: the real time data should be update regularly, not only through a user input.

How do I connect to a WebSocket socket in Django?

Django Channels has everything you need to establish WebSocket connections in Django. First, create a project in Django and then an application by any name of your choice and create a virtual environment (Basic level Django stuff) Go to your settings.py file and add “channels” to your list of “installed applications”

How to update data from REST API in Django?

For real-time update of data, polling the REST API at regular intervals is the best option you have. Something like: Now add Django Rest Framework to your project. They have a simple tutorial here. Create an API endpoint which will return the data as JSON, and use that URL in the AJAX call.

What can you do with Django channels?

Real-time applications (GPS tracking, reading instant data from sensors and taking action) by connecting with IOT devices (such as rasberrypie …) Django channels: It is a package that provides long-running connections for Django projects such as WebSockets, MQTT, chatbots, amateur radio and more … .


3 Answers

Thanks to Timothee Legros answer, it kinda helped me move along in the right direction.

Everywhere on the internet, it says that Django channels is/can be used for real-time applications, but nowhere it talks about the exact implementation(other than chat applications).

I used Celery, Django Channels and Celery's Beat to accomplish the task and it works as expected.

There are three parts to it. Setting up channel's, then creating a celery task, calling it periodically (with the help of Celery Beat) and then sending that task's output to channel's so that it can send that data to the websocket.

Channels

I followed the original tutorial on Channel's website and build up on that.

routing.py

from django.urls import re_path

from . import consumers

websocket_urlpatterns = [
    re_path(r'ws/chat/(?P<room_name>\w+)/$', consumers.ChatConsumer),
    re_path(r'ws/realtimeupdate/$', consumers.RealTimeConsumer),
]

consumers.py

class RealTimeConsumer(AsyncWebsocketConsumer):
    async def connect(self):

        self.channel_group_name = 'core-realtime-data'

        # Join room group
        await self.channel_layer.group_add(
            self.channel_group_name,
            self.channel_name
        )

        await self.accept()

    async def disconnect(self, close_code):
        # Leave room group
        await self.channel_layer.group_discard(
            self.channel_group_name,
            self.channel_name
        )

    # Receive message from WebSocket
    async def receive(self, text_data):
        print(text_data)
        pass

    async def loc_message(self, event):
        # print(event)
        message_trans = event['message_trans']
        message_tag = event['message_tag']
        # print("sending data to websocket")
        await self.send(text_data=json.dumps({
            'message_trans': message_trans,
            'message_tag': message_tag
        }))

This class will basically send data to the websocket once it receives it. Above two will be specific to the app.

Now we will setup Celery.

In the project's base directory, where the setting file resides, we need to make three files.

  • celery.py This will init the celery.
  • routing.py This will be used to route the channel's websocket addresses.
  • task.py This is where we will setup the task

celery.py

import os

from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj_name.settings')

app = Celery('proj_name', backend='redis://localhost', broker='redis://localhost/')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}') 

routing.py

from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter

from app_name import routing

application = ProtocolTypeRouter({
    # (http->django views is added by default)
    'websocket': AuthMiddlewareStack(
            URLRouter(
                routing.websocket_urlpatterns
            )
        ),
})

tasks.py

@shared_task(name='realtime_task')
def RealTimeTask():
    time_s = time.time()
    result_trans = CustomModel_1.objects.all()
    result_tag = CustomModel_2.objects.all()
    result_trans_json = serializers.serialize('json', result_trans)
    result_tag_json = serializers.serialize('json', result_tag)
    # output = {"ktr": result_transmitter_json, "ktag": result_tag_json}
    # print(output)
    channel_layer = get_channel_layer()
    message = {'type': 'loc_message',
               'message_transmitter': result_trans_json,
               'message_tag': result_tag_json}
    async_to_sync(channel_layer.group_send)('core-realtime-data', message)
    print(time.time()-time_s)

The task, after completing the task, sends the result back to the Channels, which in turn will relay it to the websocket.

Settings.py

# Channels
CHANNEL_LAYERS = {
    'default': {
        'BACKEND': 'channels_redis.core.RedisChannelLayer',
        'CONFIG': {
            "hosts": [('127.0.0.1', 6379)],
        },
    },
}

CELERY_BEAT_SCHEDULE = {
    'task-real': {
        'task': 'realtime_task',
        'schedule': 1 # this means, the task will run itself every second
    },
}

Now the only thing left is to create a websocket in the javascript file and start listening to it.

//Create web socket to receive data
const chatSocket = new WebSocket(
    'ws://'
    + window.location.host
    + '/ws/realtimeupdate'
    + '/'
);

chatSocket.onmessage = function(e) {
    const data = JSON.parse(e.data);
    console.log(e.data + '\n');
    // For trans
    var arrayOfObjects = JSON.parse(data.message_trans);
    //Do your thing

    //For tags
    var arrayOfObjects_tag = JSON.parse(data.message_tag);
    //Do your thing
    }

};

chatSocket.onclose = function(e) {
    console.error('Chat socket closed unexpectedly');
};

To answer the MySQL usage, I am inserting data into the MySQL database from external sensor and in the tasks.py, am querying the table using Django ORM.

Overall, it does the intended work, populate a real-time dashboard with real-time data from MySQL . Am sure, there might be different and better approach to it, please let me know about it.

like image 63
driftking9987 Avatar answered Nov 15 '22 08:11

driftking9987


Your best bet if you need to constantly query your sql database would be to use Celery or dramatiq which is simpler/easier but less battle tested in combination with Django Channels.

Celery allows you to create workers (kind of like background processes) that you can send tasks (functions) to. When a worker receives a task it will execute. All this is done in the background. From the task that the worker is executing you can actually send data back through a websocket directly from the worker. This only works if you have django channels + channel layers enabled because when you enable channel layers, each consumer instance created when you open a channel/websocket will have a name that you can pass to the worker so that it knows which websocket to send the query data back to.

Here is what the flow of this process would look like:

  1. Client requests to connect to your websocket
  2. Consumer instance is created and with it a specific name for it
  3. Consumer instance accepts connection
  4. Consumer triggers celery task and passes the name
  5. Worker begins polling your SQL databases every X seconds
  6. When worker finds new entry use the name it was given and send the new entry back through the websocket.

I suggest reading django channels documentation on consumers and channel layers as well as celery or dramatiq tutorials to understand how those work. For all this to work you will also have to learn about Redis and a message queue service such as RabbitMQ. There is just too much to put in a simple answer but I can provide more information if you have specific questions.

Edit:

  • Get Redis Server Setup on your machine. If you are on Windows like me then you have to download WSL 2 and install Ubuntu from the Windows Store (free). This link can walk you through it.

  • Get RabbitMQ server setup. Follow their tutorial

  • Enable Django Channels and Django-Channel-layers and then setup Redis as your default Django-channels backend.

  • Setup Dramatiq or Celery. I prefer Dramatiq as it is basically a new and improved version of Celery albeit being less popular. It is much easier to setup and use. This is the github repo for Django-dramatiq and it will walk you through how to set it up. Note that just like when you launch your django server with python manage.py runserver you have to launch dramatiq workers with python manage.py rundramatiq before testing you website.

  • Create a tasks.py file in your django app and inside of that task implement your code to check MySQL database for new entries. If you haven't figured that out already here is the link to get started with that. In your tasks file you should have a function with the dramatiq.actor decorator on top so that dramatiq knows that the function is a task.

  • Build a django-channels consumer to handle WebSocket connections as well as allow you to send data through the WebSocket connection. This is what the standard consumer would look like:

class AsyncDashboardConsumer(AsyncJsonWebsocketConsumer):

    async def connect(self):
        await self.accept()

    async def disconnect(self, code):
        await self.close()

    async def receive_json(self, text_data=None, bytes_data=None, **kwargs):
        someData = text_data['someData']
        someOtherData = text_data['someOtherData']

        if 'execute_getMySQLdata' in text_data['function']:
            await self.getData(someData, someOtherData)

    async def sendDataToClient(self, event):
        await self.send(text_data=event['text'])

    async def getData(self, someData, someOtherData):
        sync_to_async(SQLData.send(self.channel_name, someData, someOtherData))
  • connect function is called when the client attempts to connect to the WebSocket URL that your routing file (in step 2) points to this consumer.

  • recieve_json function is called whenever the client sends data to your django server.

  • getData function is called from the recieve_json function and sends a message to start your dramatiq task that you created earlier to check SQL db. Note that when you send the message you must pass in self.channel_name as you use that channel_name to send data back through the WebSocket directly from the dramatiq worker/task.

  • sendDataToClient function is used when you send data back to the client. So when you send data from your task this is the function you must pass in as a callable.

To send data from the task you created earlier use this: async_to_sync(channel_layer.send)(channelName, {'type': 'sendData', 'text': jsonPayload}). Notice how you pass the channelName as well as the sendData function from your consumer.

Finally, this is what the javascript on the client side would look like:

    let socket = new WebSocket("wss://javascript.info/article/websocket/demo/hello");
    
    socket.onopen = function(e) {
      alert("[open] Connection established");
      alert("Sending to server");
      socket.send("My name is John");
    };
    
    socket.onmessage = function(event) {
      alert(`[message] Data received from server: ${event.data}`);
    };
    
    socket.onclose = function(event) {
      if (event.wasClean) {
        alert(`[close] Connection closed cleanly, code=${event.code} reason=${event.reason}`);
      } else {
        // e.g. server process killed or network down
        // event.code is usually 1006 in this case
        alert('[close] Connection died');
      }
    };
    
    socket.onerror = function(error) {
      alert(`[error] ${error.message}`);
    };

This code came directly from this JavaScript WebSocket walkthrough.

This is how a basic web application with background workers would continually update information in real-time. There are probably other ways of doing this without background workers but since you want to get information as fast as possible as soon as it arrives it is better to have a background process that is continually checking for updates. On another note, the code above means that separate connections to the database are opened for each new client that connects but you can easily take advantage of django-channels groups and have one connection to your database that then just sends to all clients in certain groups.

like image 35
Timaayy Avatar answered Nov 15 '22 09:11

Timaayy


Build a microservice for Websockets connections

Another way to implement such a feature - is to build a standalone WebSocket microservice.

Monolyth architecture isn't what you need here. Every WebSocket will open a connection to the Django (which will be behind reverse proxy and server: NGINX and Gunicorn ex.). If your client opens two tabs in the browser you will get 2 connections etc...

My recommendation is to modify the tech stack (yes, I'm a huge fan of Django, but there are many cool solutions in building WS):

  1. Use Starlette ready for production framework with build-in WebSockets: https://www.starlette.io/websockets/
  2. Use uvicorn.workers.UvicornWorker for Gunicorn to manage your ASGI application: this is only 1 line of code, like gunicorn -w 4 -k uvicorn.workers.UvicornWorker --log-level warning example:app
  3. handle your WebSocket connections and use examples to request updates from the database: https://www.starlette.io/database/
  4. Use super simple Javascript code to open the connection of the client-side and listen for updates.

So your models, templates, the view will be managed by Django. Your WebSocket connections will be managed by Starlette in a native async way. If you're interested in such an option I can make detailed instructions.

like image 37
fanni Avatar answered Nov 15 '22 09:11

fanni