Apache Camel - 发送 JMS 消息时发出警告

发布于 2024-11-29 17:48:00 字数 5694 浏览 0 评论 0原文

我想使用 Camel 将消息发送到 JMS 队列中。正文是字节数组,我希望 Camel 将其转换为 javax.jms.BytesMessage 并毫无问题地发送它,但我收到以下警告。

警告

[org.apache.camel.component.jms.JmsBinding] (Camel (camel) thread #0 -
JmsConsumer[devrechTransactionsQueue@z4smq_4001) 
Cannot determine specific JmsMessage type to use from body class. 
Will use generic JmsMessage. Body class: org.apache.camel.impl.DefaultMessage.  
If you want to send a POJO then your class might need to implement java.io.Serializable,
or you can force a specific type by setting the jmsMessageType option on the JMS endpoint.

如果我尝试手动设置类型 (exchange.getIn().setHeader("CamelJmsMessageType", JmsMessageType.Bytes);),我会收到 NullPointerException。

异常

[org.apache.camel.processor.DefaultErrorHandler] (Camel (camel) thread #0 - JmsConsumer[devrechTransactionsQueue@z4smq_4001) Failed delivery for exchangeId: ID-madansportapp02-5
1560-1313509081079-0-1. Exhausted after delivery attempt: 1 caught: java.lang.NullPointerException
java.lang.NullPointerException
    at com.swiftmq.tools.util.DataByteArrayOutputStream.write(Unknown Source)
    at com.swiftmq.jms.BytesMessageImpl.writeBytes(Unknown Source)
    at org.apache.camel.component.jms.JmsBinding.createJmsMessageForType(JmsBinding.java:506)
    at org.apache.camel.component.jms.JmsBinding.createJmsMessage(JmsBinding.java:443)
    at org.apache.camel.component.jms.JmsBinding.makeJmsMessage(JmsBinding.java:267)
    at org.apache.camel.component.jms.JmsProducer$2.createMessage(JmsProducer.java:225)
    at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSendToDestination(JmsConfiguration.java:198)
    at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.access$100(JmsConfiguration.java:141)
    at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate$3.doInJms(JmsConfiguration.java:175)
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:466)
    at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.send(JmsConfiguration.java:172)
    at org.apache.camel.component.jms.JmsProducer.doSend(JmsProducer.java:347)
    at org.apache.camel.component.jms.JmsProducer.processInOnly(JmsProducer.java:303)
    at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:101)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:78)
    at org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:114)
    at org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:286)
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:109)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:78)
    at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:69)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:78)
    at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)
    at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:99)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:78)
    at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:318)
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:209)
    at org.apache.camel.processor.DefaultChannel.process(DefaultChannel.java:305)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:78)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:116)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:79)
    at org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:102)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:78)
    at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:69)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:104)
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:85)
    at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:91)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:560)
...

代码

public class MessageTranslator implements Processor {

@Override
public void process(Exchange exchange) throws Exception {
    RechargeType transaction = (RechargeType) exchange.getIn().getBody();
    exchange.getIn().setBody(createMessage(transaction));
}

private Message createMessage(RechargeType transaction) throws DataException, IOException {
    Message message = new DefaultMessage(); // camel message
    Request request = new Request(transaction); // our request

    final byte[] serializedPayload = RequestSerializationHelper.getSerializedPayload(request); // serialized request
    message.setBody(serializedPayload);

    return message;
}
}

I want to send a message into a JMS queue with Camel. The body is byte array and I would expect that Camel converts it to javax.jms.BytesMessage and sends it without a problem, but I get the following warning.

Warning

