来自 jdbc 源的 Mule 消息丰富器

发布于 2024-12-23 01:50:29 字数 8119 浏览 0 评论 0原文

我想用两个变量 LAST_POLL_START 和 LAST_POLL_END 来丰富当前消息。然后,这些变量将用于构建另一个 jdbc 查询。

但是,我不明白应该如何使用浓缩器。我尝试过的代码如下,它的灵感来自于 此处Mule 的博客。在我看来,这应该有效。我使用的是 mule-3.2.0,jdbc 查询可以工作并返回正确的值。

<flow name="MasterFlow" processingStrategy="synchronous">
    <quartz:inbound-endpoint jobName="cronJobPoolTime" cronExpression="0 0/1 * * * ?">
        <quartz:endpoint-polling-job>
            <quartz:job-endpoint ref="jdbc_quartzDummy"/>
        </quartz:endpoint-polling-job>
    </quartz:inbound-endpoint>      
    <flow-ref name="GetPollingTimes" />
    <!-- Do other things -->
</flow>

<!-- Below is needed when using Quartz trigger -->
<jdbc:endpoint name="jdbc_quartzDummy" connector-ref="tabuConnector" queryKey="quartzDummy" />

<flow name="GetPollingTimes">
    <enricher>
        <jdbc:outbound-endpoint queryKey="getPollTimes" exchange-pattern="request-response" />
        <enrich target="#[variable:last_poll_start]" source="#[map-payload:LAST_POLL_START]"/>
        <enrich target="#[variable:last_poll_end]" source="#[map-payload:LAST_POLL_END]"/>
    </enricher>
    <logger level="INFO" message="last_poll_start = #[variable:last_poll_start]" />
</flow>

错误消息是:

WARN  2011-12-27 15:38:00,831 [scheduler-ESB_Worker-1] org.mule.DefaultMuleMessage: setProperty(key, value) called with null value; removing key: last_poll_start; please report the following stack trace to [email protected]
java.lang.Throwable
    at org.mule.DefaultMuleMessage.setProperty(DefaultMuleMessage.java:456)
    at org.mule.expression.VariableExpressionEnricher.enrich(VariableExpressionEnricher.java:24)
    at org.mule.expression.DefaultExpressionManager.enrich(DefaultExpressionManager.java:248)
    at org.mule.expression.DefaultExpressionManager.enrich(DefaultExpressionManager.java:237)
    at org.mule.enricher.MessageEnricher.enrich(MessageEnricher.java:69)
    at org.mule.enricher.MessageEnricher.process(MessageEnricher.java:43)
    at org.mule.processor.chain.DefaultMessageProcessorChain.doProcess(DefaultMessageProcessorChain.java:99)
    at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:64)
    at org.mule.processor.AbstractInterceptingMessageProcessorBase.processNext(AbstractInterceptingMessageProcessorBase.java:105)
    at org.mule.processor.AsyncInterceptingMessageProcessor.process(AsyncInterceptingMessageProcessor.java:90)
    at org.mule.processor.chain.DefaultMessageProcessorChain.doProcess(DefaultMessageProcessorChain.java:99)
    at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:64)
    at org.mule.processor.AbstractInterceptingMessageProcessorBase.processNext(AbstractInterceptingMessageProcessorBase.java:105)
    at org.mule.interceptor.AbstractEnvelopeInterceptor.process(AbstractEnvelopeInterceptor.java:55)
    at org.mule.processor.AbstractInterceptingMessageProcessorBase.processNext(AbstractInterceptingMessageProcessorBase.java:105)
    at org.mule.processor.AbstractFilteringMessageProcessor.process(AbstractFilteringMessageProcessor.java:41)
    at org.mule.processor.chain.DefaultMessageProcessorChain.doProcess(DefaultMessageProcessorChain.java:99)
    at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:64)
    at org.mule.processor.chain.InterceptingChainLifecycleWrapper.doProcess(InterceptingChainLifecycleWrapper.java:56)
    at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:64)
    at org.mule.processor.chain.InterceptingChainLifecycleWrapper.process(InterceptingChainLifecycleWrapper.java:87)
    at org.mule.construct.Flow.process(Flow.java:64)
    at org.mule.config.spring.factories.FlowRefFactoryBean$1.process(FlowRefFactoryBean.java:44)
    at org.mule.processor.chain.DefaultMessageProcessorChain.doProcess(DefaultMessageProcessorChain.java:99)
    at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:64)
    at org.mule.processor.AbstractInterceptingMessageProcessorBase.processNext(AbstractInterceptingMessageProcessorBase.java:105)
    at org.mule.interceptor.AbstractEnvelopeInterceptor.process(AbstractEnvelopeInterceptor.java:55)
    at org.mule.processor.AbstractInterceptingMessageProcessorBase.processNext(AbstractInterceptingMessageProcessorBase.java:105)
    at org.mule.processor.AbstractFilteringMessageProcessor.process(AbstractFilteringMessageProcessor.java:41)
    at org.mule.processor.chain.DefaultMessageProcessorChain.doProcess(DefaultMessageProcessorChain.java:99)
    at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:64)
    at org.mule.processor.chain.InterceptingChainLifecycleWrapper.doProcess(InterceptingChainLifecycleWrapper.java:56)
    at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:64)
    at org.mule.processor.chain.InterceptingChainLifecycleWrapper.process(InterceptingChainLifecycleWrapper.java:87)
    at org.mule.construct.AbstractPipeline$1.process(AbstractPipeline.java:138)
    at org.mule.processor.chain.DefaultMessageProcessorChain.doProcess(DefaultMessageProcessorChain.java:99)
    at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:64)
    at org.mule.processor.chain.InterceptingChainLifecycleWrapper.doProcess(InterceptingChainLifecycleWrapper.java:56)
    at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:64)
    at org.mule.processor.chain.InterceptingChainLifecycleWrapper.process(InterceptingChainLifecycleWrapper.java:87)
    at org.mule.processor.chain.DefaultMessageProcessorChain.doProcess(DefaultMessageProcessorChain.java:99)
    at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:64)
    at org.mule.processor.chain.InterceptingChainLifecycleWrapper.doProcess(InterceptingChainLifecycleWrapper.java:56)
    at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:64)
    at org.mule.processor.chain.InterceptingChainLifecycleWrapper.process(InterceptingChainLifecycleWrapper.java:87)
    at org.mule.transport.AbstractMessageReceiver.routeMessage(AbstractMessageReceiver.java:190)
    at org.mule.transport.AbstractMessageReceiver.routeMessage(AbstractMessageReceiver.java:163)
    at org.mule.transport.AbstractMessageReceiver.routeMessage(AbstractMessageReceiver.java:150)
    at org.mule.transport.quartz.jobs.EndpointPollingJob$1.doInTransaction(EndpointPollingJob.java:162)
    at org.mule.transport.quartz.jobs.EndpointPollingJob$1.doInTransaction(EndpointPollingJob.java:125)
    at org.mule.transaction.TransactionTemplate.execute(TransactionTemplate.java:45)
    at org.mule.transport.quartz.jobs.EndpointPollingJob.doExecute(EndpointPollingJob.java:169)
    at org.mule.transport.quartz.jobs.AbstractJob.execute(AbstractJob.java:36)
    at org.quartz.core.JobRunShell.run(JobRunShell.java:202)
    at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:534)

将丰富源更改为#[payload]:

<enrich target="#[variable:last_poll_start]" source="#[payload]"/>
<logger level="INFO" message="last_poll_start = #[variable:last_poll_start]" />

产生(来自日志文件):

org.mule.api.processor.LoggerMessageProcessor: last_poll_start = [{LAST_POLL_START=2011-12-21, LAST_POLL_END=2000-01-01}]

I'd like to enrich the current message with two variables, LAST_POLL_START and LAST_POLL_END. These variables are then going to be used to build another jdbc query.

However, I cannot understand how I should use the enricher. The code I've tried is below, and it is inspired by the code found here and on Mule's blog. In my mind, this should work. I'm using mule-3.2.0, and the jdbc query works and returns the correct values.

<flow name="MasterFlow" processingStrategy="synchronous">
    <quartz:inbound-endpoint jobName="cronJobPoolTime" cronExpression="0 0/1 * * * ?">
        <quartz:endpoint-polling-job>
            <quartz:job-endpoint ref="jdbc_quartzDummy"/>
        </quartz:endpoint-polling-job>
    </quartz:inbound-endpoint>      
    <flow-ref name="GetPollingTimes" />
    <!-- Do other things -->
</flow>

<!-- Below is needed when using Quartz trigger -->
<jdbc:endpoint name="jdbc_quartzDummy" connector-ref="tabuConnector" queryKey="quartzDummy" />

<flow name="GetPollingTimes">
    <enricher>
        <jdbc:outbound-endpoint queryKey="getPollTimes" exchange-pattern="request-response" />
        <enrich target="#[variable:last_poll_start]" source="#[map-payload:LAST_POLL_START]"/>
        <enrich target="#[variable:last_poll_end]" source="#[map-payload:LAST_POLL_END]"/>
    </enricher>
    <logger level="INFO" message="last_poll_start = #[variable:last_poll_start]" />
</flow>

The error message is:

WARN  2011-12-27 15:38:00,831 [scheduler-ESB_Worker-1] org.mule.DefaultMuleMessage: setProperty(key, value) called with null value; removing key: last_poll_start; please report the following stack trace to [email protected]
java.lang.Throwable
    at org.mule.DefaultMuleMessage.setProperty(DefaultMuleMessage.java:456)
    at org.mule.expression.VariableExpressionEnricher.enrich(VariableExpressionEnricher.java:24)
    at org.mule.expression.DefaultExpressionManager.enrich(DefaultExpressionManager.java:248)
    at org.mule.expression.DefaultExpressionManager.enrich(DefaultExpressionManager.java:237)
    at org.mule.enricher.MessageEnricher.enrich(MessageEnricher.java:69)
    at org.mule.enricher.MessageEnricher.process(MessageEnricher.java:43)
    at org.mule.processor.chain.DefaultMessageProcessorChain.doProcess(DefaultMessageProcessorChain.java:99)
    at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:64)
    at org.mule.processor.AbstractInterceptingMessageProcessorBase.processNext(AbstractInterceptingMessageProcessorBase.java:105)
    at org.mule.processor.AsyncInterceptingMessageProcessor.process(AsyncInterceptingMessageProcessor.java:90)
    at org.mule.processor.chain.DefaultMessageProcessorChain.doProcess(DefaultMessageProcessorChain.java:99)
    at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:64)
    at org.mule.processor.AbstractInterceptingMessageProcessorBase.processNext(AbstractInterceptingMessageProcessorBase.java:105)
    at org.mule.interceptor.AbstractEnvelopeInterceptor.process(AbstractEnvelopeInterceptor.java:55)
    at org.mule.processor.AbstractInterceptingMessageProcessorBase.processNext(AbstractInterceptingMessageProcessorBase.java:105)
    at org.mule.processor.AbstractFilteringMessageProcessor.process(AbstractFilteringMessageProcessor.java:41)
    at org.mule.processor.chain.DefaultMessageProcessorChain.doProcess(DefaultMessageProcessorChain.java:99)
    at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:64)
    at org.mule.processor.chain.InterceptingChainLifecycleWrapper.doProcess(InterceptingChainLifecycleWrapper.java:56)
    at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:64)
    at org.mule.processor.chain.InterceptingChainLifecycleWrapper.process(InterceptingChainLifecycleWrapper.java:87)
    at org.mule.construct.Flow.process(Flow.java:64)
    at org.mule.config.spring.factories.FlowRefFactoryBean$1.process(FlowRefFactoryBean.java:44)
    at org.mule.processor.chain.DefaultMessageProcessorChain.doProcess(DefaultMessageProcessorChain.java:99)
    at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:64)
    at org.mule.processor.AbstractInterceptingMessageProcessorBase.processNext(AbstractInterceptingMessageProcessorBase.java:105)
    at org.mule.interceptor.AbstractEnvelopeInterceptor.process(AbstractEnvelopeInterceptor.java:55)
    at org.mule.processor.AbstractInterceptingMessageProcessorBase.processNext(AbstractInterceptingMessageProcessorBase.java:105)
    at org.mule.processor.AbstractFilteringMessageProcessor.process(AbstractFilteringMessageProcessor.java:41)
    at org.mule.processor.chain.DefaultMessageProcessorChain.doProcess(DefaultMessageProcessorChain.java:99)
    at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:64)
    at org.mule.processor.chain.InterceptingChainLifecycleWrapper.doProcess(InterceptingChainLifecycleWrapper.java:56)
    at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:64)
    at org.mule.processor.chain.InterceptingChainLifecycleWrapper.process(InterceptingChainLifecycleWrapper.java:87)
    at org.mule.construct.AbstractPipeline$1.process(AbstractPipeline.java:138)
    at org.mule.processor.chain.DefaultMessageProcessorChain.doProcess(DefaultMessageProcessorChain.java:99)
    at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:64)
    at org.mule.processor.chain.InterceptingChainLifecycleWrapper.doProcess(InterceptingChainLifecycleWrapper.java:56)
    at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:64)
    at org.mule.processor.chain.InterceptingChainLifecycleWrapper.process(InterceptingChainLifecycleWrapper.java:87)
    at org.mule.processor.chain.DefaultMessageProcessorChain.doProcess(DefaultMessageProcessorChain.java:99)
    at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:64)
    at org.mule.processor.chain.InterceptingChainLifecycleWrapper.doProcess(InterceptingChainLifecycleWrapper.java:56)
    at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:64)
    at org.mule.processor.chain.InterceptingChainLifecycleWrapper.process(InterceptingChainLifecycleWrapper.java:87)
    at org.mule.transport.AbstractMessageReceiver.routeMessage(AbstractMessageReceiver.java:190)
    at org.mule.transport.AbstractMessageReceiver.routeMessage(AbstractMessageReceiver.java:163)
    at org.mule.transport.AbstractMessageReceiver.routeMessage(AbstractMessageReceiver.java:150)
    at org.mule.transport.quartz.jobs.EndpointPollingJob$1.doInTransaction(EndpointPollingJob.java:162)
    at org.mule.transport.quartz.jobs.EndpointPollingJob$1.doInTransaction(EndpointPollingJob.java:125)
    at org.mule.transaction.TransactionTemplate.execute(TransactionTemplate.java:45)
    at org.mule.transport.quartz.jobs.EndpointPollingJob.doExecute(EndpointPollingJob.java:169)
    at org.mule.transport.quartz.jobs.AbstractJob.execute(AbstractJob.java:36)
    at org.quartz.core.JobRunShell.run(JobRunShell.java:202)
    at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:534)

