Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python Bleak scan for advertisements and exit event loop

I've inherited some code that utilizes Python Bleak to scan for advertisements emitted from a certain device. Whenever an advertisement from the Bluetooth mac address and service id we're looking for is detected and a certain condition from the extracted payload information is true, we want to terminate and return. In the attached code, I've masked the Bluetooth and service ID:s.

Not being too familiar with the event loop, is there a way to exit before the timer runs out? I suppose there's probably a better way to approach this problem.

Sample code:

import asyncio
import struct

from bleak import BleakScanner

timeout_seconds = 10
address_to_look_for = 'masked'
service_id_to_look_for = 'masked'

def detection_callback(device, advertisement_data):
    if device.address == address_to_look_for:
        byte_data = advertisement_data.service_data.get(service_id_to_look_for)
        num_to_test = struct.unpack_from('<I', byte_data, 0)
        if num_to_test == 1:
            print('here we want to terminate')
        

async def run():
    scanner = BleakScanner()
    scanner.register_detection_callback(detection_callback)
    await scanner.start()
    await asyncio.sleep(timeout_seconds)
    await scanner.stop()

if __name__=='__main__':    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())
like image 988
koffertfisk Avatar asked Mar 05 '26 16:03

koffertfisk


1 Answers

I'm sure there are many ways this can be done. A small mod to your code would be rather than having the asyncio.sleep for the full period before you stop the scan, you could could have a while loop that ends on time elapsed or device found event.

For example:

import asyncio
import struct

from bleak import BleakScanner

timeout_seconds = 20
address_to_look_for = 'F1:D9:3B:39:4D:A2'
service_id_to_look_for = '0000feaa-0000-1000-8000-00805f9b34fb'


class MyScanner:
    def __init__(self):
        self._scanner = BleakScanner()
        self._scanner.register_detection_callback(self.detection_callback)
        self.scanning = asyncio.Event()

    def detection_callback(self, device, advertisement_data):
        # Looking for:
        # AdvertisementData(service_data={
        # '0000feaa-0000-1000-8000-00805f9b34fb': b'\x00\xf6\x00\x00\x00Jupiter\x00\x00\x00\x00\x00\x0b'},
        # service_uuids=['0000feaa-0000-1000-8000-00805f9b34fb'])
        if device.address == address_to_look_for:
            byte_data = advertisement_data.service_data.get(service_id_to_look_for)
            num_to_test, = struct.unpack_from('<I', byte_data, 0)
            if num_to_test == 62976:
                print('\t\tDevice found so we terminate')
                self.scanning.clear()

    async def run(self):
        await self._scanner.start()
        self.scanning.set()
        end_time = loop.time() + timeout_seconds
        while self.scanning.is_set():
            if loop.time() > end_time:
                self.scanning.clear()
                print('\t\tScan has timed out so we terminate')
            await asyncio.sleep(0.1)
        await self._scanner.stop()


if __name__ == '__main__':
    my_scanner = MyScanner()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(my_scanner.run())
like image 149
ukBaz Avatar answered Mar 08 '26 04:03

ukBaz