Akka - 负载平衡和处理器的充分利用
我编写了一个矩阵乘法算法,它使用并行集合来加速乘法。
事情是这样的:
(0 until M1_ROWS).grouped(PARTITION_ROWS).toList.par.map( i =>
singleThreadedMultiplicationFAST(i.toArray.map(m1(_)), m2)
).reduce(_++_)
现在我想在 Akka 中做同样的事情,所以我所做的是:
val multiplyer = actorOf[Pool]
multiplyer start
val futures = (0 until M1_ROWS).grouped(PARTITION_ROWS).map( i =>
multiplyer ? MultiplyMatrix(i.toArray.map(m1(_)), m2)
)
futures.map(_.get match { case res :Array[Array[Double]] => res }).reduce(_++_)
class Multiplyer extends akka.actor.Actor{
protected def receive = {
case MultiplyMatrix(m1, m2) => self reply singleThreadedMultiplicationFAST (m1,m2)
}
}
class Pool extends Actor with DefaultActorPool
with FixedCapacityStrategy with RoundRobinSelector {
def receive = _route
def partialFill = false
def selectionCount = 1
def instance = actorOf[Multiplyer]
def limit = 32 // I tried 256 with no effect either
}
事实证明,该算法的基于 actor 的版本仅使用 在我的 i7 sandybridge 上 200%,而并行集合版本是 使用 600% 的处理器,速度提高 4-5 倍。 我认为这可能是调度员并尝试了这个:
self.dispatcher = Dispatchers.newThreadBasedDispatcher(self, mailboxCapacity = 100)
和这个(我在演员之间共享了这个):
val messageDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("d1")
.withNewBoundedThrea dPoolWithLinkedBlockingQueueWithUnboundedCapacity(100)
.setCorePoolSize(16)
.setMaxPoolSize(128)
.setKeepAliveTimeInMillis(60000).build
但我没有观察到任何变化。仍然只有 200% 处理器使用率并且 该算法比并行集合慢 4-5 倍 版本。
我确信我在做一些愚蠢的事情所以请帮忙!!!:)
I wrote a matrix multiplication algorithm, which uses parallel collections, to speed up the multiplication.
It goes like that:
(0 until M1_ROWS).grouped(PARTITION_ROWS).toList.par.map( i =>
singleThreadedMultiplicationFAST(i.toArray.map(m1(_)), m2)
).reduce(_++_)
Now I would like to do the same in Akka, so what I did is:
val multiplyer = actorOf[Pool]
multiplyer start
val futures = (0 until M1_ROWS).grouped(PARTITION_ROWS).map( i =>
multiplyer ? MultiplyMatrix(i.toArray.map(m1(_)), m2)
)
futures.map(_.get match { case res :Array[Array[Double]] => res }).reduce(_++_)
class Multiplyer extends akka.actor.Actor{
protected def receive = {
case MultiplyMatrix(m1, m2) => self reply singleThreadedMultiplicationFAST (m1,m2)
}
}
class Pool extends Actor with DefaultActorPool
with FixedCapacityStrategy with RoundRobinSelector {
def receive = _route
def partialFill = false
def selectionCount = 1
def instance = actorOf[Multiplyer]
def limit = 32 // I tried 256 with no effect either
}
It turned out that actor based version of this algorithm is using only
200% on my i7 sandybridge, while the parallel collections version is
using 600% of processor and is 4-5x faster.
I thought it might be the dispatcher and tried this:
self.dispatcher = Dispatchers.newThreadBasedDispatcher(self, mailboxCapacity = 100)
and this(I shared this one between actors):
val messageDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("d1")
.withNewBoundedThrea dPoolWithLinkedBlockingQueueWithUnboundedCapacity(100)
.setCorePoolSize(16)
.setMaxPoolSize(128)
.setKeepAliveTimeInMillis(60000).build
But I didn't observe any changes. Still 200% processor usage only and
the algorithm is 4-5 times slower than the parallel collections
version.
I am sure I am doing something silly so please help!!!:)
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
这个表达式:
创建一个惰性集合,所以你的 _.get 使你的整个程序串行。
因此,解决方案是通过添加 toList 或类似内容来使该表达式严格。
This expression:
creates a lazy collection, so your _.get makes your entire program serial.
So the solution is to make that expression strict by adding toList or similar.