使用用户界面管理异步Python

发布于 2025-02-11 00:31:53 字数 399 浏览 1 评论 0原文

我正在建立一个涉及触摸屏用户界面的Raspberry Pi项目,并通过蓝牙低能读取温度传感器。我使用的是Python,Bleak Bluetooth库和Graphics.py Simple Graphics软件包。

graphics.py想要拥有事件循环。 Bleak与Asyncio深深相关,并希望生活在自己的Asyncio活动循环中。因此,我分叉了一个线程来照顾蓝牙内容,这些内容只是将其读数存放在全球变量中。一切都很好,直到用户按下退出按钮或某些错误终止程序。如果我不进行尝试并正确清理蓝牙物品,那么下次都不会找到该设备;在与该设备连接时,操作系统中更深的东西被卡住了,直到我重新启动为止。但这是在一个单独的线程中,因此将清理蓝牙的控制流并不明显。

是否有一种著名的方法来处理这种事情?一种众所周知的范式,用于将异步Python与图形用户界面混合?

I am building a Raspberry Pi project involving a touchscreen user interface, and reading temperature sensors via Bluetooth Low Energy. I'm using Python, the BLEAK Bluetooth library, and the graphics.py simple graphics package.

Graphics.py kind of wants to own the event loop. BLEAK is deeply involved with asyncio and wants to live in its own asyncio event loop. So I forked a thread to take care of the Bluetooth stuff, which simply deposits its readings in global variables. All well and good until the user presses a Quit button or some error terminates the program. If I don't do a try-finally and clean up the Bluetooth stuff properly, it won't even find the device the next time around; something deeper in the OS is jammed when it comes to connecting with that device, until I reboot. But it's in a separate thread, so the flow of control that is going to get that Bluetooth cleaned up is not obvious.

Is there a well-known method for dealing with this sort of thing? A well-known paradigm for mixing asynchronous Python with a graphical user interface?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

倾城泪 2025-02-18 00:31:53

为GUI运行事件循环并访问蓝牙是必经之路。

可以使用为设备记录的Bluez D-Bus API完成此操作:
https> https://git.kernel。 org/pub/scm/bluetooth/bluez.git/tree/doc/device-api.txt

和gatt特征:
https> https://git.kernel。 org/pub/scm/bluetooth/bluez.git/tree/doc/doc/gatt-api.txt

GTK库周围有一些合理的文档:
htttps://pygobject.readthedocs.io/en/en/latest/getting_startest/getting_started.html

和其他教程:
https://python-gtk-3-tutorial.readthecss。 IO/en/最终/install.html

以下是我拥有的设备的一个小示例,该设备在其温度上发出通知:

from time import sleep

import gi
gi.require_version("Gtk", "3.0")
from gi.repository import Gio, Gtk

ADAPTER_PATH = '/org/bluez/hci0'
TEMP_DATA = 'E95D9250-251D-470A-A062-FA1922DFA9A8'

BLUEZ_BUS_NAME = 'org.bluez'
MNGR_IFACE = 'org.freedesktop.DBus.ObjectManager'
PROP_IFACE = 'org.freedesktop.DBus.Properties'
DEVICE_IFACE = 'org.bluez.Device1'
BLE_CHRC_IFACE = 'org.bluez.GattCharacteristic1'


