Asyncio、Arduino BLE,并且不读取特性更新

发布于 2025-01-09 16:59:50 字数 8643 浏览 2 评论 0原文

我有一个 Arduino 33 BLE,它使用 BNO055 传感器校准和四元数数据的字符串表示来更新一些蓝牙特性。在 Arduino 方面,我看到校准和四元数数据按照预期以良好的有序顺序进行更新。

我有一个在 Windows 10 上运行的 Python (3.9) 程序,它使用 asyncio 订阅 Arduino 上的特性来读取更新。当 Arduino 的更新率为 1/秒时,一切工作正常。我所说的“工作正常”是指我看到了有序的更新顺序:四元数、校准、四元数、校准……我遇到的问题是我将更新速率更改为 10/秒(Arduino 中的延迟为 100 毫秒)并且例如,现在我得到了四元数数据的 100 次更新,但当更新次数应该相等时,校准数据只有 50 次更新。不知何故,我没有在 python 端正确处理更新。

python 代码如下所示:

import asyncio
import pandas as pd

from bleak import BleakClient
from bleak import BleakScanner

ardAddress = ''
found = ''
exit_flag = False

temperaturedata = []
timedata = []
calibrationdata=[]
quaterniondata=[]

# loop: asyncio.AbstractEventLoop

tempServiceUUID = '0000290c-0000-1000-8000-00805f9b34fb'  # Temperature Service UUID on Arduino 33 BLE

stringUUID = '00002a56-0000-1000-8000-00805f9b34fb'  # Characteristic of type String [Write to Arduino]
inttempUUID = '00002a1c-0000-1000-8000-00805f9b34fb'  # Characteristic of type Int [Temperature]
longdateUUID = '00002a08-0000-1000-8000-00805f9b34fb'  # Characteristic of type Long [datetime millis]

strCalibrationUUID = '00002a57-0000-1000-8000-00805f9b34fb'  # Characteristic of type String [BNO055 Calibration]
strQuaternionUUID = '9e6c967a-5a87-49a1-a13f-5a0f96188552'  # Characteristic of type Long [BNO055 Quaternion]


async def scanfordevices():
    devices = await BleakScanner.discover()
    for d in devices:
        print(d)
        if (d.name == 'TemperatureMonitor'):
            global found, ardAddress
            found = True
            print(f'{d.name=}')
            print(f'{d.address=}')
            ardAddress = d.address
            print(f'{d.rssi=}')
            return d.address


async def readtemperaturecharacteristic(client, uuid: str):
    val = await client.read_gatt_char(uuid)
    intval = int.from_bytes(val, byteorder='little')
    print(f'readtemperaturecharacteristic:  Value read from: {uuid} is:  {val} | as int={intval}')


async def readdatetimecharacteristic(client, uuid: str):
    val = await client.read_gatt_char(uuid)
    intval = int.from_bytes(val, byteorder='little')
    print(f'readdatetimecharacteristic:  Value read from: {uuid} is:  {val} | as int={intval}')


async def readcalibrationcharacteristic(client, uuid: str):
    # Calibration characteristic is a string
    val = await client.read_gatt_char(uuid)
    strval = val.decode('UTF-8')
    print(f'readcalibrationcharacteristic:  Value read from: {uuid} is:  {val} | as string={strval}')


async def getservices(client):
    svcs = await client.get_services()
    print("Services:")
    for service in svcs:
        print(service)

        ch = service.characteristics
        for c in ch:
            print(f'\tCharacteristic Desc:{c.description} | UUID:{c.uuid}')


def notification_temperature_handler(sender, data):
    """Simple notification handler which prints the data received."""
    intval = int.from_bytes(data, byteorder='little')
    # TODO:  review speed of append vs extend.  Extend using iterable but is faster
    temperaturedata.append(intval)
    #print(f'Temperature:  Sender: {sender}, and byte data= {data} as an Int={intval}')


def notification_datetime_handler(sender, data):
    """Simple notification handler which prints the data received."""
    intval = int.from_bytes(data, byteorder='little')
    timedata.append(intval)
    #print(f'Datetime: Sender: {sender}, and byte data= {data} as an Int={intval}')


def notification_calibration_handler(sender, data):
    """Simple notification handler which prints the data received."""
    strval = data.decode('UTF-8')
    numlist=extractvaluesaslist(strval,':')
    #Save to list for processing later
    calibrationdata.append(numlist)

    print(f'Calibration Data: {sender}, and byte data= {data} as a List={numlist}')


