跨线程存储任意函数调用

发布于 2024-12-23 16:58:29 字数 1425 浏览 2 评论 0原文

我正在尝试编写一个旨在重现 Qt 线程语义的库:信号可以连接到插槽,并且所有插槽都在已知线程中执行,因此绑定到同一线程的插槽对于彼此而言是线程安全的。

我有以下 API:

data Signal a = Signal Unique a
data Slot a = Slot Unique ThreadId (a -> IO ())

mkSignal :: IO (Signal a)
mkSlot   :: ThreadId -> (Slot a -> a -> IO ()) -> IO (Slot a)

connect :: Signal a -> Slot a -> IO ()

-- callable from any thread
emit :: Signal a -> a -> IO ()

-- runs in Slot's thread as a result of `emit`
execute :: Slot a -> a -> IO ()
execute (Slot _ _ f) arg = f arg

问题是从 emitexecute。该参数需要在运行时以某种方式存储,然后执行 IO 操作,但我似乎无法通过类型检查器。

我需要的东西:

  1. 类型安全:信号不应连接到需要不同类型的插槽。
  2. 类型无关:任何给定类型可以有多个插槽(也许可以使用 newtype 和/或 TH 来放松这一点)。
  3. 易于使用:由于这是一个库,因此信号和槽应该易于创建。

我尝试过的事情:

我缺少什么?

I'm trying to write a library aiming to reproduce Qt's threading semantics: signals can be connected to slots, and all slots execute in a known thread, so that slots tied to the same thread are threadsafe with regards to each other.

I have the following API:

data Signal a = Signal Unique a
data Slot a = Slot Unique ThreadId (a -> IO ())

mkSignal :: IO (Signal a)
mkSlot   :: ThreadId -> (Slot a -> a -> IO ()) -> IO (Slot a)

connect :: Signal a -> Slot a -> IO ()

-- callable from any thread
emit :: Signal a -> a -> IO ()

-- runs in Slot's thread as a result of `emit`
execute :: Slot a -> a -> IO ()
execute (Slot _ _ f) arg = f arg

The problem is getting from emit to execute. The argument needs to be stored at runtime somehow, and then an IO action performed, but I can't seem to get past the type checker.

The things I need:

  1. Type safety: signals shouldn't be connected to slots expecting a different type.
  2. Type-independence: there can be more than one slots for any given type (Perhaps this can be relaxed with newtype and/or TH).
  3. Ease of use: since this is a library, signals and slots should be easy to create.

The things I've tried:

  • Data.Dynamic: makes the whole thing really fragile, and I haven't found a way to perform a correctly-typed IO action on a Dynamic. There's dynApply, but it's pure.
  • Existential types: I need to execute the function passed to mkSlot, as opposed to an arbitrary function based on the type.
  • Data.HList: I'm not smart enough to figure it out.

What am I missing?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

顾北清歌寒 2024-12-30 16:58:29

首先,您确定 Slots 确实想在特定线程中执行吗?在 Haskell 中编写线程安全代码很容易,并且 GHC 中的线程非常轻量,因此将所有事件处理程序执行绑定到特定的 Haskell 线程并不会获得太多好处。

另外,mkSlot 的回调不需要提供 Slot 本身:您可以使用 递归 do-notation 在回调中绑定槽,而不需要担心打结mkSlot

无论如何,您不需要像这些解决方案那么复杂的东西。我希望当您谈论存在类型时,您正在考虑通过 TChan 发送类似 (a -> IO (), a) 的内容(您提到过使用在注释中)并将其应用到另一端,但您希望 TChan 接受任何 a 的这种类型的值,而不仅仅是一个特定的 a 。这里的关键见解是,如果您有 (a -> IO (), a) 并且不知道 a 是什么,您唯一能做的就是将函数应用于该值,为您提供一个 IO () ,这样我们就可以通过通道发送它们!

下面是一个示例:

import Data.Unique
import Control.Applicative
import Control.Monad
import Control.Concurrent
import Control.Concurrent.STM

newtype SlotGroup = SlotGroup (IO () -> IO ())

data Signal a = Signal Unique (TVar [Slot a])
data Slot a = Slot Unique SlotGroup (a -> IO ())

-- When executed, this produces a function taking an IO action and returning
-- an IO action that writes that action to the internal TChan. The advantage
-- of this approach is that it's impossible for clients of newSlotGroup to
-- misuse the internals by reading the TChan or similar, and the interface is
-- kept abstract.
newSlotGroup :: IO SlotGroup
newSlotGroup = do
  chan <- newTChanIO
  _ <- forkIO . forever . join . atomically . readTChan $ chan
  return $ SlotGroup (atomically . writeTChan chan)

mkSignal :: IO (Signal a)
mkSignal = Signal <
gt; newUnique <*> newTVarIO []

