当 8-10 个 Actor 同时运行时,某些 Scala Actor 会进入等待状态

发布于 2024-10-14 06:25:21 字数 1469 浏览 2 评论 0原文

在我的模型中大约有 8-9 个 Scala Actor。 每个 Actor 在 RabbitMQ Server 上都有自己的队列

,在每个 Actor 的 act 方法中,它不断地列到队列中 就像

def act {
    this ! 1
    loop {
      react {
        case 1 => processMessage(QManager.getMessage); this ! 1
      }
    }
  } 

我的rabbitMq QManager getMessage方法

def getMessage: MyObject = {
    getConnection
    val durable = true
    channel.exchangeDeclare(EXCHANGE, "direct", durable)
    channel.queueDeclare(QUEUE, durable, false, false, null)
    channel queueBind (QUEUE, EXCHANGE, _ROUTING_KEY)
    consumer = new QueueingConsumer(channel)
    channel basicConsume (QUEUE, false, consumer)

    var obj = new MyObject
    try {
      val delivery = consumer.nextDelivery
      val msg = new java.io.ObjectInputStream(
        new java.io.ByteArrayInputStream(delivery.getBody)).readObject()
      obj = msg.asInstanceOf[MyObject]
      channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false)
    } catch {
      case e: Exception =>logger.error("error in Get Message", e);endConnection
    }
    endConnection
    obj
  }

一样,所有9个Actors都有自己的对象类型和

GetMessage中自己的QManager,我正在使用Rabbitmq QueueConsumer

 val delivery = consumer.nextDelivery

,并且nextDelivery方法在在队列中找到时返回一个对象 当我启动所有 8 个演员时,此方法将演员置于等待状态,

其中只有 4 个演员工作正常,其他未说明。 我已经测试了每个相互依赖地运行的演员,它们在单独启动时工作正常。

当我启动超过 4 个演员时,就会出现问题,

scala 演员的线程是否存在任何问题。

In my model there are about 8-9 Scala Actors.
Each actor has its own queue on RabbitMQ Server

in act method of each Actor .It is continuously listing to the queue
like

def act {
    this ! 1
    loop {
      react {
        case 1 => processMessage(QManager.getMessage); this ! 1
      }
    }
  } 

I a rabbitMq QManager getMessage Method

def getMessage: MyObject = {
    getConnection
    val durable = true
    channel.exchangeDeclare(EXCHANGE, "direct", durable)
    channel.queueDeclare(QUEUE, durable, false, false, null)
    channel queueBind (QUEUE, EXCHANGE, _ROUTING_KEY)
    consumer = new QueueingConsumer(channel)
    channel basicConsume (QUEUE, false, consumer)

    var obj = new MyObject
    try {
      val delivery = consumer.nextDelivery
      val msg = new java.io.ObjectInputStream(
        new java.io.ByteArrayInputStream(delivery.getBody)).readObject()
      obj = msg.asInstanceOf[MyObject]
      channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false)
    } catch {
      case e: Exception =>logger.error("error in Get Message", e);endConnection
    }
    endConnection
    obj
  }

All 9 Actors has its own object type and a own QManager

in a GetMessage I am using Rabbitmq QueueConsumer

 val delivery = consumer.nextDelivery

and the nextDelivery method returns a object when itfounds in a queue
this method puts actor in waiting state

when i start all 8 actors only 4 of them works fine other are not stated.
I have test each and every actor running interdependently they works fine when started Alone

The problem occurs when i start more that 4 actors

is therer any Problem with threading of scala actors.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

一身仙ぐ女味 2024-10-21 06:25:21

免责声明:我是 Akka 的 PO

正如 Rex 所说,您正在忙于等待,占用共享线程池上的线程。

我不知道您是否可以选择测试 Akka,但我们支持 AMQP 消费者(和生产者)作为参与者: Akka-AMQP

生成 AMQP 消息:

    val exchangeParameters = ExchangeParameters("my_topic_exchange", Topic)
    val producer = AMQP.newProducer(connection, ProducerParameters(Some(exchangeParameters), producerId = Some("my_producer"))
producer ! Message("Some simple sting data".getBytes, "some.routing.key")

使用 AMQP 消息:

val exchangeParameters = ExchangeParameters("my_topic_exchange", Topic)
val myConsumer = AMQP.newConsumer(connection, ConsumerParameters("some.routing.key", actorOf(new Actor { def receive = {
  case Delivery(payload, _, _, _, _, _) => log.info("Received delivery: %s", new String(payload))
}}), None, Some(exchangeParameters)))

另一种选择是使用 Akka-Camel 使用 Actor 消费和生成 AMQP 消息

Disclaimer: I am the PO of Akka

As Rex says, you're busy-waiting, hogging threads, on a shared pool of threads.

I don't know if you have the option to test Akka, but we have support for AMQP consumers (and producers) as actors: Akka-AMQP

Producing AMQP messages:

    val exchangeParameters = ExchangeParameters("my_topic_exchange", Topic)
    val producer = AMQP.newProducer(connection, ProducerParameters(Some(exchangeParameters), producerId = Some("my_producer"))
producer ! Message("Some simple sting data".getBytes, "some.routing.key")

Consuming AMQP messages:

val exchangeParameters = ExchangeParameters("my_topic_exchange", Topic)
val myConsumer = AMQP.newConsumer(connection, ConsumerParameters("some.routing.key", actorOf(new Actor { def receive = {
  case Delivery(payload, _, _, _, _, _) => log.info("Received delivery: %s", new String(payload))
}}), None, Some(exchangeParameters)))

Another option is to use Akka-Camel to consume and produce AMQP messages with actors

时光清浅 2024-10-21 06:25:21

所有的演员都在不停地奔跑;他们从不休息。由于参与者在公共线程池中共享,这意味着幸运的获胜者参与者一直在运行,而不幸的失败者根本没有任何时间。如果您希望有一个实体始终占用整个线程,那么通常最好使用 java Thread,或者至少使用 receive 而不是 <代码>反应。您还可以增加演员池的大小以匹配演员的数量,但通常如果您有大量的演员并且所有演员都一直在运行,您应该更仔细地考虑如何构建您的程序。

All your actors are running all the time; they never take a break. Since actors are shared across a common pool of threads, this means that the lucky winner actors run all the time and the unlucky losers never get any time at all. If you want to have an entity that takes an entire thread for itself all the time, it's generally better to use a java Thread, or at least to use receive instead of react. You could also increase the size of the actor pool to match the number of actors, but generally if you have a very large number of actors all of which run all the time, you should think more carefully about how you're structuring your program.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文