Python 观察者模式:示例、技巧?

发布于 2024-08-15 00:52:22 字数 1436 浏览 2 评论 0原文

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(9

您的好友蓝忘机已上羡 2024-08-22 00:52:22

但是它确实缺乏灵活性。

嗯...实际上,如果您想要异步 API,这对我来说看起来是一个很好的设计。通常是这样。也许您需要的只是从 stderr 切换到 Python 的 logging 模块,它有自己的发布/订阅模型,以及 Logger.addHandler() 等等。

如果您确实想支持观察员,我的建议是保持简单。您实际上只需要几行代码。

class Event(object):
    pass

class Observable(object):
    def __init__(self):
        self.callbacks = []
    def subscribe(self, callback):
        self.callbacks.append(callback)
    def fire(self, **attrs):
        e = Event()
        e.source = self
        for k, v in attrs.items():
            setattr(e, k, v)
        for fn in self.callbacks:
            fn(e)

您的 Job 类可以子类化 Observable。当感兴趣的事情发生时,调用 self.fire(type="progress",percent=50) 等。

However it does lack flexibility.

Well... actually, this looks like a good design to me if an asynchronous API is what you want. It usually is. Maybe all you need is to switch from stderr to Python's logging module, which has a sort of publish/subscribe model of its own, what with Logger.addHandler() and so on.

If you do want to support observers, my advice is to keep it simple. You really only need a few lines of code.

class Event(object):
    pass

class Observable(object):
    def __init__(self):
        self.callbacks = []
    def subscribe(self, callback):
        self.callbacks.append(callback)
    def fire(self, **attrs):
        e = Event()
        e.source = self
        for k, v in attrs.items():
            setattr(e, k, v)
        for fn in self.callbacks:
            fn(e)

Your Job class can subclass Observable. When something of interest happens, call self.fire(type="progress", percent=50) or the like.

橙味迷妹 2024-08-22 00:52:22

我认为其他答案中的人做得太过分了。您可以用少于 15 行代码轻松地在 Python 中实现事件。

您只需有两个类:EventObserver。任何想要监听事件的类,都需要继承Observer并设置为监听(观察)特定的事件。当实例化并触发Event时,侦听该事件的所有观察者都将运行指定的回调函数。

class Observer():
    _observers = []
    def __init__(self):
        self._observers.append(self)
        self._observables = {}
    def observe(self, event_name, callback):
        self._observables[event_name] = callback


class Event():
    def __init__(self, name, data, autofire = True):
        self.name = name
        self.data = data
        if autofire:
            self.fire()
    def fire(self):
        for observer in Observer._observers:
            if self.name in observer._observables:
                observer._observables[self.name](self.data)

示例

class Room(Observer):

    def __init__(self):
        print("Room is ready.")
        Observer.__init__(self) # Observer's init needs to be called
    def someone_arrived(self, who):
        print(who + " has arrived!")

room = Room()
room.observe('someone arrived',  room.someone_arrived)

Event('someone arrived', 'Lenard')

输出:

Room is ready.
Lenard has arrived!

I think people in the other answers overdo it. You can easily achieve events in Python with less than 15 lines of code.

You simple have two classes: Event and Observer. Any class that wants to listen for an event, needs to inherit Observer and set to listen (observe) for a specific event. When an Event is instantiated and fired, all observers listening to that event will run the specified callback functions.

class Observer():
    _observers = []
    def __init__(self):
        self._observers.append(self)
        self._observables = {}
    def observe(self, event_name, callback):
        self._observables[event_name] = callback


class Event():
    def __init__(self, name, data, autofire = True):
        self.name = name
        self.data = data
        if autofire:
            self.fire()
    def fire(self):
        for observer in Observer._observers:
            if self.name in observer._observables:
                observer._observables[self.name](self.data)

Example:

class Room(Observer):

    def __init__(self):
        print("Room is ready.")
        Observer.__init__(self) # Observer's init needs to be called
    def someone_arrived(self, who):
        print(who + " has arrived!")

room = Room()
room.observe('someone arrived',  room.someone_arrived)

Event('someone arrived', 'Lenard')

Output:

Room is ready.
Lenard has arrived!
深居我梦 2024-08-22 00:52:22

更多方法...

示例:日志记录模块

也许您需要的只是从 stderr 切换到 Python 的 logging 模块,具有强大的发布/订阅模型。

开始生成日志记录很容易。

# producer
import logging

log = logging.getLogger("myjobs")  # that's all the setup you need

class MyJob(object):
    def run(self):
        log.info("starting job")
        n = 10
        for i in range(n):
            log.info("%.1f%% done" % (100.0 * i / n))
        log.info("work complete")

在消费者方面还有更多工作要做。不幸的是,配置记录器输出需要 7 整行代码才能完成。 ;)

