I am developing a Linux application using python that will connect to my BLE Device and get the data by notifying characteristic. I am using pygatt for BLE Communications. I can successfully connect and bond to device and read from/write to characteristics. Even I can subscribe to notify characteristic, but the problem is, my BLE device is a custom machine and have 4 counters inside it, every time one of the counter's data changed, it sets the corresponding flag for notification, thus, with the onDataChanged-like methods I can read the counter's data from reading characteristic. In Python using pygatt, I can subscribe to notify characteristic with:
class_name.device.subscribe(uuid.UUID(notify_characteristic),callback=notifyBle)
and the notifyBle is:
def notifyBle(self,handle,data):
read_data = class_name.device.char_read(uuid.UUID(read_characteristic))
print(read_data)
When I run the program, first I scan the devices and connect to my device and bond with it, then I discover characteristics and list them. All is successful. After listing the characteristics, I write to write characteristic to clear notification flags, it is successful too. Last I subscribe to notify characteristic it is successful.
After all these processes, I increase my device's counters physically(there are buttons on the device for increasing counters). When I press the button program goes to the notifyBle method and it gives the error, which is:
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner
self.run()
File "/usr/local/lib/python3.5/dist-packages/pygatt/backends/gatttool/gatttool.py", line 137, in run
event["callback"](event)
File "/usr/local/lib/python3.5/dist-packages/pygatt/backends/gatttool/gatttool.py", line 479, in _handle_notification_string
self._connected_device.receive_notification(handle, values)
File "/usr/local/lib/python3.5/dist-packages/pygatt/device.py", line 226, in receive_notification
callback(handle, value)
File "/home/acd/Masaüstü/python_workspace/ble.py", line 54, in notifyBle
read_data = bleFunctions.dev.char_read(uuid.UUID(bleFunctions.read_characteristic))
File "/usr/local/lib/python3.5/dist-packages/pygatt/backends/gatttool/device.py", line 17, in wrapper
return func(self, *args, **kwargs)
File "/usr/local/lib/python3.5/dist-packages/pygatt/backends/gatttool/device.py", line 40, in char_read
return self._backend.char_read(self, uuid, *args, **kwargs)
File "/usr/local/lib/python3.5/dist-packages/pygatt/backends/gatttool/gatttool.py", line 53, in wrapper
return func(self, *args, **kwargs)
File "/usr/local/lib/python3.5/dist-packages/pygatt/backends/gatttool/gatttool.py", line 519, in char_read
self.sendline('char-read-uuid %s' % uuid)
File "/usr/lib/python3.5/contextlib.py", line 66, in __exit__
next(self.gen)
File "/usr/local/lib/python3.5/dist-packages/pygatt/backends/gatttool/gatttool.py", line 180, in event
self.wait(event, timeout)
File "/usr/local/lib/python3.5/dist-packages/pygatt/backends/gatttool/gatttool.py", line 154, in wait
raise NotificationTimeout()
pygatt.exceptions.NotificationTimeout
Any help would be appreciated.
PS: I wrote the exact same program in Android, and in Windows UWP. With python, I am aiming to run this on raspberry pi 3.
PSS: I am using raspberry pi 3 with Ubuntu Mate installed for developing this program in python.
Firstly, create event class like below,
class Event:
def __init__(self):
self.handlers = set()
def handle(self, handler):
self.handlers.add(handler)
return self
def unhandle(self, handler):
try:
self.handlers.remove(handler)
except:
raise ValueError("Handler is not handling this event, so cannot unhandle it.")
return self
def fire(self, *args, **kargs):
for handler in self.handlers:
handler(*args, **kargs)
def getHandlerCount(self):
return len(self.handlers)
__iadd__ = handle
__isub__ = unhandle
__call__ = fire
__len__ = getHandlerCount
then, create a ble class
import pygatt
from eventclass import Event
class myBle:
ADDRESS_TYPE = pygatt.BLEAddressType.random
read_characteristic = "0000xxxx-0000-1000-8000-00805f9b34fb"
write_characteristic = "0000xxxx-0000-1000-8000-00805f9b34fb"
notify_characteristic = "0000xxxxx-0000-1000-8000-00805f9b34fb"
def __init__(self,device):
self.device = device
self.valueChanged = Event()
self.checkdata = False
def alert(self):
self.valueChanged(self.checkdata)
def write(self,data):
self.device.write_char(self.write_characteristic,binascii.unhexlify(data))
def notify(self,handle,data):
self.checkdata = True
def read(self):
if(self.checkdata):
self.read_data = self.device.char_read(uuid.UUID(self.read_characteristic))
self.write(bytearray(b'\x10\x00'))
self.checkdata = False
return self.read_data
def discover(self):
return self.device.discover_characteristics().keys()
when you get notification, you will set the boolean value to true and the alert method will notify the boolean value is changed.You will listen to alert method with
def triggerEvent(checkdata):
print(str(checkdata))
ble = myBle(device)
ble.valueChanged += triggerEvent
ble.alert()
you can call the read method to get characteristics value with triggerEvent method.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With