使用 akka future 和 actor 并行化列表

发布于 2024-12-09 09:13:36 字数 1689 浏览 1 评论 0原文

我想向参与者发送消息列表,在未来立即收到回复,然后等待所有未来完成,然后再返回调用方法。通过阅读 akka 文档,我相信 Future.sequence 是可行的方法,但我无法让以下代码正常工作。我从编译器中收到此错误:

  found   : akka.dispatch.ActorCompletableFuture
  required: akka.dispatch.Future[Integer]
Error occurred in an application involving default arguments.
            futures += secondary ? GetRandom
                                 ^

我确信我遗漏了一些明显的内容,但根据示例和 API 文档,下面的代码似乎是“正确的”。

import java.util.Random
import akka.dispatch.Future
import akka.actor._
import Commands._
import collection.mutable.ListBuffer

object Commands {
    trait Command

    case class GetRandom() extends Command
    case class GenRandomList() extends Command  
}

class Secondary() extends Actor {
    val randomGenerator = new Random()

    override def receive = {
        case GetRandom() =>
            self reply randomGenerator.nextInt(100)
    }
}

class Primary() extends Actor {
    private val secondary = Actor.actorOf[Secondary]

    override def receive = {

        case GenRandomList() =>

            val futures = new ListBuffer[Future[Integer]]

            for (i <- 0 until 10) {
                futures += secondary ? GetRandom
            }

            val futureWithList = Future.sequence(futures)

            futureWithList.map { foo =>
                println("Shouldn't foo be an integer now? " + foo)
            }.get
    }

    override def preStart() = {
        secondary.start()
    }
}

class Starter extends App {
    println("Starting")
    var master = Actor.actorOf(new Primary()).start()
    master ! GenRandomList()
}

向参与者发送一系列消息、接收未来并在所有未来完成后返回(可以选择将每个未来的结果存储在列表中并返回)的正确方法是什么?

I want to send a list of messages to an actor, receive a reply immediately in a future and then wait for all futures to complete before returning to the calling method. From reading the akka docs, I believe Future.sequence is the way to go but I have not been able to get the following code to work correctly. I get this error from the compiler:

  found   : akka.dispatch.ActorCompletableFuture
  required: akka.dispatch.Future[Integer]
Error occurred in an application involving default arguments.
            futures += secondary ? GetRandom
                                 ^

I'm sure I am missing something obvious but the code below seems to be "correct" per the examples and API docs.

import java.util.Random
import akka.dispatch.Future
import akka.actor._
import Commands._
import collection.mutable.ListBuffer

object Commands {
    trait Command

    case class GetRandom() extends Command
    case class GenRandomList() extends Command  
}

class Secondary() extends Actor {
    val randomGenerator = new Random()

    override def receive = {
        case GetRandom() =>
            self reply randomGenerator.nextInt(100)
    }
}

class Primary() extends Actor {
    private val secondary = Actor.actorOf[Secondary]

    override def receive = {

        case GenRandomList() =>

            val futures = new ListBuffer[Future[Integer]]

            for (i <- 0 until 10) {
                futures += secondary ? GetRandom
            }

            val futureWithList = Future.sequence(futures)

            futureWithList.map { foo =>
                println("Shouldn't foo be an integer now? " + foo)
            }.get
    }

    override def preStart() = {
        secondary.start()
    }
}

class Starter extends App {
    println("Starting")
    var master = Actor.actorOf(new Primary()).start()
    master ! GenRandomList()
}

What is the correct way to send a series of messages to an actor, receive a future and return once all the futures have completed (optionally storing the results from each future in a List and returning it).

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

〆凄凉。 2024-12-16 09:13:36
(secondary ? GetRandom).mapTo[Int]
(secondary ? GetRandom).mapTo[Int]
梦亿 2024-12-16 09:13:36

Akka ? 返回一个 Future[Any] 但您需要一个 Future[Int]

因此,您可以定义一个接受所有类型 future 的列表:

val futures = new ListBuffer[Future[Any]]

或者在结果可用时将其转换为 Int

for (i <- 0 until 10) {
  futures += (secondary ? GetRandom) map {
    _.asInstanceOf[Int]
  }
}

顺便说一句,要使其工作,您需要更改 GetRandom 定义:

case object GetRandom extends Command

并将其与以下内容匹配:

case GetRandom =>

Akka ? returns a Future[Any] but you need a Future[Int].

Thus you can either define a list which accept all kind of futures:

val futures = new ListBuffer[Future[Any]]

or cast the result as an Int as soon as it is available:

for (i <- 0 until 10) {
  futures += (secondary ? GetRandom) map {
    _.asInstanceOf[Int]
  }
}

BTW, to make it work, you need to change GetRandom definition:

case object GetRandom extends Command

and match it with:

case GetRandom =>
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文