# consumer
import myjobs, sys, logging

if user_wants_log_output:
    ch = logging.StreamHandler(sys.stderr)
    ch.setLevel(logging.INFO)
    formatter = logging.Formatter(
        "%(asctime)s - %(name)s - %(levelname)s - %(message)s")
    ch.setFormatter(formatter)
    myjobs.log.addHandler(ch)
    myjobs.log.setLevel(logging.INFO)

myjobs.MyJob().run()

另一方面,日志记录包中有大量的内容。如果您需要将日志数据发送到一组轮换文件、电子邮件地址和 Windows 事件日志,那么您就可以满足。

示例:最简单的观察者

但是您根本不需要使用任何库。支持观察者的一种极其简单的方法是调用一个不执行任何操作的方法。

# producer
class MyJob(object):
    def on_progress(self, pct):
        """Called when progress is made. pct is the percent complete.
        By default this does nothing. The user may override this method
        or even just assign to it."""
        pass

    def run(self):
        n = 10
        for i in range(n):
            self.on_progress(100.0 * i / n)
        self.on_progress(100.0)

# consumer
import sys, myjobs
job = myjobs.MyJob()
job.on_progress = lambda pct: sys.stdout.write("%.1f%% done\n" % pct)
job.run()

有时,您可以只写 job.on_progress = ProgressBar.update,而不是编写 lambda,这很好。

这就是最简单的事情了。一个缺点是它自然不支持多个侦听器订阅相同的事件。

示例:类似 C# 的事件

通过一些支持代码,您可以在 Python 中获得类似 C# 的事件。代码如下:

# glue code
class event(object):
    def __init__(self, func):
        self.__doc__ = func.__doc__
        self._key = ' ' + func.__name__
    def __get__(self, obj, cls):
        try:
            return obj.__dict__[self._key]
        except KeyError, exc:
            be = obj.__dict__[self._key] = boundevent()
            return be

class boundevent(object):
    def __init__(self):
        self._fns = []
    def __iadd__(self, fn):
        self._fns.append(fn)
        return self
    def __isub__(self, fn):
        self._fns.remove(fn)
        return self
    def __call__(self, *args, **kwargs):
        for f in self._fns[:]:
            f(*args, **kwargs)

生产者使用装饰器声明事件:

# producer
class MyJob(object):
    @event
    def progress(pct):
        """Called when progress is made. pct is the percent complete."""

    def run(self):
        n = 10
        for i in range(n+1):
            self.progress(100.0 * i / n)

#consumer
import sys, myjobs
job = myjobs.MyJob()
job.progress += lambda pct: sys.stdout.write("%.1f%% done\n" % pct)
job.run()

