使用 Java 并发 API 建模动态数据流的技术
编辑:这基本上是一个“如何在 Java 中正确实现数据流引擎”的问题,我觉得这不能用一个答案来充分回答(这就像问“如何正确实现 ORM 层”并让某人写出Hibernate之类的细节),所以考虑这个问题“关闭”。
有没有一种优雅的方法在 Java 中对动态数据流进行建模?通过数据流,我的意思是有各种类型的任务,并且这些任务可以任意“连接”,这样当一个任务完成时,后续任务使用已完成的任务输出作为输入并行执行,或者当多个任务完成时,它们的后续任务将被并行执行。输出聚合在后续任务中(请参阅基于流程的编程)。通过动态,我的意思是任务完成时后继任务的类型和数量取决于该已完成任务的输出,因此例如,如果任务 A 具有特定输出,则它可能会生成任务 B,但如果有特定输出,则可能会生成任务 C不同的输出。另一种说法是每个任务(或一组任务)负责确定下一个任务是什么。
用于渲染网页的示例数据流:我的任务类型有:文件下载器、HTML/CSS 渲染器、HTML 解析器/DOM 构建器、图像渲染器、JavaScript 解析器、JavaScript 解释器。
- HTML 文件的文件下载器任务
- HTML 解析器/DOM 构建器任务
- 每个嵌入文件/链接的文件下载器任务
- 如果是图像,则图像渲染器
- 如果是外部 JavaScript,则为 JavaScript 解析器
- JavaScript 解释器
- 否则,只需存储在 HTML 解析器任务中的某个 var/field 中即可
- 每个嵌入脚本的 JavaScript 解析器
- JavaScript 解释器
- 等待上述任务完成,然后是 HTML/CSS 渲染器(显然不是最佳或完全正确,但这很简单)
- 每个嵌入文件/链接的文件下载器任务
- HTML 解析器/DOM 构建器任务
我并不是说解决方案需要某种综合框架(事实上,更接近的框架 ) JDK API,更好),我绝对不想要像 Spring Web Flow 或某些声明性标记或其他 DSL 这样的重量级东西。
更具体地说,我试图想出一种在 Java 中使用 Callables、Executors、ExecutorCompletionServices 以及可能的各种同步器类(如 Semaphore 或 CountDownLatch)对此进行建模的好方法。有几个用例和要求:
- 不要对任务将在哪个执行器上运行做出任何假设。事实上,为了简单起见,只需假设只有一个执行者。它可以是固定的线程池执行器,因此简单的实现可能会导致死锁(例如,想象一个任务提交另一个任务,然后阻塞直到该子任务完成,现在想象其中几个任务耗尽了所有线程)。
- 为了简化,假设数据不在任务之间流动(任务输出->后续任务输入) - 完成任务和后续任务不必一起存在,因此后续任务的输入数据不会被更改前面的任务(因为它已经完成)。
- 数据流“引擎”应该能够处理的只有几个操作:
- 一种任务可以对更多任务进行排队的机制
- 一种机制,在所有必需的输入任务完成之前,后续任务不会排队
- 主线程(或其他不受执行器管理的线程)阻塞直到流程完成的机制
- 主线程(或其他不受执行程序管理的线程)阻塞直到某些任务完成的机制
- 由于数据流是动态的(取决于任务的输入/状态),因此这些机制的激活应该发生在任务内代码,例如 Callable 中的代码本身负责对更多 Callable 进行排队。
- 数据流“内部结构”不应该暴露给任务(Callables)本身 - 只有上面列出的操作应该可供任务使用。
- 请注意,所有任务的数据类型不一定相同,例如文件下载任务可能接受文件作为输入,但会输出字符串。
- 如果任务抛出未捕获的异常(指示某些致命错误,需要停止所有数据流处理),则它必须尽快传播到启动数据流的线程并取消所有任务(或者像致命错误处理程序这样更奇特的东西)。
- 应尽快启动任务。这与之前的要求一起应该排除简单的 Future 轮询 + Thread.sleep()。
- 作为奖励,我希望数据流引擎本身在每次任务完成时或自上次任务完成以来 X 时间内没有完成时执行一些操作(如日志记录)。类似于:ExecutorCompletionService
电子学分; while (hasTasks()) { Future ;未来 = ecs.poll(1 分钟); some_action_like_logging(); if (future != null) { future.get() ... } ... }
是否有简单的方法可以使用 Java 并发 API 完成所有这些操作?或者,如果无论 JDK 中的可用内容如何,它都会变得复杂,是否有一个轻量级库可以满足要求?我已经有了一个适合我的特定用例的部分解决方案(它在某种程度上作弊,因为我使用了两个执行器,而且你知道,它与我上面给出的网络浏览器示例完全无关),但我希望看到一个更通用和更优雅的解决方案。
EDIT: This is basically a "how to properly implement a data flow engine in Java" question, and I feel this cannot be adequately answered in a single answer (it's like asking, "how to properly implement an ORM layer" and getting someone to write out the details of Hibernate or something), so consider this question "closed".
Is there an elegant way to model a dynamic dataflow in Java? By dataflow, I mean there are various types of tasks, and these tasks can be "connected" arbitrarily, such that when a task finishes, successor tasks are executed in parallel using the finished tasks output as input, or when multiple tasks finish, their output is aggregated in a successor task (see flow-based programming). By dynamic, I mean that the type and number of successors tasks when a task finishes depends on the output of that finished task, so for example, task A may spawn task B if it has a certain output, but may spawn task C if has a different output. Another way of putting it is that each task (or set of tasks) is responsible for determining what the next tasks are.
Sample dataflow for rendering a webpage: I have as task types: file downloader, HTML/CSS renderer, HTML parser/DOM builder, image renderer, JavaScript parser, JavaScript interpreter.
- File downloader task for HTML file
- HTML parser/DOM builder task
- File downloader task for each embedded file/link
- If image, image renderer
- If external JavaScript, JavaScript parser
- JavaScript interpreter
- Otherwise, just store in some var/field in HTML parser task
- JavaScript parser for each embedded script
- JavaScript interpreter
- Wait for above tasks to finish, then HTML/CSS renderer (obviously not optimal or perfectly correct, but this is simple)
- File downloader task for each embedded file/link
- HTML parser/DOM builder task
I'm not saying the solution needs to be some comprehensive framework (in fact, the closer to the JDK API, the better), and I absolutely don't want something as heavyweight is say Spring Web Flow or some declarative markup or other DSL.
To be more specific, I'm trying to think of a good way to model this in Java with Callables, Executors, ExecutorCompletionServices, and perhaps various synchronizer classes (like Semaphore or CountDownLatch). There are a couple use cases and requirements:
- Don't make any assumptions on what executor(s) the tasks will run on. In fact, to simplify, just assume there's only one executor. It can be a fixed thread pool executor, so a naive implementation can result in deadlocks (e.g. imagine a task that submits another task and then blocks until that subtask is finished, and now imagine several of these tasks using up all the threads).
- To simplify, assume that the data is not streamed between tasks (task output->succeeding task input) - the finishing task and succeeding task don't have to exist together, so the input data to the succeeding task will not be changed by the preceeding task (since it's already done).
- There are only a couple operations that the dataflow "engine" should be able to handle:
- A mechanism where a task can queue more tasks
- A mechanism whereby a successor task is not queued until all the required input tasks are finished
- A mechanism whereby the main thread (or other threads not managed by the executor) blocks until the flow is finished
- A mechanism whereby the main thread (or other threads not managed by the executor) blocks until certain tasks have finished
- Since the dataflow is dynamic (depends on input/state of the task), the activation of these mechanisms should occur within the task code, e.g. the code in a Callable is itself responsible for queueing more Callables.
- The dataflow "internals" should not be exposed to the tasks (Callables) themselves - only the operations listed above should be available to the task.
- Note that the type of the data is not necessarily the same for all tasks, e.g. a file download task may accept a File as input but will output a String.
- If a task throws an uncaught exception (indicating some fatal error requiring all dataflow processing to stop), it must propagate up to the thread that initiated the dataflow as quickly as possible and cancel all tasks (or something fancier like a fatal error handler).
- Tasks should be launched as soon as possible. This along with the previous requirement should preclude simple Future polling + Thread.sleep().
- As a bonus, I would like to dataflow engine itself to perform some action (like logging) every time task is finished or when no has finished in X time since last task has finished. Something like:
ExecutorCompletionService<T> ecs; while (hasTasks()) { Future<T> future = ecs.poll(1 minute); some_action_like_logging(); if (future != null) { future.get() ... } ... }
Are there straightforward ways to do all this with Java concurrency API? Or if it's going to complex no matter what with what's available in the JDK, is there a lightweight library that satisfies the requirements? I already have a partial solution that fits my particular use case (it cheats in a way, since I'm using two executors, and just so you know, it's not related at all to the web browser example I gave above), but I'd like to see a more general purpose and elegant solution.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
如何定义接口,例如:
您的“数据流引擎”只需要管理任务对象的集合,即允许新的任务对象排队等待执行,并允许查询给定任务的状态(所以可能是上面的接口需要扩展以包含 id 和/或类型)。当任务完成时(当然当引擎启动时),引擎必须查询任何未启动的任务以查看它们现在是否准备好,如果准备好,则将它们传递给执行器运行。正如您所提到的,然后也可以完成任何日志记录等。
另一件可能有帮助的事情是使用 Guice (http://code.google.com/ p/google-guice/)或类似的轻量级 DI 框架,以帮助正确连接所有对象(例如,确保创建正确的执行程序类型,并确保需要访问数据流引擎的任务(例如,对于他们的 isReady 方法或排队其他任务)可以提供一个实例,而无需引入复杂的循环关系,
但如果我错过了任何关键方面,请发表评论......
保罗.
How about defining interface such as:
Your "dataflow engine" would then just need to manage a collection of Task objects i.e. allow new Task objects to be queued for excecution and allow queries as to the status of a given task (so maybe the interface above needs extending to include id and/or type). When a task completes (and when the engine starts of course) the engine must just query any unstarted tasks to see if they are now ready, and if so pass them to be run on the executor. As you mention, any logging, etc. could also be done then.
One other thing that may help is to use Guice (http://code.google.com/p/google-guice/) or a similar lightweight DI framework to help wire up all the objects correctly (e.g. to ensure that the correct executor type is created, and to make sure that Tasks that need access to the dataflow engine (either for their isReady method or for queuing other tasks, say) can be provided with an instance without introducing complex circular relationships.
HTH, but please do comment if I've missed any key aspects...
Paul.
看看 https://github.com/rfqu/df4j — 一个简单但功能强大的数据流库。如果缺少一些所需的功能,可以轻松添加它们。
Look at https://github.com/rfqu/df4j — a simple but powerful dataflow library. If it lacks some desired features, they can be added easily.