如何在 Scala 中执行多个任务?

发布于 2024-10-08 22:56:37 字数 131 浏览 1 评论 0原文

我有 50,000 个任务,想用 10 个线程执行它们。 在Java中,我应该创建Executers.threadPool(10)并将runnable传递给然后等待处理所有线程。据我了解,Scala 对于该任务特别有用,但我在文档中找不到解决方案。

I have 50,000 tasks and want to execute them with 10 threads.
In Java I should create Executers.threadPool(10) and pass runnable to is then wait to process all. Scala as I understand especially useful for that task, but I can't find solution in docs.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

戏剧牡丹亭 2024-10-15 22:56:37

Scala 2.9.3 及更高版本

最简单的方法是使用 scala.concurrent.Future 类和关联的基础设施。 scala.concurrent.future 方法异步计算传递给它的块,并立即返回表示异步计算的 Future[A]。 Futures 可以通过多种非阻塞方式进行操作,包括映射、flatMapping、过滤、恢复错误等。

例如,这里有一个创建 10 个任务的示例,其中每个任务休眠任意时间,然后返回平方传递给它的值。

import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

val tasks: Seq[Future[Int]] = for (i <- 1 to 10) yield future {
  println("Executing task " + i)
  Thread.sleep(i * 1000L)
  i * i
}

val aggregated: Future[Seq[Int]] = Future.sequence(tasks)

val squares: Seq[Int] = Await.result(aggregated, 15.seconds)
println("Squares: " + squares)

在此示例中,我们首先创建一系列单独的异步任务,这些任务完成后提供一个 int。然后,我们使用 Future.sequence 将这些异步任务合并为一个异步任务 - 交换 Future 和 Seq 的位置类型。最后,我们在等待结果时阻塞当前线程最多 15 秒。在示例中,我们使用全局执行上下文,它由 fork/join 线程池支持。对于重要的示例,您可能希望使用特定于应用程序的 ExecutionContext。

一般来说,应尽可能避免阻塞。 Future 类上还有其他组合器可以帮助以异步方式进行编程,包括 onSuccessonFailureonComplete< /代码>。

另外,请考虑研究 Akka 库,它为 Scala 和 Java 提供基于 actor 的并发性,并与 scala 进行互操作。并发。

Scala 2.9.2 及更早版本

最简单的方法是使用 Scala 的 Future 类,它是 Actors 框架的子组件。 scala.actors.Futures.future 方法为传递给它的块创建一个 Future。然后,您可以使用 scala.actors.Futures.awaitAll 等待所有任务完成。

例如,下面的示例创建了 10 个任务,其中每个任务休眠任意时间,然后返回传递给它的值的平方。

import scala.actors.Futures._

val tasks = for (i <- 1 to 10) yield future {
  println("Executing task " + i)
  Thread.sleep(i * 1000L)
  i * i
}

val squares = awaitAll(20000L, tasks: _*)
println("Squares: " + squares)

Scala 2.9.3 and later

THe simplest approach is to use the scala.concurrent.Future class and associated infrastructure. The scala.concurrent.future method asynchronously evaluates the block passed to it and immediately returns a Future[A] representing the asynchronous computation. Futures can be manipulated in a number of non-blocking ways, including mapping, flatMapping, filtering, recovering errors, etc.

For example, here's a sample that creates 10 tasks, where each tasks sleeps an arbitrary amount of time and then returns the square of the value passed to it.

import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

val tasks: Seq[Future[Int]] = for (i <- 1 to 10) yield future {
  println("Executing task " + i)
  Thread.sleep(i * 1000L)
  i * i
}

val aggregated: Future[Seq[Int]] = Future.sequence(tasks)

val squares: Seq[Int] = Await.result(aggregated, 15.seconds)
println("Squares: " + squares)

