使用自定义容器工厂自动装配 jms 模板或侦听器内的其他 bean 为 null

发布于 01-13 07:11 字数 9927 浏览 3 评论 0原文

我创建了一个自定义 jmsListenerContainerFactory 和 MessageListenerContainer 用于批处理。问题是,当我在侦听器文件中的 @JmsListener(containerFactory=" customContainerFactoryq1") 中传递自定义容器工厂时,我在类中自动装配的任何组件都是 null。但工作正常时 我在 @JmsListener 内使用 DefaultJmsListenerContainerFactory 类型的 containerFactory 注解

Config class

@EnableJms
@Configuration
public class SpringBatchJmsConfig {
    
    @Bean
    public ActiveMQConnectionFactory receiverActiveMQConnectionFactory() {
        ActiveMQConnectionFactory activeMQConnectionFactory =
                new ActiveMQConnectionFactory();
        activeMQConnectionFactory.setBrokerURL("tcp://localhost:61616");
        return activeMQConnectionFactory;
    }

    @Bean(name="customContainerFactoryq1")
    public CustomJmsListenerContainerFactory customJmsListenerContainerFactory() {
        CustomJmsListenerContainerFactory factory = new CustomJmsListenerContainerFactory();
        factory.setConnectionFactory(receiverActiveMQConnectionFactory());
        return factory;
    }
    
    @Bean(name="defaultContainerFactoryq1")
    public DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(receiverActiveMQConnectionFactory());
        return factory;
    }
 
    @Bean(name="myCustomJmsTemplateMq1")
    public JmsTemplate customJmsTemplateMq1() {
        JmsTemplate jmsTemplate = new JmsTemplate();
        jmsTemplate.setConnectionFactory(receiverActiveMQConnectionFactory()); 
        return jmsTemplate;
    }

Container

    public static final int DEFAULT_BATCH_SIZE = 20;

    private int batchSize = DEFAULT_BATCH_SIZE;

    public CustomJmsListenerContainer() {i
        super();
        setSessionTransacted(true);
    }
    public int getBatchSize() {
        return batchSize;
    }
    public void setBatchSize(int batchSize) {
        this.batchSize = batchSize;
    }
    @Override
    protected boolean doReceiveAndExecute(Object invoker, Session session, MessageConsumer consumer, TransactionStatus status) throws JMSException {

        Connection conToClose = null;
        MessageConsumer consumerToClose = null;
        Session sessionToClose = null;

        try {
            Session sessionToUse = session;
            MessageConsumer consumerToUse = consumer;

            if (sessionToUse == null) {
                Connection conToUse = null;
                if (sharedConnectionEnabled()) {
                    conToUse = getSharedConnection();
                } else {
                    conToUse = createConnection();
                    conToClose = conToUse;
                    conToUse.start();
                }
                sessionToUse = createSession(conToUse);
                sessionToClose = sessionToUse;
            }

            if (consumerToUse == null) {
                consumerToUse = createListenerConsumer(sessionToUse);
                consumerToClose = consumerToUse;
            }

            List<Message> messages = new ArrayList<Message>();

            int count = 0;
            Message message = null;
            do {
                message = receiveMessage(consumerToUse);
                if (message != null) {
                    messages.add(message);
                }
            }
         
           while ((message != null) && (++count < batchSize));

            if (messages.size() > 0) {
                try {
                    doExecuteListener(sessionToUse, messages);
                    sessionToUse.commit();
                } catch (Throwable ex) {
                    handleListenerException(ex);
                    if (ex instanceof JMSException) {
                        throw (JMSException) ex;
                    }
                }
                return true;
            }

            noMessageReceived(invoker, sessionToUse);
            return false;
        } finally {
            JmsUtils.closeMessageConsumer(consumerToClose);
            JmsUtils.closeSession(sessionToClose);
            ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), true);
        }
    }

    protected void doExecuteListener(Session session, List<Message> messages) throws JMSException {
        System.out.println("Message Size inside container:" + messages.size());
        if (!isAcceptMessagesWhileStopping() && !isRunning()) {
            if (logger.isWarnEnabled()) {
                logger.warn("Rejecting received messages because of the listener container "
                        + "having been stopped in the meantime: " + messages);
            }
            rollbackIfNecessary(session);
            throw new JMSException("Rejecting received messages as listener container is stopping");
        }
      MessagingMessageListenerAdapter container = (MessagingMessageListenerAdapter)getMessageListener();
Method method = null;
String method Name = null;
        try {
method = container.getClass().getDeclaredMethod("getHandlerMethod");
method.setAccessible(true);
          InvocableHandlerMethod methodNameObject = (InvocableHandlerMethod)method.invoke(container);
methodName = methodNameObject.getMethod().getName();
Class.forName("com.demo.jms.SampleListener").getMethod(methodName, List.class)invoke(Class.forName("com.demo.jms.SampleListener").newInstance(), message);
        } catch (JMSException ex) {
            rollbackOnExceptionIfNecessary(session, ex);
            throw ex;
        } catch (RuntimeException ex) {
            rollbackOnExceptionIfNecessary(session, ex);
            throw ex;
        } catch (Error err) {
            rollbackOnExceptionIfNecessary(session, err);
            throw err;
        }
    }


    @Override
    protected void validateConfiguration() {
        if (batchSize <= 0) {
            throw new IllegalArgumentException("Property batchSize must be a value greater than 0");
        }
    }

    public void setSessionTransacted(boolean transacted) {
        if (!transacted) {
            throw new IllegalArgumentException("Batch Listener requires a transacted Session");
        }
        super.setSessionTransacted(transacted);
    }
}

