I'm trying to get a bluetoothe device working in Python on Bluez5. Currently I have the following:
#set up a bluez profile to advertise device capabilities from a loaded service record
def init_bluez_profile(self):
print("Configuring Bluez Profile")
#setup profile options
service_record=self.read_sdp_service_record()
opts = {
"ServiceRecord":service_record,
"Role":"server",
"RequireAuthentication":False,
"RequireAuthorization":False,
"Name":BTKbDevice.MY_DEV_NAME,
"AutoConnect":True
}
#retrieve a proxy for the bluez profile interface
bus = dbus.SystemBus()
self.manager = dbus.Interface(bus.get_object("org.bluez","/org/bluez"), "org.bluez.ProfileManager1")
self.profile = BTKbBluezProfile(bus, BTKbDevice.PROFILE_DBUS_PATH)
self.manager.RegisterProfile(BTKbDevice.PROFILE_DBUS_PATH, BTKbDevice.UUID, opts)
print("Profile registered ")
This code executes properly and the profile code is the standard one from teh bluez test cases:
class BTKbBluezProfile(dbus.service.Object):
fd = -1
@dbus.service.method("org.bluez.Profile1",
in_signature="", out_signature="")
def Release(self):
print("Release")
mainloop.quit()
@dbus.service.method("org.bluez.Profile1",
in_signature="", out_signature="")
def Cancel(self):
print("Cancel")
@dbus.service.method("org.bluez.Profile1", in_signature="oha{sv}", out_signature="")
def NewConnection(self, path, fd, properties):
self.fd = fd.take()
print("NewConnection(%s, %d)" % (path, self.fd))
for key in properties.keys():
print ('key ' + key + ' value ' + properties[key])
if key == "Version" or key == "Features":
print(" %s = 0x%04x" % (key, properties[key]))
else:
print(" %s = %s" % (key, properties[key]))
@dbus.service.method("org.bluez.Profile1", in_signature="o", out_signature="")
def RequestDisconnection(self, path):
print("RequestDisconnection(%s)" % (path))
if (self.fd > 0):
os.close(self.fd)
self.fd = -1
def __init__(self, bus, path):
dbus.service.Object.__init__(self, bus, path)
However when I get connections/disconnections nothing works. I've tried playing around with the various options but I just can't get anything to register. The documentation is light and there seems to be little debugging info I can get on the dbus communication. Has anybody succeeded in regsitering a profile and/or obtaining more debugging information on the bluez interaction?
Thanks.
This is a successful HID Keyboard implementation that is documented at: https://gist.github.com/ukBaz/a47e71e7b87fbc851b27cde7d1c0fcf0
""" Bluetooth HID keyboard emulator DBUS Service Original idea taken from: http://yetanotherpointlesstechblog.blogspot.com/2016/04/emulating-bluetooth-keyboard-with.html Moved to Python 3 and tested with BlueZ 5.43 Updates documented at: https://gist.github.com/ukBaz/a47e71e7b87fbc851b27cde7d1c0fcf0 """ import os import sys import dbus import dbus.service import socket from gi.repository import GLib from dbus.mainloop.glib import DBusGMainLoop class HumanInterfaceDeviceProfile(dbus.service.Object): """ BlueZ D-Bus Profile for HID """ fd = -1 @dbus.service.method('org.bluez.Profile1', in_signature='', out_signature='') def Release(self): print('Release') mainloop.quit() @dbus.service.method('org.bluez.Profile1', in_signature='oha{sv}', out_signature='') def NewConnection(self, path, fd, properties): self.fd = fd.take() print('NewConnection({}, {})'.format(path, self.fd)) for key in properties.keys(): if key == 'Version' or key == 'Features': print(' {} = 0x{:04x}'.format(key, properties[key])) else: print(' {} = {}'.format(key, properties[key])) @dbus.service.method('org.bluez.Profile1', in_signature='o', out_signature='') def RequestDisconnection(self, path): print('RequestDisconnection {}'.format(path)) if self.fd > 0: os.close(self.fd) self.fd = -1 class BTKbDevice: """ create a bluetooth device to emulate a HID keyboard """ MY_DEV_NAME = 'BT_HID_Keyboard' # Service port - must match port configured in SDP record P_CTRL = 17 # Service port - must match port configured in SDP record#Interrrupt port P_INTR = 19 # BlueZ dbus PROFILE_DBUS_PATH = '/bluez/yaptb/btkb_profile' ADAPTER_IFACE = 'org.bluez.Adapter1' DEVICE_INTERFACE = 'org.bluez.Device1' DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager' # file path of the sdp record to laod install_dir = os.path.dirname(os.path.realpath(__file__)) SDP_RECORD_PATH = os.path.join(install_dir, 'sdp_record.xml') # UUID for HID service (1124) # https://www.bluetooth.com/specifications/assigned-numbers/service-discovery UUID = '00001124-0000-1000-8000-00805f9b34fb' def __init__(self, hci=0): self.scontrol = None self.ccontrol = None # Socket object for control self.sinterrupt = None self.cinterrupt = None # Socket object for interrupt self.dev_path = '/org/bluez/hci{}'.format(hci) print('Setting up BT device') self.bus = dbus.SystemBus() self.adapter_methods = dbus.Interface( self.bus.get_object('org.bluez', self.dev_path), self.ADAPTER_IFACE) self.adapter_property = dbus.Interface( self.bus.get_object('org.bluez', self.dev_path), self.DBUS_PROP_IFACE) self.bus.add_signal_receiver(self.interfaces_added, dbus_interface=self.DBUS_OM_IFACE, signal_name='InterfacesAdded') self.bus.add_signal_receiver(self._properties_changed, dbus_interface=self.DBUS_PROP_IFACE, signal_name='PropertiesChanged', arg0=self.DEVICE_INTERFACE, path_keyword='path') print('Configuring for name {}'.format(BTKbDevice.MY_DEV_NAME)) self.config_hid_profile() # set the Bluetooth device configuration self.alias = BTKbDevice.MY_DEV_NAME self.discoverabletimeout = 0 self.discoverable = True def interfaces_added(self): pass def _properties_changed(self, interface, changed, invalidated, path): if self.on_disconnect is not None: if 'Connected' in changed: if not changed['Connected']: self.on_disconnect() def on_disconnect(self): print('The client has been disconnect') self.listen() @property def address(self): """Return the adapter MAC address.""" return self.adapter_property.Get(self.ADAPTER_IFACE, 'Address') @property def powered(self): """ power state of the Adapter. """ return self.adapter_property.Get(self.ADAPTER_IFACE, 'Powered') @powered.setter def powered(self, new_state): self.adapter_property.Set(self.ADAPTER_IFACE, 'Powered', new_state) @property def alias(self): return self.adapter_property.Get(self.ADAPTER_IFACE, 'Alias') @alias.setter def alias(self, new_alias): self.adapter_property.Set(self.ADAPTER_IFACE, 'Alias', new_alias) @property def discoverabletimeout(self): """Discoverable timeout of the Adapter.""" return self.adapter_props.Get(self.ADAPTER_IFACE, 'DiscoverableTimeout') @discoverabletimeout.setter def discoverabletimeout(self, new_timeout): self.adapter_property.Set(self.ADAPTER_IFACE, 'DiscoverableTimeout', dbus.UInt32(new_timeout)) @property def discoverable(self): """Discoverable state of the Adapter.""" return self.adapter_props.Get( self.ADAPTER_INTERFACE, 'Discoverable') @discoverable.setter def discoverable(self, new_state): self.adapter_property.Set(self.ADAPTER_IFACE, 'Discoverable', new_state) def config_hid_profile(self): """ Setup and register HID Profile """ print('Configuring Bluez Profile') service_record = self.read_sdp_service_record() opts = { 'Role': 'server', 'RequireAuthentication': False, 'RequireAuthorization': False, 'AutoConnect': True, 'ServiceRecord': service_record, } manager = dbus.Interface(self.bus.get_object('org.bluez', '/org/bluez'), 'org.bluez.ProfileManager1') HumanInterfaceDeviceProfile(self.bus, BTKbDevice.PROFILE_DBUS_PATH) manager.RegisterProfile(BTKbDevice.PROFILE_DBUS_PATH, BTKbDevice.UUID, opts) print('Profile registered ') @staticmethod def read_sdp_service_record(): """ Read and return SDP record from a file :return: (string) SDP record """ print('Reading service record') try: fh = open(BTKbDevice.SDP_RECORD_PATH, 'r') except OSError: sys.exit('Could not open the sdp record. Exiting...') return fh.read() def listen(self): """ Listen for connections coming from HID client """ print('Waiting for connections') self.scontrol = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) self.scontrol.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.sinterrupt = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) self.sinterrupt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.scontrol.bind((self.address, self.P_CTRL)) self.sinterrupt.bind((self.address, self.P_INTR)) # Start listening on the server sockets self.scontrol.listen(1) # Limit of 1 connection self.sinterrupt.listen(1) self.ccontrol, cinfo = self.scontrol.accept() print('{} connected on the control socket'.format(cinfo[0])) self.cinterrupt, cinfo = self.sinterrupt.accept() print('{} connected on the interrupt channel'.format(cinfo[0])) def send(self, msg): """ Send HID message :param msg: (bytes) HID packet to send """ self.cinterrupt.send(bytes(bytearray(msg))) class BTKbService(dbus.service.Object): """ Setup of a D-Bus service to recieve HID messages from other processes. Send the recieved HID messages to the Bluetooth HID server to send """ def __init__(self): print('Setting up service') bus_name = dbus.service.BusName('org.yaptb.btkbservice', bus=dbus.SystemBus()) dbus.service.Object.__init__(self, bus_name, '/org/yaptb/btkbservice') # create and setup our device self.device = BTKbDevice() # start listening for socket connections self.device.listen() @dbus.service.method('org.yaptb.btkbservice', in_signature='ay') def send_keys(self, cmd): self.device.send(cmd) if __name__ == '__main__': # The sockets require root permission if not os.geteuid() == 0: sys.exit('Only root can run this script') DBusGMainLoop(set_as_default=True) myservice = BTKbService() mainloop = GLib.MainLoop() mainloop.run()
'sdp_record.xml'
<?xml version="1.0" encoding="UTF-8" ?> <record> <attribute id="0x0001"> <sequence> <uuid value="0x1124" /> </sequence> </attribute> <attribute id="0x0004"> <sequence> <sequence> <uuid value="0x0100" /> <uint16 value="0x0011" /> </sequence> <sequence> <uuid value="0x0011" /> </sequence> </sequence> </attribute> <attribute id="0x0005"> <sequence> <uuid value="0x1002" /> </sequence> </attribute> <attribute id="0x0006"> <sequence> <uint16 value="0x656e" /> <uint16 value="0x006a" /> <uint16 value="0x0100" /> </sequence> </attribute> <attribute id="0x0009"> <sequence> <sequence> <uuid value="0x1124" /> <uint16 value="0x0100" /> </sequence> </sequence> </attribute> <attribute id="0x000d"> <sequence> <sequence> <sequence> <uuid value="0x0100" /> <uint16 value="0x0013" /> </sequence> <sequence> <uuid value="0x0011" /> </sequence> </sequence> </sequence> </attribute> <attribute id="0x0100"> <text value="Raspberry Pi Virtual Keyboard" /> </attribute> <attribute id="0x0101"> <text value="USB > BT Keyboard" /> </attribute> <attribute id="0x0102"> <text value="Raspberry Pi" /> </attribute> <attribute id="0x0200"> <uint16 value="0x0100" /> </attribute> <attribute id="0x0201"> <uint16 value="0x0111" /> </attribute> <attribute id="0x0202"> <uint8 value="0x40" /> </attribute> <attribute id="0x0203"> <uint8 value="0x00" /> </attribute> <attribute id="0x0204"> <boolean value="false" /> </attribute> <attribute id="0x0205"> <boolean value="false" /> </attribute> <attribute id="0x0206"> <sequence> <sequence> <uint8 value="0x22" /> <text encoding="hex" value="05010906a101850175019508050719e029e715002501810295017508810395057501050819012905910295017503910395067508150026ff000507190029ff8100c0050c0901a1018503150025017501950b0a23020a21020ab10109b809b609cd09b509e209ea09e9093081029501750d8103c0" /> </sequence> </sequence> </attribute> <attribute id="0x0207"> <sequence> <sequence> <uint16 value="0x0409" /> <uint16 value="0x0100" /> </sequence> </sequence> </attribute> <attribute id="0x020b"> <uint16 value="0x0100" /> </attribute> <attribute id="0x020c"> <uint16 value="0x0c80" /> </attribute> <attribute id="0x020d"> <boolean value="true" /> </attribute> <attribute id="0x020e"> <boolean value="false" /> </attribute> <attribute id="0x020f"> <uint16 value="0x0640" /> </attribute> <attribute id="0x0210"> <uint16 value="0x0320" /> </attribute> </record>
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With