使用自定义容器工厂自动装配 jms 模板或侦听器内的其他 bean 为 null
我创建了一个自定义 jmsListenerContainerFactory 和 MessageListenerContainer 用于批处理。问题是,当我在侦听器文件中的 @JmsListener(containerFactory=" customContainerFactoryq1") 中传递自定义容器工厂时,我在类中自动装配的任何组件都是 null。但工作正常时 我在 @JmsListener 内使用 DefaultJmsListenerContainerFactory 类型的 containerFactory 注解
Config class
@EnableJms
@Configuration
public class SpringBatchJmsConfig {
@Bean
public ActiveMQConnectionFactory receiverActiveMQConnectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory =
new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL("tcp://localhost:61616");
return activeMQConnectionFactory;
}
@Bean(name="customContainerFactoryq1")
public CustomJmsListenerContainerFactory customJmsListenerContainerFactory() {
CustomJmsListenerContainerFactory factory = new CustomJmsListenerContainerFactory();
factory.setConnectionFactory(receiverActiveMQConnectionFactory());
return factory;
}
@Bean(name="defaultContainerFactoryq1")
public DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(receiverActiveMQConnectionFactory());
return factory;
}
@Bean(name="myCustomJmsTemplateMq1")
public JmsTemplate customJmsTemplateMq1() {
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory(receiverActiveMQConnectionFactory());
return jmsTemplate;
}
Container
public static final int DEFAULT_BATCH_SIZE = 20;
private int batchSize = DEFAULT_BATCH_SIZE;
public CustomJmsListenerContainer() {i
super();
setSessionTransacted(true);
}
public int getBatchSize() {
return batchSize;
}
public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}
@Override
protected boolean doReceiveAndExecute(Object invoker, Session session, MessageConsumer consumer, TransactionStatus status) throws JMSException {
Connection conToClose = null;
MessageConsumer consumerToClose = null;
Session sessionToClose = null;
try {
Session sessionToUse = session;
MessageConsumer consumerToUse = consumer;
if (sessionToUse == null) {
Connection conToUse = null;
if (sharedConnectionEnabled()) {
conToUse = getSharedConnection();
} else {
conToUse = createConnection();
conToClose = conToUse;
conToUse.start();
}
sessionToUse = createSession(conToUse);
sessionToClose = sessionToUse;
}
if (consumerToUse == null) {
consumerToUse = createListenerConsumer(sessionToUse);
consumerToClose = consumerToUse;
}
List<Message> messages = new ArrayList<Message>();
int count = 0;
Message message = null;
do {
message = receiveMessage(consumerToUse);
if (message != null) {
messages.add(message);
}
}
while ((message != null) && (++count < batchSize));
if (messages.size() > 0) {
try {
doExecuteListener(sessionToUse, messages);
sessionToUse.commit();
} catch (Throwable ex) {
handleListenerException(ex);
if (ex instanceof JMSException) {
throw (JMSException) ex;
}
}
return true;
}
noMessageReceived(invoker, sessionToUse);
return false;
} finally {
JmsUtils.closeMessageConsumer(consumerToClose);
JmsUtils.closeSession(sessionToClose);
ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), true);
}
}
protected void doExecuteListener(Session session, List<Message> messages) throws JMSException {
System.out.println("Message Size inside container:" + messages.size());
if (!isAcceptMessagesWhileStopping() && !isRunning()) {
if (logger.isWarnEnabled()) {
logger.warn("Rejecting received messages because of the listener container "
+ "having been stopped in the meantime: " + messages);
}
rollbackIfNecessary(session);
throw new JMSException("Rejecting received messages as listener container is stopping");
}
MessagingMessageListenerAdapter container = (MessagingMessageListenerAdapter)getMessageListener();
Method method = null;
String method Name = null;
try {
method = container.getClass().getDeclaredMethod("getHandlerMethod");
method.setAccessible(true);
InvocableHandlerMethod methodNameObject = (InvocableHandlerMethod)method.invoke(container);
methodName = methodNameObject.getMethod().getName();
Class.forName("com.demo.jms.SampleListener").getMethod(methodName, List.class)invoke(Class.forName("com.demo.jms.SampleListener").newInstance(), message);
} catch (JMSException ex) {
rollbackOnExceptionIfNecessary(session, ex);
throw ex;
} catch (RuntimeException ex) {
rollbackOnExceptionIfNecessary(session, ex);
throw ex;
} catch (Error err) {
rollbackOnExceptionIfNecessary(session, err);
throw err;
}
}
@Override
protected void validateConfiguration() {
if (batchSize <= 0) {
throw new IllegalArgumentException("Property batchSize must be a value greater than 0");
}
}
public void setSessionTransacted(boolean transacted) {
if (!transacted) {
throw new IllegalArgumentException("Batch Listener requires a transacted Session");
}
super.setSessionTransacted(transacted);
}
}
Container Factory
@Nullable
private Executor taskExecutor;
@Nullable
private PlatformTransactionManager transactionManager;
@Nullable
private Integer cacheLevel;
@Nullable
private String cacheLevelName;
@Nullable
private String concurrency;
@Nullable
private Integer maxMessagesPerTask;
@Nullable
private Long receiveTimeout;
@Nullable
private Long recoveryInterval;
@Nullable
private BackOff backOff;
public CustomJmsListenerContainerFactory() {
}
public void setTaskExecutor(Executor taskExecutor) {
this.taskExecutor = taskExecutor;
}
public void setTransactionManager(PlatformTransactionManager transactionManager) {
this.transactionManager = transactionManager;
}
public void setCacheLevel(Integer cacheLevel) {
this.cacheLevel = cacheLevel;
}
public void setCacheLevelName(String cacheLevelName) {
this.cacheLevelName = cacheLevelName;
}
public void setConcurrency(String concurrency) {
this.concurrency = concurrency;
}
public void setMaxMessagesPerTask(Integer maxMessagesPerTask) {
this.maxMessagesPerTask = maxMessagesPerTask;
}
public void setReceiveTimeout(Long receiveTimeout) {
this.receiveTimeout = receiveTimeout;
}
public void setRecoveryInterval(Long recoveryInterval) {
this.recoveryInterval = recoveryInterval;
}
public void setBackOff(BackOff backOff) {
this.backOff = backOff;
}
protected CustomJmsListenerContainer createContainerInstance() {
CustomJmsListenerContainer container =new CustomJmsListenerContainer();
container.setBatchSize(60);
container.setCacheLevel(CustomJmsListenerContainer.CACHE_CONSUMER);
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(1);
return container;
}
protected void initializeContainer(CustomJmsListenerContainer container) {
if (this.taskExecutor != null) {
container.setTaskExecutor(this.taskExecutor);
}
if (this.transactionManager != null) {
container.setTransactionManager(this.transactionManager);
}
if (this.cacheLevel != null) {
container.setCacheLevel(this.cacheLevel);
} else if (this.cacheLevelName != null) {
container.setCacheLevelName(this.cacheLevelName);
}
if (this.concurrency != null) {
container.setConcurrency(this.concurrency);
}
if (this.maxMessagesPerTask != null) {
container.setMaxMessagesPerTask(this.maxMessagesPerTask);
}
if (this.receiveTimeout != null) {
container.setReceiveTimeout(this.receiveTimeout);
}
if (this.backOff != null) {
container.setBackOff(this.backOff);
if (this.recoveryInterval != null) {
this.logger.info("Ignoring recovery interval in DefaultJmsListenerContainerFactory in favor of BackOff");
}
} else if (this.recoveryInterval != null) {
container.setRecoveryInterval(this.recoveryInterval);
}
}
}
Listener
@Component
public class Sample Listener {
@Autowired
@Qualifier("myCustomJmsTemplateMq1")
private JmsTemplate jmsTemplate;
@JmsListener(containerFactory="customContainerFactoryq1", destination="myQueue")
public void getMessages(List<Message> msgs) {
//some logic
}
}
有人可以帮我解决这个问题吗?
I have created a custom jmsListenerContainerFactory and MessageListenerContainer for batch processing. The problem is when I pass my custom container factory in @JmsListener(containerFactory=" customContainerFactoryq1") in the listener file, whatever components that I am autowiring in the class is null. But works fine when
I use containerFactory of type DefaultJmsListenerContainerFactory inside the @JmsListener
annotation
Config class
@EnableJms
@Configuration
public class SpringBatchJmsConfig {
@Bean
public ActiveMQConnectionFactory receiverActiveMQConnectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory =
new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL("tcp://localhost:61616");
return activeMQConnectionFactory;
}
@Bean(name="customContainerFactoryq1")
public CustomJmsListenerContainerFactory customJmsListenerContainerFactory() {
CustomJmsListenerContainerFactory factory = new CustomJmsListenerContainerFactory();
factory.setConnectionFactory(receiverActiveMQConnectionFactory());
return factory;
}
@Bean(name="defaultContainerFactoryq1")
public DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(receiverActiveMQConnectionFactory());
return factory;
}
@Bean(name="myCustomJmsTemplateMq1")
public JmsTemplate customJmsTemplateMq1() {
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory(receiverActiveMQConnectionFactory());
return jmsTemplate;
}
Container
public static final int DEFAULT_BATCH_SIZE = 20;
private int batchSize = DEFAULT_BATCH_SIZE;
public CustomJmsListenerContainer() {i
super();
setSessionTransacted(true);
}
public int getBatchSize() {
return batchSize;
}
public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}
@Override
protected boolean doReceiveAndExecute(Object invoker, Session session, MessageConsumer consumer, TransactionStatus status) throws JMSException {
Connection conToClose = null;
MessageConsumer consumerToClose = null;
Session sessionToClose = null;
try {
Session sessionToUse = session;
MessageConsumer consumerToUse = consumer;
if (sessionToUse == null) {
Connection conToUse = null;
if (sharedConnectionEnabled()) {
conToUse = getSharedConnection();
} else {
conToUse = createConnection();
conToClose = conToUse;
conToUse.start();
}
sessionToUse = createSession(conToUse);
sessionToClose = sessionToUse;
}
if (consumerToUse == null) {
consumerToUse = createListenerConsumer(sessionToUse);
consumerToClose = consumerToUse;
}
List<Message> messages = new ArrayList<Message>();
int count = 0;
Message message = null;
do {
message = receiveMessage(consumerToUse);
if (message != null) {
messages.add(message);
}
}
while ((message != null) && (++count < batchSize));
if (messages.size() > 0) {
try {
doExecuteListener(sessionToUse, messages);
sessionToUse.commit();
} catch (Throwable ex) {
handleListenerException(ex);
if (ex instanceof JMSException) {
throw (JMSException) ex;
}
}
return true;
}
noMessageReceived(invoker, sessionToUse);
return false;
} finally {
JmsUtils.closeMessageConsumer(consumerToClose);
JmsUtils.closeSession(sessionToClose);
ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), true);
}
}
protected void doExecuteListener(Session session, List<Message> messages) throws JMSException {
System.out.println("Message Size inside container:" + messages.size());
if (!isAcceptMessagesWhileStopping() && !isRunning()) {
if (logger.isWarnEnabled()) {
logger.warn("Rejecting received messages because of the listener container "
+ "having been stopped in the meantime: " + messages);
}
rollbackIfNecessary(session);
throw new JMSException("Rejecting received messages as listener container is stopping");
}
MessagingMessageListenerAdapter container = (MessagingMessageListenerAdapter)getMessageListener();
Method method = null;
String method Name = null;
try {
method = container.getClass().getDeclaredMethod("getHandlerMethod");
method.setAccessible(true);
InvocableHandlerMethod methodNameObject = (InvocableHandlerMethod)method.invoke(container);
methodName = methodNameObject.getMethod().getName();
Class.forName("com.demo.jms.SampleListener").getMethod(methodName, List.class)invoke(Class.forName("com.demo.jms.SampleListener").newInstance(), message);
} catch (JMSException ex) {
rollbackOnExceptionIfNecessary(session, ex);
throw ex;
} catch (RuntimeException ex) {
rollbackOnExceptionIfNecessary(session, ex);
throw ex;
} catch (Error err) {
rollbackOnExceptionIfNecessary(session, err);
throw err;
}
}
@Override
protected void validateConfiguration() {
if (batchSize <= 0) {
throw new IllegalArgumentException("Property batchSize must be a value greater than 0");
}
}
public void setSessionTransacted(boolean transacted) {
if (!transacted) {
throw new IllegalArgumentException("Batch Listener requires a transacted Session");
}
super.setSessionTransacted(transacted);
}
}
Container Factory
@Nullable
private Executor taskExecutor;
@Nullable
private PlatformTransactionManager transactionManager;
@Nullable
private Integer cacheLevel;
@Nullable
private String cacheLevelName;
@Nullable
private String concurrency;
@Nullable
private Integer maxMessagesPerTask;
@Nullable
private Long receiveTimeout;
@Nullable
private Long recoveryInterval;
@Nullable
private BackOff backOff;
public CustomJmsListenerContainerFactory() {
}
public void setTaskExecutor(Executor taskExecutor) {
this.taskExecutor = taskExecutor;
}
public void setTransactionManager(PlatformTransactionManager transactionManager) {
this.transactionManager = transactionManager;
}
public void setCacheLevel(Integer cacheLevel) {
this.cacheLevel = cacheLevel;
}
public void setCacheLevelName(String cacheLevelName) {
this.cacheLevelName = cacheLevelName;
}
public void setConcurrency(String concurrency) {
this.concurrency = concurrency;
}
public void setMaxMessagesPerTask(Integer maxMessagesPerTask) {
this.maxMessagesPerTask = maxMessagesPerTask;
}
public void setReceiveTimeout(Long receiveTimeout) {
this.receiveTimeout = receiveTimeout;
}
public void setRecoveryInterval(Long recoveryInterval) {
this.recoveryInterval = recoveryInterval;
}
public void setBackOff(BackOff backOff) {
this.backOff = backOff;
}
protected CustomJmsListenerContainer createContainerInstance() {
CustomJmsListenerContainer container =new CustomJmsListenerContainer();
container.setBatchSize(60);
container.setCacheLevel(CustomJmsListenerContainer.CACHE_CONSUMER);
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(1);
return container;
}
protected void initializeContainer(CustomJmsListenerContainer container) {
if (this.taskExecutor != null) {
container.setTaskExecutor(this.taskExecutor);
}
if (this.transactionManager != null) {
container.setTransactionManager(this.transactionManager);
}
if (this.cacheLevel != null) {
container.setCacheLevel(this.cacheLevel);
} else if (this.cacheLevelName != null) {
container.setCacheLevelName(this.cacheLevelName);
}
if (this.concurrency != null) {
container.setConcurrency(this.concurrency);
}
if (this.maxMessagesPerTask != null) {
container.setMaxMessagesPerTask(this.maxMessagesPerTask);
}
if (this.receiveTimeout != null) {
container.setReceiveTimeout(this.receiveTimeout);
}
if (this.backOff != null) {
container.setBackOff(this.backOff);
if (this.recoveryInterval != null) {
this.logger.info("Ignoring recovery interval in DefaultJmsListenerContainerFactory in favor of BackOff");
}
} else if (this.recoveryInterval != null) {
container.setRecoveryInterval(this.recoveryInterval);
}
}
}
Listener
@Component
public class Sample Listener {
@Autowired
@Qualifier("myCustomJmsTemplateMq1")
private JmsTemplate jmsTemplate;
@JmsListener(containerFactory="customContainerFactoryq1", destination="myQueue")
public void getMessages(List<Message> msgs) {
//some logic
}
}
Can someone please help me on this.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

我觉得一切都很好;您认为到底哪里出了问题?
All looks good to me; exactly what do you think is wrong?