并行流水线

发布于 2024-08-05 17:24:04 字数 645 浏览 7 评论 0原文

 (fileNameToCharStream "bigfile"
 |>> fuse [length;
           splitBy (fun x -> x = ' ' || x = '\n') removeEmpty |>> length;
           splitBy (fun x -> x = '\n') keepEmpty |>> length;
         ])
  (*fuse "fuses" the three functions to run concurrently*)
 |> run 2  (*forces to run in parallel on two threads*)
 |> (fun [num_chars; num_words; num_lines] -> 
       printfn "%d %d %d"
           num_chars num_words, num_lines))

我想让这段代码按以下方式工作: 将原始流正好在中间分成两部分;然后 对于每一半运行一个单独的计算 计算三件事:长度(即字符数), 字数、行数。 但是,我不想出现问题,如果 我错误地分割了一个单词。这必须是 得到照顾。该文件只能读取一次。

我应该如何编写指定的函数和运算符|>>? 是否可以?

 (fileNameToCharStream "bigfile"
 |>> fuse [length;
           splitBy (fun x -> x = ' ' || x = '\n') removeEmpty |>> length;
           splitBy (fun x -> x = '\n') keepEmpty |>> length;
         ])
  (*fuse "fuses" the three functions to run concurrently*)
 |> run 2  (*forces to run in parallel on two threads*)
 |> (fun [num_chars; num_words; num_lines] -> 
       printfn "%d %d %d"
           num_chars num_words, num_lines))

I want to make this code work in the following way:
split the original stream into two exactly in the middle; then
for each half run a separate computation that
computes 3 things: the length (i.e. number of chars),
the number of words, the number of lines.
However, I do not want to have a problem if
I erroneously split over a word. This has to be
taken care of. The file should be read only once.

How should I program the functions specified and the operator |>>?
Is it possible?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

醉酒的小男人 2024-08-12 17:24:04

看来你的要求有点高了。我将让您自行弄清楚字符串操作,但我将向您展示如何定义一个并行执行一系列操作的运算符。

第 1 步:编写 fuse 函数

您的 fusion 函数似乎使用多个函数映射单个输入,这很容易编写如下:

//val fuse : seq<('a -> 'b)> -> 'a -> 'b list
let fuse functionList input = [ for f in functionList -> f input]

请注意,所有映射函数需要有相同的类型。

第 2 步:定义并行执行函数的运算符

标准并行映射函数可以编写如下:

//val pmap : ('a -> 'b) -> seq<'a> -> 'b array
let pmap f l =
    seq [for a in l -> async { return f a } ]
    |> Async.Parallel
    |> Async.RunSynchronously

据我所知,Async.Parallel 将并行执行异步操作,其中在任何给定时间执行的并行任务数量等于机器上的核心数量(如果我错了,有人可以纠正我)。因此,在双核机器上,当调用此函数时,我们的机器上最多应该有 2 个线程运行。这是一件好事,因为我们不希望通过每个核心运行多个线程来提高速度(事实上,额外的上下文切换可能会减慢速度)。

我们可以根据pmapfuse定义一个运算符|>>

//val ( |>> ) : seq<'a> -> seq<('a -> 'b)> -> 'b list array
let (|>>) input functionList = pmap (fuse functionList) input

所以|>> code> 运算符接受一堆输入并使用许多不同的输出来映射它们。到目前为止,如果我们将所有这些放在一起,我们会得到以下结果(在 fsi 中):

> let countOccurrences compareChar source =
    source |> Seq.sumBy(fun c -> if c = compareChar then 1 else 0)

let length (s : string) = s.Length

let testData = "Juliet is awesome|Someone should give her a medal".Split('|')
let testOutput =
    testData
    |>> [length; countOccurrences 'J'; countOccurrences 'o'];;

val countOccurrences : 'a -> seq<'a> -> int
val length : string -> int
val testData : string [] =
  [|"Juliet is awesome"; "Someone should give her a medal"|]
val testOutput : int list array = [|[17; 1; 1]; [31; 0; 3]|]

testOutput 包含两个元素,这两个元素都是并行计算的。

第 3 步:将元素聚合到单个输出

好吧,现在我们有了由数组中的每个元素表示的部分结果,并且我们希望将部分结果合并到单个聚合中。我假设数组中的每个元素应该合并相同的函数,因为输入中的每个元素都具有相同的数据类型。

这是我为这项工作编写的一个非常丑陋的函数:

> let reduceMany f input =
    input
    |> Seq.reduce (fun acc x -> [for (a, b) in Seq.zip acc x -> f a b ]);;

val reduceMany : ('a -> 'a -> 'a) -> seq<'a list> -> 'a list

> reduceMany (+) testOutput;;
val it : int list = [48; 1; 4]

reduceMany 接受 n 长度序列的序列,并返回一个 n 长度数组作为输出。如果您能想出更好的方法来编写此函数,请成为我的客人:)

