Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Bluez Profile Registration

Tags:

python

bluez

I'm trying to get a bluetoothe device working in Python on Bluez5. Currently I have the following:

    #set up a bluez profile to advertise device capabilities from a loaded service record
def init_bluez_profile(self):

    print("Configuring Bluez Profile")

    #setup profile options
    service_record=self.read_sdp_service_record()

    opts = {
        "ServiceRecord":service_record,
        "Role":"server",
        "RequireAuthentication":False,
        "RequireAuthorization":False,
        "Name":BTKbDevice.MY_DEV_NAME,
        "AutoConnect":True
    }

    #retrieve a proxy for the bluez profile interface
    bus = dbus.SystemBus()
    self.manager = dbus.Interface(bus.get_object("org.bluez","/org/bluez"), "org.bluez.ProfileManager1")
    self.profile = BTKbBluezProfile(bus, BTKbDevice.PROFILE_DBUS_PATH)
    self.manager.RegisterProfile(BTKbDevice.PROFILE_DBUS_PATH, BTKbDevice.UUID, opts)
    print("Profile registered ")

This code executes properly and the profile code is the standard one from teh bluez test cases:

class BTKbBluezProfile(dbus.service.Object):
fd = -1

@dbus.service.method("org.bluez.Profile1",
                                in_signature="", out_signature="")
def Release(self):
        print("Release")
        mainloop.quit()

@dbus.service.method("org.bluez.Profile1",
                                in_signature="", out_signature="")
def Cancel(self):
        print("Cancel")

@dbus.service.method("org.bluez.Profile1", in_signature="oha{sv}", out_signature="")
def NewConnection(self, path, fd, properties):
        self.fd = fd.take()
        print("NewConnection(%s, %d)" % (path, self.fd))
        for key in properties.keys():
                print ('key ' + key + ' value ' + properties[key])
                if key == "Version" or key == "Features":
                        print("  %s = 0x%04x" % (key, properties[key]))
                else:
                        print("  %s = %s" % (key, properties[key]))



@dbus.service.method("org.bluez.Profile1", in_signature="o", out_signature="")
def RequestDisconnection(self, path):
        print("RequestDisconnection(%s)" % (path))

        if (self.fd > 0):
                os.close(self.fd)
                self.fd = -1

def __init__(self, bus, path):
        dbus.service.Object.__init__(self, bus, path)

However when I get connections/disconnections nothing works. I've tried playing around with the various options but I just can't get anything to register. The documentation is light and there seems to be little debugging info I can get on the dbus communication. Has anybody succeeded in regsitering a profile and/or obtaining more debugging information on the bluez interaction?

Thanks.

like image 395
Neil Benn Avatar asked Sep 28 '18 15:09

Neil Benn


1 Answers

This is a successful HID Keyboard implementation that is documented at: https://gist.github.com/ukBaz/a47e71e7b87fbc851b27cde7d1c0fcf0

