在 F# 中实现 CCR 交错仲裁器

发布于 2024-11-04 13:09:49 字数 182 浏览 6 评论 0原文

我想在 F# 中实现 CCR 框架端口的概念(因为 .Net 4.0 不正式支持 CCR)。我知道可以使用 F# 中的 MailboxProcessor 类来执行此操作。 这非常适合简单的接收仲裁器,但我需要交错仲裁器的概念,即我想控制哪些消息是独占处理的,哪些消息是并发处理的。 到目前为止,我还不知道如何在 F# 中实现这一点,我将感谢您的帮助。

I want to implement the concept of a Port of the CCR Framework in F# (as CCR is not officially supported for .Net 4.0). I know that one can use the MailboxProcessor class in F# to do this.
This works perfectly for simple Receive Arbiters but I need the concept of the Interleave Arbiter, i.e. I want to control which messages are processed exclusively and which are processed concurrently.
So far I've got no idea to implement this in F# and I would be grateful for your help.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

很酷又爱笑 2024-11-11 13:09:49

我对 CCR 不太熟悉,但我会尝试回答 - 我的理解是交错仲裁器的行为有点像 ReaderWriterLock 。也就是说,您可以指定一些可以并行运行的操作(读取)和一些独占的操作(写入)。

以下代理是实现它的一种方法(未经测试,但类型检查:-))。该代理公开了两个供公众使用的操作。最后一个是内部的:

type Message<'T> =
  | PerformReadOperation of ('T -> Async<unit>)
  | PerformWriteOperation of ('T -> Async<'T>)
  | ReadOperationCompleted
  • 通过发送代理PerformReadOperation,您将为其提供一个应使用状态运行(一次)的操作,并且可能与其他读取操作并行运行。

  • 通过发送代理PerformWriteOperation,您将为其提供一个计算新状态的操作,并且必须在所有读取操作完成后执行。 (如果您使用不可变状态,这将使事情变得更简单 - 您不必等到读者完成!但下面的实现实现了等待)。

代理从一些初始状态开始:

let initial = // initial state

代理的其余部分是使用两个循环实现的:

let interleaver = MailboxProcessor.Start(fun mbox ->

  // Asynchronously wait until all read operations complete
  let rec waitUntilReadsComplete reads = 
    if reads = 0 then async { return () }
    else mbox.Scan(fun msg ->
      match msg with
      | ReadOperationCompleted -> Some(waitUntilReadsComplete (reads - 1))
      | _ -> None)

  let rec readingLoop state reads = async {
    let! msg = mbox.Receive()
    match msg with
    | ReadOperationCompleted ->
        // Some read operation completed - decrement counter
        return! readingLoop state (reads - 1) 
    | PerformWriteOperation(op) ->
        do! waitUntilReadsComplete reads
        let! newState = op state
        return! readingLoop newState 0
    | PerformReadOperation(op) ->
        // Start the operation in background & increment counter
        async { do! op state
                mbox.Post(ReadOperationCompleted) }
        |> Async.Start
        return! readingLoop state (reads + 1) }
  readingLoop initial 0)

I'm not very familiar with CCR, but I'll try to answer - my understanding is that interleave arbiter behaves a bit like ReaderWriterLock. That is, you can specify some operations that can run in parallel (reads) and some operations that are exclusive (writes).

The following agent is one way to implement it (not tested, but type checks :-)). The agent exposes two operations that are intended for public use. The last one is internal:

type Message<'T> =
  | PerformReadOperation of ('T -> Async<unit>)
  | PerformWriteOperation of ('T -> Async<'T>)
  | ReadOperationCompleted
  • By sending the agent PerformReadOperation, you're giving it an operation that should be run (once) using the state and possibly in parallel with other read operations.

  • By sending the agent PerformWriteOperation, you're giving it an operation that calculates a new state and must be executed after all read operations complete. (If you were working with immutable state, that would make things simpler - you wouldn't have to wait until readers complete! But the implementation below implements the waiting).

The agent starts with some initial state:

let initial = // initial state

And the rest of the agent is implemented using two loops:

let interleaver = MailboxProcessor.Start(fun mbox ->

  // Asynchronously wait until all read operations complete
  let rec waitUntilReadsComplete reads = 
    if reads = 0 then async { return () }
    else mbox.Scan(fun msg ->
      match msg with
      | ReadOperationCompleted -> Some(waitUntilReadsComplete (reads - 1))
      | _ -> None)

  let rec readingLoop state reads = async {
    let! msg = mbox.Receive()
    match msg with
    | ReadOperationCompleted ->
        // Some read operation completed - decrement counter
        return! readingLoop state (reads - 1) 
    | PerformWriteOperation(op) ->
        do! waitUntilReadsComplete reads
        let! newState = op state
        return! readingLoop newState 0
    | PerformReadOperation(op) ->
        // Start the operation in background & increment counter
        async { do! op state
                mbox.Post(ReadOperationCompleted) }
        |> Async.Start
        return! readingLoop state (reads + 1) }
  readingLoop initial 0)
仅冇旳回忆 2024-11-11 13:09:49

只是为了添加 Tomas 建议的解决方案,以防您不想向邮箱的使用者公开“ReadOperationCompleted”消息(因为此消息是内部消息,并且在当前实现中可以由邮箱的任何使用者发送)可以在主邮箱处理器函数中创建一个单独的邮箱,该邮箱将接受两条消息:ReadOperationCompleted 和 WaitForReadCompleted(主邮箱将与 PostAndAsyncReply 一起使用该消息),因为只有当所有读操作都完成时才会对此消息做出响应操作已完成。此外,由“读取”表示的“读取”计数将被移动到该新的内部邮箱,因为该状态被该内部邮箱封装。

Just to add on to Tomas suggested solution, in case you do not want to expose the "ReadOperationCompleted" message to the consumer of the mail box (as this message is internal and in current implementation can be sent by any consumer of the mail box) a separate mail box can be created inside the main mail box processor function which will accept two messages: ReadOperationCompleted and WaitForReadCompleted (this one will be used with PostAndAsyncReply by the main mail box) as the response to this message will only come when all the read operations are completed. Also the "read" count represented by "reads" will be moved to this new internal mail box as that state be encapsulated by this internal mail box.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文