等待 Spring Integration 中的所有线程完成

发布于 2024-12-28 02:27:04 字数 1673 浏览 0 评论 0原文

我有一个自动执行的 jar 程序,它严重依赖 Spring Integration。我遇到的问题是程序在其他 Spring bean 完全完成之前终止

下面是我正在使用的代码的精简版本,如果需要,我可以提供更多代码/配置。入口点是 main() 方法,它引导 Spring 并启动导入过程:

public static void main(String[] args) {
    ctx = new ClassPathXmlApplicationContext("flow.xml");
    DataImporter importer = (DataImporter)ctx.getBean("MyImporterBean");
    try {
        importer.startImport();
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        ctx.close();
    }
}

DataImporter 包含一个简单的循环,该循环将消息发送到 Spring Integration 网关。这为流程提供了一种主动的“推送”方法,而不是轮询数据的常见方法。这就是我的问题所在:

public void startImport() throws Exception {
    for (Item item : items) {
        gatewayBean.publish(item);
        Thread.sleep(200); // Yield period
    }
}

为了完整性,流程 XML 看起来像这样:

<gateway default-request-channel="inChannel" service-interface="GatewayBean" />

<splitter input-channel="inChannel" output-channel="splitChannel" />

<payload-type-router input-channel="splitChannel">
    <mapping type="Item" channel="itemChannel" />
    <mapping type="SomeOtherItem" channel="anotherChannel" />
</payload-type-router>

<outbound-channel-adapter channel="itemChannel" ref="DAOBean" method="persist" />

流程启动并有效地处理项目,但是一旦 startImport() 循环完成,主线程就会终止并立即拆除所有 Spring Integration 线程。这会导致竞争条件,当程序终止时,最后 (n) 项没有完全处理。

我有一个想法,维护我正在处理的项目的引用计数,但这被证明是相当复杂的,因为流程经常将消息分割/路由到多个服务激活器 - 这意味着很难确定每个项目是否具有“完成的”。

我认为我需要某种方法来检查是否没有 Spring bean 仍在执行,或者标记发送到网关的所有项目在终止之前已完全处理。

我的问题是,我该如何去做其中任何一个,或者是否有更好的方法来解决我没有想到的问题?

I have a self-executable jar program that relies heavily on Spring Integration. The problem I am having is that the program is terminating before the other Spring beans have completely finished.

Below is a cut-down version of the code I'm using, I can supply more code/configuration if needed. The entry point is a main() method, which bootstraps Spring and starts the import process:

public static void main(String[] args) {
    ctx = new ClassPathXmlApplicationContext("flow.xml");
    DataImporter importer = (DataImporter)ctx.getBean("MyImporterBean");
    try {
        importer.startImport();
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        ctx.close();
    }
}

The DataImporter contains a simple loop that fires messages to a Spring Integration gateway. This delivers an active "push" approach to the flow, rather than the common approach of polling for data. This is where my problem comes in:

public void startImport() throws Exception {
    for (Item item : items) {
        gatewayBean.publish(item);
        Thread.sleep(200); // Yield period
    }
}

For completeness, the flow XML looks something like this:

<gateway default-request-channel="inChannel" service-interface="GatewayBean" />

<splitter input-channel="inChannel" output-channel="splitChannel" />

<payload-type-router input-channel="splitChannel">
    <mapping type="Item" channel="itemChannel" />
    <mapping type="SomeOtherItem" channel="anotherChannel" />
</payload-type-router>

<outbound-channel-adapter channel="itemChannel" ref="DAOBean" method="persist" />

The flow starts and processes items effectively, but once the startImport() loop finishes the main thread terminates and tears down all the Spring Integration threads immediately. This results in a race condition, the last (n) items are not completely processed when the program terminates.

I have an idea of maintaining a reference count of the items I am processing, but this is proving to be quite complicated, since the flow often splits/routes the messages to multiple service activators - meaning it is difficult to determine if each item has "finished".

What I think I need is some way to either check that no Spring beans are still executing, or to flag that all items sent to the gateway have been completely processed before terminating.

My question is, how might I go about doing either of these, or is there a better approach to my problem I haven't thought of?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

饭团 2025-01-04 02:27:04

您在这里没有使用请求-响应模式。

outbound-channel-adapter 是一个即发即忘的操作,如果你想等待响应,你应该使用一个将等待响应的出站网关,并将响应连接到原始网关,然后在 java sendAndReceive 中不仅仅是发布。

You're not using a request-response pattern here.

outbound-channel-adapter is a fire and forget action, if you want to wait for the response you should use an outbound-gateway that will wait for response, and connect the response to the original gateway, then in java sendAndReceive not just publish.

荒岛晴空 2025-01-04 02:27:04

如果您可以获取一个 Item 来确定是否仍然需要它(processingFinished() 或在后端阶段执行的类似操作),您可以注册所有 Item

如果这种方法可行,您甚至可以考虑将项目打包到 FutureTask 对象中,或者利用 java.util.concurrent 中的类似概念。

编辑:第二个想法:

你有没有想过让渠道变得更加智能化?一旦发送者不再发送任何数据,它就会关闭通道。在这种情况下,worker-bean 不必是守护线程,但可以根据关闭和空的输入通道确定其终止标准。

If you can get an Item to determine, whether it is still needed or not (processingFinished() or something similar executed in the back-end-stages), you can register all Items at a central authority, which keeps track of the number of non-finished Items and effecitvely determines a termination-condition.

If this approach is feasible, you could even think of packaging the items into FutureTask-objects or make use of similar concepts from java.util.concurrent.

Edit: Second Idea:

Have you thought about making the channels more intelligent? A sender closes the channel once it does not send any more data. In this scenario, the worker-beans do not have to be deamon threads but can determine their termination-criterion based on a closed and empty input channel.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文