如何在 TVar 上添加终结器

发布于 2024-10-26 20:37:26 字数 4666 浏览 0 评论 0原文

背景

响应问题,我构建并上传了一个有界的-tchan (我不适合上传 jnb 的版本) 。如果名称还不够,有界tchan (BTChan) 是一个具有最大容量的STM 通道(如果通道达到容量则写入块)。

最近,我收到了添加 dup 功能的请求,如 常规 TChan。问题就这样开始了。

BTChan 的外观

下面是 BTChan 的简化(实际上是非功能性)视图。

data BTChan a = BTChan
    { max :: Int
    , count :: TVar Int
    , channel :: TVar [(Int, a)]
    , nrDups  :: TVar Int
    }

每次写入通道时,您都会在元组中包含重复项的数量 (nrDups) - 这是一个“单个元素计数器”,指示有多少读者已获取该元素。

每个读取器都会递减它读取的元素的计数器,然后将其读取指针移动到列表中的下一个元素。如果读取器将计数器递减至零,则 count 的值也会递减,以正确反映通道上的可用容量。

为了清楚所需的语义:通道容量表示通道中排队的元素的最大数量。任何给定的元素都会排队,直到每个 dup 的读取器收到该元素。任何元素都不应保留在 GC 重复队列中(这是主要问题)。

例如,假设一个通道有 3 个重复项(c1、c2、c3),容量为 2,其中 2 个项目写入通道,然后从 c1中读取所有项目c2。该通道仍满(剩余容量为 0),因为 c3 尚未消耗其副本。在任何时间点,如果对 c3 的所有引用都被删除(因此 c3 被 GC),则应释放容量(在本例中恢复为 2)。

问题是这样的:假设我有以下代码,

c <- newBTChan 1
_ <- dupBTChan c  -- This represents what would probably be a pathological bug or terminated reader
writeBTChan c "hello"
_ <- readBTChan c

导致 BTChan 看起来像这样:

BTChan 1 (TVar 0) (TVar []) (TVar 1)             -->   -- newBTChan
BTChan 1 (TVar 0) (TVar []) (TVar 2)             -->   -- dupBTChan
BTChan 1 (TVar 1) (TVar [(2, "hello")]) (TVar 2) -->   -- readBTChan c
BTChan 1 (TVar 1) (TVar [(1, "hello")]) (TVar 2)       -- OH NO!

请注意,最后 "hello" 的读取计数仍然是 1< /代码>?这意味着该消息不会被视为已消失(即使它在实际实现中会被 GC),并且我们的 count 永远不会减少。由于通道已满(最多 1 个元素),写入者将始终阻塞。

我希望每次调用 dupBTChan 时都创建一个终结器。当收集复制(或原始)通道时,该通道上剩余要读取的所有元素将减少每个元素的计数,nrDups 变量也将减少。因此,未来的写入将具有正确的 count(该 count 不会为 GCed 通道未读取的变量保留空间)。

解决方案1 ​​- 手动资源管理(我想避免)

JNB 的bounded-tchan 实际上因此具有手动资源管理。请参阅cancelBTChan。我正在寻找用户更难出错的东西(并不是说手动管理在很多情况下不是正确的方法)。

解决方案 2 - 通过阻止 TVar 来使用异常(GHC 无法按照我想要的方式执行此操作)

编辑此解决方案,而解决方案 3 只是一个衍生产品,不起作用!由于 bug 5055 (WONTFIX),GHC 编译器向两个被阻止的线程发送异常,即使 1 就足够了(理论上可以确定,但对于 GHC GC 来说不切实际)。

如果获取 BTChan 的所有方式都是 IO,我们可以 forkIO 一个线程,该线程读取/重试给定 BTChan 独有的额外(虚拟)TVar 字段。当对 TVar 的所有其他引用都被删除时,新线程将捕获异常,因此它将知道何时递减 nrDups 和单个元素计数器。这应该可行,但会强制我的所有用户使用 IO 来获取他们的 BTChan:

data BTChan = BTChan { ... as before ..., dummyTV :: TVar () }

dupBTChan :: BTChan a -> IO (BTChan a)
dupBTChan c = do
       ... as before ...
       d <- newTVarIO ()
       let chan = BTChan ... d
       forkIO $ watchChan chan
       return chan