mkSlot :: SlotGroup -> (a -> IO ()) -> IO (Slot a)
mkSlot group f = Slot <
gt; newUnique <*> pure group <*> pure f

connect :: Signal a -> Slot a -> IO ()
connect (Signal _ v) slot = atomically $ do
  slots <- readTVar v
  writeTVar v (slot:slots)

emit :: Signal a -> a -> IO ()
emit (Signal _ v) a = atomically (readTVar v) >>= mapM_ (`execute` a)

execute :: Slot a -> a -> IO ()
execute (Slot _ (SlotGroup send) f) a = send (f a)

它使用 TChan 将操作发送到每个槽所绑定的工作线程。

请注意,我对 Qt 不太熟悉,所以我可能错过了该模型的一些微妙之处。您还可以通过以下方式断开插槽连接:

disconnect :: Signal a -> Slot a -> IO ()
disconnect (Signal _ v) (Slot u _ _) = atomically $ do
  slots <- readTVar v
  writeTVar v $ filter keep slots
  where keep (Slot u' _) = u' /= u

如果这可能成为瓶颈,您可能需要像 Map Unique (Slot a) 而不是 [Slot a] 之类的东西。

因此,这里的解决方案是 (a) 认识到您拥有基本上基于可变状态的东西,并使用可变变量来构造它; (b) 认识到函数和 IO 操作就像其他所有东西一样都是一流的,因此您不必做任何特殊的事情来在运行时构造它们:)

顺便说一句,我建议保留 Signal< 的实现/code> 和 Slot 通过不从定义它们的模块中导出它们的构造函数来进行抽象;毕竟,有很多方法可以在不改变 API 的情况下解决这个问题。

Firstly, are you sure Slots really want to execute in a specific thread? It's easy to write thread-safe code in Haskell, and threads are very lightweight in GHC, so you're not gaining much by tying all event-handler execution to a specific Haskell thread.

Also, mkSlot's callback doesn't need to be given the Slot itself: you can use recursive do-notation to bind the slot in its callback without adding the concern of tying the knot to mkSlot.

Anyway, you don't need anything as complicated as those solutions. I expect when you talk about existential types, you're thinking about sending something like (a -> IO (), a) through a TChan (which you mentioned using in the comments) and applying it on the other end, but you want the TChan to accept values of this type for any a, rather than just one specific a. The key insight here is that if you have (a -> IO (), a) and don't know what a is, the only thing you can do is apply the function to the value, giving you an IO () — so we can just send those through the channel instead!

Here's an example:

import Data.Unique
import Control.Applicative
import Control.Monad
import Control.Concurrent
import Control.Concurrent.STM

newtype SlotGroup = SlotGroup (IO () -> IO ())

data Signal a = Signal Unique (TVar [Slot a])
data Slot a = Slot Unique SlotGroup (a -> IO ())

-- When executed, this produces a function taking an IO action and returning
-- an IO action that writes that action to the internal TChan. The advantage
-- of this approach is that it's impossible for clients of newSlotGroup to
-- misuse the internals by reading the TChan or similar, and the interface is
-- kept abstract.
newSlotGroup :: IO SlotGroup
newSlotGroup = do
  chan <- newTChanIO
  _ <- forkIO . forever . join . atomically . readTChan $ chan
  return $ SlotGroup (atomically . writeTChan chan)

mkSignal :: IO (Signal a)
mkSignal = Signal <
gt; newUnique <*> newTVarIO []

mkSlot :: SlotGroup -> (a -> IO ()) -> IO (Slot a)
mkSlot group f = Slot <
gt; newUnique <*> pure group <*> pure f

connect :: Signal a -> Slot a -> IO ()
connect (Signal _ v) slot = atomically $ do
  slots <- readTVar v
  writeTVar v (slot:slots)

emit :: Signal a -> a -> IO ()
emit (Signal _ v) a = atomically (readTVar v) >>= mapM_ (`execute` a)

execute :: Slot a -> a -> IO ()
execute (Slot _ (SlotGroup send) f) a = send (f a)

This uses a TChan to send actions to the worker thread each slot is tied to.

Note that I'm not very familiar with Qt, so I may have missed some subtlety of the model. You can also disconnect Slots with this:

disconnect :: Signal a -> Slot a -> IO ()
disconnect (Signal _ v) (Slot u _ _) = atomically $ do
  slots <- readTVar v
  writeTVar v $ filter keep slots
  where keep (Slot u' _) = u' /= u

You might want something like Map Unique (Slot a) instead of [Slot a] if this is likely to be a bottleneck.

So, the solution here is to (a) recognise that you have something that's fundamentally based upon mutable state, and use a mutable variable to structure it; (b) realise that functions and IO actions are first-class just like everything else, so you don't have to do anything special to construct them at runtime :)

By the way, I suggest keeping the implementations of Signal and Slot abstract by not exporting their constructors from the module defining them; there are many ways to tackle this approach without changing the API, after all.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文