Spring Integration:使用聚合后再次拆分消息?

发布于 2024-11-27 21:52:36 字数 1964 浏览 0 评论 0原文

在我的 Spring Integration 支持的项目中,我有一个分离器和有效负载路由器,用于将数据发送到各种变压器。然后,新的“转换后”对象被传回聚合器并进行处理。

现在我想拆分聚合结果,以便正确保留它们,因为我需要将一些对象路由到单独的出站通道适配器。为了实现这一点,我在聚合器之后添加了第二个分离器;但似乎聚合集合中只有第一个元素被传递给路由器。

这是我当前的流程:

<splitter ref="articleContentExtractor" />

<!-- This router works exactly as expected -->
<payload-type-router>
    ... routing to various transformers ...
    ... results are sent to articleOutAggregateChannel ...
</payload-type-router>

<aggregator ref="articleAggregator" />

<splitter />

<!-- This is where it seems to go wrong, the second
        splitter returns only the first object in the collection -->    
<payload-type-router resolution-required="true">
    <mapping type="x.y.z.AbstractContent" channel="contentOutChannel" />
    <mapping type="x.y.z.Staff" channel="staffOutChannel" />
</payload-type-router>

<outbound-channel-adapter id="contentSaveService" ref="contentExporter" 
    method="persist" channel="contentOutChannel" />

<outbound-channel-adapter id="staffSaveService" ref="staffExporter" 
    method="persist" channel="staffOutChannel" />

以及我的聚合器代码:

@Aggregator
public List<? super BaseObject> compileArticle(List<? super BaseObject> parts) {

    // Search for the required objects for referencing
    Iterator<? super BaseObject> it = parts.iterator();
    Article article = null;
    List<Staff> authors = new ArrayList<Staff>();

    while (it.hasNext()) {
        Object part = it.next();
        if (part instanceof Article) {
            article = (Article)part;
        }
        else if (part instanceof Staff) {
            authors.add((Staff)part);
        }
    }

    // Apply references
    article.setAuthors(authors);

    return parts;
}

我做错了什么?我是否正确使用聚合器?

注意:如果我完全删除聚合器和第二个分离器,则流程的其余部分可以完美运行。

In my Spring Integration powered project I have a splitter and payload-router for sending my data to various transformers. The new "transformed" objects are then passed back to an aggregator and processed.

Now I want to split up my aggregated results so they are persisted properly, since I need to route some of the objects to a seperate outbound-channel-adapter. To achieve this, I added a second splitter after my aggregator; but it seems only the first element in the aggregated collection is passed to the router.

This is my current flow:

<splitter ref="articleContentExtractor" />

<!-- This router works exactly as expected -->
<payload-type-router>
    ... routing to various transformers ...
    ... results are sent to articleOutAggregateChannel ...
</payload-type-router>

<aggregator ref="articleAggregator" />

<splitter />

<!-- This is where it seems to go wrong, the second
        splitter returns only the first object in the collection -->    
<payload-type-router resolution-required="true">
    <mapping type="x.y.z.AbstractContent" channel="contentOutChannel" />
    <mapping type="x.y.z.Staff" channel="staffOutChannel" />
</payload-type-router>

<outbound-channel-adapter id="contentSaveService" ref="contentExporter" 
    method="persist" channel="contentOutChannel" />

<outbound-channel-adapter id="staffSaveService" ref="staffExporter" 
    method="persist" channel="staffOutChannel" />

And my Aggregator code:

@Aggregator
public List<? super BaseObject> compileArticle(List<? super BaseObject> parts) {

    // Search for the required objects for referencing
    Iterator<? super BaseObject> it = parts.iterator();
    Article article = null;
    List<Staff> authors = new ArrayList<Staff>();

    while (it.hasNext()) {
        Object part = it.next();
        if (part instanceof Article) {
            article = (Article)part;
        }
        else if (part instanceof Staff) {
            authors.add((Staff)part);
        }
    }

    // Apply references
    article.setAuthors(authors);

    return parts;
}

What am I doing wrong? Am I using my aggregator properly?

Note: If I just remove both the aggregator and second splitter altogether, the rest of the flow works perfectly.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

纸伞微斜 2024-12-04 21:52:36

由于我没有您的所有代码,因此很难说出正在发生的一切,但我能够使此流程按照我认为您想要的方式工作。 baseObjectTransformer 除了在通过它的对象上设置一个标志之外什么也不做:

...
<context:component-scan base-package="net.grogscave.example" />

<channel id="inputChannel" />

<splitter ref="articleContentExtractor" input-channel="inputChannel"
    output-channel="splitArticleStaff" />

<channel id="splitArticleStaff" />

<payload-type-router input-channel="splitArticleStaff">
    <mapping type="net.grogscave.example.domain.Article" channel="articleChannel" />
    <mapping type="net.grogscave.example.domain.Staff" channel="staffChannel" />
</payload-type-router>

<channel id="articleChannel" />

<transformer input-channel="articleChannel" output-channel="articleOutAggregateChannel"
    ref="baseObjectTransformer"/>

