使用 Scala 的 Actor 时如何避免竞争条件

发布于 2024-11-02 20:24:01 字数 3176 浏览 5 评论 0原文

我正在编写一段代码,当缓冲区(列表)增长到一定大小时,该代码将填充 mongoDB 集合。

import scala.actors.Actor
import com.mongodb.casbah.Imports._
import scala.collection.mutable.ListBuffer

class PopulateDB extends Actor {
  val buffer = new ListBuffer[DBObject]
  val mongoConn = MongoConnection()
  val mongoCol = mongoConn("casbah_test")("logs")

  def add(info: DBObject = null) {
    if (info != null) buffer += info

    if (buffer.size > 0 && (info == null || buffer.length >= 1000)) {
      mongoCol.insert(buffer.toList)
      buffer.clear
      println("adding a batch")
    }
  }

  def act() {
    loop {
      react {
        case info: DBObject => add(info)

        case msg if msg == "closeConnection" =>
          println("Close connection")
          add()
          mongoConn.close
      }
    }
  }
}

但是,当我运行以下代码时,scala 有时会在“mongoCol.insert(buffer.toList)”行上抛出“ConcurrentModificationException”。我很确定它与“mongoCol.insert”有关。我想知道代码是否有根本性的错误。或者我应该使用 Akka 中的“atomic {...}”之类的东西来避免这个问题。

这是完整的堆栈跟踪:

PopulateDB@7e859a68: caught java.util.ConcurrentModificationException
java.util.ConcurrentModificationException
    at java.util.LinkedHashMap$LinkedHashIterator.nextEntry(LinkedHashMap.java:373)
    at java.util.LinkedHashMap$EntryIterator.next(LinkedHashMap.java:392)
    at java.util.LinkedHashMap$EntryIterator.next(LinkedHashMap.java:391)
    at org.bson.BSONEncoder.putObject(BSONEncoder.java:113)
    at org.bson.BSONEncoder.putObject(BSONEncoder.java:67)
    at com.mongodb.DBApiLayer$MyCollection.insert(DBApiLayer.java:215)
    at com.mongodb.DBApiLayer$MyCollection.insert(DBApiLayer.java:180)
    at com.mongodb.DBCollection.insert(DBCollection.java:85)
    at com.mongodb.casbah.MongoCollectionBase$class.insert(MongoCollection.scala:561)
    at com.mongodb.casbah.MongoCollection.insert(MongoCollection.scala:864)
    at PopulateDB.add(PopulateDB.scala:14)
    at PopulateDB$$anonfun$act$1$$anonfun$apply$1.apply(PopulateDB.scala:26)
    at PopulateDB$$anonfun$act$1$$anonfun$apply$1.apply(PopulateDB.scala:25)
    at scala.actors.ReactorTask.run(ReactorTask.scala:34)
    at scala.actors.Reactor$class.resumeReceiver(Reactor.scala:129)
    at PopulateDB.scala$actors$ReplyReactor$$super$resumeReceiver(PopulateDB.scala:5)
    at scala.actors.ReplyReactor$class.resumeReceiver(ReplyReactor.scala:69)
    at PopulateDB.resumeReceiver(PopulateDB.scala:5)
    at scala.actors.Actor$class.searchMailbox(Actor.scala:478)
    at PopulateDB.searchMailbox(PopulateDB.scala:5)
    at scala.actors.Reactor$$anonfun$startSearch$1$$anonfun$apply$mcV$sp$1.apply(Reactor.scala:114)
    at scala.actors.Reactor$$anonfun$startSearch$1$$anonfun$apply$mcV$sp$1.apply(Reactor.scala:114)
    at scala.actors.ReactorTask.run(ReactorTask.scala:36)
    at scala.concurrent.forkjoin.ForkJoinPool$AdaptedRunnable.exec(ForkJoinPool.java:611)
    at scala.concurrent.forkjoin.ForkJoinTask.quietlyExec(ForkJoinTask.java:422)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.mainLoop(ForkJoinWorkerThread.java:340)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:325)

谢谢, 德里克

I am writing a piece of code that would populate a mongoDB collection when the buffer (list) grow to a certain size.

import scala.actors.Actor
import com.mongodb.casbah.Imports._
import scala.collection.mutable.ListBuffer

class PopulateDB extends Actor {
  val buffer = new ListBuffer[DBObject]
  val mongoConn = MongoConnection()
  val mongoCol = mongoConn("casbah_test")("logs")

  def add(info: DBObject = null) {
    if (info != null) buffer += info

    if (buffer.size > 0 && (info == null || buffer.length >= 1000)) {
      mongoCol.insert(buffer.toList)
      buffer.clear
      println("adding a batch")
    }
  }

  def act() {
    loop {
      react {
        case info: DBObject => add(info)

        case msg if msg == "closeConnection" =>
          println("Close connection")
          add()
          mongoConn.close
      }
    }
  }
}

