在有其他长时间运行的演员在场的情况下,如何防止演员饥饿?

发布于 2024-10-01 09:20:55 字数 1908 浏览 3 评论 0原文

这是使用 Scala 2.8 Actors。我有一项可以并行运行的长期工作。它由大约 650,000 个工作单元组成。我将其分为 2600 个不同的独立子任务,并为每个子任务创建一个新的 actor:

actor {
  val range = (0L to total by limit)
  val latch = new CountDownLatch(range.length)
  range.foreach { offset =>
    actor {
      doExpensiveStuff(offset,limit)
      latch.countDown
    }
  }
  latch.await
}

这效果相当好,但总体需要 2+h 才能完成。问题是,与此同时,我创建的用于执行正常任务的任何其他 actor 似乎都被最初的 2600 个 actor 饿死了,这些 actor 也在耐心等待在线程上运行的时间,但比任何新的 actor 等待的时间都长。来吧。

我怎样才能避免这种饥饿呢?

最初的想法:

  • 使用一个演员来依次完成一大堆工作,而不是 2600 名演员。我不喜欢这个,因为我希望通过拆分来尽快完成这项工作。
  • 使用两个演员,而不是 2600 个演员,每个演员处理总工作集的不同一半。这可能会更好,但如果我的机器有 8 个核心怎么办?我可能想利用更多。

更新

有些人根本质疑 Actor 的使用,特别是因为消息传递功能没有在工作人员中使用。我曾假设 Actor 是围绕 ThreadPool 的一个非常轻量级的抽象,其性能水平与简单地手动编码基于 ThreadPool 的执行相同或接近。所以我写了一个小基准:

import testing._
import java.util.concurrent._
import actors.Futures._

val count = 100000
val poolSize = 4
val numRuns = 100

val ActorTest = new Benchmark {
  def run = {
    (1 to count).map(i => future {
      i * i
    }).foreach(_())
  }
}

val ThreadPoolTest = new Benchmark {
  def run = {
    val queue = new LinkedBlockingQueue[Runnable]
    val pool = new ThreadPoolExecutor(
          poolSize, poolSize, 1, TimeUnit.SECONDS, queue)
    val latch = new CountDownLatch(count)
    (1 to count).map(i => pool.execute(new Runnable {
      override def run = {
        i * i
        latch.countDown
      }
    }))
    latch.await
  }
}

List(ActorTest,ThreadPoolTest).map { b =>
  b.runBenchmark(numRuns).sum.toDouble / numRuns
}

// List[Double] = List(545.45, 44.35)

我在 ActorTest 中使用了 Future 抽象,以避免将消息传递回另一个 Actor 以表示工作已完成。我惊讶地发现我的 Actor 代码慢了 10 倍以上。请注意,我还使用初始池大小创建了 ThreadPoolExecutor,并使用该初始池大小创建了默认 Actor 池。

回顾过去,我似乎可能过度使用了 Actor 抽象。我将考虑使用单独的线程池来处理这些不同的、昂贵的、长时间运行的任务。

This is using Scala 2.8 Actors. I have a long running job which can be parallelized. It consists of about 650,000 units of work. I divide it into 2600 different separate subtasks, and for each of these I create a new actor:

actor {
  val range = (0L to total by limit)
  val latch = new CountDownLatch(range.length)
  range.foreach { offset =>
    actor {
      doExpensiveStuff(offset,limit)
      latch.countDown
    }
  }
  latch.await
}

This works fairly well, but overall takes 2+h to complete. The issue is that in the meanwhile, any other actors I create to do normal tasks seem to be starved out by the initial 2600 actors which are also patiently awaiting their time to be run on a thread but have been waiting longer than any new actors that come along.

How might I go about avoiding this starvation?

Initial thoughts:

  • Instead of 2600 actors, use one actor that sequentially plows through the large pile of work. I'm not fond of this because I'd like this job to finish sooner by splitting it up.
  • Instead of 2600 actors, use two actors, each processing a different half of the total work set. This might work better, but what if my machine has 8 cores? I'd likely want to utilize more than that.

UPDATE

Some folks have questioned the use of Actors at all, especially since the message passing capability was not being used within the workers. I had assumed that the Actor was a very lightweight abstraction around a ThreadPool at or near the same performance level of simply coding the ThreadPool-based execution manually. So I wrote a little benchmark:

import testing._
import java.util.concurrent._
import actors.Futures._

val count = 100000
val poolSize = 4
val numRuns = 100

val ActorTest = new Benchmark {
  def run = {
    (1 to count).map(i => future {
      i * i
    }).foreach(_())
  }
}

val ThreadPoolTest = new Benchmark {
  def run = {
    val queue = new LinkedBlockingQueue[Runnable]
    val pool = new ThreadPoolExecutor(
          poolSize, poolSize, 1, TimeUnit.SECONDS, queue)
    val latch = new CountDownLatch(count)
    (1 to count).map(i => pool.execute(new Runnable {
      override def run = {
        i * i
        latch.countDown
      }
    }))
    latch.await
  }
}

