F# 中的并行存在函数

发布于 2024-12-21 16:58:24 字数 1985 浏览 5 评论 0原文

动机

我有一个长时间运行的布尔函数,应该在数组中执行,并且如果数组中的元素满足条件,我想立即返回。我想并行进行搜索,并在第一个完整线程返回正确答案时终止其他线程。

问题

在 F# 中实现并行存在函数的好方法是什么?由于我的目标是性能,因此高效的解决方案优于简单或惯用的解决方案。

测试用例

假设我想查找数组中是否存在一个值。比较函数(equals)被模拟为计算量大的函数:

open System.Diagnostics
open System.Threading

// Source at http://parallelpatterns.codeplex.com/releases/view/50473
let doCpuIntensiveOperation seconds (token:CancellationToken) throwOnCancel =
        if (token.IsCancellationRequested) then
            if (throwOnCancel) then token.ThrowIfCancellationRequested()
            false
        else
            let ms = int64 (seconds * 1000.0)
            let sw = new Stopwatch()
            sw.Start()
            let checkInterval = Math.Min(20000000, int (20000000.0 * seconds))

            // Loop to simulate a computationally intensive operation
            let rec loop i = 
                // Periodically check to see if the user has requested 
                // cancellation or if the time limit has passed
                let check = seconds = 0.0 || i % checkInterval = 0
                if check && token.IsCancellationRequested then
                    if throwOnCancel then token.ThrowIfCancellationRequested()
                    false
                elif check && sw.ElapsedMilliseconds > ms then
                    true
                else 
                  loop (i + 1)

            // Start the loop with 0 as the first value
            loop 0

let inline equals x y =
    doCpuIntensiveOperation 0.01 CancellationToken.None false |> ignore
    x = y

数组由 1000 个随机生成的元素组成,并且搜索值保证在数组的第二半(因此顺序搜索必须至少遍历数组的一半):

let rand = new System.Random()
let m = 1000
let N = 1000000
let xs = [|for _ in 1..m -> rand.Next(N)|]
let i = rand.Next((m-1)/2, m-1);;

#time "on";;
let b1 = parallelExists (equals xs.[i]) xs;; // Parallel
let b2 = Array.exists (equals xs.[i]) xs;; // Sequential

Motivation

I have a long-running boolean function which should be executed in an array and I want to return immediately if an element in the array satisfies the condition. I would like to do the search in parallel and terminate other threads when the first complete thread returns an correct answer.

Question

What is a good way to implement parallel exists function in F#? Since my goal is performance, an efficient solution is preferred to an easy or idiomatic one.

Test case

Suppose that I want to find whether one value exists in an array or not. And the comparison function (equals) is simulated as a computation-expensive one:

open System.Diagnostics
open System.Threading

// Source at http://parallelpatterns.codeplex.com/releases/view/50473
let doCpuIntensiveOperation seconds (token:CancellationToken) throwOnCancel =
        if (token.IsCancellationRequested) then
            if (throwOnCancel) then token.ThrowIfCancellationRequested()
            false
        else
            let ms = int64 (seconds * 1000.0)
            let sw = new Stopwatch()
            sw.Start()
            let checkInterval = Math.Min(20000000, int (20000000.0 * seconds))

            // Loop to simulate a computationally intensive operation
            let rec loop i = 
                // Periodically check to see if the user has requested 
                // cancellation or if the time limit has passed
                let check = seconds = 0.0 || i % checkInterval = 0
                if check && token.IsCancellationRequested then
                    if throwOnCancel then token.ThrowIfCancellationRequested()
                    false
                elif check && sw.ElapsedMilliseconds > ms then
                    true
                else 
                  loop (i + 1)

            // Start the loop with 0 as the first value
            loop 0

let inline equals x y =
    doCpuIntensiveOperation 0.01 CancellationToken.None false |> ignore
    x = y

The array consists of 1000 randomly generated elements and the searching value is guaranteed in the 2nd half of the array (so sequential search has to go through at least a half of the array):

let rand = new System.Random()
let m = 1000
let N = 1000000
let xs = [|for _ in 1..m -> rand.Next(N)|]
let i = rand.Next((m-1)/2, m-1);;

#time "on";;
let b1 = parallelExists (equals xs.[i]) xs;; // Parallel
let b2 = Array.exists (equals xs.[i]) xs;; // Sequential

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

你的笑 2024-12-28 16:58:24