Container Factory

    @Nullable
    private Executor taskExecutor;
    @Nullable
    private PlatformTransactionManager transactionManager;
    @Nullable
    private Integer cacheLevel;
    @Nullable
    private String cacheLevelName;
    @Nullable
    private String concurrency;
    @Nullable
    private Integer maxMessagesPerTask;
    @Nullable
    private Long receiveTimeout;
    @Nullable
    private Long recoveryInterval;
    @Nullable
    private BackOff backOff;

    public CustomJmsListenerContainerFactory() {
    }

    public void setTaskExecutor(Executor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }

    public void setTransactionManager(PlatformTransactionManager transactionManager) {
        this.transactionManager = transactionManager;
    }

    public void setCacheLevel(Integer cacheLevel) {
        this.cacheLevel = cacheLevel;
    }

    public void setCacheLevelName(String cacheLevelName) {
        this.cacheLevelName = cacheLevelName;
    }

    public void setConcurrency(String concurrency) {
        this.concurrency = concurrency;
    }

    public void setMaxMessagesPerTask(Integer maxMessagesPerTask) {
        this.maxMessagesPerTask = maxMessagesPerTask;
    }

    public void setReceiveTimeout(Long receiveTimeout) {
        this.receiveTimeout = receiveTimeout;
    }

    public void setRecoveryInterval(Long recoveryInterval) {
        this.recoveryInterval = recoveryInterval;
    }

    public void setBackOff(BackOff backOff) {
        this.backOff = backOff;
    }

    protected CustomJmsListenerContainer createContainerInstance() {
        CustomJmsListenerContainer container =new CustomJmsListenerContainer();
        container.setBatchSize(60);
        container.setCacheLevel(CustomJmsListenerContainer.CACHE_CONSUMER);
        container.setConcurrentConsumers(1);
        container.setMaxConcurrentConsumers(1);
        return container;
    }

    protected void initializeContainer(CustomJmsListenerContainer container) {
        if (this.taskExecutor != null) {
            container.setTaskExecutor(this.taskExecutor);
        }

        if (this.transactionManager != null) {
            container.setTransactionManager(this.transactionManager);
        }

        if (this.cacheLevel != null) {
            container.setCacheLevel(this.cacheLevel);
        } else if (this.cacheLevelName != null) {
            container.setCacheLevelName(this.cacheLevelName);
        }

        if (this.concurrency != null) {
            container.setConcurrency(this.concurrency);
        }

        if (this.maxMessagesPerTask != null) {
            container.setMaxMessagesPerTask(this.maxMessagesPerTask);
        }

        if (this.receiveTimeout != null) {
            container.setReceiveTimeout(this.receiveTimeout);
        }

        if (this.backOff != null) {
            container.setBackOff(this.backOff);
            if (this.recoveryInterval != null) {
                this.logger.info("Ignoring recovery interval in DefaultJmsListenerContainerFactory in favor of BackOff");
            }
        } else if (this.recoveryInterval != null) {
            container.setRecoveryInterval(this.recoveryInterval);
        }

    }
}

Listener

@Component
public class Sample Listener { 

@Autowired
@Qualifier("myCustomJmsTemplateMq1")
private JmsTemplate jmsTemplate;

@JmsListener(containerFactory="customContainerFactoryq1", destination="myQueue")
public void getMessages(List<Message> msgs) {
   //some logic
  }
} 

有人可以帮我解决这个问题吗?