解码上面的输出:

  • 48 = 我的两个输入字符串的长度之和。请注意,原始字符串有 49 个字符,但将其拆分为“|”每个“|”消耗一个字符。
  • 1 = 我的输入中所有“J”实例的总和
  • 4 = 所有“O”实例的总和。

第 4 步:将所有内容放在一起

let pmap f l =
    seq [for a in l -> async { return f a } ]
    |> Async.Parallel
    |> Async.RunSynchronously

let fuse functionList input = [ for f in functionList -> f input]

let (|>>) input functionList = pmap (fuse functionList) input

let reduceMany f input =
    input
    |> Seq.reduce (fun acc x -> [for (a, b) in Seq.zip acc x -> f a b ])

let countOccurrences compareChar source =
    source |> Seq.sumBy(fun c -> if c = compareChar then 1 else 0)

let length (s : string) = s.Length

let testData = "Juliet is awesome|Someone should give her a medal".Split('|')
let testOutput =
    testData
    |>> [length; countOccurrences 'J'; countOccurrences 'o']
    |> reduceMany (+)

It looks like your asking for quite a bit. I'll leave it up to you to figure out the string manipulation, but I'll show you how to define an operator which executes a series of operations in parallel.

Step 1: Write a fuse function

Your fuse function appears to map a single input using multiple functions, which is easy enough to write as follows:

//val fuse : seq<('a -> 'b)> -> 'a -> 'b list
let fuse functionList input = [ for f in functionList -> f input]

Note that all of your mapping functions need to have the same type.

Step 2: Define operator to execute functions in parallel

The standard parallel map function can be written as follows:

//val pmap : ('a -> 'b) -> seq<'a> -> 'b array
let pmap f l =
    seq [for a in l -> async { return f a } ]
    |> Async.Parallel
    |> Async.RunSynchronously

To my knowledge, Async.Parallel will execute async operations in parallel, where the number of parallel tasks executing at any given time is equal to the number of cores on a machine (someone can correct me if I'm wrong). So on a dual core machine, we should have at most 2 threads running on my machine when this function is called. This is a good thing, since we don't expect any speedup by running more than one thread per core (in fact the extra context switching might slow things down).

We can define an operator |>> in terms of pmap and fuse:

//val ( |>> ) : seq<'a> -> seq<('a -> 'b)> -> 'b list array
let (|>>) input functionList = pmap (fuse functionList) input

So the |>> operator takes a bunch of inputs and maps them using lots of different outputs. So far, if we put all this together, we get the following (in fsi):

> let countOccurrences compareChar source =
    source |> Seq.sumBy(fun c -> if c = compareChar then 1 else 0)

let length (s : string) = s.Length

let testData = "Juliet is awesome|Someone should give her a medal".Split('|')
let testOutput =
    testData
    |>> [length; countOccurrences 'J'; countOccurrences 'o'];;

val countOccurrences : 'a -> seq<'a> -> int
val length : string -> int
val testData : string [] =
  [|"Juliet is awesome"; "Someone should give her a medal"|]
val testOutput : int list array = [|[17; 1; 1]; [31; 0; 3]|]

testOutput contains two elements, both of which were computed in parallel.

Step 3: Aggregate elements into a single output

Alright, so now we have partial results represented by each element in our array, and we want to merge our partial results into a single aggregate. I assume each element in the array should be merged the same function, since each element in the input has the same datatype.

Here's a really ugly function I wrote for the job:

> let reduceMany f input =
    input
    |> Seq.reduce (fun acc x -> [for (a, b) in Seq.zip acc x -> f a b ]);;

val reduceMany : ('a -> 'a -> 'a) -> seq<'a list> -> 'a list

> reduceMany (+) testOutput;;
val it : int list = [48; 1; 4]

reduceMany takes sequence of n-length sequences, and it returns an n-length array as an output. If you can think of a better way to write this function, be my guest :)

To decode the output above:

  • 48 = sum of the lengths of my two input strings. Note the original string was 49 chars, but splitting it on the "|" ate up one char per "|".
  • 1 = sum of all instances of 'J' in my input
  • 4 = sum of all instances of 'O'.

Step 4: Put everything together

let pmap f l =
    seq [for a in l -> async { return f a } ]
    |> Async.Parallel
    |> Async.RunSynchronously

let fuse functionList input = [ for f in functionList -> f input]

let (|>>) input functionList = pmap (fuse functionList) input

let reduceMany f input =
    input
    |> Seq.reduce (fun acc x -> [for (a, b) in Seq.zip acc x -> f a b ])

let countOccurrences compareChar source =
    source |> Seq.sumBy(fun c -> if c = compareChar then 1 else 0)

let length (s : string) = s.Length

let testData = "Juliet is awesome|Someone should give her a medal".Split('|')
let testOutput =
    testData
    |>> [length; countOccurrences 'J'; countOccurrences 'o']
    |> reduceMany (+)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文