我认为您可以采取以下步骤:

  1. 生成多个工作人员(线程或异步计算),并传递每个相同的数组切片和一个由所有工作人员共享的取消令牌

  2. 当工作人员找到搜索到的项目时,它会在令牌上调用取消(每个工作人员应该检查取消状态每次迭代上的令牌,并在需要时保释)

我不这样做目前有时间编写代码,因此可能会省略一些细节。

这个答案以及相关问题可能会有所帮助。

更新

的一个例子

open System
open System.Collections.Generic
open System.Threading
open System.Threading.Tasks

let getChunks size array =
  let rec loop s n =
    seq {
      if n > 0 then
        let r = n - size
        if r > 0 then yield (s, size); yield! loop (s + size) r
        else yield (s, size + r)
    }      
  loop 0 (Array.length array)

[<Literal>]
let CHUNK_SIZE = 3

let parallelExists f (array:_[]) =
  use cts = new CancellationTokenSource()
  let rec checkSlice i n = 
    if n > 0 && not cts.IsCancellationRequested then
      if f array.[i] then cts.Cancel()
      else checkSlice (i + 1) (n - 1)
  let workers =
    array
    |> getChunks CHUNK_SIZE
    |> Seq.map (fun (s, c) -> Task.Factory.StartNew(fun () -> checkSlice s c))
    |> Seq.toArray
  try 
    Task.WaitAll(workers, cts.Token)
    false
  with :? OperationCanceledException -> true

这是我的想法

let array = Array.init 10 id
let exists = 
  array |> parallelExists (fun i ->
    Thread.Sleep(500)
    i = 9)
printfn "%b" exists //true

I think you can take the following steps:

  1. Spawn a number of workers (threads or async computations), and pass each an equal slice of the array and a cancellation token which will be shared by all workers

  2. When a worker finds the searched item, it calls Cancel on the token (each worker should check the cancel state of the token on each iteration and bail if needed)

I don't have time at the moment to write the code, so there could be some detail I'm omitting.

This answer, and related question, may be helpful.

UPDATE

This is an example of what I'm thinking

open System
open System.Collections.Generic
open System.Threading
open System.Threading.Tasks

let getChunks size array =
  let rec loop s n =
    seq {
      if n > 0 then
        let r = n - size
        if r > 0 then yield (s, size); yield! loop (s + size) r
        else yield (s, size + r)
    }      
  loop 0 (Array.length array)

[<Literal>]
let CHUNK_SIZE = 3

let parallelExists f (array:_[]) =
  use cts = new CancellationTokenSource()
  let rec checkSlice i n = 
    if n > 0 && not cts.IsCancellationRequested then
      if f array.[i] then cts.Cancel()
      else checkSlice (i + 1) (n - 1)
  let workers =
    array
    |> getChunks CHUNK_SIZE
    |> Seq.map (fun (s, c) -> Task.Factory.StartNew(fun () -> checkSlice s c))
    |> Seq.toArray
  try 
    Task.WaitAll(workers, cts.Token)
    false
  with :? OperationCanceledException -> true

Usage

let array = Array.init 10 id
let exists = 
  array |> parallelExists (fun i ->
    Thread.Sleep(500)
    i = 9)
printfn "%b" exists //true
淡淡離愁欲言轉身 2024-12-28 16:58:24

F# Powerpack 具有 PSeq.exists,它映射到 PLINQ 的 ParallelEnumerable.Any,它是 BCL 的一部分。还有 ParallelEnumerable。首先,

我尝试反编译,但无法立即理解发生了什么。因此,我执行了以下副作用代码,以确认它在找到该元素后使用了某种取消操作:

let elems = seq {
    for x = 0 to 1000000 do
        printfn "test"
        yield x }

open System
open System.Linq;;

ParallelEnumerable.First (ParallelEnumerable.AsParallel(elems), Func<_,_>(fun x -> x = 1))

The F# Powerpack has PSeq.exists which maps to PLINQ's ParallelEnumerable.Any which is part of the BCL. There's also ParallelEnumerable.First

I tried to decompile but wouldn't understand right away what was going on. So instead I went and executed the following side-effecting code to confirm that it's using some sort of cancellation once it found the element:

let elems = seq {
    for x = 0 to 1000000 do
        printfn "test"
        yield x }

open System
open System.Linq;;

ParallelEnumerable.First (ParallelEnumerable.AsParallel(elems), Func<_,_>(fun x -> x = 1))
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文