class MainApp(Gtk.Window):

    def __init__(self, remote_device):
        super().__init__(title="Thermometer")
        self.ble_dev = remote_device
        # Veritical layout box
        self.box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6)
        self.add(self.box)

        # horizontal box for buttons
        hbox = Gtk.Box(spacing=6)
        # Connect Button
        button = Gtk.Button.new_with_label("Connect")
        button.connect("clicked", self.connect)
        hbox.pack_start(button, True, True, 0)
        # Disconnect Button
        button = Gtk.Button.new_with_label("Disconnect")
        button.connect("clicked", self.disconnect)
        hbox.pack_start(button, True, True, 0)
        # Add labels to vertical layout
        self.box.pack_start(hbox, True, True, 0)
        self.temp_label = Gtk.Label(label='Temperature')
        self.box.pack_start(self.temp_label, True, True, 0)
        self.temp_value = Gtk.Label(label='Value Here')
        self.box.pack_start(self.temp_value, True, True, 0)

    def update_label(self, proxy, changed_props, invalidated_props):
        """Callback to update with new temperature"""
        props = changed_props.unpack()
        # print(props)
        temperature = props.get('Value')
        if temperature:
            self.temp_value.set_text(str(temperature[0]))

    def connect(self, button):
        """Connect to BLE device and start notifications"""
        self.ble_dev.dev_mthd_proxy.Connect()
        while not self.ble_dev.dev_props_proxy.Get(
                '(ss)', DEVICE_IFACE, 'ServicesResolved'):
            sleep(0.5)
        temp_path = get_characteristic_path(
            self.ble_dev.mngr_proxy,
            self.ble_dev.device_path, TEMP_DATA)
        self.ble_dev.tmp_gatt_proxy = bluez_proxy(temp_path, BLE_CHRC_IFACE)
        self.ble_dev.tmp_gatt_proxy.StartNotify()
        self.ble_dev.tmp_gatt_proxy.connect(
            'g-properties-changed', self.update_label)

    def disconnect(self, button):
        """Stop notifications and disconnect"""
        self.ble_dev.tmp_gatt_proxy.StopNotify()
        self.ble_dev.dev_mthd_proxy.Disconnect()


class TemperateDevice:
    """Collection of proxies for remote device"""
    def __init__(self, address):
        self.device_path = f"{ADAPTER_PATH}/dev_{address.replace(':', '_')}"
        self.mngr_proxy = bluez_proxy('/', MNGR_IFACE)
        self.dev_mthd_proxy = bluez_proxy(self.device_path, DEVICE_IFACE)
        self.dev_props_proxy = bluez_proxy(self.device_path, PROP_IFACE)
        self.tmp_gatt_proxy = None


def get_characteristic_path(mngr, dev_path, uuid):
    """Look up D-Bus path for characteristic UUID"""
    mng_objs = mngr.GetManagedObjects()
    for path in mng_objs:
        chr_uuid = mng_objs[path].get(BLE_CHRC_IFACE, {}).get('UUID')
        if path.startswith(dev_path) and chr_uuid == uuid.casefold():
            return path


def bluez_proxy(object_path, interface):
    """Function to assist with creating BlueZ proxies"""
    return Gio.DBusProxy.new_for_bus_sync(
        bus_type=Gio.BusType.SYSTEM,
        flags=Gio.DBusProxyFlags.NONE,
        info=None,
        name=BLUEZ_BUS_NAME,
        object_path=object_path,
        interface_name=interface,
        cancellable=None)


def main(address):
    print("running application")
    ble_dev = TemperateDevice(address)
    app = MainApp(ble_dev)
    app.show_all()
    Gtk.main()


if __name__ == "__main__":
    main("E1:4B:6C:22:56:F0")

Running an event loop for the GUI and accessing Bluetooth is the way to go.

This can be done using the BlueZ D-Bus API that is documented for the device:
https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/doc/device-api.txt

And the GATT characteristics:
https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/doc/gatt-api.txt

There is some reasonable documentation around the GTK library:
https://pygobject.readthedocs.io/en/latest/getting_started.html

And other tutorials:
https://python-gtk-3-tutorial.readthedocs.io/en/latest/install.html

Below is a small example for a device I have that gives notifications over BLE of its temperature:

from time import sleep

import gi
gi.require_version("Gtk", "3.0")
from gi.repository import Gio, Gtk

ADAPTER_PATH = '/org/bluez/hci0'
TEMP_DATA = 'E95D9250-251D-470A-A062-FA1922DFA9A8'