def notification_quaternion_handler(sender, data):
    """Simple notification handler which prints the data received."""
    strval = data.decode('UTF-8')
    numlist=extractvaluesaslist(strval,':')

    #Save to list for processing later
    quaterniondata.append(numlist)

    print(f'Quaternion Data: {sender}, and byte data= {data} as a List={numlist}')


def extractvaluesaslist(raw, separator=':'):
    # Get everything after separator
    s1 = raw.split(sep=separator)[1]
    s2 = s1.split(sep=',')
    return list(map(float, s2))


async def runmain():
    # Based on code from: https://github.com/hbldh/bleak/issues/254
    global exit_flag

    print('runmain: Starting Main Device Scan')

    await scanfordevices()

    print('runmain: Scan is done, checking if found Arduino')

    if found:
        async with BleakClient(ardAddress) as client:

            print('runmain: Getting Service Info')
            await getservices(client)

            # print('runmain: Reading from Characteristics Arduino')
            # await readdatetimecharacteristic(client, uuid=inttempUUID)
            # await readcalibrationcharacteristic(client, uuid=strCalibrationUUID)

            print('runmain: Assign notification callbacks')
            await client.start_notify(inttempUUID, notification_temperature_handler)
            await client.start_notify(longdateUUID, notification_datetime_handler)
            await client.start_notify(strCalibrationUUID, notification_calibration_handler)
            await client.start_notify(strQuaternionUUID, notification_quaternion_handler)

            while not exit_flag:
                await asyncio.sleep(1)
            # TODO:  This does nothing.  Understand why?
            print('runmain: Stopping notifications.')
            await client.stop_notify(inttempUUID)
            print('runmain: Write to characteristic to let it know we plan to quit.')
            await client.write_gatt_char(stringUUID, 'Stopping'.encode('ascii'))
    else:
        print('runmain: Arduino not found.  Check that its on')

    print('runmain: Done.')


def main():
    # get main event loop
    loop = asyncio.get_event_loop()

    try:
        loop.run_until_complete(runmain())
    except KeyboardInterrupt:
        global exit_flag
        print('\tmain: Caught keyboard interrupt in main')
        exit_flag = True
    finally:
        pass

    print('main: Getting all pending tasks')

    # From book Pg 26.
    pending = asyncio.all_tasks(loop=loop)
    print(f'\tmain: number of tasks={len(pending)}')
    for task in pending:
        task.cancel()
    group = asyncio.gather(*pending, return_exceptions=True)
    print('main: Waiting for tasks to complete')
    loop.run_until_complete(group)
    loop.close()

    # Display data recorded in Dataframe
    if len(temperaturedata)==len(timedata):
        print(f'Temperature data len={len(temperaturedata)}, and len of timedata={len(timedata)}')

        df = pd.DataFrame({'datetime': timedata,
                           'temperature': temperaturedata})
        #print(f'dataframe shape={df.shape}')
        #print(df)
        df.to_csv('temperaturedata.csv')
    else:
        print(f'No data or lengths different: temp={len(temperaturedata)}, time={len(timedata)}')

    if len(quaterniondata)==len(calibrationdata):
        print('Processing Quaternion and Calibration Data')
        #Load quaternion data
        dfq=pd.DataFrame(quaterniondata,columns=['time','qw','qx','qy','qz'])
        print(f'Quaternion dataframe shape={dfq.shape}')
        #Add datetime millis data
        #dfq.insert(0,'Time',timedata)
        #Load calibration data
        dfcal=pd.DataFrame(calibrationdata,columns=['time','syscal','gyrocal','accelcal','magcal'])
        print(f'Calibration dataframe shape={dfcal.shape}')
        #Merge two dataframes together
        dffinal=pd.concat([dfq,dfcal],axis=1)
        dffinal.to_csv('quaternion_and_cal_data.csv')
    else:
        print(f'No data or lengths different. Quat={len(quaterniondata)}, Cal={len(calibrationdata)}')
        if len(quaterniondata)>0:
            dfq = pd.DataFrame(quaterniondata, columns=['time', 'qw', 'qx', 'qy', 'qz'])
            dfq.to_csv('quaterniononly.csv')
        if len(calibrationdata)>0:
            dfcal = pd.DataFrame(calibrationdata, columns=['time','syscal', 'gyrocal', 'accelcal', 'magcal'])
            dfcal.to_csv('calibrationonly.csv')

    print("main: Done.")