watchBTChan :: BTChan a -> IO ()
watchBTChan b = do
    catch (atomically (readTVar (dummyTV b) >> retry)) $ \e -> do
    case fromException e of
        BlockedIndefinitelyOnSTM -> atomically $ do -- the BTChan must have gotten collected
            ls <- readTVar (channel b)
            writeTVar (channel b) (map (\(a,b) -> (a-1,b)) ls)
            readTVar (nrDup b) >>= writeTVar (nrDup b) . (-1)
        _ -> watchBTChan b

编辑:是的,这是一个穷人终结器,我没有任何特殊原因避免使用 addFinalizer 。这将是相同的解决方案,仍然强制使用 IO afaict。

解决方案 3:比解决方案 2 更干净的 API,但 GHC 仍然不支持

用户通过调用 initBTChanCollector 启动管理器线程,该线程将监视一组虚拟 TVar (来自解决方案 2)并进行所需的清理。基本上,它将 IO 推入另一个线程,该线程知道通过全局 (unsafePerformIOed) TVar 做什么。事情的工作原理基本上与解决方案 2 类似,但 BTChan 的创建仍然可以是 STM。运行initBTChanCollector失败将导致进程运行时任务列表不断增长(空间泄漏)。

解决方案 4:绝不允许丢弃 BTChan

这类似于忽略该问题。如果用户从不丢弃重复的 BTChan ,那么问题就会消失。

解决方案5 我看到 ezyang 的答案(完全有效且值得赞赏),但确实希望保留当前的 ​​API 仅具有“dup”功能。

** 解决方案 6** 请告诉我有更好的选择。

编辑: 我实现了解决方案3(完全未经测试alpha 版本)并通过将全局本身设置为 BTChan 来处理潜在的空间泄漏 - chan 的容量可能应该为 1,因此忘记运行 init 很快就会出现,但这只是一个微小的变化。这在 GHCi (7.0.3) 中有效,但这似乎是偶然的。 GHC 会向两个被阻止的线程(读取 BTChan 的有效线程和观察线程)抛出异常,所以如果你在另一个线程丢弃它的引用时被阻止读取 BTChan,那么你就会死掉。

Background

In response to a question, I built and uploaded a bounded-tchan (wouldn't have been right for me to upload jnb's version). If the name isn't enough, a bounded-tchan (BTChan) is an STM channel that has a maximum capacity (writes block if the channel is at capacity).

Recently, I've received a request to add a dup feature like in the regular TChan's. And thus begins the problem.

How the BTChan looks

A simplified (and actually non-functional) view of BTChan is below.

data BTChan a = BTChan
    { max :: Int
    , count :: TVar Int
    , channel :: TVar [(Int, a)]
    , nrDups  :: TVar Int
    }

Every time you write to the channel you include the number of dups (nrDups) in the tuple - this is an 'individual element counter' which indicates how many readers have gotten this element.

Every reader will decrement the counter for the element it reads then move it's read-pointer to then next element in the list. If the reader decrements the counter to zero then the value of count is decremented to properly reflect available capacity on the channel.

To be clear on the desired semantics: A channel capacity indicates the maximum number of elements queued in the channel. Any given element is queued until a reader of each dup has received the element. No elements should remain queued for a GCed dup (this is the main problem).

For example, let there be three dups of a channel (c1, c2, c3) with capacity of 2, where 2 items were written into the channel then all items were read out of c1 and c2. The channel is still full (0 remaining capacity) because c3 hasn't consumed its copies. At any point in time if all references toc3 are dropped (so c3 is GCed) then the capacity should be freed (restored to 2 in this case).

Here's the issue: let's say I have the following code

c <- newBTChan 1
_ <- dupBTChan c  -- This represents what would probably be a pathological bug or terminated reader
writeBTChan c "hello"
_ <- readBTChan c

Causing the BTChan to look like:

BTChan 1 (TVar 0) (TVar []) (TVar 1)             -->   -- newBTChan
BTChan 1 (TVar 0) (TVar []) (TVar 2)             -->   -- dupBTChan
BTChan 1 (TVar 1) (TVar [(2, "hello")]) (TVar 2) -->   -- readBTChan c
BTChan 1 (TVar 1) (TVar [(1, "hello")]) (TVar 2)       -- OH NO!

Notice at the end the read count for "hello" is still 1? That means the message is not considered gone (even though it will get GCed in the real implementation) and our count will never decrement. Because the channel is at capacity (1 element maximum) the writers will always block.