In this example, we first create a sequence of individual asynchronous tasks that, when complete, provide an int. We then use Future.sequence to combine those async tasks in to a single async task -- swapping the position of the Future and the Seq in the type. Finally, we block the current thread for up to 15 seconds while waiting for the result. In the example, we use the global execution context, which is backed by a fork/join thread pool. For non-trivial examples, you probably would want to use an application specific ExecutionContext.

Generally, blocking should be avoided when at all possible. There are other combinators available on the Future class that can help program in an asynchronous style, including onSuccess, onFailure, and onComplete.

Also, consider investigating the Akka library, which provides actor-based concurrency for Scala and Java, and interoperates with scala.concurrent.

Scala 2.9.2 and prior

This simplest approach is to use Scala's Future class, which is a sub-component of the Actors framework. The scala.actors.Futures.future method creates a Future for the block passed to it. You can then use scala.actors.Futures.awaitAll to wait for all tasks to complete.

For example, here's a sample that creates 10 tasks, where each tasks sleeps an arbitrary amount of time and then returns the square of the value passed to it.

import scala.actors.Futures._

val tasks = for (i <- 1 to 10) yield future {
  println("Executing task " + i)
  Thread.sleep(i * 1000L)
  i * i
}

val squares = awaitAll(20000L, tasks: _*)
println("Squares: " + squares)
浮光之海 2024-10-15 22:56:37

您想查看 Scala actor 库或 Akka。 Akka 具有更简洁的语法,但两者都可以解决问题。

因此,听起来您需要创建一个知道如何处理任务的参与者池。 Actor 基本上可以是具有接收方法的任何类 - 来自 Akka 教程 (http:// doc.akkasource.org/tutorial-chat-server-scala):

class MyActor extends Actor {
  def receive = {
    case "test" => println("received test")
    case _ =>      println("received unknown message")
 }}

val myActor = Actor.actorOf[MyActor]
myActor.start

您需要创建一个 Actor 实例池并将任务作为消息发送给它们。这是一篇关于 Akka actor 池的文章,可能会有所帮助:http://vasilrem.com/blog/software-development/flexible-load-balancing-with-akka-in-scala/

在您的情况下,每个任务一个参与者可能是合适的(参与者非常轻量级)与线程相比,您可以在单个虚拟机中拥有很多线程),或者您可能需要在它们之间进行一些更复杂的负载平衡。

编辑:
使用上面的示例 actor,向其发送消息就像这样简单:

myActor ! "test"

然后 actor 将“收到的测试”输出到标准输出。

消息可以是任何类型,当与 Scala 的模式匹配相结合时,您就拥有了构建灵活的并发应用程序的强大模式。

一般来说,Akka Actor 会在线程共享方面“做正确的事情”,并且对于 OP 的需求,我认为默认值就可以了。但如果需要,您可以将 actor 应该使用的调度程序设置为以下几种类型之一:

* Thread-based
* Event-based
* Work-stealing
* HawtDispatch-based event-driven

为 actor 设置调度程序很简单:

class MyActor extends Actor {
  self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("thread-pool-dispatch")
    .withNewThreadPoolWithBoundedBlockingQueue(100)
    .setCorePoolSize(10)
    .setMaxPoolSize(10)
    .setKeepAliveTimeInMillis(10000)
    .build
}

请参阅 http://doc.akkasource.org/dispatchers-scala

通过这种方式,您可以限制线程池大小,但同样,原始用例可能会满足 50K Akka使用默认调度程序的 Actor 实例可以很好地并行化。

这实际上只是 Akka 功能的冰山一角。它为 Scala 语言带来了 Erlang 提供的许多功能。 Actor 可以监视其他 Actor 并重新启动它们,从而创建自我修复的应用程序。 Akka 还提供软件事务内存和许多其他功能。它可以说是 Scala 的“杀手级应用程序”或“杀手级框架”。

You want to look at either the Scala actors library or Akka. Akka has cleaner syntax, but either will do the trick.

