在退出之前等待完成子级中的所有工作的演员

发布于 2024-10-16 21:43:14 字数 1904 浏览 3 评论 0原文

无法弄清楚如何解决以下问题:我有一些演员(工人)在收到(我的意思是反应)任务时以某种方式执行任务。主要参与者(领班)控制此过程并可以接收停止工作的任务。在这种情况下,主要参与者必须停止创建新任务,并等待工作人员完成所有现有任务,然后主要参与者才应退出。

import actors.Actor
import actors.Actor._

class Foreman extends Actor{
  val workerA = new WorkerA
  val workerB = new WorkerB
  val workerC = new WorkerC
  self.link(workerA)
  self.link(workerB)
  self.link(workerC)
  def act{
    workerA.start
    workerB.start
    workerC.start

    // adding tasks to workers somehow
    //...
    loop{
      case ResultOfTask(res) => //...
      case Stop => //workers mustn't immediately stop but must finish their tasks and then exit
      case ProductionAccident => //...
    }


  }
}

case class Task(activity:String)
case class ResultOfTask(result:String)

trait Worker extends Actor{

  def act{
    loop{
      react{
        case Task(activity) => sender ! processTask(activity)
      }
    }
  }

  def processTask(activity:String):ResultOfTask
}

为了解决这个问题,我编写了以下代码:

def condition = workerA.getState!=State.Suspended  && workerB.getState!=State.Suspended && workerC.getState!=State.Suspended && mailboxSize == 0
case Stop => {
  if(condition) exit("sweet dreams") else continue
}

检查主要演员是否应该退出。另一种变体是在“Worker”特征中包含计算者,当工作人员收到消息时增加它,并在它响应时减少它。

trait Worker extends Actor {
  private var count = 0
  def act {
    loop{
      react{
        case Task(activity) => {
          count += 1
          sender ! processTask(activity)
          count -= 1
        }
      }
    }
  }
  def hasDoneAllTasks = count == 0

  def processTask(activity: String): ResultOfTask
}

“工头”中的“条件”功能将是

def condition = workerA.hasDoneAllTasks   && workerB.hasDoneAllTasks  && workerC.hasDoneAllTasks  && mailboxSize == 0

我希望有更好的解决方案,并且您会提出它们。

Can't figure out how to resolve following problem: I have a few actors (workers) that execute tasks in some way when they recieve (I mean react) them. A main actor (foreman) controls this process and can recieve task to stop work. In this case main actor must stop creating new tasks and wait when workers will finish all existing tasks and only then main actor should exit.

import actors.Actor
import actors.Actor._

class Foreman extends Actor{
  val workerA = new WorkerA
  val workerB = new WorkerB
  val workerC = new WorkerC
  self.link(workerA)
  self.link(workerB)
  self.link(workerC)
  def act{
    workerA.start
    workerB.start
    workerC.start

    // adding tasks to workers somehow
    //...
    loop{
      case ResultOfTask(res) => //...
      case Stop => //workers mustn't immediately stop but must finish their tasks and then exit
      case ProductionAccident => //...
    }


  }
}

case class Task(activity:String)
case class ResultOfTask(result:String)

trait Worker extends Actor{

  def act{
    loop{
      react{
        case Task(activity) => sender ! processTask(activity)
      }
    }
  }

  def processTask(activity:String):ResultOfTask
}

To solve this I wrote following code:

def condition = workerA.getState!=State.Suspended  && workerB.getState!=State.Suspended && workerC.getState!=State.Suspended && mailboxSize == 0
case Stop => {
  if(condition) exit("sweet dreams") else continue
}

to check if main actor should exit. Another variant to have counder in "Worker" trait, increment it when worker recieves message and decrement when it reponses.

trait Worker extends Actor {
  private var count = 0
  def act {
    loop{
      react{
        case Task(activity) => {
          count += 1
          sender ! processTask(activity)
          count -= 1
        }
      }
    }
  }
  def hasDoneAllTasks = count == 0