""" Bluetooth HID keyboard emulator DBUS Service Original idea taken from: http://yetanotherpointlesstechblog.blogspot.com/2016/04/emulating-bluetooth-keyboard-with.html Moved to Python 3 and tested with BlueZ 5.43 Updates documented at: https://gist.github.com/ukBaz/a47e71e7b87fbc851b27cde7d1c0fcf0 """ import os import sys import dbus import dbus.service import socket   from gi.repository import GLib from dbus.mainloop.glib import DBusGMainLoop   class HumanInterfaceDeviceProfile(dbus.service.Object):     """     BlueZ D-Bus Profile for HID     """     fd = -1      @dbus.service.method('org.bluez.Profile1',                          in_signature='', out_signature='')     def Release(self):             print('Release')             mainloop.quit()      @dbus.service.method('org.bluez.Profile1',                          in_signature='oha{sv}', out_signature='')     def NewConnection(self, path, fd, properties):             self.fd = fd.take()             print('NewConnection({}, {})'.format(path, self.fd))             for key in properties.keys():                     if key == 'Version' or key == 'Features':                             print('  {} = 0x{:04x}'.format(key,                                                            properties[key]))                     else:                             print('  {} = {}'.format(key, properties[key]))      @dbus.service.method('org.bluez.Profile1',                          in_signature='o', out_signature='')     def RequestDisconnection(self, path):             print('RequestDisconnection {}'.format(path))              if self.fd > 0:                     os.close(self.fd)                     self.fd = -1   class BTKbDevice:     """     create a bluetooth device to emulate a HID keyboard     """     MY_DEV_NAME = 'BT_HID_Keyboard'     # Service port - must match port configured in SDP record     P_CTRL = 17     # Service port - must match port configured in SDP record#Interrrupt port     P_INTR = 19     # BlueZ dbus     PROFILE_DBUS_PATH = '/bluez/yaptb/btkb_profile'     ADAPTER_IFACE = 'org.bluez.Adapter1'     DEVICE_INTERFACE = 'org.bluez.Device1'     DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'     DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'      # file path of the sdp record to laod     install_dir  = os.path.dirname(os.path.realpath(__file__))     SDP_RECORD_PATH = os.path.join(install_dir,                                    'sdp_record.xml')     # UUID for HID service (1124)     # https://www.bluetooth.com/specifications/assigned-numbers/service-discovery     UUID = '00001124-0000-1000-8000-00805f9b34fb'      def __init__(self, hci=0):         self.scontrol = None         self.ccontrol = None  # Socket object for control         self.sinterrupt = None         self.cinterrupt = None  # Socket object for interrupt         self.dev_path = '/org/bluez/hci{}'.format(hci)         print('Setting up BT device')         self.bus = dbus.SystemBus()         self.adapter_methods = dbus.Interface(             self.bus.get_object('org.bluez',                                 self.dev_path),             self.ADAPTER_IFACE)         self.adapter_property = dbus.Interface(             self.bus.get_object('org.bluez',                                 self.dev_path),             self.DBUS_PROP_IFACE)          self.bus.add_signal_receiver(self.interfaces_added,                                      dbus_interface=self.DBUS_OM_IFACE,                                      signal_name='InterfacesAdded')          self.bus.add_signal_receiver(self._properties_changed,                                      dbus_interface=self.DBUS_PROP_IFACE,                                      signal_name='PropertiesChanged',                                      arg0=self.DEVICE_INTERFACE,                                      path_keyword='path')          print('Configuring for name {}'.format(BTKbDevice.MY_DEV_NAME))          self.config_hid_profile()          # set the Bluetooth device configuration         self.alias = BTKbDevice.MY_DEV_NAME         self.discoverabletimeout = 0         self.discoverable = True      def interfaces_added(self):         pass      def _properties_changed(self, interface, changed, invalidated, path):         if self.on_disconnect is not None:             if 'Connected' in changed:                 if not changed['Connected']:                     self.on_disconnect()      def on_disconnect(self):         print('The client has been disconnect')         self.listen()      @property     def address(self):         """Return the adapter MAC address."""         return self.adapter_property.Get(self.ADAPTER_IFACE,                                          'Address')      @property     def powered(self):         """         power state of the Adapter.         """         return self.adapter_property.Get(self.ADAPTER_IFACE, 'Powered')      @powered.setter     def powered(self, new_state):         self.adapter_property.Set(self.ADAPTER_IFACE, 'Powered', new_state)      @property     def alias(self):         return self.adapter_property.Get(self.ADAPTER_IFACE,                                          'Alias')      @alias.setter     def alias(self, new_alias):         self.adapter_property.Set(self.ADAPTER_IFACE,                                   'Alias',                                   new_alias)      @property     def discoverabletimeout(self):         """Discoverable timeout of the Adapter."""         return self.adapter_props.Get(self.ADAPTER_IFACE,                                       'DiscoverableTimeout')      @discoverabletimeout.setter     def discoverabletimeout(self, new_timeout):         self.adapter_property.Set(self.ADAPTER_IFACE,                                   'DiscoverableTimeout',                                   dbus.UInt32(new_timeout))      @property     def discoverable(self):         """Discoverable state of the Adapter."""         return self.adapter_props.Get(             self.ADAPTER_INTERFACE, 'Discoverable')      @discoverable.setter     def discoverable(self, new_state):         self.adapter_property.Set(self.ADAPTER_IFACE,                                   'Discoverable',                                   new_state)      def config_hid_profile(self):         """         Setup and register HID Profile         """          print('Configuring Bluez Profile')         service_record = self.read_sdp_service_record()          opts = {             'Role': 'server',             'RequireAuthentication': False,             'RequireAuthorization': False,             'AutoConnect': True,             'ServiceRecord': service_record,         }          manager = dbus.Interface(self.bus.get_object('org.bluez',                                                      '/org/bluez'),                                  'org.bluez.ProfileManager1')          HumanInterfaceDeviceProfile(self.bus,                                     BTKbDevice.PROFILE_DBUS_PATH)          manager.RegisterProfile(BTKbDevice.PROFILE_DBUS_PATH,                                 BTKbDevice.UUID,                                 opts)          print('Profile registered ')      @staticmethod     def read_sdp_service_record():         """         Read and return SDP record from a file         :return: (string) SDP record         """         print('Reading service record')         try:             fh = open(BTKbDevice.SDP_RECORD_PATH, 'r')         except OSError:             sys.exit('Could not open the sdp record. Exiting...')          return fh.read()         def listen(self):         """         Listen for connections coming from HID client         """          print('Waiting for connections')         self.scontrol = socket.socket(socket.AF_BLUETOOTH,                                       socket.SOCK_SEQPACKET,                                       socket.BTPROTO_L2CAP)         self.scontrol.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)         self.sinterrupt = socket.socket(socket.AF_BLUETOOTH,                                         socket.SOCK_SEQPACKET,                                         socket.BTPROTO_L2CAP)         self.sinterrupt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)         self.scontrol.bind((self.address, self.P_CTRL))         self.sinterrupt.bind((self.address, self.P_INTR))          # Start listening on the server sockets         self.scontrol.listen(1)  # Limit of 1 connection         self.sinterrupt.listen(1)          self.ccontrol, cinfo = self.scontrol.accept()         print('{} connected on the control socket'.format(cinfo[0]))          self.cinterrupt, cinfo = self.sinterrupt.accept()         print('{} connected on the interrupt channel'.format(cinfo[0]))      def send(self, msg):         """         Send HID message         :param msg: (bytes) HID packet to send         """         self.cinterrupt.send(bytes(bytearray(msg)))   class BTKbService(dbus.service.Object):     """     Setup of a D-Bus service to recieve HID messages from other     processes.     Send the recieved HID messages to the Bluetooth HID server to send     """     def __init__(self):         print('Setting up service')          bus_name = dbus.service.BusName('org.yaptb.btkbservice',                                         bus=dbus.SystemBus())         dbus.service.Object.__init__(self, bus_name, '/org/yaptb/btkbservice')          # create and setup our device         self.device = BTKbDevice()          # start listening for socket connections         self.device.listen()      @dbus.service.method('org.yaptb.btkbservice',                          in_signature='ay')     def send_keys(self, cmd):         self.device.send(cmd)   if __name__ == '__main__':     # The sockets require root permission     if not os.geteuid() == 0:         sys.exit('Only root can run this script')      DBusGMainLoop(set_as_default=True)     myservice = BTKbService()     mainloop = GLib.MainLoop()     mainloop.run() 