So it sounds like you need to create a pool of actors that know how to process your tasks. An Actor can basically be any class with a receive method - from the Akka tutorial (http://doc.akkasource.org/tutorial-chat-server-scala):

class MyActor extends Actor {
  def receive = {
    case "test" => println("received test")
    case _ =>      println("received unknown message")
 }}

val myActor = Actor.actorOf[MyActor]
myActor.start

You'll want to create a pool of actor instances and fire your tasks to them as messages. Here's a post on Akka actor pooling that may be helpful: http://vasilrem.com/blog/software-development/flexible-load-balancing-with-akka-in-scala/

In your case, one actor per task may be appropriate (actors are extremely lightweight compared to threads so you can have a LOT of them in a single VM), or you might need some more sophisticated load balancing between them.

EDIT:
Using the example actor above, sending it a message is as easy as this:

myActor ! "test"

The actor will then output "received test" to standard output.

Messages can be of any type, and when combined with Scala's pattern matching, you have a powerful pattern for building flexible concurrent applications.

In general Akka actors will "do the right thing" in terms of thread sharing, and for the OP's needs, I imagine the defaults are fine. But if you need to, you can set the dispatcher the actor should use to one of several types:

* Thread-based
* Event-based
* Work-stealing
* HawtDispatch-based event-driven

It's trivial to set a dispatcher for an actor:

class MyActor extends Actor {
  self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("thread-pool-dispatch")
    .withNewThreadPoolWithBoundedBlockingQueue(100)
    .setCorePoolSize(10)
    .setMaxPoolSize(10)
    .setKeepAliveTimeInMillis(10000)
    .build
}

See http://doc.akkasource.org/dispatchers-scala

In this way, you could limit the thread pool size, but again, the original use case could probably be satisfied with 50K Akka actor instances using default dispatchers and it would parallelize nicely.

This really only scratches the surface of what Akka can do. It brings a lot of what Erlang offers to the Scala language. Actors can monitor other actors and restart them, creating self-healing applications. Akka also provides Software Transactional Memory and many other features. It's arguably the "killer app" or "killer framework" for Scala.

忆依然 2024-10-15 22:56:37

如果你想“用10个线程执行它们”,那么就使用线程。 Scala 的 Actor 模型(通常是人们在说 Scala 适合并发时所谈论的)隐藏这些细节,这样您就看不到它们。

使用 actor 或 futures 以及你所拥有的一切都是简单的计算,你只需创建 50000 个并让它们运行。您可能会遇到问题,但它们的性质不同。

If you want to "execute them with 10 threads", then use threads. Scala's actor model, which is usually what people is speaking of when they say Scala is good for concurrency, hides such details so you won't see them.

Using actors, or futures with all you have are simple computations, you'd just create 50000 of them and let them run. You might be faced with issues, but they are of a different nature.

泪冰清 2024-10-15 22:56:37

这是另一个类似于 mpilquist 响应的答案,但没有弃用的 API,并通过自定义 ExecutionContext 包含线程设置:

import java.util.concurrent.Executors
import scala.concurrent.{ExecutionContext, Await, Future}
import scala.concurrent.duration._

val numJobs = 50000
var numThreads = 10

// customize the execution context to use the specified number of threads
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(numThreads))


// define the tasks
val tasks = for (i <- 1 to numJobs) yield Future {
  // do something more fancy here
  i
}

// aggregate and wait for final result
val aggregated = Future.sequence(tasks)
val oneToNSum = Await.result(aggregated, 15.seconds).sum

Here's another answer similar to mpilquist's response but without deprecated API and including the thread settings via a custom ExecutionContext:

import java.util.concurrent.Executors
import scala.concurrent.{ExecutionContext, Await, Future}
import scala.concurrent.duration._

val numJobs = 50000
var numThreads = 10

// customize the execution context to use the specified number of threads
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(numThreads))


// define the tasks
val tasks = for (i <- 1 to numJobs) yield Future {
  // do something more fancy here
  i
}

// aggregate and wait for final result
val aggregated = Future.sequence(tasks)
val oneToNSum = Await.result(aggregated, 15.seconds).sum
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文