if __name__ == "__main__":
    '''Starting Point of Program'''
    main()

那么,我的第一个问题是任何人都可以帮助我理解为什么我似乎没有在我的 Python 程序中获得所有更新吗?我应该看到 notification_quaternion_handler() 和 notification_calibration_handler() 被调用的次数相同,但我没有。我假设我没有正确使用 asyncio,但此时我无法调试它?

我的第二个问题是,是否有尝试从蓝牙接收相对高频更新(例如每 10-20 毫秒)的最佳实践?我正在尝试读取 IMU 传感器数据,并且需要以相当高的速率完成。

这是我第一次尝试蓝牙和异步,所以显然我还有很多东西要学习。

谢谢你的帮助

I have an Arduino 33 BLE that is updating a few bluetooth characteristics with a string representation of BNO055 sensor calibration and quaternion data. On the Arduino side, I see the calibration and quaternion data getting updated in a nice orderly sequence as expected.

I have a Python (3.9) program running on Windows 10 that uses asyncio to subscribe to the characteristics on the Arduino to read the updates. Everything works fine when I have an update rate on the Arduino of 1/second. By "works fine" I mean I see the orderly sequence of updates: quaternion, calibration, quaternion, calibration,.... The problem I have is that I changed the update rate to the 10/second (100ms delay in Arduino) and now I am getting, for example, 100 updates for quaternion data but only 50 updates for calibration data when the number of updates should be equal. Somehow I'm not handling the updates properly on the python side.

The python code is listed below:

import asyncio
import pandas as pd

from bleak import BleakClient
from bleak import BleakScanner

ardAddress = ''
found = ''
exit_flag = False

temperaturedata = []
timedata = []
calibrationdata=[]
quaterniondata=[]

# loop: asyncio.AbstractEventLoop

tempServiceUUID = '0000290c-0000-1000-8000-00805f9b34fb'  # Temperature Service UUID on Arduino 33 BLE

stringUUID = '00002a56-0000-1000-8000-00805f9b34fb'  # Characteristic of type String [Write to Arduino]
inttempUUID = '00002a1c-0000-1000-8000-00805f9b34fb'  # Characteristic of type Int [Temperature]
longdateUUID = '00002a08-0000-1000-8000-00805f9b34fb'  # Characteristic of type Long [datetime millis]

strCalibrationUUID = '00002a57-0000-1000-8000-00805f9b34fb'  # Characteristic of type String [BNO055 Calibration]
strQuaternionUUID = '9e6c967a-5a87-49a1-a13f-5a0f96188552'  # Characteristic of type Long [BNO055 Quaternion]


async def scanfordevices():
    devices = await BleakScanner.discover()
    for d in devices:
        print(d)
        if (d.name == 'TemperatureMonitor'):
            global found, ardAddress
            found = True
            print(f'{d.name=}')
            print(f'{d.address=}')
            ardAddress = d.address
            print(f'{d.rssi=}')
            return d.address


async def readtemperaturecharacteristic(client, uuid: str):
    val = await client.read_gatt_char(uuid)
    intval = int.from_bytes(val, byteorder='little')
    print(f'readtemperaturecharacteristic:  Value read from: {uuid} is:  {val} | as int={intval}')


async def readdatetimecharacteristic(client, uuid: str):
    val = await client.read_gatt_char(uuid)
    intval = int.from_bytes(val, byteorder='little')
    print(f'readdatetimecharacteristic:  Value read from: {uuid} is:  {val} | as int={intval}')


async def readcalibrationcharacteristic(client, uuid: str):
    # Calibration characteristic is a string
    val = await client.read_gatt_char(uuid)
    strval = val.decode('UTF-8')
    print(f'readcalibrationcharacteristic:  Value read from: {uuid} is:  {val} | as string={strval}')


async def getservices(client):
    svcs = await client.get_services()
    print("Services:")
    for service in svcs:
        print(service)

        ch = service.characteristics
        for c in ch:
            print(f'\tCharacteristic Desc:{c.description} | UUID:{c.uuid}')


def notification_temperature_handler(sender, data):
    """Simple notification handler which prints the data received."""
    intval = int.from_bytes(data, byteorder='little')
    # TODO:  review speed of append vs extend.  Extend using iterable but is faster
    temperaturedata.append(intval)
    #print(f'Temperature:  Sender: {sender}, and byte data= {data} as an Int={intval}')


