有没有办法让 F# 中的异步工作流程自动进行管道化?

发布于 2024-11-27 10:50:20 字数 497 浏览 0 评论 0原文

(请注意,我所说的是并行运行独立进程中的管道;与 |> 运算符无关)。

因此,如果我有两个函数,

let incr x = 
  Thread.Sleep 1000
  x + 1

let product x y = 
  Thread.Sleep 1000
  x * y

是否有一种简单的方法可以编写类似(伪代码)的工作流程,

let productOfIncrements x y = 
  async {
    let! x1 = incr x
    let! y1 = incr y
    let! result = product x1 y1
    return result
  }

该工作流程可以管道化前两个独立操作,从而在两秒内执行,或者异步工作流程是解决此问题的错误方法吗?如果有一个解决问题的好方法,是否有一种直接的方法来扩展这种方法,例如在 N+1 秒而不是 2N 秒内进行递归阶乘计算?

(Note I'm talking about pipelining as in running independent processes in parallel; not related to the |> operator).

So if I've got two functions

let incr x = 
  Thread.Sleep 1000
  x + 1

let product x y = 
  Thread.Sleep 1000
  x * y

would there be an easy way to write a workflow something like (pseudocode)

let productOfIncrements x y = 
  async {
    let! x1 = incr x
    let! y1 = incr y
    let! result = product x1 y1
    return result
  }

that pipelines the first two independent operations and thus executes in two seconds, or are async workflows the wrong approach to this problem? If there is a good approach to the problem, is there a straightforward way to extend such an approach to do, say, recursive factorial calculation in N+1 seconds rather than 2N?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

笨笨の傻瓜 2024-12-04 10:50:20

最简单的选择是使用Async.StartChild。该原语在后台(在线程池中)启动异步操作并返回可用于等待操作完成的“令牌”。这不会阻塞工作流程,因此您可以继续运行其他操作:

let productOfIncrements x y = 
  async {
    // Start the 'incr x' operation in background
    let! x1Op = async {return incr x} |> Async.StartChild
    // Continue doing other work 
    // (synchronously since it takes the same time as the above.)
    let y1 = incr y
    // Now wait for the result of 'incr x' (if it is still running)
    let! x1 = x1Op
    // return the product (just call the function synchronously)
    return product x1 y1
  }

如果两个操作返回相同的类型,那么您还可以使用 Async.Parallel,它将多个操作组合起来并行运行。

如果您正在处理纯粹受 CPU 限制的计算并且需要创建大量计算,那么您还可以直接使用 .NET Tasks(例如,请参阅 本文)。任务效率更高,但使用起来不太优雅(并且不能很好地支持异步等待)。

顺便说一句,术语管道通常用于(至少在 F# 或 .NET 世界中)更复杂的事情 - 例如,假设您有一系列相互依赖的步骤。处理多个输入时,您可以并行运行这些步骤(并且仍然限制总并行度)。这也可以使用 F# 异步工作流程来完成 - 请参阅例如本文 。还有一个名为pipelets的框架实现了这个概念。

The easiest option is to use Async.StartChild. This primitve starts an asynchronous operation in background (in a thread pool) and returns a "token" that can be used to wait for a completion of the operation. This doesn't block the workflow, so you can then continue running other operations:

let productOfIncrements x y = 
  async {
    // Start the 'incr x' operation in background
    let! x1Op = async {return incr x} |> Async.StartChild
    // Continue doing other work 
    // (synchronously since it takes the same time as the above.)
    let y1 = incr y
    // Now wait for the result of 'incr x' (if it is still running)
    let! x1 = x1Op
    // return the product (just call the function synchronously)
    return product x1 y1
  }

If the two operations return the same type, then you can also use Async.Parallel, which composes multiple operations to run in parallel.

If you're working with purely CPU-bound computations and you need to create a large number of them, then you can also use .NET Tasks directly (see for example this article). Tasks are more efficient, but are not as elegant to use (and don't support asynchronous waiting nicely).

As a side-note, the term pipeline is usually used (at least in F# or .NET world) for a more complicated thing - For example, say you have a series of steps that depend on each other. When processing multiple inputs, you can run the steps in parallel (and still limit the total paralellism). This can be done using F# async workflows too - see for example this article. There is also a framework named pipelets that implements the concept.

追我者格杀勿论 2024-12-04 10:50:20

好的,这是基于托马斯对等待问题的解决方案的递归阶乘问题的解决方案。我对它还不是100%满意;我觉得内部方法应该返回某种延续元组或其他东西;维护运行总数似乎有点像“作弊”,但无论如何:

open System.Threading
open System

let decr x = 
  Thread.Sleep 1000
  x - 1

let product x y = 
  Thread.Sleep 1000
  x * y

let fact (n:int) :Async<int> = 
  let rec fact2 (x:int) (xMinus1Op:Async<int>) (runningProduct:Async<int>) :Async<int> =
      async {
        if x = 0 then 
          return! runningProduct
        else
          let! xMinus1 = xMinus1Op
          let! xMinus2Op = async {return decr xMinus1} |> Async.StartChild
          let! prod = runningProduct
          let! runningProduct = async {return product x prod} |> Async.StartChild
          // start both threads to execute simultaneously and feed them forward.
          return! fact2 xMinus1 xMinus2Op runningProduct
      }
  fact2 n (async{return decr n}) (async{return 1})

let start = Environment.TickCount
let result = fact 10 |> Async.RunSynchronously
printfn "%A" <| result
printfn "%A" <| Environment.TickCount - start //runs in 11 seconds, not 20.

编辑:也许使用任务更简单:

let fact (n:int) :int = 
    let rec fact2 (x:int) (xMinus1:int) (runningProduct:int) :int =
        if x = 0 then 
            runningProduct
        else
            let xMinus2Op = new Task<int>(fun () -> decr xMinus1)
            let runningProductOp = new Task<int>(fun () -> product x runningProduct)
            xMinus2Op.Start()
            runningProductOp.Start()
            let xMinus2 = xMinus2Op.Result
            let runningProduct = runningProductOp.Result
            fact2 xMinus1 xMinus2 runningProduct
    fact2 n (decr n) (1)

不需要工作流程,只需简单的命令式代码;它甚至可以轻松地转换为 C#。

Okay, here's a solution to the recursive factorial problem based on Tomas's solution to the waiting problem. I'm still not 100% satisfied with it; I feel like the inner method should return some kind of continuation tuple or something; maintaining the running total seems somehow like "cheating", but anyway:

open System.Threading
open System

let decr x = 
  Thread.Sleep 1000
  x - 1

let product x y = 
  Thread.Sleep 1000
  x * y

let fact (n:int) :Async<int> = 
  let rec fact2 (x:int) (xMinus1Op:Async<int>) (runningProduct:Async<int>) :Async<int> =
      async {
        if x = 0 then 
          return! runningProduct
        else
          let! xMinus1 = xMinus1Op
          let! xMinus2Op = async {return decr xMinus1} |> Async.StartChild
          let! prod = runningProduct
          let! runningProduct = async {return product x prod} |> Async.StartChild
          // start both threads to execute simultaneously and feed them forward.
          return! fact2 xMinus1 xMinus2Op runningProduct
      }
  fact2 n (async{return decr n}) (async{return 1})

let start = Environment.TickCount
let result = fact 10 |> Async.RunSynchronously
printfn "%A" <| result
printfn "%A" <| Environment.TickCount - start //runs in 11 seconds, not 20.

Edit: maybe it's more straightforward using tasks:

let fact (n:int) :int = 
    let rec fact2 (x:int) (xMinus1:int) (runningProduct:int) :int =
        if x = 0 then 
            runningProduct
        else
            let xMinus2Op = new Task<int>(fun () -> decr xMinus1)
            let runningProductOp = new Task<int>(fun () -> product x runningProduct)
            xMinus2Op.Start()
            runningProductOp.Start()
            let xMinus2 = xMinus2Op.Result
            let runningProduct = runningProductOp.Result
            fact2 xMinus1 xMinus2 runningProduct
    fact2 n (decr n) (1)

No workflows necessary, just plain imperative code; it might even translate into C# easily.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文