I'm building a Websocket server application in python 3. I'm using this implementation: https://websockets.readthedocs.io/
Basically I want to manage multiple client. Also I want to send data from 2 different thread (one for GPS + one for IMU) GPS thread is refreshed 1Hz, while IMU thread is refresh at 25Hz
My problem is in MSGWorker.sendData method: as soon as I uncomment the line @asyncio.coroutine and yield from websocket.send('{"GPS": "%s"}' % data) the whole method does nothing (no print("Send data: foo") in terminal)
However with these two line commented my code works as I expect except that I send nothing through the websocket.
But, of course, my goal is to send data through the websocket, I just don't understand why it doesn't work ? Any idea ?
server.py
#!/usr/bin/env python3
import signal, sys
sys.path.append('.')
import time
import websockets
import asyncio
import threading
connected = set()
stopFlag = False
class GPSWorker (threading.Thread):
  def __init__(self):
    threading.Thread.__init__(self)
    self.data = 0
    self.lastData = 0
    self.inc = 0
  # Simulate GPS data
  def run(self):
    while not stopFlag:
      self.data = self.inc
      self.inc += 1
      time.sleep(1)
  def get(self):
    if self.lastData is not self.data:
      self.lastData = self.data
      return self.data
class IMUWorker (threading.Thread):
  def __init__(self):
    threading.Thread.__init__(self)
    self.data = 0
    self.lastData = 0
    self.inc = 0
  # Simulate IMU data
  def run(self):
    while not stopFlag:
      self.data = self.inc
      self.inc += 1
      time.sleep(0.04)
  def get(self):
    if self.lastData is not self.data:
      self.lastData = self.data
      return self.data
class MSGWorker (threading.Thread):
  def __init__(self):
    threading.Thread.__init__(self)
  def run(self):
    while not stopFlag:
      data = gpsWorker.get()
      if data:
        self.sendData('{"GPS": "%s"}' % data)          
      data = imuWorker.get()
      if data:
        self.sendData('{"IMU": "%s"}' % data)
      time.sleep(0.04)
  #@asyncio.coroutine
  def sendData(self, data):
    for websocket in connected.copy():
      print("Sending data: %s" % data)
      #yield from websocket.send('{"GPS": "%s"}' % data)
@asyncio.coroutine
def handler(websocket, path):
  global connected
  connected.add(websocket)
  #TODO: handle client disconnection
  # i.e connected.remove(websocket)
if __name__ == "__main__":
  print('aeroPi server')
  gpsWorker = GPSWorker()
  imuWorker = IMUWorker()
  msgWorker = MSGWorker()
  try:
    gpsWorker.start()
    imuWorker.start()
    msgWorker.start()
    start_server = websockets.serve(handler, 'localhost', 7700)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(start_server)
    loop.run_forever()
  except KeyboardInterrupt:
    stopFlag = True
    loop.close()
    print("Exiting program...")
client.html
<!doctype html>
<html>
<head>
  <meta charset="UTF-8" />
</head>
<body>
</body>
</html>
<script type="text/javascript">
  var ws = new WebSocket("ws://localhost:7700", 'json');
  ws.onmessage = function (e) {
    var data = JSON.parse(e.data);
    console.log(data);
  };
</script>
Thanks you for your help
Finally I got it ! It required Python 3.5.1 (while my distro provide only 3.4.3) and some help from Aymeric, the author of the websockets library (thanks to him).
#!/usr/bin/env python3
import signal, sys
sys.path.append('.')
import time
import websockets
import asyncio
import threading
stopFlag = False
class GPSWorker (threading.Thread):
  def __init__(self):
    threading.Thread.__init__(self)
    self.data = 0
    self.lastData = 0
    self.inc = 0
  # Simulate GPS data
  def run(self):
    while not stopFlag:
      self.data = self.inc
      self.inc += 1
      time.sleep(1)
  def get(self):
    if self.lastData is not self.data:
      self.lastData = self.data
      return self.data
class IMUWorker (threading.Thread):
  def __init__(self):
    threading.Thread.__init__(self)
    self.data = 0
    self.lastData = 0
    self.inc = 0
  # Simulate IMU data
  def run(self):
    while not stopFlag:
      self.data = self.inc
      self.inc += 1
      time.sleep(0.04)
  def get(self):
    if self.lastData is not self.data:
      self.lastData = self.data
      return self.data
class MSGWorker (threading.Thread):
  def __init__(self):
    threading.Thread.__init__(self)
    self.connected = set()
  def run(self):
    while not stopFlag:
      data = gpsWorker.get()
      if data:
        self.sendData('{"GPS": "%s"}' % data)
      data = imuWorker.get()
      if data:
        self.sendData('{"IMU": "%s"}' % data)
      time.sleep(0.04)
  async def handler(self, websocket, path):
    self.connected.add(websocket)
    try:
      await websocket.recv()
    except websockets.exceptions.ConnectionClosed:
      pass
    finally:
      self.connected.remove(websocket)
  def sendData(self, data):
    for websocket in self.connected.copy():
      print("Sending data: %s" % data)
      coro = websocket.send(data)
      future = asyncio.run_coroutine_threadsafe(coro, loop)
if __name__ == "__main__":
  print('aeroPi server')
  gpsWorker = GPSWorker()
  imuWorker = IMUWorker()
  msgWorker = MSGWorker()
  try:
    gpsWorker.start()
    imuWorker.start()
    msgWorker.start()
    ws_server = websockets.serve(msgWorker.handler, '0.0.0.0', 7700)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(ws_server)
    loop.run_forever()
  except KeyboardInterrupt:
    stopFlag = True
    #TODO: close ws server and loop correctely
    print("Exiting program...")
Regards, Clément
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With