Akka - 负载平衡和处理器的充分利用

发布于 2024-12-19 21:32:25 字数 1613 浏览 0 评论 0原文

我编写了一个矩阵乘法算法,它使用并行集合来加速乘法。

事情是这样的:

(0 until M1_ROWS).grouped(PARTITION_ROWS).toList.par.map( i => 
  singleThreadedMultiplicationFAST(i.toArray.map(m1(_)), m2) 
).reduce(_++_) 

现在我想在 Akka 中做同样的事情,所以我所做的是:

val multiplyer = actorOf[Pool] 
multiplyer start 
val futures = (0 until M1_ROWS).grouped(PARTITION_ROWS).map( i => 
  multiplyer ? MultiplyMatrix(i.toArray.map(m1(_)), m2) 
) 
futures.map(_.get match { case res :Array[Array[Double]] => res }).reduce(_++_) 

class Multiplyer extends akka.actor.Actor{ 
  protected def receive = { 
    case MultiplyMatrix(m1, m2) => self reply singleThreadedMultiplicationFAST (m1,m2) 
  } 
} 
class Pool extends Actor with DefaultActorPool 
  with FixedCapacityStrategy with RoundRobinSelector { 

  def receive = _route 
  def partialFill = false 
  def selectionCount = 1 
  def instance = actorOf[Multiplyer] 
  def limit = 32 // I tried 256 with no effect either 
} 

事实证明,该算法的基于 actor 的版本仅使用 在我的 i7 sandybridge 上 200%,而并行集合版本是 使用 600% 的处理器,速度提高 4-5 倍。 我认为这可能是调度员并尝试了这个:

self.dispatcher = Dispatchers.newThreadBasedDispatcher(self, mailboxCapacity = 100) 

和这个(我在演员之间共享了这个):

val messageDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("d1")
  .withNewBoundedThrea dPoolWithLinkedBlockingQueueWithUnboundedCapacity(100)
  .setCorePoolSize(16)
  .setMaxPoolSize(128)
  .setKeepAliveTimeInMillis(60000).build 

但我没有观察到任何变化。仍然只有 200% 处理器使用率并且 该算法比并行集合慢 4-5 倍 版本。

我确信我在做一些愚蠢的事情所以请帮忙!!!:)

I wrote a matrix multiplication algorithm, which uses parallel collections, to speed up the multiplication.

It goes like that:

(0 until M1_ROWS).grouped(PARTITION_ROWS).toList.par.map( i => 
  singleThreadedMultiplicationFAST(i.toArray.map(m1(_)), m2) 
).reduce(_++_) 

Now I would like to do the same in Akka, so what I did is:

val multiplyer = actorOf[Pool] 
multiplyer start 
val futures = (0 until M1_ROWS).grouped(PARTITION_ROWS).map( i => 
  multiplyer ? MultiplyMatrix(i.toArray.map(m1(_)), m2) 
) 
futures.map(_.get match { case res :Array[Array[Double]] => res }).reduce(_++_) 

class Multiplyer extends akka.actor.Actor{ 
  protected def receive = { 
    case MultiplyMatrix(m1, m2) => self reply singleThreadedMultiplicationFAST (m1,m2) 
  } 
} 
class Pool extends Actor with DefaultActorPool 
  with FixedCapacityStrategy with RoundRobinSelector { 

  def receive = _route 
  def partialFill = false 
  def selectionCount = 1 
  def instance = actorOf[Multiplyer] 
  def limit = 32 // I tried 256 with no effect either 
} 

It turned out that actor based version of this algorithm is using only
200% on my i7 sandybridge, while the parallel collections version is
using 600% of processor and is 4-5x faster.
I thought it might be the dispatcher and tried this:

self.dispatcher = Dispatchers.newThreadBasedDispatcher(self, mailboxCapacity = 100) 

and this(I shared this one between actors):

val messageDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("d1")
  .withNewBoundedThrea dPoolWithLinkedBlockingQueueWithUnboundedCapacity(100)
  .setCorePoolSize(16)
  .setMaxPoolSize(128)
  .setKeepAliveTimeInMillis(60000).build 

But I didn't observe any changes. Still 200% processor usage only and
the algorithm is 4-5 times slower than the parallel collections
version.

I am sure I am doing something silly so please help!!!:)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

看透却不说透 2024-12-26 21:32:25

这个表达式:

val futures = (0 until M1_ROWS).grouped(PARTITION_ROWS).map( i => 
  multiplyer ? MultiplyMatrix(i.toArray.map(m1(_)), m2) 
) 

创建一个惰性集合,所以你的 _.get 使你的整个程序串行。
因此,解决方案是通过添加 toList 或类似内容来使该表达式严格。

This expression:

val futures = (0 until M1_ROWS).grouped(PARTITION_ROWS).map( i => 
  multiplyer ? MultiplyMatrix(i.toArray.map(m1(_)), m2) 
) 

creates a lazy collection, so your _.get makes your entire program serial.
So the solution is to make that expression strict by adding toList or similar.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文