'sdp_record.xml'

<?xml version="1.0" encoding="UTF-8" ?>  <record>     <attribute id="0x0001">         <sequence>             <uuid value="0x1124" />         </sequence>     </attribute>     <attribute id="0x0004">         <sequence>             <sequence>                 <uuid value="0x0100" />                 <uint16 value="0x0011" />             </sequence>             <sequence>                 <uuid value="0x0011" />             </sequence>         </sequence>     </attribute>     <attribute id="0x0005">         <sequence>             <uuid value="0x1002" />         </sequence>     </attribute>     <attribute id="0x0006">         <sequence>             <uint16 value="0x656e" />             <uint16 value="0x006a" />             <uint16 value="0x0100" />         </sequence>     </attribute>     <attribute id="0x0009">         <sequence>             <sequence>                 <uuid value="0x1124" />                 <uint16 value="0x0100" />             </sequence>         </sequence>     </attribute>     <attribute id="0x000d">         <sequence>             <sequence>                 <sequence>                     <uuid value="0x0100" />                     <uint16 value="0x0013" />                 </sequence>                 <sequence>                     <uuid value="0x0011" />                 </sequence>             </sequence>         </sequence>     </attribute>     <attribute id="0x0100">         <text value="Raspberry Pi Virtual Keyboard" />     </attribute>     <attribute id="0x0101">         <text value="USB > BT Keyboard" />     </attribute>     <attribute id="0x0102">         <text value="Raspberry Pi" />     </attribute>     <attribute id="0x0200">         <uint16 value="0x0100" />     </attribute>     <attribute id="0x0201">         <uint16 value="0x0111" />     </attribute>     <attribute id="0x0202">         <uint8 value="0x40" />     </attribute>     <attribute id="0x0203">         <uint8 value="0x00" />     </attribute>     <attribute id="0x0204">         <boolean value="false" />     </attribute>     <attribute id="0x0205">         <boolean value="false" />     </attribute>     <attribute id="0x0206">         <sequence>             <sequence>                 <uint8 value="0x22" />                 <text encoding="hex" value="05010906a101850175019508050719e029e715002501810295017508810395057501050819012905910295017503910395067508150026ff000507190029ff8100c0050c0901a1018503150025017501950b0a23020a21020ab10109b809b609cd09b509e209ea09e9093081029501750d8103c0" />             </sequence>         </sequence>     </attribute>     <attribute id="0x0207">         <sequence>             <sequence>                 <uint16 value="0x0409" />                 <uint16 value="0x0100" />             </sequence>         </sequence>     </attribute>     <attribute id="0x020b">         <uint16 value="0x0100" />     </attribute>     <attribute id="0x020c">         <uint16 value="0x0c80" />     </attribute>     <attribute id="0x020d">         <boolean value="true" />     </attribute>     <attribute id="0x020e">         <boolean value="false" />     </attribute>     <attribute id="0x020f">         <uint16 value="0x0640" />     </attribute>     <attribute id="0x0210">         <uint16 value="0x0320" />     </attribute> </record> 
like image 90
ukBaz Avatar answered Sep 26 '22 16:09

ukBaz