BLUEZ_BUS_NAME = 'org.bluez'
MNGR_IFACE = 'org.freedesktop.DBus.ObjectManager'
PROP_IFACE = 'org.freedesktop.DBus.Properties'
DEVICE_IFACE = 'org.bluez.Device1'
BLE_CHRC_IFACE = 'org.bluez.GattCharacteristic1'


class MainApp(Gtk.Window):

    def __init__(self, remote_device):
        super().__init__(title="Thermometer")
        self.ble_dev = remote_device
        # Veritical layout box
        self.box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6)
        self.add(self.box)

        # horizontal box for buttons
        hbox = Gtk.Box(spacing=6)
        # Connect Button
        button = Gtk.Button.new_with_label("Connect")
        button.connect("clicked", self.connect)
        hbox.pack_start(button, True, True, 0)
        # Disconnect Button
        button = Gtk.Button.new_with_label("Disconnect")
        button.connect("clicked", self.disconnect)
        hbox.pack_start(button, True, True, 0)
        # Add labels to vertical layout
        self.box.pack_start(hbox, True, True, 0)
        self.temp_label = Gtk.Label(label='Temperature')
        self.box.pack_start(self.temp_label, True, True, 0)
        self.temp_value = Gtk.Label(label='Value Here')
        self.box.pack_start(self.temp_value, True, True, 0)

    def update_label(self, proxy, changed_props, invalidated_props):
        """Callback to update with new temperature"""
        props = changed_props.unpack()
        # print(props)
        temperature = props.get('Value')
        if temperature:
            self.temp_value.set_text(str(temperature[0]))

    def connect(self, button):
        """Connect to BLE device and start notifications"""
        self.ble_dev.dev_mthd_proxy.Connect()
        while not self.ble_dev.dev_props_proxy.Get(
                '(ss)', DEVICE_IFACE, 'ServicesResolved'):
            sleep(0.5)
        temp_path = get_characteristic_path(
            self.ble_dev.mngr_proxy,
            self.ble_dev.device_path, TEMP_DATA)
        self.ble_dev.tmp_gatt_proxy = bluez_proxy(temp_path, BLE_CHRC_IFACE)
        self.ble_dev.tmp_gatt_proxy.StartNotify()
        self.ble_dev.tmp_gatt_proxy.connect(
            'g-properties-changed', self.update_label)

    def disconnect(self, button):
        """Stop notifications and disconnect"""
        self.ble_dev.tmp_gatt_proxy.StopNotify()
        self.ble_dev.dev_mthd_proxy.Disconnect()


class TemperateDevice:
    """Collection of proxies for remote device"""
    def __init__(self, address):
        self.device_path = f"{ADAPTER_PATH}/dev_{address.replace(':', '_')}"
        self.mngr_proxy = bluez_proxy('/', MNGR_IFACE)
        self.dev_mthd_proxy = bluez_proxy(self.device_path, DEVICE_IFACE)
        self.dev_props_proxy = bluez_proxy(self.device_path, PROP_IFACE)
        self.tmp_gatt_proxy = None


def get_characteristic_path(mngr, dev_path, uuid):
    """Look up D-Bus path for characteristic UUID"""
    mng_objs = mngr.GetManagedObjects()
    for path in mng_objs:
        chr_uuid = mng_objs[path].get(BLE_CHRC_IFACE, {}).get('UUID')
        if path.startswith(dev_path) and chr_uuid == uuid.casefold():
            return path


def bluez_proxy(object_path, interface):
    """Function to assist with creating BlueZ proxies"""
    return Gio.DBusProxy.new_for_bus_sync(
        bus_type=Gio.BusType.SYSTEM,
        flags=Gio.DBusProxyFlags.NONE,
        info=None,
        name=BLUEZ_BUS_NAME,
        object_path=object_path,
        interface_name=interface,
        cancellable=None)


def main(address):
    print("running application")
    ble_dev = TemperateDevice(address)
    app = MainApp(ble_dev)
    app.show_all()
    Gtk.main()


if __name__ == "__main__":
    main("E1:4B:6C:22:56:F0")

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文