def notification_datetime_handler(sender, data):
    """Simple notification handler which prints the data received."""
    intval = int.from_bytes(data, byteorder='little')
    timedata.append(intval)
    #print(f'Datetime: Sender: {sender}, and byte data= {data} as an Int={intval}')


def notification_calibration_handler(sender, data):
    """Simple notification handler which prints the data received."""
    strval = data.decode('UTF-8')
    numlist=extractvaluesaslist(strval,':')
    #Save to list for processing later
    calibrationdata.append(numlist)

    print(f'Calibration Data: {sender}, and byte data= {data} as a List={numlist}')


def notification_quaternion_handler(sender, data):
    """Simple notification handler which prints the data received."""
    strval = data.decode('UTF-8')
    numlist=extractvaluesaslist(strval,':')

    #Save to list for processing later
    quaterniondata.append(numlist)

    print(f'Quaternion Data: {sender}, and byte data= {data} as a List={numlist}')


def extractvaluesaslist(raw, separator=':'):
    # Get everything after separator
    s1 = raw.split(sep=separator)[1]
    s2 = s1.split(sep=',')
    return list(map(float, s2))


async def runmain():
    # Based on code from: https://github.com/hbldh/bleak/issues/254
    global exit_flag

    print('runmain: Starting Main Device Scan')

    await scanfordevices()

    print('runmain: Scan is done, checking if found Arduino')

    if found:
        async with BleakClient(ardAddress) as client:

            print('runmain: Getting Service Info')
            await getservices(client)

            # print('runmain: Reading from Characteristics Arduino')
            # await readdatetimecharacteristic(client, uuid=inttempUUID)
            # await readcalibrationcharacteristic(client, uuid=strCalibrationUUID)

            print('runmain: Assign notification callbacks')
            await client.start_notify(inttempUUID, notification_temperature_handler)
            await client.start_notify(longdateUUID, notification_datetime_handler)
            await client.start_notify(strCalibrationUUID, notification_calibration_handler)
            await client.start_notify(strQuaternionUUID, notification_quaternion_handler)

            while not exit_flag:
                await asyncio.sleep(1)
            # TODO:  This does nothing.  Understand why?
            print('runmain: Stopping notifications.')
            await client.stop_notify(inttempUUID)
            print('runmain: Write to characteristic to let it know we plan to quit.')
            await client.write_gatt_char(stringUUID, 'Stopping'.encode('ascii'))
    else:
        print('runmain: Arduino not found.  Check that its on')

    print('runmain: Done.')


def main():
    # get main event loop
    loop = asyncio.get_event_loop()

    try:
        loop.run_until_complete(runmain())
    except KeyboardInterrupt:
        global exit_flag
        print('\tmain: Caught keyboard interrupt in main')
        exit_flag = True
    finally:
        pass

    print('main: Getting all pending tasks')

    # From book Pg 26.
    pending = asyncio.all_tasks(loop=loop)
    print(f'\tmain: number of tasks={len(pending)}')
    for task in pending:
        task.cancel()
    group = asyncio.gather(*pending, return_exceptions=True)
    print('main: Waiting for tasks to complete')
    loop.run_until_complete(group)
    loop.close()

    # Display data recorded in Dataframe
    if len(temperaturedata)==len(timedata):
        print(f'Temperature data len={len(temperaturedata)}, and len of timedata={len(timedata)}')

        df = pd.DataFrame({'datetime': timedata,
                           'temperature': temperaturedata})
        #print(f'dataframe shape={df.shape}')
        #print(df)
        df.to_csv('temperaturedata.csv')
    else:
        print(f'No data or lengths different: temp={len(temperaturedata)}, time={len(timedata)}')

    if len(quaterniondata)==len(calibrationdata):
        print('Processing Quaternion and Calibration Data')
        #Load quaternion data
        dfq=pd.DataFrame(quaterniondata,columns=['time','qw','qx','qy','qz'])
        print(f'Quaternion dataframe shape={dfq.shape}')
        #Add datetime millis data
        #dfq.insert(0,'Time',timedata)
        #Load calibration data
        dfcal=pd.DataFrame(calibrationdata,columns=['time','syscal','gyrocal','accelcal','magcal'])
        print(f'Calibration dataframe shape={dfcal.shape}')
        #Merge two dataframes together
        dffinal=pd.concat([dfq,dfcal],axis=1)
        dffinal.to_csv('quaternion_and_cal_data.csv')
    else:
        print(f'No data or lengths different. Quat={len(quaterniondata)}, Cal={len(calibrationdata)}')
        if len(quaterniondata)>0:
            dfq = pd.DataFrame(quaterniondata, columns=['time', 'qw', 'qx', 'qy', 'qz'])
            dfq.to_csv('quaterniononly.csv')
        if len(calibrationdata)>0:
            dfcal = pd.DataFrame(calibrationdata, columns=['time','syscal', 'gyrocal', 'accelcal', 'magcal'])
            dfcal.to_csv('calibrationonly.csv')

    print("main: Done.")


