如何取消Akka演员?
我有一个 akka actor(worker),它接收请求并回复它。请求处理可能需要 3-60 分钟。来电者(也是演员)当前正在使用!并等待 future.get,但是如果需要,可以更改 Caller actor 的设计。另外,我目前正在使用 EventDriven 调度程序。
如何取消(用户发起的)请求处理,以便释放工作参与者并返回到就绪状态以接收新请求?我希望有一个类似于 java.util.concurrent.Future 的取消方法的方法,但在 Akka 1.1.3 中找不到
编辑:
我们尝试使用 获得我们正在寻找的行为CompleteWithException :
object Cancel {
def main(args: Array[String]) {
val actor = Actor.actorOf[CancelActor].start
EventHandler.info(this, "Getting future")
val future = (actor ? "request").onComplete(x => EventHandler.info(this, "Completed!! " + x.get))
Thread.sleep(500L)
EventHandler.info(this, "Cancelling")
future.completeWithException(new Exception("cancel"))
EventHandler.info(this, "Future is " + future.get)
}
}
class CancelActor extends Actor {
def receive = {
case "request" =>
EventHandler.info(this, "start")
(1 to 5).foreach(x => {
EventHandler.info(this, "I am a long running process")
Thread.sleep(200L)
})
self reply "response"
EventHandler.info(this, "stop")
}
}
但这并没有停止长时间运行的过程。
[INFO] [9/16/11 1:46 PM] [main] [Cancel$] Getting future
[INFO] [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] start
[INFO] [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] I am a long running process
[INFO] [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] I am a long running process
[INFO] [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] I am a long running process
[INFO] [9/16/11 1:46 PM] [main] [Cancel$] Cancelling
[ERROR] [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-7] [ActorCompletableFuture]
java.lang.Exception: cancel
at kozo.experimental.Cancel$.main(Cancel.scala:15)
...
[INFO] [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] I am a long running process
[INFO] [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] I am a long running process
[INFO] [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] stop
相反,考虑 java.util.concurrent.Future 的行为:
object Cancel2 {
def main(args: Array[String]) {
val executor: ExecutorService = Executors.newSingleThreadExecutor()
EventHandler.info(this, "Getting future")
val future = executor.submit(new Runnable {
def run() {
EventHandler.info(this, "start")
(1 to 5).foreach(x => {
EventHandler.info(this, "I am a long running process")
Thread.sleep(200L)
})
}
})
Thread.sleep(500L)
EventHandler.info(this, "Cancelling")
future.cancel(true)
EventHandler.info(this, "Future is " + future.get)
}
}
这确实会停止长时间运行的进程
[INFO] [9/16/11 1:48 PM] [main] [Cancel2$] Getting future
[INFO] [9/16/11 1:48 PM] [pool-1-thread-1] [anon$1] start
[INFO] [9/16/11 1:48 PM] [pool-1-thread-1] [anon$1] I am a long running process
[INFO] [9/16/11 1:48 PM] [pool-1-thread-1] [anon$1] I am a long running process
[INFO] [9/16/11 1:48 PM] [pool-1-thread-1] [anon$1] I am a long running process
Exception in thread "main" java.util.concurrent.CancellationException
...
[INFO] [9/16/11 1:48 PM] [main] [Cancel2$] Cancelling
I have an akka actor(worker) that receives a request and replies to it. The request processing can take 3-60 minutes. Caller(also an actor) is currently using !!! and waiting on future.get, however the design of Caller actor can be changed if required. Also, I'm currently using EventDriven dispatcher.
How do i Cancel(user initiated) the request processing so that the worker actor is freed up and returns to the ready state to receive new requests? I was hoping for a method similar to java.util.concurrent.Future's cancel method but couldn't find in Akka 1.1.3
Edit:
We tried to get the behavior we are looking for with completeWithException
:
object Cancel {
def main(args: Array[String]) {
val actor = Actor.actorOf[CancelActor].start
EventHandler.info(this, "Getting future")
val future = (actor ? "request").onComplete(x => EventHandler.info(this, "Completed!! " + x.get))
Thread.sleep(500L)
EventHandler.info(this, "Cancelling")
future.completeWithException(new Exception("cancel"))
EventHandler.info(this, "Future is " + future.get)
}
}
class CancelActor extends Actor {
def receive = {
case "request" =>
EventHandler.info(this, "start")
(1 to 5).foreach(x => {
EventHandler.info(this, "I am a long running process")
Thread.sleep(200L)
})
self reply "response"
EventHandler.info(this, "stop")
}
}
But that did not stop the long-running process.
[INFO] [9/16/11 1:46 PM] [main] [Cancel$] Getting future
[INFO] [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] start
[INFO] [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] I am a long running process
[INFO] [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] I am a long running process
[INFO] [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] I am a long running process
[INFO] [9/16/11 1:46 PM] [main] [Cancel$] Cancelling
[ERROR] [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-7] [ActorCompletableFuture]
java.lang.Exception: cancel
at kozo.experimental.Cancel$.main(Cancel.scala:15)
...
[INFO] [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] I am a long running process
[INFO] [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] I am a long running process
[INFO] [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] stop
In contrast consider the behavior of of a java.util.concurrent.Future:
object Cancel2 {
def main(args: Array[String]) {
val executor: ExecutorService = Executors.newSingleThreadExecutor()
EventHandler.info(this, "Getting future")
val future = executor.submit(new Runnable {
def run() {
EventHandler.info(this, "start")
(1 to 5).foreach(x => {
EventHandler.info(this, "I am a long running process")
Thread.sleep(200L)
})
}
})
Thread.sleep(500L)
EventHandler.info(this, "Cancelling")
future.cancel(true)
EventHandler.info(this, "Future is " + future.get)
}
}
Which does stop the long running process
[INFO] [9/16/11 1:48 PM] [main] [Cancel2$] Getting future
[INFO] [9/16/11 1:48 PM] [pool-1-thread-1] [anon$1] start
[INFO] [9/16/11 1:48 PM] [pool-1-thread-1] [anon$1] I am a long running process
[INFO] [9/16/11 1:48 PM] [pool-1-thread-1] [anon$1] I am a long running process
[INFO] [9/16/11 1:48 PM] [pool-1-thread-1] [anon$1] I am a long running process
Exception in thread "main" java.util.concurrent.CancellationException
...
[INFO] [9/16/11 1:48 PM] [main] [Cancel2$] Cancelling
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
您还可以检查 Actor 中 Future 的状态。
这要求消息发送时带有“?”或者“问”。
希望有帮助。
You could also check the status of the Future in the Actor.
This requires the message to be sent with '?' or 'ask' though.
Hope it helps.
如果您只是在虚拟机中,您可以将 AtomicBoolean 与您的 Job 消息一起传递,并间歇性地在您的 Actor 中检查它,看看是否应该中止。
If you're just in-VM you can just pass along an AtomicBoolean with your Job message and check that in your actor intermittently to see if you should abort.