Changing the enrich source to #[payload]:

<enrich target="#[variable:last_poll_start]" source="#[payload]"/>
<logger level="INFO" message="last_poll_start = #[variable:last_poll_start]" />

Produces (from log file):

org.mule.api.processor.LoggerMessageProcessor: last_poll_start = [{LAST_POLL_START=2011-12-21, LAST_POLL_END=2000-01-01}]

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

请爱~陌生人 2024-12-30 01:50:29

受到前面答案的启发,我注意到可以在丰富器中使用 groovy 语法。如果其他人遇到这个问题,这里是解决方案。

<flow name="GetPollingTimes">
    <enricher>
        <jdbc:outbound-endpoint queryKey="getPollTimes" exchange-pattern="request-response" />
        <enrich target="#[variable:last_poll_start]" source="#[groovy:payload.last_poll_start]"/>
        <enrich target="#[variable:last_poll_end]" source="#[groovy:payload.last_poll_end]"/>
    </enricher>
</flow>

Inspired by the previous answer, I noticed that it is possible to use the groovy syntax in the enricher. Here is the solution, if someone else were to run into this problem.

<flow name="GetPollingTimes">
    <enricher>
        <jdbc:outbound-endpoint queryKey="getPollTimes" exchange-pattern="request-response" />
        <enrich target="#[variable:last_poll_start]" source="#[groovy:payload.last_poll_start]"/>
        <enrich target="#[variable:last_poll_end]" source="#[groovy:payload.last_poll_end]"/>
    </enricher>
</flow>
番薯 2024-12-30 01:50:29

我找到了这个问题的部分解决方案,它使用 groovy 脚本。然而,这感觉像是某种我宁愿没有的黑客:

<flow name="GetPollingTimes">
    <jdbc:outbound-endpoint queryKey="getPollTimes" exchange-pattern="request-response" />
    <script:transformer>
        <script:script engine="groovy">
            <script:text>
                message.setInvocationProperty('last_poll_start', payload.last_poll_start)
                message.setInvocationProperty('last_poll_end', payload.last_poll_end)
                return payload
            </script:text>
        </script:script>
    </script:transformer>
</flow>

I have found a partial solution to this problem, which uses a groovy script. This, however, feels like some sort of hack that I'd rather be without:

<flow name="GetPollingTimes">
    <jdbc:outbound-endpoint queryKey="getPollTimes" exchange-pattern="request-response" />
    <script:transformer>
        <script:script engine="groovy">
            <script:text>
                message.setInvocationProperty('last_poll_start', payload.last_poll_start)
                message.setInvocationProperty('last_poll_end', payload.last_poll_end)
                return payload
            </script:text>
        </script:script>
    </script:transformer>
</flow>
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文