有用的发布-订阅语义

发布于 2024-10-12 15:58:51 字数 6791 浏览 3 评论 0原文

我正在寻找维基百科风格的参考资料,以了解实际有效的轻量级发布-订阅机制的设计和实现。我将根据答案和评论以及我自己的研究来更新问题。

我研究了我的书籍和网络,以了解用 Python 和 Delphi 完成的发布/订阅工作,但对结果并不满意。这些设计依赖于函数签名或位图或来过滤消息或决定应该将什么传递给谁,并且要么过于严格(绑定到消息传递服务器),要么过于混杂(每个人都可以订阅任何事物)。

我不想自己写。我想找到一些已经设计良好、经过讨论和经过现场验证的东西。

今天我在 Delphi Pascal 中实现了一个设计(因为 Delphi 是我首先需要的)。正如此 API 所做的那样,按参数类型进行分派并不是一个原始想法(它是用设计模式 Visitor 模式解释的),我想我以前见过类似的东西(但我不记得在哪里;Taligent?)。其核心是订阅、过滤和调度是通过类型系统进行的。

unit JalSignals;
//  A publish/subscribe mechanism.    
//  1. Signal payloads are objects, and their class is their signal type.
//  2. Free is called on the payloads after they have been delivered.    
//  3. Members subscribe by providing a callback method (of object).
//  4. Members may subscribe with the same method to different types of signals.
//  5. A member subscribes to a type, which means that all signals
//     with payloads of that class or any of its subclasses will be delivered
//     to the callback, with one important exception    
//  6. A forum breaks the general class hierarchy into independent branches.    
//     A signal will not be delivered to members subscribed to classes that    
//     are not in the branch.    
//  7. This is a GPL v3 design.
interface
uses
  SysUtils;
type
  TSignal = TObject;
  TSignalType = TClass;
  TSignalAction = (soGo, soStop);
  TCallback = function(signal :TSignal) :TSignalAction of object;

  procedure signal(payload: TSignal);

  procedure subscribe(  callback :TCallback; atype :TSignalType);
  procedure unsubscribe(callback :TCallback; atype :TSignalType = nil); overload;
  procedure unsubscribe(obj      :TObject;   atype :TSignalType = nil); overload;

  procedure openForum( atype :TSignalType);
  procedure closeForum(atype :TSignalType);

上面的“回调”就像Python中的绑定方法。

Delphi实现的完整源代码是这里

这是Python中的实现。我更改了键名,因为signalmessage 已经过载了。与 Delphi 实现不同,结果(包括异常)被收集并以列表形式返回给信号发送器。

"""
  A publish/subscribe mechanism.

  1. Signal payloads are objects, and their class is their signal type.
  2. Free is called on the payloads after they have been delivered.
  3. Members subscribe by providing a callback method (of object).
  4. Members may subscribe with the same method to different types of signals.
  5. A member subscribes to a type, which means that all signals
     with payloads of that class or any of its subclasses will be delivered
     to the callback, with one important exception:
  6. A forum breaks the general class hierarchy into independent branches.
     A signal will not be delivered to members subscribed to classes that
     are not in the branch.
"""

__all__ = ['open_forum', 'close_forum', 'announce',
           'subscribe', 'unsubscribe'
           ]

def _is_type(atype):
    return issubclass(atype, object)

class Sub(object):
    def __init__(self, callback, atype):
        assert callable(callback)
        assert issubclass(atype, object)
        self.atype = atype
        self.callback = callback

__forums = set()
__subscriptions = []

def open_forum(forum):
    assert issubclass(forum, object)
    __forums.add(forum)

def close_forum(forum):
    __forums.remove(forum)

def subscribe(callback, atype):
    __subscriptions.append(Sub(callback, atype))

def unsubscribe(callback, atype=None):
    for i, sub in enumerate(__subscriptions):
        if sub.callback is not callback:
            continue
        if atype is None or issubclass(sub.atype, atype):
            del __subscriptions[i]

def _boundary(atype):
    assert _is_type(atype)
    lower = object
    for f in __forums:
        if (issubclass(atype, f)
            and issubclass(f, lower)):
            lower = f
    return lower

def _receivers(news):
    bound = _boundary(type(news))
    for sub in __subscriptions:
        if not isinstance(news, sub.atype):
            continue
        if not issubclass(sub.atype, bound):
            continue
        yield sub

def announce(news):
    replies = []
    for sub in _receivers(news):
        try:
            reply = sub.callback(news)
            replies.append(reply)
        except Exception as e:
            replies.append(e)
    return replies