I want a finalizer created each time dupBTChan is called. When a dupped (or original) channel is collected all elements remaining to be read on that channel will get the per-element count decremented, also the nrDups variable will be decremented. As a result, future writes will have the correct count (a count that doesn't reserve space for variables not-read by GCed channels).

Solution 1 - Manual Resource Management (what I want to avoid)

JNB's bounded-tchan actually has manual resource management for this reason. See the cancelBTChan. I'm going for something harder for the user to get wrong (not that manual management isn't the right way to go in many cases).

Solution 2 - Use exceptions by blocking on TVars (GHC can't do this how I want)

EDIT this solution, and solution 3 which is just a spin-off, does not work! Due to bug 5055 (WONTFIX) the GHC compiler sends exceptions to both blocked threads, even though one is sufficient (which is theoretically determinable, but not practical with the GHC GC).

If all the ways to get a BTChan are IO, we can forkIO a thread that reads/retries on an extra (dummy) TVar field unique to the given BTChan. The new thread will catch an exception when all other references to the TVar are dropped, so it will know when to decrement the nrDups and individual element counters. This should work but forces all my users to use IO to get their BTChans:

data BTChan = BTChan { ... as before ..., dummyTV :: TVar () }

dupBTChan :: BTChan a -> IO (BTChan a)
dupBTChan c = do
       ... as before ...
       d <- newTVarIO ()
       let chan = BTChan ... d
       forkIO $ watchChan chan
       return chan

watchBTChan :: BTChan a -> IO ()
watchBTChan b = do
    catch (atomically (readTVar (dummyTV b) >> retry)) $ \e -> do
    case fromException e of
        BlockedIndefinitelyOnSTM -> atomically $ do -- the BTChan must have gotten collected
            ls <- readTVar (channel b)
            writeTVar (channel b) (map (\(a,b) -> (a-1,b)) ls)
            readTVar (nrDup b) >>= writeTVar (nrDup b) . (-1)
        _ -> watchBTChan b

EDIT: Yes, this is a poor mans finalizer and I don't have any particular reason to avoid using addFinalizer. That would be the same solution, still forcing use of IO afaict.

Solution 3: A cleaner API than solution 2, but GHC still doesn't support it

Users start a manager thread by calling initBTChanCollector, which will monitor a set of these dummy TVars (from solution 2) and do the needed clean-up. Basically, it shoves the IO into another thread that knows what to do via a global (unsafePerformIOed) TVar. Things work basically like solution 2, but the creation of BTChan's can still be STM. Failure to run initBTChanCollector would result in an ever-growing list (space leak) of tasks as the process runs.

Solution 4: Never allow discarding BTChans

This is akin to ignoring the problem. If the user never drops a dupped BTChan then the issue disappears.

Solution 5
I see ezyang's answer (totally valid and appreciated), but really would like to keep the current API just with a 'dup' function.

** Solution 6**
Please tell me there's a better option.

EDIT:
I implemented solution 3 (totally untested alpha release) and handled the potential space leak by making the global itself a BTChan - that chan should probably have a capacity of 1 so forgetting to run init shows up really quick, but that's a minor change. This works in GHCi (7.0.3) but that seems to be incidental. GHC throws exceptions to both blocked threads (the valid one reading the BTChan and the watching thread) so my if you are blocked reading a BTChan when another thread discards it's reference then you die.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

空心↖ 2024-11-02 20:37:26

这是另一个解决方案:要求对有界通道副本的所有访问都由在退出时释放其资源的函数括起来(通过异常或正常情况)。您可以使用带有 2 级运行器的 monad 来防止重复的通道泄漏。它仍然是手动的,但是类型系统使得做一些顽皮的事情变得更加困难。

你真的不想依赖真正的 IO 终结器,因为 GHC 不保证何时运行终结器:据你所知,它可能会等到程序结束后再运行终结器,这意味着你陷入了死锁直到那时。

Here is another solution: require all accesses to the the bounded channel duplicate to be bracketed by a function that releases its resources on exit (by an exception or normally). You can use a monad with a rank-2 runner to prevent duplicated channels from leaking out. It's still manual, but the type system makes it a lot harder to do naughty things.

You really don't want to rely on true IO finalizers, because GHC gives no guarantees about when a finalizer may be run: for all you know it may wait until the end of the program before running the finalizer, which means you're deadlocked until then.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文