List(ActorTest,ThreadPoolTest).map { b =>
  b.runBenchmark(numRuns).sum.toDouble / numRuns
}

// List[Double] = List(545.45, 44.35)

I used the Future abstraction in the ActorTest to avoid passing a message back to another actor to signal work was done. I was surprised to find that my Actor code was over 10 times slower. Note that I also created my ThreadPoolExecutor with an initial pool size with which the default Actor pool is created.

Looking back, it seems like I've possibly overused the Actor abstraction. I'm going to look into using separate ThreadPools for these distinct, expensive, long-running tasks.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

盗琴音 2024-10-08 09:20:55

无论您有多少参与者,如果您没有显式配置调度,那么所有参与者都由一个单个 fork/join 调度程序支持(针对容量为 4 的线程池运行,如果我'我没有记错)。这就是饥饿的来源。

  1. 您应该为您的 Actor 池尝试不同的调度程序,以找到表现出最佳性能的调度程序(如果您想使用尽可能多的线程来最大化并行性,请尝试 ResizingThreadPoolScheduler)
  2. 您需要为庞大的 Actor 池拥有一个单独的调度程序(系统中的其他参与者不应该使用它)
  3. 正如@DaGGeRRz所建议的,你可以尝试提供可配置调度程序的Akka框架(例如,工作窃取负载平衡调度程序将事件从繁忙参与者的邮箱移动到空闲参与者

)默认注释 Actor 实现:

运行时系统可配置
使用更大的线程池大小(例如
例如,通过设置
actors.corePoolSize JVM 属性)。
Actorscheduler 方法
可以覆盖特征以返回
ResizableThreadPoolScheduler,其中
调整其线程池的大小以避免
演员造成的饥饿
调用任意阻塞方法。这
actors.enableForkJoin JVM 属性
可以设置为 false,在这种情况下
使用ResizableThreadPoolScheduler
默认情况下执行参与者。

另外:scala-lang 上有一个有趣的关于调度程序的线程

No matter how many actors you have, if you're not configuring your scheduling explicitly, all of them are backed with a single fork/join scheduler (running against a thread pool with capacity 4, if I'm not mistaken). That's where starvation comes from.

  1. You should try different schedulers for your pool of actors, to find the one that shows the best performance (try ResizableThreadPoolScheduler, if you want to maximize parallelism using as much threads as possible)
  2. You need to have a separate scheduler for the huge pool of actors (other actors in your system shouln't use it)
  3. As it was suggested by @DaGGeRRz you may try Akka framework that offers configurable dispatchers (e.g., work stealing load balancing dispatcher moves events from mailboxes of the busy actors to the idle actors)

From the comments to default Actor implementation:

The run-time system can be configured
to use a larger thread pool size (for
example, by setting the
actors.corePoolSize JVM property).
The scheduler method of the Actor
trait can be overridden to return a
ResizableThreadPoolScheduler, which
resizes its thread pool to avoid
starvation caused by actors that
invoke arbitrary blocking methods. The
actors.enableForkJoin JVM property
can be set to false, in which case a
ResizableThreadPoolScheduler is used
by default to execute actors.

In addition: an interesting thread on schedulers at scala-lang.

尽揽少女心 2024-10-08 09:20:55

从您的示例来看,您实际上根本不需要使用参与者,因为您没有将消息传递给您的工作单位,或者回复,甚至循环。

为什么不直接创建一批 Future 然后等待它们完成呢?这样,底层的 Fork Join Pool 就可以完全自由地为您的系统决定适当的并行级别(即线程数):

import actors.Futures._
def mkFuture(i : Int) = future {
  doExpensiveStuff(i, limit)
}
val fs = (1 to range by limit).map(mkFuture)
awaitAll(timeout, fs) //wait on the work all finishing

请注意,您只能通过处理更多任务来从并行性中受益。如果昂贵的工作不受 CPU 限制(可能是 IO 限制),那么您的系统同时拥有多个核心。

It seems from your example that you don't actually need to use actors at all, as you are not passing messages to your work units, or replying, or even looping.

Why not just create a load of Futures and then wait on them finishing? That way, the underlying Fork Join Pool is completely free to decide on the appropriate level of parallelism (i.e. # of threads) for your system:

import actors.Futures._
def mkFuture(i : Int) = future {
  doExpensiveStuff(i, limit)
}
val fs = (1 to range by limit).map(mkFuture)
awaitAll(timeout, fs) //wait on the work all finishing

Note that you are only going to benefit from parallelism by processing more tasks concurrently than your system has cores if the expensive work is not CPU-bound (maybe it's IO bound).

可可 2024-10-08 09:20:55

我没有使用具有该语法的参与者,但默认情况下我认为 scala 中的所有参与者都使用线程池。

请参阅如何为 actor 指定线程池

I have not used actors with that syntax, but by default I think all actors in scala use a thread pool.

See How to designate a thread pool for actors

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文