多个 Scala actor 服务于一项任务

发布于 2024-10-26 07:21:04 字数 805 浏览 9 评论 0原文

我需要并行处理多个数据值(“SIMD”)。我可以使用 java.util.concurrent API (Executors.newFixedThreadPool()) 使用 Future 实例并行处理多个值:

import java.util.concurrent.{Executors, Callable}

class ExecutorsTest {
  private class Process(value: Int)
      extends Callable[Int] {
    def call(): Int = {
      // Do some time-consuming task
      value
    }
  }

  val executorService = {
    val threads = Runtime.getRuntime.availableProcessors
    Executors.newFixedThreadPool(threads)
  }

  val processes = for (process <- 1 to 1000) yield new Process(process)

  val futures = executorService.invokeAll(processes)

  // Wait for futures
}

如何我使用演员做同样的事情?我不认为我想将所有进程“馈送到”单个参与者,因为参与者随后将按顺序执行它们。

我是否需要创建多个“处理器”参与者,并使用“调度程序”参与者向每个“处理器”参与者发送相同数量的进程?

I need to process multiple data values in parallel ("SIMD"). I can use the java.util.concurrent APIs (Executors.newFixedThreadPool()) to process several values in parallels using Future instances:

import java.util.concurrent.{Executors, Callable}

class ExecutorsTest {
  private class Process(value: Int)
      extends Callable[Int] {
    def call(): Int = {
      // Do some time-consuming task
      value
    }
  }

  val executorService = {
    val threads = Runtime.getRuntime.availableProcessors
    Executors.newFixedThreadPool(threads)
  }

  val processes = for (process <- 1 to 1000) yield new Process(process)

  val futures = executorService.invokeAll(processes)

  // Wait for futures
}

How do I do the same thing using Actors? I do not believe that I want to "feed" all of the processes to a single actor because the actor will then execute them sequentially.

Do I need to create multiple "processor" actors with a "dispatcher" actor that sends an equal number of processes to each "processor" actor?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

厌倦 2024-11-02 07:21:04

如果您只想“即发即忘”处理,为什么不使用 Scala future?

import scala.actors.Futures._
def example = {
  val answers = (1 to 4).map(x => future {
    Thread.sleep(x*1000)
    println("Slept for "+x)
    x
  })
  val t0 = System.nanoTime
  awaitAll(1000000,answers: _*)  // Number is timeout in ms
  val t1 = System.nanoTime
  printf("%.3f seconds elapsed\n",(t1-t0)*1e-9)
  answers.map(_()).sum
}

scala> example
Slept for 1
Slept for 2
Slept for 3
Slept for 4
4.000 seconds elapsed
res1: Int = 10

基本上,您所做的就是将所需的代码放入 future { } 块中,它将立即返回一个 future;应用它来获取答案(它将阻塞直到完成),或者使用带有超时的 awaitAll 来等待每个人都完成。


更新:从 2.11 开始,可以使用 scala.concurrent.Future 来执行此操作。上述代码的翻译是:

import scala.concurrent._
import duration._
import ExecutionContext.Implicits.global

def example = {
  val answers = Future.sequence(
    (1 to 4).map(x => Future {
      Thread.sleep(x*1000)
      println("Slept for "+x)
      x
    })
  )
  val t0 = System.nanoTime
  val completed = Await.result(answers, Duration(1000, SECONDS))
  val t1 = System.nanoTime
  printf("%.3f seconds elapsed\n",(t1-t0)*1e-9)
  completed.sum
}

If you just want fire-and-forget processing, why not use Scala futures?

import scala.actors.Futures._
def example = {
  val answers = (1 to 4).map(x => future {
    Thread.sleep(x*1000)
    println("Slept for "+x)
    x
  })
  val t0 = System.nanoTime
  awaitAll(1000000,answers: _*)  // Number is timeout in ms
  val t1 = System.nanoTime
  printf("%.3f seconds elapsed\n",(t1-t0)*1e-9)
  answers.map(_()).sum
}

scala> example
Slept for 1
Slept for 2
Slept for 3
Slept for 4
4.000 seconds elapsed
res1: Int = 10

Basically, all you do is put the code you want inside a future { } block, and it will immediately return a future; apply it to get the answer (it will block until done), or use awaitAll with a timeout to wait until everyone is done.


Update: As of 2.11, the way to do this is with scala.concurrent.Future. A translation of the above code is:

import scala.concurrent._
import duration._
import ExecutionContext.Implicits.global

def example = {
  val answers = Future.sequence(
    (1 to 4).map(x => Future {
      Thread.sleep(x*1000)
      println("Slept for "+x)
      x
    })
  )
  val t0 = System.nanoTime
  val completed = Await.result(answers, Duration(1000, SECONDS))
  val t1 = System.nanoTime
  printf("%.3f seconds elapsed\n",(t1-t0)*1e-9)
  completed.sum
}
别念他 2024-11-02 07:21:04

如果您可以使用 Akka,请查看 ActorPool 支持: http://doc.akka.io/routing -scala

它允许您指定有关要并行运行的参与者数量的参数,然后将工作分派给这些参与者。

If you can use Akka, take a look at the ActorPool support: http://doc.akka.io/routing-scala

It lets you specify parameters about how many actors you want running in parallel and then dispatches work to those actors.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文