I have created a custom jmsListenerContainerFactory and MessageListenerContainer for batch processing. The problem is when I pass my custom container factory in @JmsListener(containerFactory=" customContainerFactoryq1") in the listener file, whatever components that I am autowiring in the class is null. But works fine when
I use containerFactory of type DefaultJmsListenerContainerFactory inside the @JmsListener
annotation

Config class

@EnableJms
@Configuration
public class SpringBatchJmsConfig {
    
    @Bean
    public ActiveMQConnectionFactory receiverActiveMQConnectionFactory() {
        ActiveMQConnectionFactory activeMQConnectionFactory =
                new ActiveMQConnectionFactory();
        activeMQConnectionFactory.setBrokerURL("tcp://localhost:61616");
        return activeMQConnectionFactory;
    }

    @Bean(name="customContainerFactoryq1")
    public CustomJmsListenerContainerFactory customJmsListenerContainerFactory() {
        CustomJmsListenerContainerFactory factory = new CustomJmsListenerContainerFactory();
        factory.setConnectionFactory(receiverActiveMQConnectionFactory());
        return factory;
    }
    
    @Bean(name="defaultContainerFactoryq1")
    public DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(receiverActiveMQConnectionFactory());
        return factory;
    }
 
    @Bean(name="myCustomJmsTemplateMq1")
    public JmsTemplate customJmsTemplateMq1() {
        JmsTemplate jmsTemplate = new JmsTemplate();
        jmsTemplate.setConnectionFactory(receiverActiveMQConnectionFactory()); 
        return jmsTemplate;
    }

Container

    public static final int DEFAULT_BATCH_SIZE = 20;

    private int batchSize = DEFAULT_BATCH_SIZE;

    public CustomJmsListenerContainer() {i
        super();
        setSessionTransacted(true);
    }
    public int getBatchSize() {
        return batchSize;
    }
    public void setBatchSize(int batchSize) {
        this.batchSize = batchSize;
    }
    @Override
    protected boolean doReceiveAndExecute(Object invoker, Session session, MessageConsumer consumer, TransactionStatus status) throws JMSException {

        Connection conToClose = null;
        MessageConsumer consumerToClose = null;
        Session sessionToClose = null;

        try {
            Session sessionToUse = session;
            MessageConsumer consumerToUse = consumer;

            if (sessionToUse == null) {
                Connection conToUse = null;
                if (sharedConnectionEnabled()) {
                    conToUse = getSharedConnection();
                } else {
                    conToUse = createConnection();
                    conToClose = conToUse;
                    conToUse.start();
                }
                sessionToUse = createSession(conToUse);
                sessionToClose = sessionToUse;
            }

            if (consumerToUse == null) {
                consumerToUse = createListenerConsumer(sessionToUse);
                consumerToClose = consumerToUse;
            }

            List<Message> messages = new ArrayList<Message>();

            int count = 0;
            Message message = null;
            do {
                message = receiveMessage(consumerToUse);
                if (message != null) {
                    messages.add(message);
                }
            }
         
           while ((message != null) && (++count < batchSize));

            if (messages.size() > 0) {
                try {
                    doExecuteListener(sessionToUse, messages);
                    sessionToUse.commit();
                } catch (Throwable ex) {
                    handleListenerException(ex);
                    if (ex instanceof JMSException) {
                        throw (JMSException) ex;
                    }
                }
                return true;
            }

            noMessageReceived(invoker, sessionToUse);
            return false;
        } finally {
            JmsUtils.closeMessageConsumer(consumerToClose);
            JmsUtils.closeSession(sessionToClose);
            ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), true);
        }
    }

    protected void doExecuteListener(Session session, List<Message> messages) throws JMSException {
        System.out.println("Message Size inside container:" + messages.size());
        if (!isAcceptMessagesWhileStopping() && !isRunning()) {
            if (logger.isWarnEnabled()) {
                logger.warn("Rejecting received messages because of the listener container "
                        + "having been stopped in the meantime: " + messages);
            }
            rollbackIfNecessary(session);
            throw new JMSException("Rejecting received messages as listener container is stopping");
        }
      MessagingMessageListenerAdapter container = (MessagingMessageListenerAdapter)getMessageListener();
Method method = null;
String method Name = null;
        try {
method = container.getClass().getDeclaredMethod("getHandlerMethod");
method.setAccessible(true);
          InvocableHandlerMethod methodNameObject = (InvocableHandlerMethod)method.invoke(container);
methodName = methodNameObject.getMethod().getName();
Class.forName("com.demo.jms.SampleListener").getMethod(methodName, List.class)invoke(Class.forName("com.demo.jms.SampleListener").newInstance(), message);
        } catch (JMSException ex) {
            rollbackOnExceptionIfNecessary(session, ex);
            throw ex;
        } catch (RuntimeException ex) {
            rollbackOnExceptionIfNecessary(session, ex);
            throw ex;
        } catch (Error err) {
            rollbackOnExceptionIfNecessary(session, err);
            throw err;
        }
    }


    @Override
    protected void validateConfiguration() {
        if (batchSize <= 0) {
            throw new IllegalArgumentException("Property batchSize must be a value greater than 0");
        }
    }

    public void setSessionTransacted(boolean transacted) {
        if (!transacted) {
            throw new IllegalArgumentException("Batch Listener requires a transacted Session");
        }
        super.setSessionTransacted(transacted);
    }
}