if __name__ == "__main__":
    '''Starting Point of Program'''
    main()

So, my first question is can anyone help me understand why I do not seem to be getting all the updates in my Python program? I should be seeing notification_quaternion_handler() and notification_calibration_handler() called the same number of times but I am not. I assume I am not using asyncio properly but I am at a loss to debug it at this point?

My second question is, are there best practices for trying to receive relatively high frequency updates from bluetooth, for example every 10-20 ms? I am trying to read IMU sensor data and it needs to be done at a fairly high rate.

This is my first attempt at bluetooth and asyncio so clearly I have a lot to learn.

Thank You for the help

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

知足的幸福 2025-01-16 16:59:50

@ukBaz 的回答非常棒。

总结一下,供其他可能有类似问题的人参考。

在 Arduino 方面,我最终得到了这样的结果(仅显示了重要部分):

typedef struct  __attribute__ ((packed)) {
  unsigned long timeread;
  int qw; //float Quaternion values will be scaled to int by multiplying by constant
  int qx;
  int qy;
  int qz;
  uint8_t cal_system;
  uint8_t cal_gyro;
  uint8_t cal_accel;
  uint8_t cal_mag;
}sensordata ;

//Declare struct and populate
  sensordata datareading;

  datareading.timeread=tnow;
  datareading.qw=(int) (quat.w()*10000);
  datareading.qx=(int) (quat.x()*10000);
  datareading.qy=(int) (quat.y()*10000);
  datareading.qz=(int) (quat.z()*10000);
  datareading.cal_system=system;
  datareading.cal_gyro=gyro;
  datareading.cal_accel=accel;
  datareading.cal_mag=mag;

  //Write values to Characteristics.
  
  structDataChar.writeValue((uint8_t *)&datareading, sizeof(datareading));

然后在 Python(Windows 桌面)方面,我用它来解压发送的数据:

def notification_structdata_handler(sender, data):
    """Simple notification handler which prints the data received."""
    
    # NOTE:  IT IS CRITICAL THAT THE UNPACK BYTE STRUCTURE MATCHES THE STRUCT
    #        CONFIGURATION SHOWN IN THE ARDUINO C PROGRAM.
    
    # <hh meaning:  <=little endian, h=short (2 bytes), b=1 byte, i=int 4 bytes, unsigned long = 4 bytes

    #Scale factor used in Arduino to convert floats to ints.
    scale=10000

    # Main Sensor struct
    t,qw,qx,qy,qz,cs,cg,ca,cm= struct.unpack('<5i4b', data)

    sensorstructdata.append([t,qw/scale,qx/scale,qy/scale,qz/scale,cs,cg,ca,cm])

    print(f'--->Struct Decoded. time={t}, qw={qw/scale}, qx={qx/scale}, qy={qy/scale}, qz={qz/scale},'
          f'cal_s={cs}, cal_g={cg}, cal_a={ca}, cal_m={cm}')

感谢所有帮助,并且正如所承诺的,性能要好得多比我开始的时候!

Fantastic answer by @ukBaz.

In summary for other who may have a similar issue.

On the Arduino side I ended up with something like this (important parts only shown):

typedef struct  __attribute__ ((packed)) {
  unsigned long timeread;
  int qw; //float Quaternion values will be scaled to int by multiplying by constant
  int qx;
  int qy;
  int qz;
  uint8_t cal_system;
  uint8_t cal_gyro;
  uint8_t cal_accel;
  uint8_t cal_mag;
}sensordata ;

//Declare struct and populate
  sensordata datareading;

  datareading.timeread=tnow;
  datareading.qw=(int) (quat.w()*10000);
  datareading.qx=(int) (quat.x()*10000);
  datareading.qy=(int) (quat.y()*10000);
  datareading.qz=(int) (quat.z()*10000);
  datareading.cal_system=system;
  datareading.cal_gyro=gyro;
  datareading.cal_accel=accel;
  datareading.cal_mag=mag;

  //Write values to Characteristics.
  
  structDataChar.writeValue((uint8_t *)&datareading, sizeof(datareading));

