ActiveMQ 上的重复消息
我使用 ActiveMQ 作为 JMS 代理和消费者,使用 jmsTemplate 发送消息,目前有 1 个非持久主题。在高峰时段,我每秒大约有 100 条消息。
队列中有多少消息并不重要,但我经常收到重复的消息。我提出的临时解决方案是在表上设置索引 - 目前所有消息仅保存在数据库中。
我的第一个问题 - 如果我指定了非持久主题并且不需要响应,为什么消息会重复?
发件人:
@Component
public class QueueSender
{
private Logger log = Logger.getLogger(getClass());
@Autowired
protected JmsTemplate jmsTemplate;
public JmsTemplate getJmsTemplate() {
return jmsTemplate;
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
@Autowired
public QueueSender( final JmsTemplate jmsTemplate )
{
this.jmsTemplate = jmsTemplate;
this.jmsTemplate.setDeliveryPersistent(false);
System.out.println("isSessionTransacted "+jmsTemplate.isSessionTransacted()+
" getDeliveryMode "+jmsTemplate.getDeliveryMode()+
" getReceiveTimeout "+jmsTemplate.getReceiveTimeout()+
" getSessionAcknowledgeMode "+jmsTemplate.getSessionAcknowledgeMode());
}
public void sendPrice(Integer tickerId, Integer field, Double price, Long timestamp)
{
jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
jmsTemplate.setMessageIdEnabled(true);
Map <String, Object>map = new HashMap<String, Object>();
map.put("tickerId", tickerId);
map.put("field", field);
map.put("price", price);
map.put("timestamp", timestamp);
jmsTemplate.convertAndSend("Quotez", map);
}
public void sendVolume(Integer tickerId, Integer field, Integer size, Long timestamp)
{
jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
Map <String, Object>map = new HashMap<String, Object>();
map.put("tickerId", tickerId);
map.put("field", field);
map.put("size", size);
map.put("timestamp", timestamp);
jmsTemplate.convertAndSend("Quotez", map);
}
}
听众:
public void onMessage(Message message)
{
if (message instanceof MapMessage)
{
try
{
MapMessage mapMessage = (MapMessage) message;
if(null != mapMessage.getString("price"))
{
priceService.insert(mapMessage.getInt("tickerId"),mapMessage.getDouble("price"),
mapMessage.getInt("field"),mapMessage.getLong("timestamp"));
} else{
volumeService.insert(mapMessage.getInt("tickerId"),mapMessage.getInt("size"),
mapMessage.getInt("field"),mapMessage.getLong("timestamp"));
}
}
catch (final JMSException e)
{
exceptionListener.onException(e);
}
}
}
春天:
<amq:broker useJmx="true" persistent="false">
<amq:transportConnectors>
<amq:transportConnector uri="tcp://localhost:0"/>
</amq:transportConnectors> </amq:broker>
<amq:topic id="topicDest" physicalName="Quotez"/>
<amq:connectionFactory id="jmsFactory" brokerURL="vm://localhost?jms.watchTopicAdvisories=false"/>
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<constructor-arg ref="jmsFactory" />
<property name="exceptionListener" ref="jmsExceptionListener" />
<property name="sessionCacheSize" value="100" />
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="connectionFactory"/>
<property name="pubSubDomain" value="true"/>
<property name="defaultDestinationName" value="Quotez"/>
</bean>
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="topicDest"/>
<property name="messageListener" ref="jdbcListener" />
</bean>
第二个问题是关于jmsContainer配置。上面的代码和下面的代码有什么区别?上面的代码为我提供了主题作为订阅者,下面的代码为我提供了队列。
<jms:listener-container concurrency="10" connection-factory="connectionFactory">
<jms:listener id="JdbcListener" destination="topicDest" ref="queueListener" />
</jms:listener-container>
我发现,Camel 及其幂等消费者应该解决重复问题 - 当然,最好首先知道为什么会发生这种情况。第三个问题涉及Camel的配置。我有这个配置(默认):
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="brokerURL" value="tcp://localhost:0"/>
</bean>
<bean id="myRepo" class="org.apache.camel.processor.idempotent.MemoryIdempotentRepository"/>
<camelContext xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="direct:start"/>
<idempotentConsumer messageIdRepositoryRef="myRepo">
<header>messageId</header>
<to uri="mock:result"/>
</idempotentConsumer>
</route>
</camelContext>
它适用于所有队列还是我应该进行显式订阅?我想它会检查每个主题/队列和所有传入消息。目前的问题是,所有消息的 messageId=null 并且过滤器将其作为参数。
2011-03-01 11:24:09,152 DEBUG (org.springframework.jms.core.JmsTemplate:567) - Sending created message: ActiveMQMapMessage {commandId = 0, responseRequired = false, **messageId = null**, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false} ActiveMQMapMessage{ theTable = {field=1, timestamp=1298975049138, price=72.89, tickerId=2} }
我没有找到设置 messageId 的简单方法。我的问题 - 设置 messageId 是否足够,并且它将正常工作,或者配置有问题,例如我必须指定将使用哪个主题。
谢谢,
齐达斯
I use ActiveMQ as JMS broker and consumer, jmsTemplate to send the messages, 1 non-durable Topic for the moment. During the peak time I have ~100 messages/second.
It doesn't matter how many messages are in the queue, but I frequently get duplicated messages. The temporary solution that I came up is to set up index on table - for the moment all messages are only saved in database.
My first question - why messages are duplicated, if I specified non-durable Topic and the response is not required?
Sender:
@Component
public class QueueSender
{
private Logger log = Logger.getLogger(getClass());
@Autowired
protected JmsTemplate jmsTemplate;
public JmsTemplate getJmsTemplate() {
return jmsTemplate;
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
@Autowired
public QueueSender( final JmsTemplate jmsTemplate )
{
this.jmsTemplate = jmsTemplate;
this.jmsTemplate.setDeliveryPersistent(false);
System.out.println("isSessionTransacted "+jmsTemplate.isSessionTransacted()+
" getDeliveryMode "+jmsTemplate.getDeliveryMode()+
" getReceiveTimeout "+jmsTemplate.getReceiveTimeout()+
" getSessionAcknowledgeMode "+jmsTemplate.getSessionAcknowledgeMode());
}
public void sendPrice(Integer tickerId, Integer field, Double price, Long timestamp)
{
jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
jmsTemplate.setMessageIdEnabled(true);
Map <String, Object>map = new HashMap<String, Object>();
map.put("tickerId", tickerId);
map.put("field", field);
map.put("price", price);
map.put("timestamp", timestamp);
jmsTemplate.convertAndSend("Quotez", map);
}
public void sendVolume(Integer tickerId, Integer field, Integer size, Long timestamp)
{
jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
Map <String, Object>map = new HashMap<String, Object>();
map.put("tickerId", tickerId);
map.put("field", field);
map.put("size", size);
map.put("timestamp", timestamp);
jmsTemplate.convertAndSend("Quotez", map);
}
}
Listener:
public void onMessage(Message message)
{
if (message instanceof MapMessage)
{
try
{
MapMessage mapMessage = (MapMessage) message;
if(null != mapMessage.getString("price"))
{
priceService.insert(mapMessage.getInt("tickerId"),mapMessage.getDouble("price"),
mapMessage.getInt("field"),mapMessage.getLong("timestamp"));
} else{
volumeService.insert(mapMessage.getInt("tickerId"),mapMessage.getInt("size"),
mapMessage.getInt("field"),mapMessage.getLong("timestamp"));
}
}
catch (final JMSException e)
{
exceptionListener.onException(e);
}
}
}
Spring:
<amq:broker useJmx="true" persistent="false">
<amq:transportConnectors>
<amq:transportConnector uri="tcp://localhost:0"/>
</amq:transportConnectors> </amq:broker>
<amq:topic id="topicDest" physicalName="Quotez"/>
<amq:connectionFactory id="jmsFactory" brokerURL="vm://localhost?jms.watchTopicAdvisories=false"/>
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<constructor-arg ref="jmsFactory" />
<property name="exceptionListener" ref="jmsExceptionListener" />
<property name="sessionCacheSize" value="100" />
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="connectionFactory"/>
<property name="pubSubDomain" value="true"/>
<property name="defaultDestinationName" value="Quotez"/>
</bean>
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="topicDest"/>
<property name="messageListener" ref="jdbcListener" />
</bean>
The second question is concerning jmsContainer configuration. What is the difference between the code above and the code below? The code above gives me Topic as subscriber and the code below gives me Queue.
<jms:listener-container concurrency="10" connection-factory="connectionFactory">
<jms:listener id="JdbcListener" destination="topicDest" ref="queueListener" />
</jms:listener-container>
I found, that Camel and its idempotentConsumer suppose to solve duplication problem - of course, it would be nice to know why it happens in first place. The third question concerns Camel's configuration. I have this configuration (default):
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="brokerURL" value="tcp://localhost:0"/>
</bean>
<bean id="myRepo" class="org.apache.camel.processor.idempotent.MemoryIdempotentRepository"/>
<camelContext xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="direct:start"/>
<idempotentConsumer messageIdRepositoryRef="myRepo">
<header>messageId</header>
<to uri="mock:result"/>
</idempotentConsumer>
</route>
</camelContext>
Does it apply for all queues or should I make explicit subscription? I suppose it will check every topic/queue and all incoming messages. The problem at the moment, that all messages have messageId=null and the filter takes it as the parameter.
2011-03-01 11:24:09,152 DEBUG (org.springframework.jms.core.JmsTemplate:567) - Sending created message: ActiveMQMapMessage {commandId = 0, responseRequired = false, **messageId = null**, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false} ActiveMQMapMessage{ theTable = {field=1, timestamp=1298975049138, price=72.89, tickerId=2} }
I didn't find a easy way of setting messageId. My question - is it enough to set messageId and it will work as excepted or something is wrong with configuration, for example I have to specify which topic will be used.
Thanks,
Dzidas
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
使用 JMS 主题时,您需要将并发/最大并发消费者设置为“1”,否则您将得到重复项。如果您需要多线程消耗和/或负载平衡,请改用虚拟主题。
when using a JMS topic, you need to set the concurrent/max concurrent consumers to "1" or you will get duplicates. if you need multi-threaded consumption and/or load balancing, then use virtual topics instead.