[org.apache.camel.component.jms.JmsBinding] (Camel (camel) thread #0 -
JmsConsumer[devrechTransactionsQueue@z4smq_4001) 
Cannot determine specific JmsMessage type to use from body class. 
Will use generic JmsMessage. Body class: org.apache.camel.impl.DefaultMessage.  
If you want to send a POJO then your class might need to implement java.io.Serializable,
or you can force a specific type by setting the jmsMessageType option on the JMS endpoint.

If I try to set the type manually (exchange.getIn().setHeader("CamelJmsMessageType", JmsMessageType.Bytes);), I get the NullPointerException.

Exception

[org.apache.camel.processor.DefaultErrorHandler] (Camel (camel) thread #0 - JmsConsumer[devrechTransactionsQueue@z4smq_4001) Failed delivery for exchangeId: ID-madansportapp02-5
1560-1313509081079-0-1. Exhausted after delivery attempt: 1 caught: java.lang.NullPointerException
java.lang.NullPointerException
    at com.swiftmq.tools.util.DataByteArrayOutputStream.write(Unknown Source)
    at com.swiftmq.jms.BytesMessageImpl.writeBytes(Unknown Source)
    at org.apache.camel.component.jms.JmsBinding.createJmsMessageForType(JmsBinding.java:506)
    at org.apache.camel.component.jms.JmsBinding.createJmsMessage(JmsBinding.java:443)
    at org.apache.camel.component.jms.JmsBinding.makeJmsMessage(JmsBinding.java:267)
    at org.apache.camel.component.jms.JmsProducer$2.createMessage(JmsProducer.java:225)
    at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSendToDestination(JmsConfiguration.java:198)
    at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.access$100(JmsConfiguration.java:141)
    at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate$3.doInJms(JmsConfiguration.java:175)
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:466)
    at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.send(JmsConfiguration.java:172)
    at org.apache.camel.component.jms.JmsProducer.doSend(JmsProducer.java:347)
    at org.apache.camel.component.jms.JmsProducer.processInOnly(JmsProducer.java:303)
    at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:101)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:78)
    at org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:114)
    at org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:286)
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:109)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:78)
    at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:69)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:78)
    at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)
    at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:99)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:78)
    at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:318)
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:209)
    at org.apache.camel.processor.DefaultChannel.process(DefaultChannel.java:305)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:78)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:116)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:79)
    at org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:102)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:78)
    at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:69)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:104)
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:85)
    at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:91)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:560)
...

Code

public class MessageTranslator implements Processor {

@Override
public void process(Exchange exchange) throws Exception {
    RechargeType transaction = (RechargeType) exchange.getIn().getBody();
    exchange.getIn().setBody(createMessage(transaction));
}

private Message createMessage(RechargeType transaction) throws DataException, IOException {
    Message message = new DefaultMessage(); // camel message
    Request request = new Request(transaction); // our request

    final byte[] serializedPayload = RequestSerializationHelper.getSerializedPayload(request); // serialized request
    message.setBody(serializedPayload);

    return message;
}
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

苍景流年 2024-12-06 17:48:01

或者,如果您想发送可序列化的对象:

import org.apache.camel.ProducerTemplate;

@Produce(uri = "activemq:queue:RechargeType")
private ProducerTemplate producerTemplate;

RechargeType rechargeType = new RechargeType();
producerTemplate.sendBody(rechargeType);

并在 Processor 类中接收

RechargeType request = (RechargeType) exchange.getIn().getBody();

,但 RechargeType 和所有子类必须实现可序列化

Or if you want send serializable object:

import org.apache.camel.ProducerTemplate;

@Produce(uri = "activemq:queue:RechargeType")
private ProducerTemplate producerTemplate;

RechargeType rechargeType = new RechargeType();
producerTemplate.sendBody(rechargeType);

and receive in Processor class

RechargeType request = (RechargeType) exchange.getIn().getBody();

but RechargeType and all subclasses must implements Serializable

伴梦长久 2024-12-06 17:48:00

您不应该将正文设置为 Camel 消息,而应将您想要用作有效负载的纯字节 [] 设置为。

因此 createMessage 方法应该返回 byte[]。

You should not set the body as a Camel Message, but just the plain byte[] you want to use as payload.

So the createMessage method should return the byte[] instead.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文