在scala的另一个线程上执行一个简单的任务

发布于 2024-08-03 15:03:38 字数 1742 浏览 5 评论 0原文

我想知道是否有一种方法可以在 scala 中的另一个线程上执行非常简单的任务,并且不会有很多开销?

基本上我想创建一个全局“执行器”,可以处理执行任意数量的任务。然后我可以使用执行器来构建额外的构造。

此外,如果客户端不必考虑阻塞或非阻塞因素,那就太好了。

我知道 scala actor 库是建立在 Doug Lea FJ 的基础上的,而且它们在一定程度上支持我想要完成的任务。然而,根据我的理解,我必须预先分配一个“演员池”才能完成。

我想避免为此创建一个全局线程池,因为据我了解,它并不擅长细粒度并行性。

这是一个简单的示例:

import concurrent.SyncVar
object SimpleExecutor {
  import actors.Actor._
  def exec[A](task:  => A) : SyncVar[A] = {
    //what goes here?
    //This is what I currently have
    val x = new concurrent.SyncVar[A]
    //The overhead of making the actor appears to be a killer
    actor {
      x.set(task)
    }
    x
  }
  //Not really sure what to stick here
  def execBlocker[A](task: => A) : SyncVar[A] = exec(task)

}

现在是使用 exec 的示例:

object Examples {
  //Benchmarks a task
  def benchmark(blk : => Unit) = {
    val start = System.nanoTime
    blk
    System.nanoTime - start
  }

  //Benchmarks and compares 2 tasks
  def cmp(a: => Any, b: => Any) = {
    val at = benchmark(a)
    val bt = benchmark(b)
    println(at + " " + bt + " " +at.toDouble / bt)
  }

  //Simple example for simple non blocking comparison
  import SimpleExecutor._
  def paraAdd(hi: Int) = (0 until hi) map (i=>exec(i+5)) foreach (_.get)
  def singAdd(hi: Int) = (0 until hi) foreach (i=>i+5)

  //Simple example for the blocking performance
  import Thread.sleep
  def paraSle(hi : Int) = (0 until hi) map (i=>exec(sleep(i))) foreach (_.get)
  def singSle(hi : Int) = (0 until hi) foreach (i=>sleep(i))
}

最后运行示例(可能需要执行几次,以便 HotSpot 可以预热):

import Examples._
cmp(paraAdd(10000), singAdd(10000))
cmp(paraSle(100), singSle(100))

I was wondering if there was a way to execute very simple tasks on another thread in scala that does not have a lot of overhead?

Basically I would like to make a global 'executor' that can handle executing an arbitrary number of tasks. I can then use the executor to build up additional constructs.

Additionally it would be nice if blocking or non-blocking considerations did not have to be considered by the clients.

I know that the scala actors library is built on top of the Doug Lea FJ stuff, and also that they support to a limited degree what I am trying to accomplish. However from my understanding I will have to pre-allocate an 'Actor Pool' to accomplish.

I would like to avoid making a global thread pool for this, as from what I understand it is not all that good at fine grained parallelism.

Here is a simple example:

import concurrent.SyncVar
object SimpleExecutor {
  import actors.Actor._
  def exec[A](task:  => A) : SyncVar[A] = {
    //what goes here?
    //This is what I currently have
    val x = new concurrent.SyncVar[A]
    //The overhead of making the actor appears to be a killer
    actor {
      x.set(task)
    }
    x
  }
  //Not really sure what to stick here
  def execBlocker[A](task: => A) : SyncVar[A] = exec(task)

}

and now an example of using exec:

object Examples {
  //Benchmarks a task
  def benchmark(blk : => Unit) = {
    val start = System.nanoTime
    blk
    System.nanoTime - start
  }

  //Benchmarks and compares 2 tasks
  def cmp(a: => Any, b: => Any) = {
    val at = benchmark(a)
    val bt = benchmark(b)
    println(at + " " + bt + " " +at.toDouble / bt)
  }

  //Simple example for simple non blocking comparison
  import SimpleExecutor._
  def paraAdd(hi: Int) = (0 until hi) map (i=>exec(i+5)) foreach (_.get)
  def singAdd(hi: Int) = (0 until hi) foreach (i=>i+5)

  //Simple example for the blocking performance
  import Thread.sleep
  def paraSle(hi : Int) = (0 until hi) map (i=>exec(sleep(i))) foreach (_.get)
  def singSle(hi : Int) = (0 until hi) foreach (i=>sleep(i))
}

Finally to run the examples (might want to do it a few times so HotSpot can warm up):

import Examples._
cmp(paraAdd(10000), singAdd(10000))
cmp(paraSle(100), singSle(100))

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

残月升风 2024-08-10 15:03:38

这就是 Futures 的诞生目的。只需 import scala.actors.Futures._,使用 future 创建新的 future,使用 awaitAll 等方法等待结果一段时间, applyrespond 阻塞直到收到结果,isSet 查看是否准备好等等。

