Java 执行器:当任务完成时,如何在不阻塞的情况下收到通知?
假设我有一个充满任务的队列,我需要将其提交给执行器服务。 我希望一次处理一个。 我能想到的最简单的方法是:
- 从队列中取出一个任务 将
- 其提交给执行程序
- 在返回的 Future 上调用 .get 并阻塞直到结果可用
- 从队列中取出另一个任务...
但是,我正在尝试避免完全阻塞。 如果我有 10,000 个这样的队列,它们需要一次处理一个任务,那么我将耗尽堆栈空间,因为它们中的大多数将保留阻塞的线程。
我想要的是提交任务并提供任务完成时调用的回调。 我将使用该回调通知作为发送下一个任务的标志。 (functionjava和jetlang显然使用了这种非阻塞算法,但我无法理解他们的代码)
我如何使用JDK的java.util.concurrent来做到这一点,而不需要编写自己的执行器服务?
(为我提供这些任务的队列本身可能会阻塞,但这是一个稍后要解决的问题)
Say I have a queue full of tasks which I need to submit to an executor service. I want them processed one at a time. The simplest way I can think of is to:
- Take a task from the queue
- Submit it to the executor
- Call .get on the returned Future and block until a result is available
- Take another task from the queue...
However, I am trying to avoid blocking completely. If I have 10,000 such queues, which need their tasks processed one at a time, I'll run out of stack space because most of them will be holding on to blocked threads.
What I would like is to submit a task and provide a call-back which is called when the task is complete. I'll use that call-back notification as a flag to send the next task. (functionaljava and jetlang apparently use such non-blocking algorithms, but I can't understand their code)
How can I do that using JDK's java.util.concurrent, short of writing my own executor service?
(the queue which feeds me these tasks may itself block, but that is an issue to be tackled later)
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(12)
定义一个回调接口来接收您想要在完成通知中传递的任何参数。 然后在任务结束时调用它。
您甚至可以为 Runnable 任务编写一个通用包装器,并将它们提交给 ExecutorService。 或者,请参阅下面的 Java 8 内置机制。
使用
CompletableFuture
,Java 8 包含了一种更复杂的方法来组合管道,其中流程可以异步且有条件地完成。 这是一个人为但完整的通知示例。Define a callback interface to receive whatever parameters you want to pass along in the completion notification. Then invoke it at the end of the task.
You could even write a general wrapper for Runnable tasks, and submit these to
ExecutorService
. Or, see below for a mechanism built into Java 8.With
CompletableFuture
, Java 8 included a more elaborate means to compose pipelines where processes can be completed asynchronously and conditionally. Here's a contrived but complete example of notification.在 Java 8 中,您可以使用 CompletableFuture 。 这是我在代码中的一个示例,我使用它从我的用户服务中获取用户,将它们映射到我的视图对象,然后更新我的视图或显示错误对话框(这是一个 GUI 应用程序):
它异步执行。 我使用两个私有方法:
mapUsersToUserViews
和updateView
。In Java 8 you can use CompletableFuture. Here's an example I had in my code where I'm using it to fetch users from my user service, map them to my view objects and then update my view or show an error dialog (this is a GUI application):
It executes asynchronously. I'm using two private methods:
mapUsersToUserViews
andupdateView
.使用 Guava 的可监听 future API 并添加回调。 比照。 来自网站:
Use Guava's listenable future API and add a callback. Cf. from the website :
您可以扩展
FutureTask
类,并重写done()
方法,然后将FutureTask
对象添加到ExecutorService
,因此当FutureTask
立即完成时,done()
方法将被调用。You could extend
FutureTask
class, and override thedone()
method, then add theFutureTask
object to theExecutorService
, so thedone()
method will invoke when theFutureTask
completed immediately.ThreadPoolExecutor
还具有您可以重写和使用的beforeExecute
和afterExecute
挂钩方法。 以下是 ThreadPoolExecutor 的 Javadocs。ThreadPoolExecutor
also hasbeforeExecute
andafterExecute
hook methods that you can override and make use of. Here is the description fromThreadPoolExecutor
's Javadocs.使用
CountDownLatch
。它来自
java.util.concurrent
,它正是等待多个线程完成执行然后再继续的方法。为了实现您正在寻找的回调效果,这确实需要一些额外的额外工作。 也就是说,您自己在一个单独的线程中处理此问题,该线程使用 CountDownLatch 并等待它,然后继续通知您需要通知的任何内容。 没有对回调或类似效果的本机支持。
编辑:现在我进一步理解了你的问题,我认为你太过分了,没有必要。 如果您采用常规
SingleThreadExecutor
,给它所有的任务,它会本地进行排队。Use a
CountDownLatch
.It's from
java.util.concurrent
and it's exactly the way to wait for several threads to complete execution before continuing.In order to achieve the callback effect you're looking after, that does require a little additional extra work. Namely, handling this by yourself in a separate thread which uses the
CountDownLatch
and does wait on it, then goes on about notifying whatever it is you need to notify. There is no native support for callback, or anything similar to that effect.EDIT: now that I further understand your question, I think you are reaching too far, unnecessarily. If you take a regular
SingleThreadExecutor
, give it all the tasks, and it will do the queueing natively.如果您想确保没有任务同时运行,请使用 SingleThreadedExecutor。 任务将按照提交的顺序进行处理。 您甚至不需要保留任务,只需将它们提交给执行人员即可。
If you want to make sure that no tasks will run at the same time then use a SingleThreadedExecutor. The tasks will be processed in the order the are submitted. You don't even need to hold the tasks, just submit them to the exec.
实现
Callback
机制的简单代码使用
ExecutorService
输出:要点:
newFixedThreadPool(5)
withnewFixedThreadPool(1)
如果你想在分析上一个任务的
callback
结果后处理下一个任务,只需取消下面一行的注释你可以替换
newFixedThreadPool()
与其中之一取决于您的用例。
如果你想异步处理回调方法
a. 将共享的 ExecutorService 或 ThreadPoolExecutor 传递给 Callable 任务
b. 将您的
Callable
方法转换为Callable/Runnable
任务c. 请将回调任务推送到 ExecutorService 或 ThreadPoolExecutor
Simple code to implement
Callback
mechanism usingExecutorService
output:
Key notes:
newFixedThreadPool(5)
withnewFixedThreadPool(1)
If you want to process next task after analysing the result from
callback
of previous task,just un-comment below lineYou can replace
newFixedThreadPool()
with one ofdepending on your use case.
If you want to handle callback method asynchronously
a. Pass a shared
ExecutorService or ThreadPoolExecutor
to Callable taskb. Convert your
Callable
method toCallable/Runnable
taskc. Push callback task to
ExecutorService or ThreadPoolExecutor
这是使用 Guava 的
ListenableFuture
对 Pache 答案的扩展。特别是,
Futures.transform()
返回ListenableFuture
,因此可用于链接异步调用。Futures.addCallback()
返回void
,因此不能用于链接,但适合处理异步完成时的成功/失败。注意:除了链接异步任务之外,
Futures.transform()
还允许您在单独的执行器上安排每个任务(本示例中未显示)。This is an extension to Pache's answer using Guava's
ListenableFuture
.In particular,
Futures.transform()
returnsListenableFuture
so can be used to chain async calls.Futures.addCallback()
returnsvoid
, so cannot be used for chaining, but is good for handling success/failure on an async completion.NOTE: In addition to chaining async tasks,
Futures.transform()
also allows you to schedule each task on a separate executor (Not shown in this example).只是为了补充 Matt 的答案(这很有帮助),这里有一个更充实的示例来展示回调的使用。
输出是:
Just to add to Matt's answer, which helped, here is a more fleshed-out example to show the use of a callback.
The output is:
您可以使用 Callable 的实现,其中
CallbackInterface 是非常基本的东西
,现在主类将如下所示
You may use a implementation of Callable such that
where CallbackInterface is something very basic like
and now the main class will look like this
您可以在 java 执行器中实现未来任务,该执行器在任务完成时返回回调。
Follwoing 是从任务返回随机整数值并在主线程上打印的
类输出是:
You can implement future tasks in java executors that returns the callback on the completion of the tasks.
Follwoing is the class that returns random integer values from a task and printed on main thread
The Output is: