对期货作出反应
我正在尝试使用分而治之(又名分叉/连接)方法来解决数字运算问题。这是代码:
import scala.actors.Futures.future
private def compute( input: Input ):Result = {
if( pairs.size < SIZE_LIMIT ) {
computeSequential()
} else {
val (input1,input2) = input.split
val f1 = future( compute(input1) )
val f2 = future( compute(input2) )
val result1 = f1()
val result2 = f2()
merge(result1,result2)
}
}
它运行(具有很好的加速),但是未来的 apply 方法似乎会阻塞线程并且线程池极大地增加。当创建太多线程时,计算就会被卡住。
是否有一种用于释放线程的 future 的 react 方法?或者任何其他方式来实现这种行为?
编辑:我正在使用 scala 2.8.0.final
I am trying to use a divide-and-conquer (aka fork/join) approach for a number crunching problem. Here is the code:
import scala.actors.Futures.future
private def compute( input: Input ):Result = {
if( pairs.size < SIZE_LIMIT ) {
computeSequential()
} else {
val (input1,input2) = input.split
val f1 = future( compute(input1) )
val f2 = future( compute(input2) )
val result1 = f1()
val result2 = f2()
merge(result1,result2)
}
}
It runs (with a nice speed-up) but the the future apply method seems to block a thread and the thread pool increases tremendously. And when too many threads are created, the computations is stucked.
Is there a kind of react method for futures which releases the thread ? Or any other way to achieve that behavior ?
EDIT: I am using scala 2.8.0.final
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
不要声明(应用)您的
Future
,因为这会迫使他们阻塞并等待答案;正如您所看到的,这可能会导致僵局。相反,使用它们来告诉他们完成后要做什么。而不是:试试这个:
结果将是一个包含合并结果的
Responder[Result]
(本质上是一个Future[Result]
);您可以使用respond()
或foreach()
对此最终值执行一些有效的操作,也可以使用map()
或flatMap ()
将其发送给另一个Responder[T]
。无需阻塞,只需为未来安排计算即可!编辑 1:
好的,
compute
函数的签名现在必须更改为Responder[Result]
,那么这对递归调用有何影响?让我们试试这个:现在您不再需要使用
future(...)
包装对compute
的调用,因为它们已经返回Responder
(Future
的超类)。编辑 2:
使用这种连续传递样式的一个好处是,您的顶级代码(无论最初调用
compute
的内容)都不再阻塞。如果它是从main()
调用的,而这就是程序所做的所有事情,这将是一个问题,因为现在它只会产生一堆 future,然后立即关闭,完成所有事情告诉去做。您需要做的是阻止
所有这些未来,但仅在顶层一次,并且仅针对所有计算的结果,而不是任何中间计算的结果。不幸的是,由
compute()
返回的Responder
不再像Future
那样具有阻塞apply()
方法代码> 做了。我不确定为什么 flatMappingFuture
s 会生成通用的Responder
而不是Future
;这似乎是一个 API 错误。但无论如何,您应该能够自己创建:所以现在您可以在
main
方法中创建一个阻塞调用来计算,如下所示:Don't claim (apply) your
Future
s, since this forces them to block and wait for an answer; as you've seen this can lead to deadlocks. Instead, use them monadically to tell them what to do when they complete. Instead of:Try this:
The result of this will be a
Responder[Result]
(essentially aFuture[Result]
) containing the merged results; you can do something effectful with this final value usingrespond()
orforeach()
, or you canmap()
orflatMap()
it to anotherResponder[T]
. No blocking necessary, just keep scheduling computations for the future!Edit 1:
Ok, the signature of the
compute
function is going to have to change toResponder[Result]
now, so how does that affect the recursive calls? Let's try this:Now you no longer need to wrap the calls to
compute
withfuture(...)
because they're already returningResponder
(a superclass ofFuture
).Edit 2:
One upshot of using this continuation-passing style is that your top-level code--whatever calls
compute
originally--doesn't block at all any more. If it's being called frommain()
, and that's all the program does, this will be a problem, because now it will just spawn a bunch of futures and then immediately shut down, having finished everything it was told to do. What you need to do isblock
on all these futures, but only once, at the top level, and only on the results of all the computations, not any intermediate ones.Unfortunately, this
Responder
thing that's being returned bycompute()
no longer has a blockingapply()
method like theFuture
did. I'm not sure why flatMappingFuture
s produces a genericResponder
instead of aFuture
; this seems like an API mistake. But in any case, you should be able to make your own:So now you can create a blocking call to compute in your
main
method like so: