在scala的另一个线程上执行一个简单的任务
我想知道是否有一种方法可以在 scala 中的另一个线程上执行非常简单的任务,并且不会有很多开销?
基本上我想创建一个全局“执行器”,可以处理执行任意数量的任务。然后我可以使用执行器来构建额外的构造。
此外,如果客户端不必考虑阻塞或非阻塞因素,那就太好了。
我知道 scala actor 库是建立在 Doug Lea FJ 的基础上的,而且它们在一定程度上支持我想要完成的任务。然而,根据我的理解,我必须预先分配一个“演员池”才能完成。
我想避免为此创建一个全局线程池,因为据我了解,它并不擅长细粒度并行性。
这是一个简单的示例:
import concurrent.SyncVar
object SimpleExecutor {
import actors.Actor._
def exec[A](task: => A) : SyncVar[A] = {
//what goes here?
//This is what I currently have
val x = new concurrent.SyncVar[A]
//The overhead of making the actor appears to be a killer
actor {
x.set(task)
}
x
}
//Not really sure what to stick here
def execBlocker[A](task: => A) : SyncVar[A] = exec(task)
}
现在是使用 exec 的示例:
object Examples {
//Benchmarks a task
def benchmark(blk : => Unit) = {
val start = System.nanoTime
blk
System.nanoTime - start
}
//Benchmarks and compares 2 tasks
def cmp(a: => Any, b: => Any) = {
val at = benchmark(a)
val bt = benchmark(b)
println(at + " " + bt + " " +at.toDouble / bt)
}
//Simple example for simple non blocking comparison
import SimpleExecutor._
def paraAdd(hi: Int) = (0 until hi) map (i=>exec(i+5)) foreach (_.get)
def singAdd(hi: Int) = (0 until hi) foreach (i=>i+5)
//Simple example for the blocking performance
import Thread.sleep
def paraSle(hi : Int) = (0 until hi) map (i=>exec(sleep(i))) foreach (_.get)
def singSle(hi : Int) = (0 until hi) foreach (i=>sleep(i))
}
最后运行示例(可能需要执行几次,以便 HotSpot 可以预热):
import Examples._
cmp(paraAdd(10000), singAdd(10000))
cmp(paraSle(100), singSle(100))
I was wondering if there was a way to execute very simple tasks on another thread in scala that does not have a lot of overhead?
Basically I would like to make a global 'executor' that can handle executing an arbitrary number of tasks. I can then use the executor to build up additional constructs.
Additionally it would be nice if blocking or non-blocking considerations did not have to be considered by the clients.
I know that the scala actors library is built on top of the Doug Lea FJ stuff, and also that they support to a limited degree what I am trying to accomplish. However from my understanding I will have to pre-allocate an 'Actor Pool' to accomplish.
I would like to avoid making a global thread pool for this, as from what I understand it is not all that good at fine grained parallelism.
Here is a simple example:
import concurrent.SyncVar
object SimpleExecutor {
import actors.Actor._
def exec[A](task: => A) : SyncVar[A] = {
//what goes here?
//This is what I currently have
val x = new concurrent.SyncVar[A]
//The overhead of making the actor appears to be a killer
actor {
x.set(task)
}
x
}
//Not really sure what to stick here
def execBlocker[A](task: => A) : SyncVar[A] = exec(task)
}
and now an example of using exec:
object Examples {
//Benchmarks a task
def benchmark(blk : => Unit) = {
val start = System.nanoTime
blk
System.nanoTime - start
}
//Benchmarks and compares 2 tasks
def cmp(a: => Any, b: => Any) = {
val at = benchmark(a)
val bt = benchmark(b)
println(at + " " + bt + " " +at.toDouble / bt)
}
//Simple example for simple non blocking comparison
import SimpleExecutor._
def paraAdd(hi: Int) = (0 until hi) map (i=>exec(i+5)) foreach (_.get)
def singAdd(hi: Int) = (0 until hi) foreach (i=>i+5)
//Simple example for the blocking performance
import Thread.sleep
def paraSle(hi : Int) = (0 until hi) map (i=>exec(sleep(i))) foreach (_.get)
def singSle(hi : Int) = (0 until hi) foreach (i=>sleep(i))
}
Finally to run the examples (might want to do it a few times so HotSpot can warm up):
import Examples._
cmp(paraAdd(10000), singAdd(10000))
cmp(paraSle(100), singSle(100))
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
这就是
Futures
的诞生目的。只需import scala.actors.Futures._
,使用future
创建新的 future,使用awaitAll
等方法等待结果一段时间,apply
或respond
阻塞直到收到结果,isSet
查看是否准备好等等。您不需要创建线程池也可以。或者,至少,通常情况下你不会这样做。你认为你为什么这样做?
编辑
您无法通过并行化整数加法这样简单的操作来获得性能,因为这甚至比函数调用还要快。并发只能通过避免阻塞 I/O 造成的时间损失以及使用多个 CPU 核心并行执行任务来带来性能。在后一种情况下,任务的计算成本必须足以抵消划分工作负载和合并结果的成本。
采用并发的另一个原因是提高应用程序的响应能力。这并不是让它变得更快,而是让它对用户的响应更快,而实现这一点的一种方法是将相对快速的操作卸载到另一个线程,以便处理用户所看到或所做的事情的线程可以更快。但我离题了。
您的代码存在严重问题:
或者,转换为 futures,
您可能认为
paraAdd
正在并行执行任务,但事实并非如此,因为Range
有一个非-map
的严格实现(直到 Scala 2.7;从 Scala 2.8.0 开始,Range
是严格的)。你可以在其他 Scala 问题上查找它。发生的情况是这样的:0
到hi
创建一个范围。future(i +5)
调用时。i => future(i+5))
的每个元素,都会评估该元素(foreach
是严格的),然后函数apply
被调用。因此,由于
future
在步骤 2 中未调用,而仅在步骤 3 中调用,因此您将等待每个future
完成,然后再执行下一张。您可以使用以下方法修复它:这会给您带来更好的性能,但永远不如简单的立即添加那么好。另一方面,假设您这样做:
然后进行比较:
您可能会开始看到收益(这取决于内核数量和处理器速度)。
That's what
Futures
was made for. Justimport scala.actors.Futures._
, usefuture
to create new futures, methods likeawaitAll
to wait on the results for a while,apply
orrespond
to block until the result is received,isSet
to see if it's ready or not, etc.You don't need to create a thread pool either. Or, at least, not normally you don't. Why do you think you do?
EDIT
You can't gain performance parallelizing something as simple as an integer addition, because that's even faster than a function call. Concurrency will only bring performance by avoiding time lost to blocking i/o and by using multiple CPU cores to execute tasks in parallel. In the latter case, the task must be computationally expensive enough to offset the cost of dividing the workload and merging the results.
One other reason to go for concurrency is to improve the responsiveness of the application. That's not making it faster, that's making it respond faster to the user, and one way of doing that is getting even relatively fast operations offloaded to another thread so that the threads handling what the user sees or does can be faster. But I digress.
There's a serious problem with your code:
Or, translating into futures,
You might think
paraAdd
is doing the tasks in paralallel, but it isn't, becauseRange
has a non-strict implementation ofmap
(that's up to Scala 2.7; starting with Scala 2.8.0,Range
is strict). You can look it up on other Scala questions. What happens is this:0
untilhi
future(i+5)
when called.i => future(i+5))
, the element is evaluated (foreach
is strict) and then the functionapply
is called on it.So, because
future
is not called in step 2, but only in step 3, you'll wait for eachfuture
to complete before doing the next one. You can fix it with:Which will give you better performance, but never as good as a simple immediate addition. On the other hand, suppose you do this:
And then compare:
You may start seeing gains (it will depend on the number of cores and processor speed).