Container Factory

    @Nullable
    private Executor taskExecutor;
    @Nullable
    private PlatformTransactionManager transactionManager;
    @Nullable
    private Integer cacheLevel;
    @Nullable
    private String cacheLevelName;
    @Nullable
    private String concurrency;
    @Nullable
    private Integer maxMessagesPerTask;
    @Nullable
    private Long receiveTimeout;
    @Nullable
    private Long recoveryInterval;
    @Nullable
    private BackOff backOff;

    public CustomJmsListenerContainerFactory() {
    }

    public void setTaskExecutor(Executor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }

    public void setTransactionManager(PlatformTransactionManager transactionManager) {
        this.transactionManager = transactionManager;
    }

    public void setCacheLevel(Integer cacheLevel) {
        this.cacheLevel = cacheLevel;
    }

    public void setCacheLevelName(String cacheLevelName) {
        this.cacheLevelName = cacheLevelName;
    }

    public void setConcurrency(String concurrency) {
        this.concurrency = concurrency;
    }

    public void setMaxMessagesPerTask(Integer maxMessagesPerTask) {
        this.maxMessagesPerTask = maxMessagesPerTask;
    }

    public void setReceiveTimeout(Long receiveTimeout) {
        this.receiveTimeout = receiveTimeout;
    }

    public void setRecoveryInterval(Long recoveryInterval) {
        this.recoveryInterval = recoveryInterval;
    }

    public void setBackOff(BackOff backOff) {
        this.backOff = backOff;
    }

    protected CustomJmsListenerContainer createContainerInstance() {
        CustomJmsListenerContainer container =new CustomJmsListenerContainer();
        container.setBatchSize(60);
        container.setCacheLevel(CustomJmsListenerContainer.CACHE_CONSUMER);
        container.setConcurrentConsumers(1);
        container.setMaxConcurrentConsumers(1);
        return container;
    }

    protected void initializeContainer(CustomJmsListenerContainer container) {
        if (this.taskExecutor != null) {
            container.setTaskExecutor(this.taskExecutor);
        }

        if (this.transactionManager != null) {
            container.setTransactionManager(this.transactionManager);
        }

        if (this.cacheLevel != null) {
            container.setCacheLevel(this.cacheLevel);
        } else if (this.cacheLevelName != null) {
            container.setCacheLevelName(this.cacheLevelName);
        }

        if (this.concurrency != null) {
            container.setConcurrency(this.concurrency);
        }

        if (this.maxMessagesPerTask != null) {
            container.setMaxMessagesPerTask(this.maxMessagesPerTask);
        }

        if (this.receiveTimeout != null) {
            container.setReceiveTimeout(this.receiveTimeout);
        }

        if (this.backOff != null) {
            container.setBackOff(this.backOff);
            if (this.recoveryInterval != null) {
                this.logger.info("Ignoring recovery interval in DefaultJmsListenerContainerFactory in favor of BackOff");
            }
        } else if (this.recoveryInterval != null) {
            container.setRecoveryInterval(this.recoveryInterval);
        }

    }
}

Listener

@Component
public class Sample Listener { 

@Autowired
@Qualifier("myCustomJmsTemplateMq1")
private JmsTemplate jmsTemplate;

@JmsListener(containerFactory="customContainerFactoryq1", destination="myQueue")
public void getMessages(List<Message> msgs) {
   //some logic
  }
} 

Can someone please help me on this.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

﹉夏雨初晴づ2025-01-20 07:11:57

我觉得一切都很好;您认为到底哪里出了问题?

输入图片此处描述

All looks good to me; exactly what do you think is wrong?

enter image description here

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文