  def processTask(activity: String): ResultOfTask
}

And "condition" function in "Foreman" will be

def condition = workerA.hasDoneAllTasks   && workerB.hasDoneAllTasks  && workerC.hasDoneAllTasks  && mailboxSize == 0

I hope there are better solutions and you will propose them.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

贪恋 2024-10-23 21:43:14

如果工头总是期望从工人那里得到答案,那么解决方案很简单:工头维护一个计数器,每次发送一条消息时,它都会递增它,每次从工人那里收到一条消息时,它都会递减它。只要计数器为零,它就可以自行停止(假设没有其他人向工作人员发送消息)。

如果工头并不总是期待工人的答复,您可以通过发送无内容的消息

case object Done { }

并让工人在完成后回复该消息来实现这种情况。然后,参见上文。

如果工头不是唯一与工人交谈的人,或者你希望后台少一些闲聊,那么工头和工人就必须进行谈判。工头可以发送消息

case object RequestStop { }

,工人们会做一些优雅的事情,并在完成后回复 Done。当工头收到的 Done 消息与发送的 RequestStop 消息一样多时,它就可以自由退出。

If the foreman always expects an answer from the workers, the solution is easy: the foreman maintains a counter, and each time it sends a message it increments it and each time it receives one from a worker it decrements it. Any time the counter is zero it is free to stop itself (assuming that no-one else sends messages to the workers).

If the foreman does not always expect an answer from the workers, you can make this the case by having a no-content message

case object Done { }

and having the workers reply with that when they're finished. Then, see above.

If the foreman is not the only one talking to the workers, or you want there to be less chatter in the background, then the foreman and the workers will have to negotiate. The foreman can send

case object RequestStop { }

and the workers will do something graceful and reply with Done when they're done. When the foreman receives as many Done messages as it has sent RequestStops, it is free to exit.

挽袖吟 2024-10-23 21:43:14

为什么在向 Workers 发送作业时不包含对 Foreman actor 的引用?然后,当工人关闭时,他们可以向工头发送通知。每次 Foreman 收到工人关闭消息时,它都会记录该消息并查看所有工人是否已完成。如果是这样,它也会自行关闭。

Why not include a reference to the Foreman actor when sending jobs to the Workers? Then, when the workers shut down they can send a notification to the Foreman. Each time the Foreman receives a worker shut down message it logs it and sees if all the workers have completed. If so, it shuts itself down too.

好倦 2024-10-23 21:43:14

我的方法是在 Foreman 中完成所有计算。

您没有写领班如何创建任务,所以我假设它是为了响应消息

class Foreman extends Actor{

  var numTasks: Int = 0
  var shouldExit = false

