activemq-cpp 如何获取状态发送或接收超时
当我向代理发送消息时,我已经为 activemq 连接设置了超时属性。
但是当发送超时时我无法得到任何异常或返回。
我无法获得发送成功或超时的状态。
当我使用 receive(long long timeout); 时也会发生这种情况
有什么方法可以区分这两种状态吗?
版本 Activemq 5.4.2 activemq-cpp 3.2.5
URI:
failover:(tcp://192.168.32.11:61617) without any option, all use default.
连接代码:
bool CActiveMqProducer::Initial()
{
try
{
activemq::library::ActiveMQCPP::initializeLibrary();
//sure has been cleaned up before initial
if (!m_bCleanUp)
CleanUp();
if(m_strBrokerURI == "" || m_strDestURI == "")
{
printf("MQ initial failed for m_strBrokerURI == \"\" || m_strDestURI == \"\"\n");
return false;
}
//create a connection factory
auto_ptr<ActiveMQConnectionFactory> ConnFactoryPtr(new ActiveMQConnectionFactory(m_strBrokerURI, m_strAccount, m_strPsw));
// Create a Connection
try
{
m_pConnObj = ConnFactoryPtr->createConnection();
if(m_pConnObj != NULL)
{
ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>(m_pConnObj);
amqConnection->setSendTimeout(m_unSendTimeout);
//here set send timeout option
}
else
{
return false;
}
m_pConnObj->start();
}
catch (CMSException& e)
{
e.printStackTrace();
throw e;
}
// Create a Session
if (m_bClientAck)
{
m_pSession = m_pConnObj->createSession(Session::CLIENT_ACKNOWLEDGE);
if(m_pSession == NULL)
return false;
}
else
{
m_pSession = m_pConnObj->createSession(Session::AUTO_ACKNOWLEDGE);
if(m_pSession == NULL)
return false;
}
// Create the destination (Topic or Queue)
if (m_bUseTopic)
{
m_pMsgDest = m_pSession->createTopic(m_strDestURI);
if(m_pMsgDest == NULL)
return false;
}
else
{
m_pMsgDest = m_pSession->createQueue(m_strDestURI);
if(m_pMsgDest == NULL)
return false;
}
// Create a MessageProducer from the Session to the Topic or Queue
m_pMsgProducer = m_pSession->createProducer(m_pMsgDest);
if(m_pMsgProducer == NULL)
return false;
if(m_bPresistent)
{
m_pMsgProducer->setDeliveryMode(DeliveryMode::PERSISTENT);
}
else
{
m_pMsgProducer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
}
//control the logic
m_bInitialized = true;
m_bCleanUp = false;
}
catch (CMSException& e)
{
e.printStackTrace();
return false;
}
return true;
}
发送代码:
bool CActiveMqProducer::SendTextMessage(const char* msg, int deliveryMode, int priority, long long timeToLive, std::map<std::string,std::string> property)
{
try
{
if(!m_bInitialized)
{
printf("MQ client has not been initialized!\n");
return false;
}
TextMessage * tmsg = m_pSession->createTextMessage();
tmsg->setText(msg);
std::map<std::string, std::string>::iterator it = property.begin();
for(; it != property.end(); it++)
{
tmsg->setStringProperty(it->first,it->second);
}
m_pMsgProducer->send(tmsg, deliveryMode, priority, timeToLive);
delete tmsg;
}
catch(MessageFormatException &e)
{
e.printStackTrace();
return false;
}
catch(InvalidDestinationException &e)
{
e.printStackTrace();
return false;
}
catch(UnsupportedOperationException &e)
{
e.printStackTrace();
return false;
}
catch(CMSException &e)
{
//if an internal error occurs while sending the message.
e.printStackTrace();
return false;
}
return true;
}
I have set the timeout property for a activemq connection when I send message to broker.
But I couldn't get any exception or return when it send timeout.
I couldn't gain the status of sending succeed or timeout.
This also happened when I use receive(long long timeout);
Is there any way to distinguish these two states?
Version Activemq 5.4.2 activemq-cpp 3.2.5
URI:
failover:(tcp://192.168.32.11:61617) without any option, all use default.
Connection code:
bool CActiveMqProducer::Initial()
{
try
{
activemq::library::ActiveMQCPP::initializeLibrary();
//sure has been cleaned up before initial
if (!m_bCleanUp)
CleanUp();
if(m_strBrokerURI == "" || m_strDestURI == "")
{
printf("MQ initial failed for m_strBrokerURI == \"\" || m_strDestURI == \"\"\n");
return false;
}
//create a connection factory
auto_ptr<ActiveMQConnectionFactory> ConnFactoryPtr(new ActiveMQConnectionFactory(m_strBrokerURI, m_strAccount, m_strPsw));
// Create a Connection
try
{
m_pConnObj = ConnFactoryPtr->createConnection();
if(m_pConnObj != NULL)
{
ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>(m_pConnObj);
amqConnection->setSendTimeout(m_unSendTimeout);
//here set send timeout option
}
else
{
return false;
}
m_pConnObj->start();
}
catch (CMSException& e)
{
e.printStackTrace();
throw e;
}
// Create a Session
if (m_bClientAck)
{
m_pSession = m_pConnObj->createSession(Session::CLIENT_ACKNOWLEDGE);
if(m_pSession == NULL)
return false;
}
else
{
m_pSession = m_pConnObj->createSession(Session::AUTO_ACKNOWLEDGE);
if(m_pSession == NULL)
return false;
}
// Create the destination (Topic or Queue)
if (m_bUseTopic)
{
m_pMsgDest = m_pSession->createTopic(m_strDestURI);
if(m_pMsgDest == NULL)
return false;
}
else
{
m_pMsgDest = m_pSession->createQueue(m_strDestURI);
if(m_pMsgDest == NULL)
return false;
}
// Create a MessageProducer from the Session to the Topic or Queue
m_pMsgProducer = m_pSession->createProducer(m_pMsgDest);
if(m_pMsgProducer == NULL)
return false;
if(m_bPresistent)
{
m_pMsgProducer->setDeliveryMode(DeliveryMode::PERSISTENT);
}
else
{
m_pMsgProducer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
}
//control the logic
m_bInitialized = true;
m_bCleanUp = false;
}
catch (CMSException& e)
{
e.printStackTrace();
return false;
}
return true;
}
Send code:
bool CActiveMqProducer::SendTextMessage(const char* msg, int deliveryMode, int priority, long long timeToLive, std::map<std::string,std::string> property)
{
try
{
if(!m_bInitialized)
{
printf("MQ client has not been initialized!\n");
return false;
}
TextMessage * tmsg = m_pSession->createTextMessage();
tmsg->setText(msg);
std::map<std::string, std::string>::iterator it = property.begin();
for(; it != property.end(); it++)
{
tmsg->setStringProperty(it->first,it->second);
}
m_pMsgProducer->send(tmsg, deliveryMode, priority, timeToLive);
delete tmsg;
}
catch(MessageFormatException &e)
{
e.printStackTrace();
return false;
}
catch(InvalidDestinationException &e)
{
e.printStackTrace();
return false;
}
catch(UnsupportedOperationException &e)
{
e.printStackTrace();
return false;
}
catch(CMSException &e)
{
//if an internal error occurs while sending the message.
e.printStackTrace();
return false;
}
return true;
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
首先,我建议使用最新版本 v3.4.1,有许多修复可能会影响此问题。接下来,当您向 receive 传递超时值时,您应该会得到一个 NULL 返回。
至于您说要设置的超时选项,您应该更清楚您要设置的选项,因为有几个选项。有一个 requestTimeout 用于在同步发送消息时超时,并且故障转移传输上有一个超时选项,当代理连接断开时也会发挥作用。
由于您已经显示了任何代码或 URI,所以很难说这里发生了什么,但是对于发送来说,如果您没有设置 AlwaysSyncSend 选项,那么消息可能会异步发送。
First off I recommend using the latest release v3.4.1 there are many fixes that could affect this. Next you should get a NULL returned from receive when you pass it a timeout value.
As for the timeout option you say you are setting you should be more clear on what option you are setting as there are a couple. There is a requestTimeout that is used to timeout when a message is sent synchronously, and there is a timeout option on the failover transport that also comes into play when the broker connection is down.
Since you have shown any code or the URI its hard to say what all is going on here but for a send if you haven't set the AlwaysSyncSend option then its possible the message is getting sent async.