使用 akka future 和 actor 并行化列表
我想向参与者发送消息列表,在未来立即收到回复,然后等待所有未来完成,然后再返回调用方法。通过阅读 akka 文档,我相信 Future.sequence 是可行的方法,但我无法让以下代码正常工作。我从编译器中收到此错误:
found : akka.dispatch.ActorCompletableFuture
required: akka.dispatch.Future[Integer]
Error occurred in an application involving default arguments.
futures += secondary ? GetRandom
^
我确信我遗漏了一些明显的内容,但根据示例和 API 文档,下面的代码似乎是“正确的”。
import java.util.Random
import akka.dispatch.Future
import akka.actor._
import Commands._
import collection.mutable.ListBuffer
object Commands {
trait Command
case class GetRandom() extends Command
case class GenRandomList() extends Command
}
class Secondary() extends Actor {
val randomGenerator = new Random()
override def receive = {
case GetRandom() =>
self reply randomGenerator.nextInt(100)
}
}
class Primary() extends Actor {
private val secondary = Actor.actorOf[Secondary]
override def receive = {
case GenRandomList() =>
val futures = new ListBuffer[Future[Integer]]
for (i <- 0 until 10) {
futures += secondary ? GetRandom
}
val futureWithList = Future.sequence(futures)
futureWithList.map { foo =>
println("Shouldn't foo be an integer now? " + foo)
}.get
}
override def preStart() = {
secondary.start()
}
}
class Starter extends App {
println("Starting")
var master = Actor.actorOf(new Primary()).start()
master ! GenRandomList()
}
向参与者发送一系列消息、接收未来并在所有未来完成后返回(可以选择将每个未来的结果存储在列表中并返回)的正确方法是什么?
I want to send a list of messages to an actor, receive a reply immediately in a future and then wait for all futures to complete before returning to the calling method. From reading the akka docs, I believe Future.sequence is the way to go but I have not been able to get the following code to work correctly. I get this error from the compiler:
found : akka.dispatch.ActorCompletableFuture
required: akka.dispatch.Future[Integer]
Error occurred in an application involving default arguments.
futures += secondary ? GetRandom
^
I'm sure I am missing something obvious but the code below seems to be "correct" per the examples and API docs.
import java.util.Random
import akka.dispatch.Future
import akka.actor._
import Commands._
import collection.mutable.ListBuffer
object Commands {
trait Command
case class GetRandom() extends Command
case class GenRandomList() extends Command
}
class Secondary() extends Actor {
val randomGenerator = new Random()
override def receive = {
case GetRandom() =>
self reply randomGenerator.nextInt(100)
}
}
class Primary() extends Actor {
private val secondary = Actor.actorOf[Secondary]
override def receive = {
case GenRandomList() =>
val futures = new ListBuffer[Future[Integer]]
for (i <- 0 until 10) {
futures += secondary ? GetRandom
}
val futureWithList = Future.sequence(futures)
futureWithList.map { foo =>
println("Shouldn't foo be an integer now? " + foo)
}.get
}
override def preStart() = {
secondary.start()
}
}
class Starter extends App {
println("Starting")
var master = Actor.actorOf(new Primary()).start()
master ! GenRandomList()
}
What is the correct way to send a series of messages to an actor, receive a future and return once all the futures have completed (optionally storing the results from each future in a List and returning it).
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
Akka
?
返回一个Future[Any]
但您需要一个Future[Int]
。因此,您可以定义一个接受所有类型 future 的列表:
或者在结果可用时将其转换为
Int
:顺便说一句,要使其工作,您需要更改
GetRandom
定义:并将其与以下内容匹配:
Akka
?
returns aFuture[Any]
but you need aFuture[Int]
.Thus you can either define a list which accept all kind of futures:
or cast the result as an
Int
as soon as it is available:BTW, to make it work, you need to change
GetRandom
definition:and match it with: