对期货作出反应

发布于 2024-09-26 02:32:39 字数 609 浏览 5 评论 0原文

我正在尝试使用分而治之(又名分叉/连接)方法来解决数字运算问题。这是代码:

import scala.actors.Futures.future

private def compute( input: Input ):Result = {
  if( pairs.size < SIZE_LIMIT ) {
    computeSequential()
  } else {
    val (input1,input2) = input.split
    val f1 = future( compute(input1) )
    val f2 = future( compute(input2) )
    val result1 = f1()
    val result2 = f2()
    merge(result1,result2)
  }
}

它运行(具有很好的加速),但是未来的 apply 方法似乎会阻塞线程并且线程池极大地增加。当创建太多线程时,计算就会被卡住。

是否有一种用于释放线程的 future 的 react 方法?或者任何其他方式来实现这种行为?

编辑:我正在使用 scala 2.8.0.final

I am trying to use a divide-and-conquer (aka fork/join) approach for a number crunching problem. Here is the code:

import scala.actors.Futures.future

private def compute( input: Input ):Result = {
  if( pairs.size < SIZE_LIMIT ) {
    computeSequential()
  } else {
    val (input1,input2) = input.split
    val f1 = future( compute(input1) )
    val f2 = future( compute(input2) )
    val result1 = f1()
    val result2 = f2()
    merge(result1,result2)
  }
}

It runs (with a nice speed-up) but the the future apply method seems to block a thread and the thread pool increases tremendously. And when too many threads are created, the computations is stucked.

Is there a kind of react method for futures which releases the thread ? Or any other way to achieve that behavior ?

EDIT: I am using scala 2.8.0.final

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

喜你已久 2024-10-03 02:32:39

不要声明(应用)您的 Future,因为这会迫使他们阻塞并等待答案;正如您所看到的,这可能会导致僵局。相反,使用它们来告诉他们完成后要做什么。而不是:

val result1 = f1()
val result2 = f2()
merge(result1,result2)

试试这个:

for {
  result1 <- f1
  result2 <- f2
} yield merge(result1, result2)

结果将是一个包含合并结果的 Responder[Result] (本质上是一个 Future[Result]);您可以使用 respond()foreach() 对此最终值执行一些有效的操作,也可以使用 map()flatMap () 将其发送给另一个 Responder[T]。无需阻塞,只需为未来安排计算即可!

编辑 1:

好的,compute 函数的签名现在必须更改为 Responder[Result],那么这对递归调用有何影响?让我们试试这个:

private def compute( input: Input ):Responder[Result] = {
  if( pairs.size < SIZE_LIMIT ) {
    future(computeSequential())
  } else {
    val (input1,input2) = input.split
    for {
      result1 <- compute(input1)
      result2 <- compute(input2)
    } yield merge(result1, result2)
  }
}

现在您不再需要使用 future(...) 包装对 compute 的调用,因为它们已经返回 ResponderFuture 的超类)。

编辑 2:

使用这种连续传递样式的一个好处是,您的顶级代码(无论最初调用 compute 的内容)都不再阻塞。如果它是从 main() 调用的,而这就是程序所做的所有事情,这将是一个问题,因为现在它只会产生一堆 future,然后立即关闭,完成所有事情告诉去做。您需要做的是阻止所有这些未来,但仅在顶层一次,并且仅针对所有计算的结果,而不是任何中间计算的结果。

不幸的是,由 compute() 返回的 Responder 不再像 Future 那样具有阻塞 apply() 方法代码> 做了。我不确定为什么 flatMapping Futures 会生成通用的 Responder 而不是 Future;这似乎是一个 API 错误。但无论如何,您应该能够自己创建:

def claim[A](r:Responder[A]):A = {
  import java.util.concurrent.ArrayBlockingQueue
  import scala.actors.Actor.actor

  val q = new ArrayBlockingQueue[A](1)
  // uses of 'respond' need to be wrapped in an actor or future block
  actor { r.respond(a => q.put(a)) } 
  return q.take
}

所以现在您可以在 main 方法中创建一个阻塞调用来计算,如下所示:

val finalResult = claim(compute(input))

Don't claim (apply) your Futures, since this forces them to block and wait for an answer; as you've seen this can lead to deadlocks. Instead, use them monadically to tell them what to do when they complete. Instead of:

val result1 = f1()
val result2 = f2()
merge(result1,result2)

Try this:

for {
  result1 <- f1
  result2 <- f2
} yield merge(result1, result2)

The result of this will be a Responder[Result] (essentially a Future[Result]) containing the merged results; you can do something effectful with this final value using respond() or foreach(), or you can map() or flatMap() it to another Responder[T]. No blocking necessary, just keep scheduling computations for the future!

Edit 1:

Ok, the signature of the compute function is going to have to change to Responder[Result] now, so how does that affect the recursive calls? Let's try this:

private def compute( input: Input ):Responder[Result] = {
  if( pairs.size < SIZE_LIMIT ) {
    future(computeSequential())
  } else {
    val (input1,input2) = input.split
    for {
      result1 <- compute(input1)
      result2 <- compute(input2)
    } yield merge(result1, result2)
  }
}

Now you no longer need to wrap the calls to compute with future(...) because they're already returning Responder (a superclass of Future).

Edit 2:

One upshot of using this continuation-passing style is that your top-level code--whatever calls compute originally--doesn't block at all any more. If it's being called from main(), and that's all the program does, this will be a problem, because now it will just spawn a bunch of futures and then immediately shut down, having finished everything it was told to do. What you need to do is block on all these futures, but only once, at the top level, and only on the results of all the computations, not any intermediate ones.

Unfortunately, this Responder thing that's being returned by compute() no longer has a blocking apply() method like the Future did. I'm not sure why flatMapping Futures produces a generic Responder instead of a Future; this seems like an API mistake. But in any case, you should be able to make your own:

def claim[A](r:Responder[A]):A = {
  import java.util.concurrent.ArrayBlockingQueue
  import scala.actors.Actor.actor

  val q = new ArrayBlockingQueue[A](1)
  // uses of 'respond' need to be wrapped in an actor or future block
  actor { r.respond(a => q.put(a)) } 
  return q.take
}

So now you can create a blocking call to compute in your main method like so:

val finalResult = claim(compute(input))
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文