if __name__ == '__main__':
    i = 0
    class A(object):
        def __init__(self):
            global i
            self.msg = type(self).__name__ + str(i)
            i += 1

    class B(A): pass
    class C(B): pass

    assert _is_type(A)
    assert _is_type(B)
    assert _is_type(C)

    assert issubclass(B, A)
    assert issubclass(C, B)

    def makeHandler(atype):
        def handler(s):
            assert isinstance(s, atype)
            return 'handler' + atype.__name__ + ' got ' + s.msg
        return handler

    handleA = makeHandler(A)
    handleB = makeHandler(B)
    handleC = makeHandler(C)

    def failer(s):
        raise Exception, 'failed on' + s.msg

    assert callable(handleA) and callable(handleB) and callable(handleC)

    subscribe(handleA, A)
    subscribe(handleB, B)
    subscribe(handleC, C)
    subscribe(failer, A)

    assert _boundary(A) is object
    assert _boundary(B) is object
    assert _boundary(C) is object

    print announce(A())
    print announce(B())
    print announce(C())

    print
    open_forum(B)

    assert _boundary(A) is object
    assert _boundary(B) is B
    assert _boundary(C) is B
    assert issubclass(B, B)

    print announce(A())
    print announce(B())
    print announce(C())

    print
    close_forum(B)
    print announce(A())
    print announce(B())
    print announce(C())

以下是我进行搜索的原因:

  1. 我已经浏览了必须维护的数千行 Delphi 代码。他们使用观察者模式来进行MVC解耦,但是一切仍然是非常耦合的,因为观察者和主体之间的依赖关系过于明确。
  2. 我一直在学习 PyQt4,如果我必须在 Qt4Designer 中点击、点击、点击才能完成我想要到达有意义的目的地的每个事件,那我会丧命的。
  3. 然而,在另一个个人数据应用程序中,我需要抽象事件传递和处理,因为持久性和 UI 会因平台而异,并且必须完全独立。

参考文献

自己和其他人找到的

  • 应该放在这里 PybubSub 使用字符串作为主题,并且方法签名(第一个信号定义签名)。
  • FinalBuilder博客中的一篇文章报告他们已成功使用具有整数结构作为有效负载、消息和用于过滤的整数掩码的系统。
  • PyDispatcher 的文档最少。
  • D-Bus 已被 Gnome 和 KDE 项目等采用。 Python 绑定 可用。

I'm looking for Wikpedia-style references to designs and implementations of lightweight publish-subscribe mechanisms that actually work. I will update the question according to the answers and comments, and to my own research.

I researched my books and the Web for work done in Python and Delphi for publish/subscribe, and wasn't happy with the results. The designs relied on function signatures or bitmaps or slots for filtering messages or deciding what should be delivered to who, and were either too restrictive (bound to a messaging server), or too promiscuous (everyone can subscribe to anything).

I don't want to write my own. I want to find something that's already well-designed, debated, and field-proven.