<channel id="staffChannel" />

<transformer input-channel="staffChannel" output-channel="articleOutAggregateChannel"
    ref="baseObjectTransformer"/>

<channel id="articleOutAggregateChannel" />

<aggregator ref="articleAggregator" input-channel="articleOutAggregateChannel" output-channel="splitArticleInChannel"/>

<channel id="splitArticleInChannel" />

<splitter input-channel="splitArticleInChannel" output-channel="splitArticleOutChannel" />

<channel id="splitArticleOutChannel" />

<payload-type-router resolution-required="true" input-channel="splitArticleOutChannel">
    <mapping type="net.grogscave.example.domain.Article" channel="contentOutChannel" />
    <mapping type="net.grogscave.example.domain.Staff" channel="staffOutChannel" />
</payload-type-router>

<channel id="contentOutChannel">
    <queue capacity="10" />
</channel>

<channel id="staffOutChannel">
    <queue capacity="10" />
</channel>
...

然后,我在测试用例中使用以下代码来执行此流程:

    ...
    AbstractApplicationContext context = new ClassPathXmlApplicationContext(
            "/META-INF/spring/split-agg-split.xml",
            SplitAggSplitTests.class);

    MessageChannel inputChannel = context.getBean("inputChannel",
            MessageChannel.class);

    PollableChannel contentOutChannel = context.getBean("contentOutChannel",
            PollableChannel.class);

    PollableChannel staffOutChannel = context.getBean("staffOutChannel",
            PollableChannel.class);

    inputChannel.send(new GenericMessage<String>("Dewey Wins!,A. Fool, C. Lewlis"));

    logger.info("==> Article recieved: "
            + contentOutChannel.receive(0).getPayload());

    for(int i = 0; i < 2; i++)
    {
        logger.info("==> Staff recieved: "
                + staffOutChannel.receive(0).getPayload());
    }
    ...

我的日志记录输出生成所有三个实体。

话虽如此,您是否考虑过简单地删除额外的拆分器/聚合器并将 setAuthors 逻辑移动到您的第一个拆分器?我不知道你的流程的所有细节,但它似乎简化了事情。

It is hard to tell everything that is going on since I don't have all your code, but I was able to make this flow work the way I think you wanted. The baseObjectTransformer does nothing but set a flag on the objects passing through it:

...
<context:component-scan base-package="net.grogscave.example" />

<channel id="inputChannel" />

<splitter ref="articleContentExtractor" input-channel="inputChannel"
    output-channel="splitArticleStaff" />

<channel id="splitArticleStaff" />

<payload-type-router input-channel="splitArticleStaff">
    <mapping type="net.grogscave.example.domain.Article" channel="articleChannel" />
    <mapping type="net.grogscave.example.domain.Staff" channel="staffChannel" />
</payload-type-router>

<channel id="articleChannel" />

<transformer input-channel="articleChannel" output-channel="articleOutAggregateChannel"
    ref="baseObjectTransformer"/>

<channel id="staffChannel" />

<transformer input-channel="staffChannel" output-channel="articleOutAggregateChannel"
    ref="baseObjectTransformer"/>

<channel id="articleOutAggregateChannel" />

<aggregator ref="articleAggregator" input-channel="articleOutAggregateChannel" output-channel="splitArticleInChannel"/>

<channel id="splitArticleInChannel" />

<splitter input-channel="splitArticleInChannel" output-channel="splitArticleOutChannel" />

<channel id="splitArticleOutChannel" />

<payload-type-router resolution-required="true" input-channel="splitArticleOutChannel">
    <mapping type="net.grogscave.example.domain.Article" channel="contentOutChannel" />
    <mapping type="net.grogscave.example.domain.Staff" channel="staffOutChannel" />
</payload-type-router>

<channel id="contentOutChannel">
    <queue capacity="10" />
</channel>

<channel id="staffOutChannel">
    <queue capacity="10" />
</channel>
...

I then just exercise this flow with the following code in a test case:

    ...
    AbstractApplicationContext context = new ClassPathXmlApplicationContext(
            "/META-INF/spring/split-agg-split.xml",
            SplitAggSplitTests.class);

    MessageChannel inputChannel = context.getBean("inputChannel",
            MessageChannel.class);

    PollableChannel contentOutChannel = context.getBean("contentOutChannel",
            PollableChannel.class);

    PollableChannel staffOutChannel = context.getBean("staffOutChannel",
            PollableChannel.class);

    inputChannel.send(new GenericMessage<String>("Dewey Wins!,A. Fool, C. Lewlis"));

    logger.info("==> Article recieved: "
            + contentOutChannel.receive(0).getPayload());

    for(int i = 0; i < 2; i++)
    {
        logger.info("==> Staff recieved: "
                + staffOutChannel.receive(0).getPayload());
    }
    ...

My logging output produces all three entities.

All that being said, have you considered simply removing the extra splitter/aggregator and simply moving the setAuthors logic to your first splitter? I don't know all the details of your flow, but it would seem to simplify things.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文