Oracle AQ/JMS - 为什么在应用程序关闭时队列被清除?
我有一个使用 JMS 接口对来自 Oracle AQ 的消息进行排队和出列的应用程序。当应用程序运行时,项目会排队和出队,我可以在队列表中看到排队的项目。然而,一旦应用程序关闭,队列表就会被清除,并且应用程序无法访问先前排队的项目。知道什么可能导致这种行为吗?
Oracle AQ 是使用以下代码创建的:
BEGIN
dbms_aqadm.create_queue_table(
queue_table => 'schema.my_queuetable',
sort_list =>'priority,enq_time',
comment => 'Queue table to hold my data',
multiple_consumers => FALSE, -- THis is necessary so that a message is only processed by a single consumer
queue_payload_type => 'SYS.AQ$_JMS_OBJECT_MESSAGE',
compatible => '10.0.0',
storage_clause => 'TABLESPACE LGQUEUE_IRM01');
END;
/
BEGIN
dbms_aqadm.create_queue (
queue_name => 'schema.my_queue',
queue_table => 'schema.my_queuetable');
END;
/
BEGIN
dbms_aqadm.start_queue(queue_name=>'schema.my_queue');
END;
/
我还有一个 Java 类,用于连接到队列、对项目进行排队以及处理出队的项目,如下所示:
public class MyOperationsQueueImpl implements MyOperationsQueue {
private static final Log LOGGER = LogFactory.getLog(MyOperationsQueueImpl.class);
private final QueueConnection queueConnection;
private final QueueSession producerQueueSession;
private final QueueSession consumerQueueSession;
private final String queueName;
private final QueueSender queueSender;
private final QueueReceiver queueReceiver;
private MyOperationsQueue.MyOperationEventReceiver eventReceiver;
public MyOperationsQueueImpl(DBUtils dbUtils, String queueName) throws MyException {
this.eventReceiver = null;
this.queueName = queueName;
try {
DataSource ds = dbUtils.getDataSource();
QueueConnectionFactory connectionFactory = AQjmsFactory.getQueueConnectionFactory(ds);
this.queueConnection = connectionFactory.createQueueConnection();
// We create separate producer and consumer sessions because that is what is recommended by the docs
// See: https://docs.oracle.com/javaee/6/api/javax/jms/Session.html
this.producerQueueSession = this.queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
this.consumerQueueSession = this.queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
this.queueSender = this.producerQueueSession.createSender(this.producerQueueSession.createQueue(this.queueName));
this.queueReceiver = this.consumerQueueSession.createReceiver(this.consumerQueueSession.createQueue(this.queueName));
this.queueConnection.start();
} catch (JMSException| NamingException exception) {
throw new MyOperationException("Failed to create MyOperationsQueue", exception);
}
}
@Override
protected void finalize() throws Throwable {
this.queueReceiver.close();
this.queueSender.close();
this.consumerQueueSession.close();
this.producerQueueSession.close();
this.queueConnection.close();
super.finalize();
}
@Override
public void submitMyOperation(MyOperationParameters myParameters) throws MyOperationException {
try {
ObjectMessage message = this.producerQueueSession.createObjectMessage(myParameters);
this.queueSender.send(message);
synchronized (this) {
if(this.eventReceiver != null) {
this.eventReceiver.onOperationSubmitted(message.getJMSMessageID(), myParameters);
}
}
} catch (JMSException exc) {
throw new MyOperationException("Failed to submit my operation", exc);
}
}
@Override
public void setMyOperationEventReceiver(MyOperationEventReceiver operationReceiver) throws MyOperationException {
LOGGER.debug("Setting my operation event receiver");
synchronized (this) {
if(this.eventReceiver != null) {
throw new IllegalStateException("Cannot set an operation event receiver if it is already set");
}
this.eventReceiver = operationReceiver;
try {
this.queueReceiver.setMessageListener(message -> {
LOGGER.debug("New message received from queue receiver");
try {
ObjectMessage objectMessage = (ObjectMessage) message;
eventReceiver.onOperationReady(message.getJMSMessageID(), (MyOperationParameters) objectMessage.getObject());
} catch (Exception exception) {
try {
eventReceiver.onOperationRetrievalFailed(message.getJMSMessageID(), exception);
} catch (JMSException innerException) {
LOGGER.error("Failed to get message ID for JMS Message: "+message, innerException);
}
}
});
} catch (JMSException exc) {
throw new MyOperationException("Failed to set My message listener", exc);
}
}
}
}
I have an application that queues and deques messages from Oracle AQ using the JMS interface. When the application is running items get queued and dequeued and I can see queued items in the queue table. However, one the application shuts down the queue table is cleared and the application cannot access the previously queued items. Any idea what might cause that behavior?
The Oracle AQ is created using this code:
BEGIN
dbms_aqadm.create_queue_table(
queue_table => 'schema.my_queuetable',
sort_list =>'priority,enq_time',
comment => 'Queue table to hold my data',
multiple_consumers => FALSE, -- THis is necessary so that a message is only processed by a single consumer
queue_payload_type => 'SYS.AQ$_JMS_OBJECT_MESSAGE',
compatible => '10.0.0',
storage_clause => 'TABLESPACE LGQUEUE_IRM01');
END;
/
BEGIN
dbms_aqadm.create_queue (
queue_name => 'schema.my_queue',
queue_table => 'schema.my_queuetable');
END;
/
BEGIN
dbms_aqadm.start_queue(queue_name=>'schema.my_queue');
END;
/
I also have a Java class for connecting to the queue, queueing items and processing dequeued items like this:
public class MyOperationsQueueImpl implements MyOperationsQueue {
private static final Log LOGGER = LogFactory.getLog(MyOperationsQueueImpl.class);
private final QueueConnection queueConnection;
private final QueueSession producerQueueSession;
private final QueueSession consumerQueueSession;
private final String queueName;
private final QueueSender queueSender;
private final QueueReceiver queueReceiver;
private MyOperationsQueue.MyOperationEventReceiver eventReceiver;
public MyOperationsQueueImpl(DBUtils dbUtils, String queueName) throws MyException {
this.eventReceiver = null;
this.queueName = queueName;
try {
DataSource ds = dbUtils.getDataSource();
QueueConnectionFactory connectionFactory = AQjmsFactory.getQueueConnectionFactory(ds);
this.queueConnection = connectionFactory.createQueueConnection();
// We create separate producer and consumer sessions because that is what is recommended by the docs
// See: https://docs.oracle.com/javaee/6/api/javax/jms/Session.html
this.producerQueueSession = this.queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
this.consumerQueueSession = this.queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
this.queueSender = this.producerQueueSession.createSender(this.producerQueueSession.createQueue(this.queueName));
this.queueReceiver = this.consumerQueueSession.createReceiver(this.consumerQueueSession.createQueue(this.queueName));
this.queueConnection.start();
} catch (JMSException| NamingException exception) {
throw new MyOperationException("Failed to create MyOperationsQueue", exception);
}
}
@Override
protected void finalize() throws Throwable {
this.queueReceiver.close();
this.queueSender.close();
this.consumerQueueSession.close();
this.producerQueueSession.close();
this.queueConnection.close();
super.finalize();
}
@Override
public void submitMyOperation(MyOperationParameters myParameters) throws MyOperationException {
try {
ObjectMessage message = this.producerQueueSession.createObjectMessage(myParameters);
this.queueSender.send(message);
synchronized (this) {
if(this.eventReceiver != null) {
this.eventReceiver.onOperationSubmitted(message.getJMSMessageID(), myParameters);
}
}
} catch (JMSException exc) {
throw new MyOperationException("Failed to submit my operation", exc);
}
}
@Override
public void setMyOperationEventReceiver(MyOperationEventReceiver operationReceiver) throws MyOperationException {
LOGGER.debug("Setting my operation event receiver");
synchronized (this) {
if(this.eventReceiver != null) {
throw new IllegalStateException("Cannot set an operation event receiver if it is already set");
}
this.eventReceiver = operationReceiver;
try {
this.queueReceiver.setMessageListener(message -> {
LOGGER.debug("New message received from queue receiver");
try {
ObjectMessage objectMessage = (ObjectMessage) message;
eventReceiver.onOperationReady(message.getJMSMessageID(), (MyOperationParameters) objectMessage.getObject());
} catch (Exception exception) {
try {
eventReceiver.onOperationRetrievalFailed(message.getJMSMessageID(), exception);
} catch (JMSException innerException) {
LOGGER.error("Failed to get message ID for JMS Message: "+message, innerException);
}
}
});
} catch (JMSException exc) {
throw new MyOperationException("Failed to set My message listener", exc);
}
}
}
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论