  def act = loop {
   react {
     case t: Task =>
       if (!shouldExit)  {
         numTasks += 1
         selectWorker ! t
       } else {
         // send some kind of error status to sender
       }
     case ResultOfTask(rest) =>
       numTasks -= 1
       // ....
       if (numTasks == 0 && shouldExit) exit
     case Stop() => shoudExit = true
}

然后改进是优先考虑停止,以便即使队列中有任务消息也能处理它

  def act = loop {
   reactWithin(0) {
     case Stop() => shouldStop = true
     case TIMEOUT => react {
       case t: Task =>
         if (!shouldExit)  {
           numTasks += 1
           selectWorker ! t
         } else {
           // send some kind of error status to sender
         }
       case ResultOfTask(rest) =>
         numTasks -= 1
         // ....
         if (numTasks == 0 && shouldExit) exit
       case Stop() => shoudExit = true
  }
}

My approach is to do all calculations in the Foreman.

You didn't write how the foreman creates tasks, so I'll assume it is in response to a message

class Foreman extends Actor{

  var numTasks: Int = 0
  var shouldExit = false

  def act = loop {
   react {
     case t: Task =>
       if (!shouldExit)  {
         numTasks += 1
         selectWorker ! t
       } else {
         // send some kind of error status to sender
       }
     case ResultOfTask(rest) =>
       numTasks -= 1
       // ....
       if (numTasks == 0 && shouldExit) exit
     case Stop() => shoudExit = true
}

An improvement then is to prioritize Stop so that it is handled even if there are Task messages in the queue

  def act = loop {
   reactWithin(0) {
     case Stop() => shouldStop = true
     case TIMEOUT => react {
       case t: Task =>
         if (!shouldExit)  {
           numTasks += 1
           selectWorker ! t
         } else {
           // send some kind of error status to sender
         }
       case ResultOfTask(rest) =>
         numTasks -= 1
         // ....
         if (numTasks == 0 && shouldExit) exit
       case Stop() => shoudExit = true
  }
}
丑疤怪 2024-10-23 21:43:14

您可以利用的一件事是 trapExit、case Exit 和 exit、system。在你的主要参与者中,你可以将 trapExit 设置为 true:

// Foreman

def act() {
  trapExit = true
  link(workerA)
  link(workerB)
  ...
}

这意味着当工作进程终止时,你的领班参与者将收到退出消息,其中包括原因:

// Foreman

def act() {
  ....
  loop { react {
    case Exit (worker: Actor, reason: AnyRef) => {
      // decrement counter, or list of workers, and exit if empty
    }
    ...
   }}
}

你可以对原因参数进行模式匹配。例如,在您的工人类中,您可能会使用各种案例类来指示工头应该做什么:

// Worker:

exit(WorkComplete)
exit(Emergency)

等等,等等。当您的工作线程抛出异常时,它将终止并向链接的进程发送包含异常的退出消息。考虑到这种情况,你最终可能会得到这样的结果:

// Foreman

def act() {
  ....
  loop { react {
    case Exit (worker: Actor, reason: WorkComplete) => {
      // decrement counter, or list of workers, and exit if empty
    }

    case Exit (worker: Actor, reason: TasksExhausted) => {
      // log something, or make shut down worker if too many are exhausted
    }

    case Exit (worker: Actor, reason: Exception) => {
      // log the exception, then restart the actor
    }

    ...
   }}
}

从你最初的问题来看,你是否希望工人继续工作,即使他们已经完成,直到工头告诉他们应该在时间到了时退出。如果是这种情况,可以向工作人员发送一条消息,告诉他们“完成后退出”,并且您可以使用 trapExit 机制告诉他们已经完成。

希望这能激发出一个有趣的解决方案!

One thing you can take advantage of is the trapExit, case Exit and exit, system. In your main actor, you can set trapExit to true:

// Foreman

def act() {
  trapExit = true
  link(workerA)
  link(workerB)
  ...
}

This means that your foreman actor will get an Exit message when the worker process terminates, including a reason:

// Foreman

def act() {
  ....
  loop { react {
    case Exit (worker: Actor, reason: AnyRef) => {
      // decrement counter, or list of workers, and exit if empty
    }
    ...
   }}
}

You can pattern match on the reason parameter. For instance, in your worker class, you might exit using a variety of case classes indicating what the foreman should do:

// Worker:

exit(WorkComplete)
exit(Emergency)

etc, and so on. When your worker throws an exception, it'll terminate and send the linked process an Exit message containing the exception. Given this sort of thing, you can end up with something like:

// Foreman

def act() {
  ....
  loop { react {
    case Exit (worker: Actor, reason: WorkComplete) => {
      // decrement counter, or list of workers, and exit if empty
    }

    case Exit (worker: Actor, reason: TasksExhausted) => {
      // log something, or make shut down worker if too many are exhausted
    }

    case Exit (worker: Actor, reason: Exception) => {
      // log the exception, then restart the actor
    }

    ...
   }}
}

It's unclear from your initial question if you want the workers to keep on working even when they're done until the foreman tells them they should exit when it's time. If that's the case, sending the workers a messages telling them to "exit when finished" works, and you can tell they've finished by using the trapExit mechanism.

Hope this spurs an interesting solution!

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文