这与上面的“简单观察者”代码完全相同,但您可以使用 += 添加任意数量的侦听器。 (与 C# 不同,没有事件处理程序类型,订阅事件时不必 new EventHandler(foo.bar),并且在触发事件之前不必检查 null事件。与 C# 一样,事件不会抑制异常。)

如何选择

如果 logging 可以满足您所需的一切,请使用它。否则,做最适合你的最简单的事情。需要注意的关键是您不需要承担很大的外部依赖性。

A few more approaches...

Example: the logging module

Maybe all you need is to switch from stderr to Python's logging module, which has a powerful publish/subscribe model.

It's easy to get started producing log records.

# producer
import logging

log = logging.getLogger("myjobs")  # that's all the setup you need

class MyJob(object):
    def run(self):
        log.info("starting job")
        n = 10
        for i in range(n):
            log.info("%.1f%% done" % (100.0 * i / n))
        log.info("work complete")

On the consumer side there's a bit more work. Unfortunately configuring logger output takes, like, 7 whole lines of code to do. ;)

# consumer
import myjobs, sys, logging

if user_wants_log_output:
    ch = logging.StreamHandler(sys.stderr)
    ch.setLevel(logging.INFO)
    formatter = logging.Formatter(
        "%(asctime)s - %(name)s - %(levelname)s - %(message)s")
    ch.setFormatter(formatter)
    myjobs.log.addHandler(ch)
    myjobs.log.setLevel(logging.INFO)

myjobs.MyJob().run()

On the other hand there's an amazing amount of stuff in the logging package. If you ever need to send log data to a rotating set of files, an email address, and the Windows Event Log, you're covered.

Example: simplest possible observer

But you don't need to use any library at all. An extremely simple way to support observers is to call a method that does nothing.

# producer
class MyJob(object):
    def on_progress(self, pct):
        """Called when progress is made. pct is the percent complete.
        By default this does nothing. The user may override this method
        or even just assign to it."""
        pass

    def run(self):
        n = 10
        for i in range(n):
            self.on_progress(100.0 * i / n)
        self.on_progress(100.0)

# consumer
import sys, myjobs
job = myjobs.MyJob()
job.on_progress = lambda pct: sys.stdout.write("%.1f%% done\n" % pct)
job.run()

Sometimes instead of writing a lambda, you can just say job.on_progress = progressBar.update, which is nice.

This is about as simple as it gets. One drawback is that it doesn't naturally support multiple listeners subscribing to the same events.

Example: C#-like events

With a bit of support code, you can get C#-like events in Python. Here's the code:

# glue code
class event(object):
    def __init__(self, func):
        self.__doc__ = func.__doc__
        self._key = ' ' + func.__name__
    def __get__(self, obj, cls):
        try:
            return obj.__dict__[self._key]
        except KeyError, exc:
            be = obj.__dict__[self._key] = boundevent()
            return be

class boundevent(object):
    def __init__(self):
        self._fns = []
    def __iadd__(self, fn):
        self._fns.append(fn)
        return self
    def __isub__(self, fn):
        self._fns.remove(fn)
        return self
    def __call__(self, *args, **kwargs):
        for f in self._fns[:]:
            f(*args, **kwargs)

The producer declares the event using a decorator:

# producer
class MyJob(object):
    @event
    def progress(pct):
        """Called when progress is made. pct is the percent complete."""

    def run(self):
        n = 10
        for i in range(n+1):
            self.progress(100.0 * i / n)

#consumer
import sys, myjobs
job = myjobs.MyJob()
job.progress += lambda pct: sys.stdout.write("%.1f%% done\n" % pct)
job.run()

This works exactly like the "simple observer" code above, but you can add as many listeners as you like using +=. (Unlike C#, there are no event handler types, you don't have to new EventHandler(foo.bar) when subscribing to an event, and you don't have to check for null before firing the event. Like C#, events do not squelch exceptions.)

How to choose

If logging does everything you need, use that. Otherwise do the simplest thing that works for you. The key thing to note is that you don't need to take on a big external dependency.

嘿看小鸭子会跑 2024-08-22 00:52:22

对象不会仅仅因为它们正在观察某些东西而保持活动状态的实现怎么样?下面请找到具有以下功能的观察者模式的实现:

  1. 用法是 pythonic。要将观察者添加到实例 foo 的绑定方法 .bar 中,只需执行 foo.bar.addObserver(observer) 即可。
  2. 观察者并不因为是观察者而保持活力。换句话说,观察者代码不使用强引用。
  3. 无需子分类(描述符 ftw)。
  4. 可以与不可散列的类型一起使用。
  5. 在一个班级中可以多次使用。
  6. (奖励)截至今天,该代码存在于适当的可下载、可安装的 github 上的包中。

这是代码(github 包PyPI 包 具有最新的实现):

import weakref
import functools

class ObservableMethod(object):
    """
    A proxy for a bound method which can be observed.

    I behave like a bound method, but other bound methods can subscribe to be
    called whenever I am called.
    """

    def __init__(self, obj, func):
        self.func = func
        functools.update_wrapper(self, func)
        self.objectWeakRef = weakref.ref(obj)
        self.callbacks = {}  #observing object ID -> weak ref, methodNames

    def addObserver(self, boundMethod):
        """
        Register a bound method to observe this ObservableMethod.

        The observing method will be called whenever this ObservableMethod is
        called, and with the same arguments and keyword arguments. If a
        boundMethod has already been registered to as a callback, trying to add
        it again does nothing. In other words, there is no way to sign up an
        observer to be called back multiple times.
        """
        obj = boundMethod.__self__
        ID = id(obj)
        if ID in self.callbacks:
            s = self.callbacks[ID][1]
        else:
            wr = weakref.ref(obj, Cleanup(ID, self.callbacks))
            s = set()
            self.callbacks[ID] = (wr, s)
        s.add(boundMethod.__name__)

    def discardObserver(self, boundMethod):
        """
        Un-register a bound method.
        """
        obj = boundMethod.__self__
        if id(obj) in self.callbacks:
            self.callbacks[id(obj)][1].discard(boundMethod.__name__)

    def __call__(self, *arg, **kw):
        """
        Invoke the method which I proxy, and all of it's callbacks.

        The callbacks are called with the same *args and **kw as the main
        method.
        """
        result = self.func(self.objectWeakRef(), *arg, **kw)
        for ID in self.callbacks:
            wr, methodNames = self.callbacks[ID]
            obj = wr()
            for methodName in methodNames:
                getattr(obj, methodName)(*arg, **kw)
        return result

    @property
    def __self__(self):
        """
        Get a strong reference to the object owning this ObservableMethod

        This is needed so that ObservableMethod instances can observe other
        ObservableMethod instances.
        """
        return self.objectWeakRef()


class ObservableMethodDescriptor(object):

    def __init__(self, func):
        """
        To each instance of the class using this descriptor, I associate an
        ObservableMethod.
        """
        self.instances = {}  # Instance id -> (weak ref, Observablemethod)
        self._func = func

    def __get__(self, inst, cls):
        if inst is None:
            return self
        ID = id(inst)
        if ID in self.instances:
            wr, om = self.instances[ID]
            if not wr():
                msg = "Object id %d should have been cleaned up"%(ID,)
                raise RuntimeError(msg)
        else:
            wr = weakref.ref(inst, Cleanup(ID, self.instances))
            om = ObservableMethod(inst, self._func)
            self.instances[ID] = (wr, om)
        return om

    def __set__(self, inst, val):
        raise RuntimeError("Assigning to ObservableMethod not supported")


def event(func):
    return ObservableMethodDescriptor(func)


class Cleanup(object):
    """
    I manage remove elements from a dict whenever I'm called.

    Use me as a weakref.ref callback to remove an object's id from a dict
    when that object is garbage collected.
    """
    def __init__(self, key, d):
        self.key = key
        self.d = d

    def __call__(self, wr):
        del self.d[self.key]

要使用它,我们只需用 @event 装饰我们想要使其可观察的方法。这是一个例子

class Foo(object):
    def __init__(self, name):
        self.name = name

    @event
    def bar(self):
        print("%s called bar"%(self.name,))

    def baz(self):
        print("%s called baz"%(self.name,))

a = Foo('a')
b = Foo('b')
a.bar.addObserver(b.bar)
a.bar()

How about an implementation where objects aren't kept alive just because they're observing something? Below please find an implementation of the observer pattern with the following features:

  1. Usage is pythonic. To add an observer to a bound method .bar of instance foo, just do foo.bar.addObserver(observer).
  2. Observers are not kept alive by virtue of being observers. In other words, the observer code uses no strong references.
  3. No sub-classing necessary (descriptors ftw).
  4. Can be used with unhashable types.
  5. Can be used as many times you want in a single class.
  6. (bonus) As of today the code exists in a proper downloadable, installable package on github.

Here's the code (the github package or PyPI package have the most up to date implementation):

import weakref
import functools

class ObservableMethod(object):
    """
    A proxy for a bound method which can be observed.

    I behave like a bound method, but other bound methods can subscribe to be
    called whenever I am called.
    """

    def __init__(self, obj, func):
        self.func = func
        functools.update_wrapper(self, func)
        self.objectWeakRef = weakref.ref(obj)
        self.callbacks = {}  #observing object ID -> weak ref, methodNames

    def addObserver(self, boundMethod):
        """
        Register a bound method to observe this ObservableMethod.

        The observing method will be called whenever this ObservableMethod is
        called, and with the same arguments and keyword arguments. If a
        boundMethod has already been registered to as a callback, trying to add
        it again does nothing. In other words, there is no way to sign up an
        observer to be called back multiple times.
        """
        obj = boundMethod.__self__
        ID = id(obj)
        if ID in self.callbacks:
            s = self.callbacks[ID][1]
        else:
            wr = weakref.ref(obj, Cleanup(ID, self.callbacks))
            s = set()
            self.callbacks[ID] = (wr, s)
        s.add(boundMethod.__name__)

    def discardObserver(self, boundMethod):
        """
        Un-register a bound method.
        """
        obj = boundMethod.__self__
        if id(obj) in self.callbacks:
            self.callbacks[id(obj)][1].discard(boundMethod.__name__)

    def __call__(self, *arg, **kw):
        """
        Invoke the method which I proxy, and all of it's callbacks.

        The callbacks are called with the same *args and **kw as the main
        method.
        """
        result = self.func(self.objectWeakRef(), *arg, **kw)
        for ID in self.callbacks:
            wr, methodNames = self.callbacks[ID]
            obj = wr()
            for methodName in methodNames:
                getattr(obj, methodName)(*arg, **kw)
        return result

    @property
    def __self__(self):
        """
        Get a strong reference to the object owning this ObservableMethod

        This is needed so that ObservableMethod instances can observe other
        ObservableMethod instances.
        """
        return self.objectWeakRef()


class ObservableMethodDescriptor(object):

    def __init__(self, func):
        """
        To each instance of the class using this descriptor, I associate an
        ObservableMethod.
        """
        self.instances = {}  # Instance id -> (weak ref, Observablemethod)
        self._func = func

    def __get__(self, inst, cls):
        if inst is None:
            return self
        ID = id(inst)
        if ID in self.instances:
            wr, om = self.instances[ID]
            if not wr():
                msg = "Object id %d should have been cleaned up"%(ID,)
                raise RuntimeError(msg)
        else:
            wr = weakref.ref(inst, Cleanup(ID, self.instances))
            om = ObservableMethod(inst, self._func)
            self.instances[ID] = (wr, om)
        return om

    def __set__(self, inst, val):
        raise RuntimeError("Assigning to ObservableMethod not supported")


def event(func):
    return ObservableMethodDescriptor(func)


class Cleanup(object):
    """
    I manage remove elements from a dict whenever I'm called.

    Use me as a weakref.ref callback to remove an object's id from a dict
    when that object is garbage collected.
    """
    def __init__(self, key, d):
        self.key = key
        self.d = d

    def __call__(self, wr):
        del self.d[self.key]

To use this we just decorate methods we want to make observable with @event. Here's an example

class Foo(object):
    def __init__(self, name):
        self.name = name

    @event
    def bar(self):
        print("%s called bar"%(self.name,))

    def baz(self):
        print("%s called baz"%(self.name,))

a = Foo('a')
b = Foo('b')
a.bar.addObserver(b.bar)
a.bar()
行雁书 2024-08-22 00:52:22

来自 wikipedia

from collections import defaultdict

class Observable (defaultdict):

  def __init__ (self):
      defaultdict.__init__(self, object)

  def emit (self, *args):
      '''Pass parameters to all observers and update states.'''
      for subscriber in self:
          response = subscriber(*args)
          self[subscriber] = response

  def subscribe (self, subscriber):
      '''Add a new subscriber to self.'''
      self[subscriber]

  def stat (self):
      '''Return a tuple containing the state of each observer.'''
      return tuple(self.values())

Observable 是这样使用的。

myObservable = Observable ()

# subscribe some inlined functions.
# myObservable[lambda x, y: x * y] would also work here.
myObservable.subscribe(lambda x, y: x * y)
myObservable.subscribe(lambda x, y: float(x) / y)
myObservable.subscribe(lambda x, y: x + y)
myObservable.subscribe(lambda x, y: x - y)

# emit parameters to each observer
myObservable.emit(6, 2)

# get updated values
myObservable.stat()         # returns: (8, 3.0, 4, 12)

From wikipedia:

from collections import defaultdict

class Observable (defaultdict):

  def __init__ (self):
      defaultdict.__init__(self, object)

  def emit (self, *args):
      '''Pass parameters to all observers and update states.'''
      for subscriber in self:
          response = subscriber(*args)
          self[subscriber] = response

  def subscribe (self, subscriber):
      '''Add a new subscriber to self.'''
      self[subscriber]

  def stat (self):
      '''Return a tuple containing the state of each observer.'''
      return tuple(self.values())

The Observable is used like this.

myObservable = Observable ()

# subscribe some inlined functions.
# myObservable[lambda x, y: x * y] would also work here.
myObservable.subscribe(lambda x, y: x * y)
myObservable.subscribe(lambda x, y: float(x) / y)
myObservable.subscribe(lambda x, y: x + y)
myObservable.subscribe(lambda x, y: x - y)

# emit parameters to each observer
myObservable.emit(6, 2)

# get updated values
myObservable.stat()         # returns: (8, 3.0, 4, 12)
未蓝澄海的烟 2024-08-22 00:52:22

根据 Jason 的回答,我将类似 C# 的事件示例实现为成熟的 python 模块,包括文档和测试。我喜欢花哨的 pythonic 东西:)

所以,如果你想要一些现成的解决方案,你可以使用 代码github

Based on Jason's answer, I implemented the C#-like events example as a fully-fledged python module including documentation and tests. I love fancy pythonic stuff :)

So, if you want some ready-to-use solution, you can just use the code on github.

软糯酥胸 2024-08-22 00:52:22

OP 问“有没有用 Python 实现的 GoF Observer 的示例?”
这是 Python 3.7 中的一个示例。这个 Observable 类满足在一个可观察许多观察者之间创建关系的要求,同时保持独立其结构。

from functools import partial
from dataclasses import dataclass, field
import sys
from typing import List, Callable


@dataclass
class Observable:
    observers: List[Callable] = field(default_factory=list)

    def register(self, observer: Callable):
        self.observers.append(observer)

    def deregister(self, observer: Callable):
        self.observers.remove(observer)

    def notify(self, *args, **kwargs):
        for observer in self.observers:
            observer(*args, **kwargs)


def usage_demo():
    observable = Observable()

    # Register two anonymous observers using lambda.
    observable.register(
        lambda *args, **kwargs: print(f'Observer 1 called with args={args}, kwargs={kwargs}'))
    observable.register(
        lambda *args, **kwargs: print(f'Observer 2 called with args={args}, kwargs={kwargs}'))

    # Create an observer function, register it, then deregister it.
    def callable_3():
        print('Observer 3 NOT called.')

    observable.register(callable_3)
    observable.deregister(callable_3)

    # Create a general purpose observer function and register four observers.
    def callable_x(*args, **kwargs):
        print(f'{args[0]} observer called with args={args}, kwargs={kwargs}')

    for gui_field in ['Form field 4', 'Form field 5', 'Form field 6', 'Form field 7']:
        observable.register(partial(callable_x, gui_field))

    observable.notify('test')


if __name__ == '__main__':
    sys.exit(usage_demo())

OP asks "Are there any exemplary examples of the GoF Observer implemented in Python?"
This is an example in Python 3.7. This Observable class meets the requirement of creating a relationship between one observable and many observers while remaining independent of their structure.

from functools import partial
from dataclasses import dataclass, field
import sys
from typing import List, Callable


@dataclass
class Observable:
    observers: List[Callable] = field(default_factory=list)

    def register(self, observer: Callable):
        self.observers.append(observer)

    def deregister(self, observer: Callable):
        self.observers.remove(observer)

    def notify(self, *args, **kwargs):
        for observer in self.observers:
            observer(*args, **kwargs)


def usage_demo():
    observable = Observable()

    # Register two anonymous observers using lambda.
    observable.register(
        lambda *args, **kwargs: print(f'Observer 1 called with args={args}, kwargs={kwargs}'))
    observable.register(
        lambda *args, **kwargs: print(f'Observer 2 called with args={args}, kwargs={kwargs}'))

    # Create an observer function, register it, then deregister it.
    def callable_3():
        print('Observer 3 NOT called.')

    observable.register(callable_3)
    observable.deregister(callable_3)

    # Create a general purpose observer function and register four observers.
    def callable_x(*args, **kwargs):
        print(f'{args[0]} observer called with args={args}, kwargs={kwargs}')

    for gui_field in ['Form field 4', 'Form field 5', 'Form field 6', 'Form field 7']:
        observable.register(partial(callable_x, gui_field))

    observable.notify('test')


if __name__ == '__main__':
    sys.exit(usage_demo())
夏至、离别 2024-08-22 00:52:22

示例: 扭曲日志观察者

注册观察者 yourCallable() (接受字典的可调用函数)接收所有日志事件(除了任何其他观察者之外):

twisted.python.log.addObserver(yourCallable)

示例:完整的生产者/消费者示例

来自 Twisted-Python 邮件列表:

#!/usr/bin/env python
"""Serve as a sample implementation of a twisted producer/consumer
system, with a simple TCP server which asks the user how many random
integers they want, and it sends the result set back to the user, one
result per line."""

import random

from zope.interface import implements
from twisted.internet import interfaces, reactor
from twisted.internet.protocol import Factory
from twisted.protocols.basic import LineReceiver

class Producer:
    """Send back the requested number of random integers to the client."""
    implements(interfaces.IPushProducer)
    def __init__(self, proto, cnt):
        self._proto = proto
        self._goal = cnt
        self._produced = 0
        self._paused = False
    def pauseProducing(self):
        """When we've produced data too fast, pauseProducing() will be
called (reentrantly from within resumeProducing's transport.write
method, most likely), so set a flag that causes production to pause
temporarily."""
        self._paused = True
        print('pausing connection from %s' % (self._proto.transport.getPeer()))
    def resumeProducing(self):
        self._paused = False
        while not self._paused and self._produced < self._goal:
            next_int = random.randint(0, 10000)
            self._proto.transport.write('%d\r\n' % (next_int))
            self._produced += 1
        if self._produced == self._goal:
            self._proto.transport.unregisterProducer()
            self._proto.transport.loseConnection()
    def stopProducing(self):
        pass

class ServeRandom(LineReceiver):
    """Serve up random data."""
    def connectionMade(self):
        print('connection made from %s' % (self.transport.getPeer()))
        self.transport.write('how many random integers do you want?\r\n')
    def lineReceived(self, line):
        cnt = int(line.strip())
        producer = Producer(self, cnt)
        self.transport.registerProducer(producer, True)
        producer.resumeProducing()
    def connectionLost(self, reason):
        print('connection lost from %s' % (self.transport.getPeer()))
factory = Factory()
factory.protocol = ServeRandom
reactor.listenTCP(1234, factory)
print('listening on 1234...')
reactor.run()

Example: twisted log observers

To register an observer yourCallable() (a callable that accepts a dictionary) to receive all log events (in addition to any other observers):

twisted.python.log.addObserver(yourCallable)

Example: complete producer/consumer example

From Twisted-Python mailing list:

#!/usr/bin/env python
"""Serve as a sample implementation of a twisted producer/consumer
system, with a simple TCP server which asks the user how many random
integers they want, and it sends the result set back to the user, one
result per line."""

import random

from zope.interface import implements
from twisted.internet import interfaces, reactor
from twisted.internet.protocol import Factory
from twisted.protocols.basic import LineReceiver

class Producer:
    """Send back the requested number of random integers to the client."""
    implements(interfaces.IPushProducer)
    def __init__(self, proto, cnt):
        self._proto = proto
        self._goal = cnt
        self._produced = 0
        self._paused = False
    def pauseProducing(self):
        """When we've produced data too fast, pauseProducing() will be
called (reentrantly from within resumeProducing's transport.write
method, most likely), so set a flag that causes production to pause
temporarily."""
        self._paused = True
        print('pausing connection from %s' % (self._proto.transport.getPeer()))
    def resumeProducing(self):
        self._paused = False
        while not self._paused and self._produced < self._goal:
            next_int = random.randint(0, 10000)
            self._proto.transport.write('%d\r\n' % (next_int))
            self._produced += 1
        if self._produced == self._goal:
            self._proto.transport.unregisterProducer()
            self._proto.transport.loseConnection()
    def stopProducing(self):
        pass

class ServeRandom(LineReceiver):
    """Serve up random data."""
    def connectionMade(self):
        print('connection made from %s' % (self.transport.getPeer()))
        self.transport.write('how many random integers do you want?\r\n')
    def lineReceived(self, line):
        cnt = int(line.strip())
        producer = Producer(self, cnt)
        self.transport.registerProducer(producer, True)
        producer.resumeProducing()
    def connectionLost(self, reason):
        print('connection lost from %s' % (self.transport.getPeer()))
factory = Factory()
factory.protocol = ServeRandom
reactor.listenTCP(1234, factory)
print('listening on 1234...')
reactor.run()
∞梦里开花 2024-08-22 00:52:22

观察者设计的函数式方法:

def add_listener(obj, method_name, listener):

    # Get any existing listeners
    listener_attr = method_name + '_listeners'
    listeners = getattr(obj, listener_attr, None)

    # If this is the first listener, then set up the method wrapper
    if not listeners:

        listeners = [listener]
        setattr(obj, listener_attr, listeners)

        # Get the object's method
        method = getattr(obj, method_name)

        @wraps(method)
        def method_wrapper(*args, **kwags):
            method(*args, **kwags)
            for l in listeners:
                l(obj, *args, **kwags) # Listener also has object argument

        # Replace the original method with the wrapper
        setattr(obj, method_name, method_wrapper)

    else:
        # Event is already set up, so just add another listener
        listeners.append(listener)


def remove_listener(obj, method_name, listener):

    # Get any existing listeners
    listener_attr = method_name + '_listeners'
    listeners = getattr(obj, listener_attr, None)

    if listeners:
        # Remove the listener
        next((listeners.pop(i)
              for i, l in enumerate(listeners)
              if l == listener),
             None)

        # If this was the last listener, then remove the method wrapper
        if not listeners:
            method = getattr(obj, method_name)
            delattr(obj, listener_attr)
            setattr(obj, method_name, method.__wrapped__)

这些方法可用于向任何类方法添加侦听器。例如:

class MyClass(object):

    def __init__(self, prop):
        self.prop = prop

    def some_method(self, num, string):
        print('method:', num, string)

def listener_method(obj, num, string):
    print('listener:', num, string, obj.prop)

my = MyClass('my_prop')

add_listener(my, 'some_method', listener_method)
my.some_method(42, 'with listener')

remove_listener(my, 'some_method', listener_method)
my.some_method(42, 'without listener')

输出为:

method: 42 with listener
listener: 42 with listener my_prop
method: 42 without listener

A functional approach to observer design:

def add_listener(obj, method_name, listener):

    # Get any existing listeners
    listener_attr = method_name + '_listeners'
    listeners = getattr(obj, listener_attr, None)

    # If this is the first listener, then set up the method wrapper
    if not listeners:

        listeners = [listener]
        setattr(obj, listener_attr, listeners)

        # Get the object's method
        method = getattr(obj, method_name)

        @wraps(method)
        def method_wrapper(*args, **kwags):
            method(*args, **kwags)
            for l in listeners:
                l(obj, *args, **kwags) # Listener also has object argument

        # Replace the original method with the wrapper
        setattr(obj, method_name, method_wrapper)

    else:
        # Event is already set up, so just add another listener
        listeners.append(listener)


def remove_listener(obj, method_name, listener):

    # Get any existing listeners
    listener_attr = method_name + '_listeners'
    listeners = getattr(obj, listener_attr, None)

    if listeners:
        # Remove the listener
        next((listeners.pop(i)
              for i, l in enumerate(listeners)
              if l == listener),
             None)

        # If this was the last listener, then remove the method wrapper
        if not listeners:
            method = getattr(obj, method_name)
            delattr(obj, listener_attr)
            setattr(obj, method_name, method.__wrapped__)

These methods can then be used to add a listener to any class method. For example:

class MyClass(object):

    def __init__(self, prop):
        self.prop = prop

    def some_method(self, num, string):
        print('method:', num, string)

def listener_method(obj, num, string):
    print('listener:', num, string, obj.prop)

my = MyClass('my_prop')

add_listener(my, 'some_method', listener_method)
my.some_method(42, 'with listener')

remove_listener(my, 'some_method', listener_method)
my.some_method(42, 'without listener')

And the output is:

method: 42 with listener
listener: 42 with listener my_prop
method: 42 without listener
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文