Today I implemented a design in Delphi Pascal (because Delphi was what I needed first). Dispatching on argument type, as this API does, is not an original idea (it is explained the Design Patterns Visitor pattern), and I think I have seen something like this before (but I don't remember where; Taligent?). The core of it is that subscription, filtering, and dispatching is over the type system.

unit JalSignals;
//  A publish/subscribe mechanism.    
//  1. Signal payloads are objects, and their class is their signal type.
//  2. Free is called on the payloads after they have been delivered.    
//  3. Members subscribe by providing a callback method (of object).
//  4. Members may subscribe with the same method to different types of signals.
//  5. A member subscribes to a type, which means that all signals
//     with payloads of that class or any of its subclasses will be delivered
//     to the callback, with one important exception    
//  6. A forum breaks the general class hierarchy into independent branches.    
//     A signal will not be delivered to members subscribed to classes that    
//     are not in the branch.    
//  7. This is a GPL v3 design.
interface
uses
  SysUtils;
type
  TSignal = TObject;
  TSignalType = TClass;
  TSignalAction = (soGo, soStop);
  TCallback = function(signal :TSignal) :TSignalAction of object;

  procedure signal(payload: TSignal);

  procedure subscribe(  callback :TCallback; atype :TSignalType);
  procedure unsubscribe(callback :TCallback; atype :TSignalType = nil); overload;
  procedure unsubscribe(obj      :TObject;   atype :TSignalType = nil); overload;

  procedure openForum( atype :TSignalType);
  procedure closeForum(atype :TSignalType);

The "callbacks" in the above are like bound methods in Python.

The complete source code for the Delphi implementation is here:

This is the implementation in Python. I changed the key names because signal and message are already too overloaded. Unlike in the Delphi implementation, the results, including exceptions, are collected and returned to the signaler in a list.

"""
  A publish/subscribe mechanism.

  1. Signal payloads are objects, and their class is their signal type.
  2. Free is called on the payloads after they have been delivered.
  3. Members subscribe by providing a callback method (of object).
  4. Members may subscribe with the same method to different types of signals.
  5. A member subscribes to a type, which means that all signals
     with payloads of that class or any of its subclasses will be delivered
     to the callback, with one important exception:
  6. A forum breaks the general class hierarchy into independent branches.
     A signal will not be delivered to members subscribed to classes that
     are not in the branch.
"""

__all__ = ['open_forum', 'close_forum', 'announce',
           'subscribe', 'unsubscribe'
           ]

def _is_type(atype):
    return issubclass(atype, object)

class Sub(object):
    def __init__(self, callback, atype):
        assert callable(callback)
        assert issubclass(atype, object)
        self.atype = atype
        self.callback = callback

__forums = set()
__subscriptions = []

def open_forum(forum):
    assert issubclass(forum, object)
    __forums.add(forum)

def close_forum(forum):
    __forums.remove(forum)

def subscribe(callback, atype):
    __subscriptions.append(Sub(callback, atype))

def unsubscribe(callback, atype=None):
    for i, sub in enumerate(__subscriptions):
        if sub.callback is not callback:
            continue
        if atype is None or issubclass(sub.atype, atype):
            del __subscriptions[i]

def _boundary(atype):
    assert _is_type(atype)
    lower = object
    for f in __forums:
        if (issubclass(atype, f)
            and issubclass(f, lower)):
            lower = f
    return lower

def _receivers(news):
    bound = _boundary(type(news))
    for sub in __subscriptions:
        if not isinstance(news, sub.atype):
            continue
        if not issubclass(sub.atype, bound):
            continue
        yield sub

def announce(news):
    replies = []
    for sub in _receivers(news):
        try:
            reply = sub.callback(news)
            replies.append(reply)
        except Exception as e:
            replies.append(e)
    return replies

if __name__ == '__main__':
    i = 0
    class A(object):
        def __init__(self):
            global i
            self.msg = type(self).__name__ + str(i)
            i += 1

    class B(A): pass
    class C(B): pass

    assert _is_type(A)
    assert _is_type(B)
    assert _is_type(C)

    assert issubclass(B, A)
    assert issubclass(C, B)

    def makeHandler(atype):
        def handler(s):
            assert isinstance(s, atype)
            return 'handler' + atype.__name__ + ' got ' + s.msg
        return handler

    handleA = makeHandler(A)
    handleB = makeHandler(B)
    handleC = makeHandler(C)

    def failer(s):
        raise Exception, 'failed on' + s.msg

    assert callable(handleA) and callable(handleB) and callable(handleC)

    subscribe(handleA, A)
    subscribe(handleB, B)
    subscribe(handleC, C)
    subscribe(failer, A)

    assert _boundary(A) is object
    assert _boundary(B) is object
    assert _boundary(C) is object

    print announce(A())
    print announce(B())
    print announce(C())

    print
    open_forum(B)

    assert _boundary(A) is object
    assert _boundary(B) is B
    assert _boundary(C) is B
    assert issubclass(B, B)

    print announce(A())
    print announce(B())
    print announce(C())

    print
    close_forum(B)
    print announce(A())
    print announce(B())
    print announce(C())

These are the reasons for my search:

  1. I've been going through several thousand lines of Delphi code that I have to maintain. They use the Observer pattern for MVC decoupling, but everything is still very coupled because the dependencies between observers and subjects are too explicit.
  2. I have been learning PyQt4, and it would kill me if I have to click-click-click in Qt4Designer for every event I want arriving to a meaningful destination.
  3. Yet on another personal-data application, I need to abstract event passing and handling because persistence and UI will vary by platform, and must be totally independent.

References

Found by self and others should go here

  • PybubSub uses strings for topycs, and method signatures (the first signal defines the signature).
  • An article in the blog of FinalBuilder reports that they have successfully used a system with integers structures as payloads, as messages, and integer masks for filtering.
  • PyDispatcher has minimal documentation.
  • D-Bus has been adopted by the Gnome and KDE projects, among others. A Python binding is available.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

少女的英雄梦 2024-10-19 15:58:51

您应该尝试企业集成模式,它为发布-订阅提供了非常详细的处理,尽管它以交互为中心- 处理消息传递。

You should try Enterprise Integration Patterns it gives publish-subscribe a very detailed treatment though it is centered on inter-process message passing.

緦唸λ蓇 2024-10-19 15:58:51

您也可以尝试DDS。数据分发服务是使用发布/订阅语义的通信模式的完整标准。

You could also try DDS. Data Distribution Service is a full standard for communication patterns using Publish / Subscribe semantics.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文