However, when I run the following code, scala will occasionally throw a "ConcurrentModificationException" on the "mongoCol.insert(buffer.toList)" line. I am pretty sure it has something to do with "mongoCol.insert". I am wondering if there is anything fundamentally wrong with the code. Or should I use something like the "atomic {...}" from Akka to avoid the issue.

Here's the complete stack trace:

PopulateDB@7e859a68: caught java.util.ConcurrentModificationException
java.util.ConcurrentModificationException
    at java.util.LinkedHashMap$LinkedHashIterator.nextEntry(LinkedHashMap.java:373)
    at java.util.LinkedHashMap$EntryIterator.next(LinkedHashMap.java:392)
    at java.util.LinkedHashMap$EntryIterator.next(LinkedHashMap.java:391)
    at org.bson.BSONEncoder.putObject(BSONEncoder.java:113)
    at org.bson.BSONEncoder.putObject(BSONEncoder.java:67)
    at com.mongodb.DBApiLayer$MyCollection.insert(DBApiLayer.java:215)
    at com.mongodb.DBApiLayer$MyCollection.insert(DBApiLayer.java:180)
    at com.mongodb.DBCollection.insert(DBCollection.java:85)
    at com.mongodb.casbah.MongoCollectionBase$class.insert(MongoCollection.scala:561)
    at com.mongodb.casbah.MongoCollection.insert(MongoCollection.scala:864)
    at PopulateDB.add(PopulateDB.scala:14)
    at PopulateDB$anonfun$act$1$anonfun$apply$1.apply(PopulateDB.scala:26)
    at PopulateDB$anonfun$act$1$anonfun$apply$1.apply(PopulateDB.scala:25)
    at scala.actors.ReactorTask.run(ReactorTask.scala:34)
    at scala.actors.Reactor$class.resumeReceiver(Reactor.scala:129)
    at PopulateDB.scala$actors$ReplyReactor$super$resumeReceiver(PopulateDB.scala:5)
    at scala.actors.ReplyReactor$class.resumeReceiver(ReplyReactor.scala:69)
    at PopulateDB.resumeReceiver(PopulateDB.scala:5)
    at scala.actors.Actor$class.searchMailbox(Actor.scala:478)
    at PopulateDB.searchMailbox(PopulateDB.scala:5)
    at scala.actors.Reactor$anonfun$startSearch$1$anonfun$apply$mcV$sp$1.apply(Reactor.scala:114)
    at scala.actors.Reactor$anonfun$startSearch$1$anonfun$apply$mcV$sp$1.apply(Reactor.scala:114)
    at scala.actors.ReactorTask.run(ReactorTask.scala:36)
    at scala.concurrent.forkjoin.ForkJoinPool$AdaptedRunnable.exec(ForkJoinPool.java:611)
    at scala.concurrent.forkjoin.ForkJoinTask.quietlyExec(ForkJoinTask.java:422)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.mainLoop(ForkJoinWorkerThread.java:340)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:325)

Thanks,
Derek

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

对你而言 2024-11-09 20:24:01

DBObject 不是线程安全的;您正在将 DBObject 与您的参与者消息一起发送。稍后可能会再次修改,这将导致并发修改问题。

我建议首先尝试在 DBObject 进入 actor 时对其使用 clone() ,并将其放入缓冲区中。它只是一个浅拷贝,但至少应该足以在 LinkedHashMap 上引起并发修改问题,LinkedHashMap 支持 DBObject 上的键(凭借 LHM 保持有序)。

我会尝试:

  def act() {
    loop {
      react {
        case info: DBObject => add(info.clone())

        case msg if msg == "closeConnection" =>
          println("Close connection")
          add()
          mongoConn.close
      }
    }
  }

如果这不起作用,请在将 DBObject 发送到 Actor 后查看您正在修改的其他任何地方。

DBObject is not thread safe; you're sending a DBObject in with your actor message. It is likely being modified again later which is going to cause that concurrent modification problem.

I would suggest to start with, trying to use clone() on the DBObject as it comes into the actor, and put that into your buffer. It is only a shallow copy but should at least be enough to cause concurrent modification problems on the LinkedHashMap which backs the keys on DBObject ( which is kept ordered, by virtue of the LHM).

I'd try:

  def act() {
    loop {
      react {
        case info: DBObject => add(info.clone())

        case msg if msg == "closeConnection" =>
          println("Close connection")
          add()
          mongoConn.close
      }
    }
  }

If that doesn't work, look at anywhere else you are modifying the DBObject after it is sent to the Actor.

夜空下最亮的亮点 2024-11-09 20:24:01

为什么是下面的class

class PopulateDB extends Actor

您是否保留多个PupulateDB actor?我希望object PopulateDB extends Actor,这样单个演员就可以集中完成这项任务。

除此之外,问题似乎出在 casbah 或 mongodb 本身内部。

Why class below?

class PopulateDB extends Actor

Do you keep multiple PupulateDB actors? I'd expect object PopulateDB extends Actor, so that a single actor would concentrate this task.

Aside from that, the problem seems to be inside casbah or mongodb itself.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文