您不需要创建线程池也可以。或者,至少,通常情况下你不会这样做。你认为你为什么这样做?

编辑

您无法通过并行化整数加法这样简单的操作来获得性能,因为这甚至比函数调用还要快。并发只能通过避免阻塞 I/O 造成的时间损失以及使用多个 CPU 核心并行执行任务来带来性能。在后一种情况下,任务的计算成本必须足以抵消划分工作负载和合并结果的成本。

采用并发的另一个原因是提高应用程序的响应能力。这并不是让它变得更快,而是让它对用户的响应更快,而实现这一点的一种方法是将相对快速的操作卸载到另一个线程,以便处理用户所看到或所做的事情的线程可以更快。但我离题了。

您的代码存在严重问题:

  def paraAdd(hi: Int) = (0 until hi) map (i=>exec(i+5)) foreach (_.get)
  def singAdd(hi: Int) = (0 until hi) foreach (i=>i+5)

或者,转换为 futures,

  def paraAdd(hi: Int) = (0 until hi) map (i=>future(i+5)) foreach (_.apply)
  def singAdd(hi: Int) = (0 until hi) foreach (i=>i+5)

您可能认为 paraAdd 正在并行执行任务,但事实并非如此,因为 Range 有一个非-map 的严格实现(直到 Scala 2.7;从 Scala 2.8.0 开始,Range 是严格的)。你可以在其他 Scala 问题上查找它。发生的情况是这样的:

  1. 0hi 创建一个范围。
  2. 从该范围的每个元素 i 创建范围投影到返回 future(i +5) 调用时。
  3. 对于范围投影 (i => future(i+5)) 的每个元素,都会评估该元素(foreach 是严格的),然后函数 apply 被调用。

因此,由于 future 在步骤 2 中调用,而仅在步骤 3 中调用,因此您将等待每个 future 完成,然后再执行下一张。您可以使用以下方法修复它:

  def paraAdd(hi: Int) = (0 until hi).force map (i=>future(i+5)) foreach (_.apply)

这会给您带来更好的性能,但永远不如简单的立即添加那么好。另一方面,假设您这样做:

def repeat(n: Int, f: => Any) = (0 until n) foreach (_ => f)
def paraRepeat(n: Int, f: => Any) = 
  (0 until n).force map (_ => future(f)) foreach (_.apply)

然后进行比较:

cmp(repeat(100, singAdd(100000)), paraRepeat(100, singAdd(100000)))

您可能会开始看到收益(这取决于内核数量和处理器速度)。

That's what Futures was made for. Just import scala.actors.Futures._, use future to create new futures, methods like awaitAll to wait on the results for a while, apply or respond to block until the result is received, isSet to see if it's ready or not, etc.

You don't need to create a thread pool either. Or, at least, not normally you don't. Why do you think you do?

EDIT

You can't gain performance parallelizing something as simple as an integer addition, because that's even faster than a function call. Concurrency will only bring performance by avoiding time lost to blocking i/o and by using multiple CPU cores to execute tasks in parallel. In the latter case, the task must be computationally expensive enough to offset the cost of dividing the workload and merging the results.

One other reason to go for concurrency is to improve the responsiveness of the application. That's not making it faster, that's making it respond faster to the user, and one way of doing that is getting even relatively fast operations offloaded to another thread so that the threads handling what the user sees or does can be faster. But I digress.

There's a serious problem with your code:

  def paraAdd(hi: Int) = (0 until hi) map (i=>exec(i+5)) foreach (_.get)
  def singAdd(hi: Int) = (0 until hi) foreach (i=>i+5)

Or, translating into futures,

  def paraAdd(hi: Int) = (0 until hi) map (i=>future(i+5)) foreach (_.apply)
  def singAdd(hi: Int) = (0 until hi) foreach (i=>i+5)

You might think paraAdd is doing the tasks in paralallel, but it isn't, because Range has a non-strict implementation of map (that's up to Scala 2.7; starting with Scala 2.8.0, Range is strict). You can look it up on other Scala questions. What happens is this:

  1. A range is created from 0 until hi
  2. A range projection is created from each element i of the range into a function that returns future(i+5) when called.
  3. For each element of the range projection (i => future(i+5)), the element is evaluated (foreach is strict) and then the function apply is called on it.

So, because future is not called in step 2, but only in step 3, you'll wait for each future to complete before doing the next one. You can fix it with:

  def paraAdd(hi: Int) = (0 until hi).force map (i=>future(i+5)) foreach (_.apply)

Which will give you better performance, but never as good as a simple immediate addition. On the other hand, suppose you do this:

def repeat(n: Int, f: => Any) = (0 until n) foreach (_ => f)
def paraRepeat(n: Int, f: => Any) = 
  (0 until n).force map (_ => future(f)) foreach (_.apply)

And then compare:

cmp(repeat(100, singAdd(100000)), paraRepeat(100, singAdd(100000)))

You may start seeing gains (it will depend on the number of cores and processor speed).

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文