如何使用Scala并发编程来并行计算?

发布于 2024-10-07 14:39:35 字数 1121 浏览 2 评论 0原文

我正在尝试使用 Scala 来查找产生最大返回值的函数的参数,并且我想并行执行此操作。因此,对于这个函数:

def f(i: Long): Double = {
  // do something with i and return a double
}  

我想找到在 (0, x) 范围内的输入参数 i,当传递给函数 f 时,它给出最大值。这是我到目前为止所拥有的:

import scala.concurrent.ops._

def parMap(f: Long => (Double, Long), xs: List[Int]): Array[(Double, Long)] = {
  val results = new Array[(Double, Long)](xs.length)
  replicate(0, xs.length) { i => results(i) = f(xs(i)) }
  results
}

var results = parMap(i => (f(i), i), List.range(0, i)).max

它可能工作正常,但我收到 java.lang.OutOfMemoryError: Java 堆空间错误。对于我正在处理的问题,整个结果集太大而无法放入内存,因此需要丢弃不如迄今为止看到的最佳结果的结果。如果我将列表范围设置得足够小以使其全部适合内存,则我的结果数组(在调用 max 方法之前)看起来有点像这样:

Array(null, null, (-Infinity,2), (-Infinity,3), null, (-Infinity,5), (-Infinity,6), (-Infinity,7), (-Infinity,8), (-22184.3237904591,9), null, (-22137.315048628963,11)...

-Infinity 值对于我正在做的事情来说是正常的,但空值不是。每次运行它时我都会得到不同的空值,所以这是随机的。就像复制方法“放弃”某些函数调用并给出 null 一样。

注意我使用的是 Scala 2.8.1。

另外,在我看来,很难获得有关 Scala 和并行计算的准确文档。我想了解更多,这样我就可以自己解决这样的问题。谁能推荐一个我可以学习的可靠资源?

I am trying to use Scala to find the parameter to a function that yields the largest return value, and I would like to do it in parallel. So for this function:

def f(i: Long): Double = {
  // do something with i and return a double
}  

I want to find the input parameter i over the range (0, x) that gives the maximum value when passed to the function f. This is what I have so far:

import scala.concurrent.ops._

def parMap(f: Long => (Double, Long), xs: List[Int]): Array[(Double, Long)] = {
  val results = new Array[(Double, Long)](xs.length)
  replicate(0, xs.length) { i => results(i) = f(xs(i)) }
  results
}

var results = parMap(i => (f(i), i), List.range(0, i)).max

It might work correctly but I get a java.lang.OutOfMemoryError: Java heap space error. For the problem I am working on the entire set of results will be too large to fit in memory, so it needs to discard results that are inferior to the best seen so far. If I make the list range small enough for it to all fit in memory, my results Array (before it calls the max method) looks kind of like this:

Array(null, null, (-Infinity,2), (-Infinity,3), null, (-Infinity,5), (-Infinity,6), (-Infinity,7), (-Infinity,8), (-22184.3237904591,9), null, (-22137.315048628963,11)...

The -Infinity values are normal for what I am doing but the nulls are not. I get different nulls each time I run it, so that is random. It is like the replicate method 'gives up' on some of the function calls and gives null instead.

Note I am using Scala 2.8.1.

Also, it seems to me accurate documentation on Scala and parallel computing is hard to come by. I would like to learn more, so I can figure out problems like this one on my own. Can anyone suggest a reliable resource I can learn from?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

蓝咒 2024-10-14 14:39:35

我还没有完全跟上 2.9 并行集合的速度,并且我不确定 concurrent.ops 是否维护得很好,但在我看来,您的任务非常适合 futures 2.8 中:

// Setup--you want to use longs, so you can't use range
val x = 4000000000L  // Note that this doesn't fit in a signed integer
def f(l: Long) = l + 8e9/(3+l)
def longRange(a: Long, b: Long) = new Iterator[Long] {
  private[this] var i = a
  def hasNext = i<b
  def next = { val j = i; i += 1; j }
}

val cpus = 4
val ranges = (1 to cpus).map(i => longRange(((i-1)*x)/cpus, (i*x)/cpus))
val maxes = ranges.map(r => scala.actors.Futures.future(r.map(f).max))
println("Total max is " + maxes.map(_()).max)

在这里,您手动分割工作,并要求计算范围每个部分的最大值,这是由迭代器根据需要提供的。这些是在未来计算的,也就是说,Futures.future 返回一个承诺,它将最终交付返回值。当调用 myFuture.apply() 时,实际上会保留承诺,在本例中是 println 内的 _()。为了获得总最大值,您必须取最大值中的最大值,当然,直到所有推迟到未来的工作实际完成后,这才能返回。

如果您想验证它是否正常工作,您可以尝试比较四线程和单线程版本的运行时间。

(请注意,我提供的函数的答案应该是 4.000000001e9。)

另请注意,如果您确实希望运行得更快,您可能应该编写自己的范围测试:

def maxAppliedRange(a: Long, b: Long, f: Long=>Double) = {
  var m = f(a)
  var i = a
  while (i < b) {
    val x = f(i)
    if (m < x) m = x
    i += 1
  }
  m
}
val maxes = (1 to cpus).map(i => 
  scala.actors.Futures.future( maxAppliedRange((i-1)*x/cpus,i*x/cpus,f) )
)
println("Total max is " + maxes.map(_()).max)

这会提供更好的性能,因为没有装箱/拆箱,因此垃圾收集器不会受到压力,因此并行运行会产生更好的结果。对我来说,这比上面的方法快 40 倍,请注意,对于并行集合来说也是如此。所以要小心!仅仅使用更多内核并不一定能够加快计算速度,尤其是在处理垃圾量很大的任务时。

I'm not fully up to speed with the 2.9 parallel collections, and I'm not sure concurrent.ops is all that well maintained, but it seems to me that your task is perfectly well suited to futures in 2.8:

// Setup--you want to use longs, so you can't use range
val x = 4000000000L  // Note that this doesn't fit in a signed integer
def f(l: Long) = l + 8e9/(3+l)
def longRange(a: Long, b: Long) = new Iterator[Long] {
  private[this] var i = a
  def hasNext = i<b
  def next = { val j = i; i += 1; j }
}

val cpus = 4
val ranges = (1 to cpus).map(i => longRange(((i-1)*x)/cpus, (i*x)/cpus))
val maxes = ranges.map(r => scala.actors.Futures.future(r.map(f).max))
println("Total max is " + maxes.map(_()).max)

Here you split the work up by hand and ask for a computation of a max over each portion of the range, which is delivered on demand by the iterator. These are computed in the future, that is, the Futures.future returns a promise that it will deliver the return value eventually. The promise is actually kept when myFuture.apply() is called, which in this case is the _() inside the println. To get the total max, you have to take the max of maxes, and this of course can't return until all the work put off to the future is actually completed.

You can try comparing the runtime of the four-threaded and single-threaded versions if you want to verify that it's working.

(Note that the answer for the function I've provided should be 4.000000001e9.)

Note also that if you really want things to run quickly, you should probably write your own range tests:

def maxAppliedRange(a: Long, b: Long, f: Long=>Double) = {
  var m = f(a)
  var i = a
  while (i < b) {
    val x = f(i)
    if (m < x) m = x
    i += 1
  }
  m
}
val maxes = (1 to cpus).map(i => 
  scala.actors.Futures.future( maxAppliedRange((i-1)*x/cpus,i*x/cpus,f) )
)
println("Total max is " + maxes.map(_()).max)

This gives way better performance because there is no boxing/unboxing, and thus the garbage collector isn't stressed, and thus running in parallel gives much better results. This runs ~40x faster for me than the method above, and note that this will also be true with parallel collections. So be careful! Just using more cores isn't necessarily the way to speed up your computations, especially when engaging in a garbage-heavy task.

贩梦商人 2024-10-14 14:39:35

我认为您可以通过使用 futures 和全局 actor 线程池来简洁地做到这一点。与您原来的示例一致:

import scala.actors.Futures._

def parMap(f: Long => (Double,Long), xs: List[Int]) : Array[(Double,Long)] = {
  val results = new Array[(Double, Long)](xs.length)
  val futures = (0 until xs.length).map { i =>
    future { results(i) = f(xs(i)) }
  }
  futures.foreach(_())
  results
}

结果:

scala> parMap(l => (l.toDouble,l), List(1,2,3))
res2: Array[(Double, Long)] = Array((1.0,1), (2.0,2), (3.0,3))

这将并行化要完成的工作。如果您想针对拥有的处理器数量对其进行优化,可以使用actors.corePoolSize 和actors.maxPoolSize 属性设置actor 池的大小。

I think you could do this concisely by using futures but also using the global actor thread pool. In keeping with your original example:

import scala.actors.Futures._

def parMap(f: Long => (Double,Long), xs: List[Int]) : Array[(Double,Long)] = {
  val results = new Array[(Double, Long)](xs.length)
  val futures = (0 until xs.length).map { i =>
    future { results(i) = f(xs(i)) }
  }
  futures.foreach(_())
  results
}

results in:

scala> parMap(l => (l.toDouble,l), List(1,2,3))
res2: Array[(Double, Long)] = Array((1.0,1), (2.0,2), (3.0,3))

This will parallelize the work to be done. If you want to optimize it for the number of processors you have, you can set the size of the actor pool with the actors.corePoolSize and actors.maxPoolSize properties.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文