来自 jdbc 源的 Mule 消息丰富器
我想用两个变量 LAST_POLL_START 和 LAST_POLL_END 来丰富当前消息。然后,这些变量将用于构建另一个 jdbc 查询。
但是,我不明白应该如何使用浓缩器。我尝试过的代码如下,它的灵感来自于 此处 和 Mule 的博客。在我看来,这应该有效。我使用的是 mule-3.2.0,jdbc 查询可以工作并返回正确的值。
<flow name="MasterFlow" processingStrategy="synchronous">
<quartz:inbound-endpoint jobName="cronJobPoolTime" cronExpression="0 0/1 * * * ?">
<quartz:endpoint-polling-job>
<quartz:job-endpoint ref="jdbc_quartzDummy"/>
</quartz:endpoint-polling-job>
</quartz:inbound-endpoint>
<flow-ref name="GetPollingTimes" />
<!-- Do other things -->
</flow>
<!-- Below is needed when using Quartz trigger -->
<jdbc:endpoint name="jdbc_quartzDummy" connector-ref="tabuConnector" queryKey="quartzDummy" />
<flow name="GetPollingTimes">
<enricher>
<jdbc:outbound-endpoint queryKey="getPollTimes" exchange-pattern="request-response" />
<enrich target="#[variable:last_poll_start]" source="#[map-payload:LAST_POLL_START]"/>
<enrich target="#[variable:last_poll_end]" source="#[map-payload:LAST_POLL_END]"/>
</enricher>
<logger level="INFO" message="last_poll_start = #[variable:last_poll_start]" />
</flow>
错误消息是:
WARN 2011-12-27 15:38:00,831 [scheduler-ESB_Worker-1] org.mule.DefaultMuleMessage: setProperty(key, value) called with null value; removing key: last_poll_start; please report the following stack trace to [email protected]
java.lang.Throwable
at org.mule.DefaultMuleMessage.setProperty(DefaultMuleMessage.java:456)
at org.mule.expression.VariableExpressionEnricher.enrich(VariableExpressionEnricher.java:24)
at org.mule.expression.DefaultExpressionManager.enrich(DefaultExpressionManager.java:248)
at org.mule.expression.DefaultExpressionManager.enrich(DefaultExpressionManager.java:237)
at org.mule.enricher.MessageEnricher.enrich(MessageEnricher.java:69)
at org.mule.enricher.MessageEnricher.process(MessageEnricher.java:43)
at org.mule.processor.chain.DefaultMessageProcessorChain.doProcess(DefaultMessageProcessorChain.java:99)
at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:64)
at org.mule.processor.AbstractInterceptingMessageProcessorBase.processNext(AbstractInterceptingMessageProcessorBase.java:105)
at org.mule.processor.AsyncInterceptingMessageProcessor.process(AsyncInterceptingMessageProcessor.java:90)
at org.mule.processor.chain.DefaultMessageProcessorChain.doProcess(DefaultMessageProcessorChain.java:99)
at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:64)
at org.mule.processor.AbstractInterceptingMessageProcessorBase.processNext(AbstractInterceptingMessageProcessorBase.java:105)
at org.mule.interceptor.AbstractEnvelopeInterceptor.process(AbstractEnvelopeInterceptor.java:55)
at org.mule.processor.AbstractInterceptingMessageProcessorBase.processNext(AbstractInterceptingMessageProcessorBase.java:105)
at org.mule.processor.AbstractFilteringMessageProcessor.process(AbstractFilteringMessageProcessor.java:41)
at org.mule.processor.chain.DefaultMessageProcessorChain.doProcess(DefaultMessageProcessorChain.java:99)
at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:64)
at org.mule.processor.chain.InterceptingChainLifecycleWrapper.doProcess(InterceptingChainLifecycleWrapper.java:56)
at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:64)
at org.mule.processor.chain.InterceptingChainLifecycleWrapper.process(InterceptingChainLifecycleWrapper.java:87)
at org.mule.construct.Flow.process(Flow.java:64)
at org.mule.config.spring.factories.FlowRefFactoryBean$1.process(FlowRefFactoryBean.java:44)
at org.mule.processor.chain.DefaultMessageProcessorChain.doProcess(DefaultMessageProcessorChain.java:99)
at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:64)
at org.mule.processor.AbstractInterceptingMessageProcessorBase.processNext(AbstractInterceptingMessageProcessorBase.java:105)
at org.mule.interceptor.AbstractEnvelopeInterceptor.process(AbstractEnvelopeInterceptor.java:55)
at org.mule.processor.AbstractInterceptingMessageProcessorBase.processNext(AbstractInterceptingMessageProcessorBase.java:105)
at org.mule.processor.AbstractFilteringMessageProcessor.process(AbstractFilteringMessageProcessor.java:41)
at org.mule.processor.chain.DefaultMessageProcessorChain.doProcess(DefaultMessageProcessorChain.java:99)
at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:64)
at org.mule.processor.chain.InterceptingChainLifecycleWrapper.doProcess(InterceptingChainLifecycleWrapper.java:56)
at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:64)
at org.mule.processor.chain.InterceptingChainLifecycleWrapper.process(InterceptingChainLifecycleWrapper.java:87)
at org.mule.construct.AbstractPipeline$1.process(AbstractPipeline.java:138)
at org.mule.processor.chain.DefaultMessageProcessorChain.doProcess(DefaultMessageProcessorChain.java:99)
at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:64)
at org.mule.processor.chain.InterceptingChainLifecycleWrapper.doProcess(InterceptingChainLifecycleWrapper.java:56)
at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:64)
at org.mule.processor.chain.InterceptingChainLifecycleWrapper.process(InterceptingChainLifecycleWrapper.java:87)
at org.mule.processor.chain.DefaultMessageProcessorChain.doProcess(DefaultMessageProcessorChain.java:99)
at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:64)
at org.mule.processor.chain.InterceptingChainLifecycleWrapper.doProcess(InterceptingChainLifecycleWrapper.java:56)
at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:64)
at org.mule.processor.chain.InterceptingChainLifecycleWrapper.process(InterceptingChainLifecycleWrapper.java:87)
at org.mule.transport.AbstractMessageReceiver.routeMessage(AbstractMessageReceiver.java:190)
at org.mule.transport.AbstractMessageReceiver.routeMessage(AbstractMessageReceiver.java:163)
at org.mule.transport.AbstractMessageReceiver.routeMessage(AbstractMessageReceiver.java:150)
at org.mule.transport.quartz.jobs.EndpointPollingJob$1.doInTransaction(EndpointPollingJob.java:162)
at org.mule.transport.quartz.jobs.EndpointPollingJob$1.doInTransaction(EndpointPollingJob.java:125)
at org.mule.transaction.TransactionTemplate.execute(TransactionTemplate.java:45)
at org.mule.transport.quartz.jobs.EndpointPollingJob.doExecute(EndpointPollingJob.java:169)
at org.mule.transport.quartz.jobs.AbstractJob.execute(AbstractJob.java:36)
at org.quartz.core.JobRunShell.run(JobRunShell.java:202)
at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:534)
将丰富源更改为#[payload]:
<enrich target="#[variable:last_poll_start]" source="#[payload]"/>
<logger level="INFO" message="last_poll_start = #[variable:last_poll_start]" />
产生(来自日志文件):
org.mule.api.processor.LoggerMessageProcessor: last_poll_start = [{LAST_POLL_START=2011-12-21, LAST_POLL_END=2000-01-01}]
I'd like to enrich the current message with two variables, LAST_POLL_START and LAST_POLL_END. These variables are then going to be used to build another jdbc query.
However, I cannot understand how I should use the enricher. The code I've tried is below, and it is inspired by the code found here and on Mule's blog. In my mind, this should work. I'm using mule-3.2.0, and the jdbc query works and returns the correct values.
<flow name="MasterFlow" processingStrategy="synchronous">
<quartz:inbound-endpoint jobName="cronJobPoolTime" cronExpression="0 0/1 * * * ?">
<quartz:endpoint-polling-job>
<quartz:job-endpoint ref="jdbc_quartzDummy"/>
</quartz:endpoint-polling-job>
</quartz:inbound-endpoint>
<flow-ref name="GetPollingTimes" />
<!-- Do other things -->
</flow>
<!-- Below is needed when using Quartz trigger -->
<jdbc:endpoint name="jdbc_quartzDummy" connector-ref="tabuConnector" queryKey="quartzDummy" />
<flow name="GetPollingTimes">
<enricher>
<jdbc:outbound-endpoint queryKey="getPollTimes" exchange-pattern="request-response" />
<enrich target="#[variable:last_poll_start]" source="#[map-payload:LAST_POLL_START]"/>
<enrich target="#[variable:last_poll_end]" source="#[map-payload:LAST_POLL_END]"/>
</enricher>
<logger level="INFO" message="last_poll_start = #[variable:last_poll_start]" />
</flow>
The error message is:
WARN 2011-12-27 15:38:00,831 [scheduler-ESB_Worker-1] org.mule.DefaultMuleMessage: setProperty(key, value) called with null value; removing key: last_poll_start; please report the following stack trace to [email protected]
java.lang.Throwable
at org.mule.DefaultMuleMessage.setProperty(DefaultMuleMessage.java:456)
at org.mule.expression.VariableExpressionEnricher.enrich(VariableExpressionEnricher.java:24)
at org.mule.expression.DefaultExpressionManager.enrich(DefaultExpressionManager.java:248)
at org.mule.expression.DefaultExpressionManager.enrich(DefaultExpressionManager.java:237)
at org.mule.enricher.MessageEnricher.enrich(MessageEnricher.java:69)
at org.mule.enricher.MessageEnricher.process(MessageEnricher.java:43)
at org.mule.processor.chain.DefaultMessageProcessorChain.doProcess(DefaultMessageProcessorChain.java:99)
at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:64)
at org.mule.processor.AbstractInterceptingMessageProcessorBase.processNext(AbstractInterceptingMessageProcessorBase.java:105)
at org.mule.processor.AsyncInterceptingMessageProcessor.process(AsyncInterceptingMessageProcessor.java:90)
at org.mule.processor.chain.DefaultMessageProcessorChain.doProcess(DefaultMessageProcessorChain.java:99)
at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:64)
at org.mule.processor.AbstractInterceptingMessageProcessorBase.processNext(AbstractInterceptingMessageProcessorBase.java:105)
at org.mule.interceptor.AbstractEnvelopeInterceptor.process(AbstractEnvelopeInterceptor.java:55)
at org.mule.processor.AbstractInterceptingMessageProcessorBase.processNext(AbstractInterceptingMessageProcessorBase.java:105)
at org.mule.processor.AbstractFilteringMessageProcessor.process(AbstractFilteringMessageProcessor.java:41)
at org.mule.processor.chain.DefaultMessageProcessorChain.doProcess(DefaultMessageProcessorChain.java:99)
at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:64)
at org.mule.processor.chain.InterceptingChainLifecycleWrapper.doProcess(InterceptingChainLifecycleWrapper.java:56)
at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:64)
at org.mule.processor.chain.InterceptingChainLifecycleWrapper.process(InterceptingChainLifecycleWrapper.java:87)
at org.mule.construct.Flow.process(Flow.java:64)
at org.mule.config.spring.factories.FlowRefFactoryBean$1.process(FlowRefFactoryBean.java:44)
at org.mule.processor.chain.DefaultMessageProcessorChain.doProcess(DefaultMessageProcessorChain.java:99)
at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:64)
at org.mule.processor.AbstractInterceptingMessageProcessorBase.processNext(AbstractInterceptingMessageProcessorBase.java:105)
at org.mule.interceptor.AbstractEnvelopeInterceptor.process(AbstractEnvelopeInterceptor.java:55)
at org.mule.processor.AbstractInterceptingMessageProcessorBase.processNext(AbstractInterceptingMessageProcessorBase.java:105)
at org.mule.processor.AbstractFilteringMessageProcessor.process(AbstractFilteringMessageProcessor.java:41)
at org.mule.processor.chain.DefaultMessageProcessorChain.doProcess(DefaultMessageProcessorChain.java:99)
at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:64)
at org.mule.processor.chain.InterceptingChainLifecycleWrapper.doProcess(InterceptingChainLifecycleWrapper.java:56)
at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:64)
at org.mule.processor.chain.InterceptingChainLifecycleWrapper.process(InterceptingChainLifecycleWrapper.java:87)
at org.mule.construct.AbstractPipeline$1.process(AbstractPipeline.java:138)
at org.mule.processor.chain.DefaultMessageProcessorChain.doProcess(DefaultMessageProcessorChain.java:99)
at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:64)
at org.mule.processor.chain.InterceptingChainLifecycleWrapper.doProcess(InterceptingChainLifecycleWrapper.java:56)
at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:64)
at org.mule.processor.chain.InterceptingChainLifecycleWrapper.process(InterceptingChainLifecycleWrapper.java:87)
at org.mule.processor.chain.DefaultMessageProcessorChain.doProcess(DefaultMessageProcessorChain.java:99)
at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:64)
at org.mule.processor.chain.InterceptingChainLifecycleWrapper.doProcess(InterceptingChainLifecycleWrapper.java:56)
at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:64)
at org.mule.processor.chain.InterceptingChainLifecycleWrapper.process(InterceptingChainLifecycleWrapper.java:87)
at org.mule.transport.AbstractMessageReceiver.routeMessage(AbstractMessageReceiver.java:190)
at org.mule.transport.AbstractMessageReceiver.routeMessage(AbstractMessageReceiver.java:163)
at org.mule.transport.AbstractMessageReceiver.routeMessage(AbstractMessageReceiver.java:150)
at org.mule.transport.quartz.jobs.EndpointPollingJob$1.doInTransaction(EndpointPollingJob.java:162)
at org.mule.transport.quartz.jobs.EndpointPollingJob$1.doInTransaction(EndpointPollingJob.java:125)
at org.mule.transaction.TransactionTemplate.execute(TransactionTemplate.java:45)
at org.mule.transport.quartz.jobs.EndpointPollingJob.doExecute(EndpointPollingJob.java:169)
at org.mule.transport.quartz.jobs.AbstractJob.execute(AbstractJob.java:36)
at org.quartz.core.JobRunShell.run(JobRunShell.java:202)
at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:534)
Changing the enrich source to #[payload]:
<enrich target="#[variable:last_poll_start]" source="#[payload]"/>
<logger level="INFO" message="last_poll_start = #[variable:last_poll_start]" />
Produces (from log file):
org.mule.api.processor.LoggerMessageProcessor: last_poll_start = [{LAST_POLL_START=2011-12-21, LAST_POLL_END=2000-01-01}]
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
受到前面答案的启发,我注意到可以在丰富器中使用 groovy 语法。如果其他人遇到这个问题,这里是解决方案。
Inspired by the previous answer, I noticed that it is possible to use the groovy syntax in the enricher. Here is the solution, if someone else were to run into this problem.
我找到了这个问题的部分解决方案,它使用 groovy 脚本。然而,这感觉像是某种我宁愿没有的黑客:
I have found a partial solution to this problem, which uses a groovy script. This, however, feels like some sort of hack that I'd rather be without: