多线程观察者的设计模式
在数字信号采集系统中,通常通过一个线程将数据推送到系统中的观察者中。
来自 Wikipedia/Observer_pattern 的示例:
foreach (IObserver observer in observers)
observer.Update(message);
当来自 GUI 线程的用户操作需要数据时要停止流动,你需要断开主体与观察者的连接,甚至完全处理掉观察者。
有人可能会争辩:您应该停止数据源,并等待哨兵值来处理连接。 但这会导致系统出现更多延迟。
当然,如果数据泵线程刚刚请求观察者的地址,它可能会发现它正在向被破坏的对象发送消息。
有人创建了一个“官方”设计模式来应对这种情况吗? 他们不应该吗?
In a digital signal acquisition system, often data is pushed into an observer in the system by one thread.
example from Wikipedia/Observer_pattern:
foreach (IObserver observer in observers)
observer.Update(message);
When e.g. a user action from e.g. a GUI-thread requires the data to stop flowing, you want to break the subject-observer connection, and even dispose of the observer alltogether.
One may argue: you should just stop the data source, and wait for a sentinel value to dispose of the connection. But that would incur more latency in the system.
Of course, if the data pumping thread has just asked for the address of the observer, it might find it's sending a message to a destroyed object.
Has someone created an 'official' Design Pattern countering this situation? Shouldn't they?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
您可以向所有观察者发送一条消息,通知他们数据源正在终止,并让观察者将自己从列表中删除。
作为对评论的回应,主体-观察者模式的实现应该允许动态添加/删除观察者。 在 C# 中,事件系统是主体/观察者模式,其中使用
event +=observer
添加观察者,并使用event -=observer
删除观察者。You could send a message to all observers informing them the data source is terminating and let the observers remove themselves from the list.
In response to the comment, the implementation of the subject-observer pattern should allow for dynamic addition / removal of observers. In C#, the event system is a subject/observer pattern where observers are added using
event += observer
and removed usingevent -= observer
.如果你想让数据源始终处于并发安全的一面,那么你应该至少有一个始终安全供他使用的指针。
因此,观察者对象的生命周期应该不会早于数据源的生命周期。
这可以通过仅添加观察者而不删除它们来完成。
您可以让每个观察者本身不执行核心实现,而是将此任务委托给 ObserverImpl 对象。
您锁定对此 impl 对象的访问。 这没什么大不了的,它只是意味着 GUI 取消订阅者会被阻塞一段时间,以防观察者忙于使用 ObserverImpl 对象。 如果 GUI 响应能力成为问题,您可以使用某种并发作业队列机制,并将取消订阅作业推送到其上。 (就像 Windows 中的 PostMessage )
取消订阅时,您只需用虚拟实现替换核心实现即可。 同样,此操作应该获取锁。 这确实会引入一些等待数据源的过程,但由于它只是一个[锁定 - 指针交换 - 解锁],你可以说这对于实时应用程序来说已经足够快了。
如果您想避免堆叠仅包含虚拟对象的观察者对象,则必须进行某种簿记,但这可能会归结为一些琐碎的事情,例如持有指向他需要的列表中观察者对象的指针的对象。
优化 :
如果您还保持实现(真实的 + 虚拟的)与观察者本身一样长,那么您可以在没有实际锁定的情况下执行此操作,并使用 InterlockedExchangePointer 之类的东西来交换指针。
最坏的情况:在指针交换时委托调用正在进行 --> 没什么大不了的,所有对象都保持活动状态并且委托可以继续。 下一个委托调用将是新的实现对象。 (当然,除非有任何新的交换)
If you want to have the data source to always be on the safe side of concurrency, you should have at least one pointer that is always safe for him to use.
So the Observer object should have a lifetime that isn't ended before that of the data source.
This can be done by only adding Observers, but never removing them.
You could have each observer not do the core implementation itself, but have it delegate this task to an ObserverImpl object.
You lock access to this impl object. This is no big deal, it just means the GUI unsubscriber would be blocked for a little while in case the observer is busy using the ObserverImpl object. If GUI responsiveness would be an issue, you can use some kind of concurrent job-queue mechanism with an unsubscription job pushed onto it. ( like PostMessage in Windows )
When unsubscribing, you just substitute the core implementation for a dummy implementation. Again this operation should grab the lock. This would indeed introduce some waiting for the data source, but since it's just a [ lock - pointer swap - unlock ] you could say that this is fast enough for real-time applications.
If you want to avoid stacking Observer objects that just contain a dummy, you have to do some kind of bookkeeping, but this could boil down to something trivial like an object holding a pointer to the Observer object he needs from the list.
Optimization :
If you also keep the implementations ( the real one + the dummy ) alive as long as the Observer itself, you can do this without an actual lock, and use something like InterlockedExchangePointer to swap the pointers.
Worst case scenario : delegating call is going on while pointer is swapped --> no big deal all objects stay alive and delegating can continue. Next delegating call will be to new implementation object. ( Barring any new swaps of course )