Then on the Python (Windows Desktop) side I have this to unpack the data being sent:

def notification_structdata_handler(sender, data):
    """Simple notification handler which prints the data received."""
    
    # NOTE:  IT IS CRITICAL THAT THE UNPACK BYTE STRUCTURE MATCHES THE STRUCT
    #        CONFIGURATION SHOWN IN THE ARDUINO C PROGRAM.
    
    # <hh meaning:  <=little endian, h=short (2 bytes), b=1 byte, i=int 4 bytes, unsigned long = 4 bytes

    #Scale factor used in Arduino to convert floats to ints.
    scale=10000

    # Main Sensor struct
    t,qw,qx,qy,qz,cs,cg,ca,cm= struct.unpack('<5i4b', data)

    sensorstructdata.append([t,qw/scale,qx/scale,qy/scale,qz/scale,cs,cg,ca,cm])

    print(f'--->Struct Decoded. time={t}, qw={qw/scale}, qx={qx/scale}, qy={qy/scale}, qz={qz/scale},'
          f'cal_s={cs}, cal_g={cg}, cal_a={ca}, cal_m={cm}')

Thanks for all the help and as promised the performance is MUCH better than what I started with!

染年凉城似染瑾 2025-01-16 16:59:50

您有多个特征以相同的频率更新。在低功耗蓝牙 (BLE) 中,以相同特性传输这些值的效率更高。我注意到的另一件事是您似乎将值作为字符串发送。从字符串中提取信息的方式来看,字符串格式可能是“键:值”。这也是通过 BLE 发送数据的低效方式。

通过 BLE 传输的数据始终是字节列表,因此如果需要浮点型,则需要将其更改为整数以作为字节发送。举个例子,如果我们想发送一个带有两位小数的值,将其乘以 100 总是会删除小数位。换言之,它会除以 100。例如:

>>> value = 12.34
>>> send = int(value * 100)
>>> print(send)
1234
>>> send / 100
12.34

struct 库允许将整数轻松打包成一系列要发送的字节。举个例子:

>>> import struct
>>> value1 = 12.34
>>> value2 = 67.89
>>> send_bytes = struct.pack('<hh', int(value1 * 100), int(value2 * 100))
>>> print(send_bytes)
b'\xd2\x04\x85\x1a'

然后解压:

>>> r_val1, r_val2 = struct.unpack('<hh', send_bytes)
>>> print(f'Value1={r_val1/100} : Value2={r_val2/100}')
Value1=12.34 : Value2=67.89

使用具有最小传输字节数的单一特征应该允许更快的通知。

要了解其他特性如何做到这一点,请查看蓝牙 SIG 的以下文档:
https://www.bluetooth.com/specations/specs/gatt -specation-supplement-5/

血压测量特性就是一个很好的例子。

You have multiple characteristics that are being updated at the same frequency. It is more efficient in Bluetooth Low Energy (BLE) to transmit those values in the same characteristic. The other thing I noticed is that you appear to be sending the value as a string. It looks like the string format might "key:value" by the way you are extracting information from the string. This is also inefficient way to send data via BLE.

The data that is transmitted over BLE is always a list of bytes so if a float is required, it needs to be changed into an integer to be sent as bytes. As an example, if we wanted to send a value with two decimal places, multiplying it by 100 would always remove the decimal places. To go the other way it would be divide by 100. e.g:

>>> value = 12.34
>>> send = int(value * 100)
>>> print(send)
1234
>>> send / 100
12.34

The struct library allows integers to be easily packed that into a series of byes to send. As an example:

>>> import struct
>>> value1 = 12.34
>>> value2 = 67.89
>>> send_bytes = struct.pack('<hh', int(value1 * 100), int(value2 * 100))
>>> print(send_bytes)
b'\xd2\x04\x85\x1a'

To then unpack that:

>>> r_val1, r_val2 = struct.unpack('<hh', send_bytes)
>>> print(f'Value1={r_val1/100} : Value2={r_val2/100}')
Value1=12.34 : Value2=67.89

Using a single characteristic with the minimum number of bytes being transmitted should allow for the faster notifications.

To look at how other characteristics do this then look at the following document from the Bluetooth SIG:
https://www.bluetooth.com/specifications/specs/gatt-specification-supplement-5/

A good example might be the Blood Pressure Measurement characteristic.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文