返回介绍

Spring 系列

MyBatis

Netty

Dubbo

Tomcat

Redis

Nacos

Sentinel

RocketMQ

番外篇(JDK 1.8)

学习心得

Spring EnableJMS

发布于 2024-05-19 21:34:34 字数 18288 浏览 0 评论 0 收藏 0

Spring EnableJms 注解

源码分析

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(JmsBootstrapConfiguration.class)
public @interface EnableJms {
}
  • 该类的切入点在@Import(JmsBootstrapConfiguration.class) , 直接看JmsBootstrapConfiguration就可以了
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class JmsBootstrapConfiguration {

    /**
     * jms 监听注解后处理, 将{@link JmsListener} 注册到{@link JmsListenerContainerFactory}
     * @return
     */
    @Bean(name = JmsListenerConfigUtils.JMS_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public JmsListenerAnnotationBeanPostProcessor jmsListenerAnnotationProcessor() {
        return new JmsListenerAnnotationBeanPostProcessor();
    }


    /**
     * JMS 监听注册
     * @return
     */
    @Bean(name = JmsListenerConfigUtils.JMS_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
    public JmsListenerEndpointRegistry defaultJmsListenerEndpointRegistry() {
        return new JmsListenerEndpointRegistry();
    }

}

JmsListenerAnnotationBeanPostProcessor

类图

image-20200304085303580

  • 主要关注

    1. afterSingletonsInstantiated
  • postProcessAfterInitialization

afterSingletonsInstantiated

@Override
    public void afterSingletonsInstantiated() {
        // Remove resolved singleton classes from cache
        this.nonAnnotatedClasses.clear();

        if (this.beanFactory instanceof ListableBeanFactory) {
            // Apply JmsListenerConfigurer beans from the BeanFactory, if any
            // 根据类型获取bean
            Map<String, JmsListenerConfigurer> beans =
                    ((ListableBeanFactory) this.beanFactory).getBeansOfType(JmsListenerConfigurer.class);

            List<JmsListenerConfigurer> configurers = new ArrayList<>(beans.values());
            // 排序 Order
            AnnotationAwareOrderComparator.sort(configurers);
            for (JmsListenerConfigurer configurer : configurers) {
                // 放入jms监听配置,开发者自定义
                configurer.configureJmsListeners(this.registrar);
            }
        }

        if (this.containerFactoryBeanName != null) {
            this.registrar.setContainerFactoryBeanName(this.containerFactoryBeanName);
        }

        if (this.registrar.getEndpointRegistry() == null) {
            // Determine JmsListenerEndpointRegistry bean from the BeanFactory
            if (this.endpointRegistry == null) {
                Assert.state(this.beanFactory != null, "BeanFactory must be set to find endpoint registry by bean name");
                this.endpointRegistry = this.beanFactory.getBean(
                        JmsListenerConfigUtils.JMS_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME, JmsListenerEndpointRegistry.class);
            }
            this.registrar.setEndpointRegistry(this.endpointRegistry);
        }


        // Set the custom handler method factory once resolved by the configurer
        MessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory();
        if (handlerMethodFactory != null) {
            this.messageHandlerMethodFactory.setMessageHandlerMethodFactory(handlerMethodFactory);
        }

        // Actually register all listeners
        this.registrar.afterPropertiesSet();
    }
  • 关注最后一行this.registrar.afterPropertiesSet()

        @Override
        public void afterPropertiesSet() {
            registerAllEndpoints();
        }
    
        protected void registerAllEndpoints() {
            Assert.state(this.endpointRegistry != null, "No JmsListenerEndpointRegistry set");
            synchronized (this.mutex) {
                for (JmsListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
                    // 注册监听
                    this.endpointRegistry.registerListenerContainer(
                            descriptor.endpoint, resolveContainerFactory(descriptor));
                }
                this.startImmediately = true;  // trigger immediate startup
            }
        }
    
  • 注册监听在下面分析会讲详见下文

postProcessAfterInitialization

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if (bean instanceof AopInfrastructureBean || bean instanceof JmsListenerContainerFactory ||
                bean instanceof JmsListenerEndpointRegistry) {
            // Ignore AOP infrastructure such as scoped proxies.
            return bean;
        }

        // 获取 bean 的代理对象.class
        Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
        if (!this.nonAnnotatedClasses.contains(targetClass)) {
            Map<Method, Set<JmsListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                    (MethodIntrospector.MetadataLookup<Set<JmsListener>>) method -> {
                        Set<JmsListener> listenerMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
                                method, JmsListener.class, JmsListeners.class);
                        return (!listenerMethods.isEmpty() ? listenerMethods : null);
                    });
            if (annotatedMethods.isEmpty()) {
                this.nonAnnotatedClasses.add(targetClass);
                if (logger.isTraceEnabled()) {
                    logger.trace("No @JmsListener annotations found on bean type: " + targetClass);
                }
            } else {
                // Non-empty set of methods
                annotatedMethods.forEach((method, listeners) ->
                        listeners.forEach(listener -> processJmsListener(listener, method, bean)));
                if (logger.isDebugEnabled()) {
                    logger.debug(annotatedMethods.size() + " @JmsListener methods processed on bean '" + beanName +
                            "': " + annotatedMethods);
                }
            }
        }
        return bean;
    }
    protected void processJmsListener(JmsListener jmsListener, Method mostSpecificMethod, Object bean) {
        Method invocableMethod = AopUtils.selectInvocableMethod(mostSpecificMethod, bean.getClass());

        // 设置 监听方法信息
        MethodJmsListenerEndpoint endpoint = createMethodJmsListenerEndpoint();
        endpoint.setBean(bean);
        endpoint.setMethod(invocableMethod);
        endpoint.setMostSpecificMethod(mostSpecificMethod);
        endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
        endpoint.setEmbeddedValueResolver(this.embeddedValueResolver);
        endpoint.setBeanFactory(this.beanFactory);
        endpoint.setId(getEndpointId(jmsListener));
        endpoint.setDestination(resolve(jmsListener.destination()));
        if (StringUtils.hasText(jmsListener.selector())) {
            endpoint.setSelector(resolve(jmsListener.selector()));
        }
        if (StringUtils.hasText(jmsListener.subscription())) {
            endpoint.setSubscription(resolve(jmsListener.subscription()));
        }
        if (StringUtils.hasText(jmsListener.concurrency())) {
            endpoint.setConcurrency(resolve(jmsListener.concurrency()));
        }

        JmsListenerContainerFactory<?> factory = null;
        String containerFactoryBeanName = resolve(jmsListener.containerFactory());
        if (StringUtils.hasText(containerFactoryBeanName)) {
            Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
            try {
                factory = this.beanFactory.getBean(containerFactoryBeanName, JmsListenerContainerFactory.class);
            } catch (NoSuchBeanDefinitionException ex) {
                throw new BeanInitializationException("Could not register JMS listener endpoint on [" +
                        mostSpecificMethod + "], no " + JmsListenerContainerFactory.class.getSimpleName() +
                        " with id '" + containerFactoryBeanName + "' was found in the application context", ex);
            }
        }

        // 注册监听点 到 JmsListenerContainerFactory
        this.registrar.registerEndpoint(endpoint, factory);
    }
  • 将监听点注册的重要方法 org.springframework.jms.config.JmsListenerEndpointRegistrar#registerEndpoint(org.springframework.jms.config.JmsListenerEndpoint, org.springframework.jms.config.JmsListenerContainerFactory<?>)
    public void registerEndpoint(JmsListenerEndpoint endpoint, @Nullable JmsListenerContainerFactory<?> factory) {
        Assert.notNull(endpoint, "Endpoint must not be null");
        Assert.hasText(endpoint.getId(), "Endpoint id must be set");

        // Factory may be null, we defer the resolution right before actually creating the container
        // jms 监听点描述
        JmsListenerEndpointDescriptor descriptor = new JmsListenerEndpointDescriptor(endpoint, factory);

        synchronized (this.mutex) {
            if (this.startImmediately) {  // register and start immediately
                Assert.state(this.endpointRegistry != null, "No JmsListenerEndpointRegistry set");
                // 注册
                this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
                        resolveContainerFactory(descriptor), true);
            }
            else {
                this.endpointDescriptors.add(descriptor);
            }
        }
    }
  • org.springframework.jms.config.JmsListenerEndpointRegistry#registerListenerContainer(org.springframework.jms.config.JmsListenerEndpoint, org.springframework.jms.config.JmsListenerContainerFactory<?>, boolean)

    public void registerListenerContainer(JmsListenerEndpoint endpoint, JmsListenerContainerFactory<?> factory,
                                              boolean startImmediately) {
    
            Assert.notNull(endpoint, "Endpoint must not be null");
            Assert.notNull(factory, "Factory must not be null");
            String id = endpoint.getId();
            Assert.hasText(id, "Endpoint id must be set");
    
            synchronized (this.listenerContainers) {
                if (this.listenerContainers.containsKey(id)) {
                    throw new IllegalStateException("Another endpoint is already registered with id '" + id + "'");
                }
                // 创建消息监听容器
                MessageListenerContainer container = createListenerContainer(endpoint, factory);
                this.listenerContainers.put(id, container);
                if (startImmediately) {
                    // 启动消息监听容器
                    startIfNecessary(container);
                }
            }
        }
    
  • org.springframework.jms.config.JmsListenerEndpointRegistry#createListenerContainer

    /**
     * Create and start a new container using the specified factory.
     * 创建监听容器
     */
    protected MessageListenerContainer createListenerContainer(JmsListenerEndpoint endpoint,
                                                               JmsListenerContainerFactory<?> factory) {

        // 创建监听 容器
        MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);

        if (listenerContainer instanceof InitializingBean) {
            try {
                // 后置方法
                ((InitializingBean) listenerContainer).afterPropertiesSet();
            } catch (Exception ex) {
                throw new BeanInitializationException("Failed to initialize message listener container", ex);
            }
        }

        int containerPhase = listenerContainer.getPhase();
        if (containerPhase < Integer.MAX_VALUE) {  // a custom phase value
            if (this.phase < Integer.MAX_VALUE && this.phase != containerPhase) {
                throw new IllegalStateException("Encountered phase mismatch between container factory definitions: " +
                        this.phase + " vs " + containerPhase);
            }
            this.phase = listenerContainer.getPhase();
        }

        return listenerContainer;
    }
  • 关键接口JmsListenerContainerFactory<C extends MessageListenerContainer>

    public interface JmsListenerContainerFactory<C extends MessageListenerContainer> {
    
        /**
         * Create a {@link MessageListenerContainer} for the given {@link JmsListenerEndpoint}.
         * 创建肩痛容器
         * @param endpoint the endpoint to configure
         * @return the created container
         */
        C createListenerContainer(JmsListenerEndpoint endpoint);
    
    }
    

    image-20200304092154712

  • 注册完成后是否立即启动

            this.listenerContainers.put(id, container);
                if (startImmediately) {
                    // 启动消息监听容器
                    startIfNecessary(container);
                }
    
        private void startIfNecessary(MessageListenerContainer listenerContainer) {
            if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
                listenerContainer.start();
            }
        }
    
    • 具体实现: org.springframework.jms.listener.AbstractJmsListeningContainer#start
  • 执行完start方法就结束了processJmsListener的调用链路, postProcessAfterInitialization 也结束了

JmsListenerEndpointRegistry

  • 这个类辅助JmsListenerAnnotationBeanPostProcessor 处理

registerListenerContainer

    /**
     * Create a message listener container for the given {@link JmsListenerEndpoint}.
     * <p>This create the necessary infrastructure to honor that endpoint
     * with regards to its configuration.
     * <p>The {@code startImmediately} flag determines if the container should be
     * started immediately.
     * <p>
     * 注册监听容器
     *
     * @param endpoint         the endpoint to add
     *                         监听点
     * @param factory          the listener factory to use
     *                         监听容器工厂
     * @param startImmediately start the container immediately if necessary
     *                         是否立即启动容器
     * @see #getListenerContainers()
     * @see #getListenerContainer(String)
     */
    public void registerListenerContainer(JmsListenerEndpoint endpoint, JmsListenerContainerFactory<?> factory,
                                          boolean startImmediately) {

        Assert.notNull(endpoint, "Endpoint must not be null");
        Assert.notNull(factory, "Factory must not be null");
        String id = endpoint.getId();
        Assert.hasText(id, "Endpoint id must be set");

        synchronized (this.listenerContainers) {
            if (this.listenerContainers.containsKey(id)) {
                throw new IllegalStateException("Another endpoint is already registered with id '" + id + "'");
            }
            // 创建消息监听容器
            MessageListenerContainer container = createListenerContainer(endpoint, factory);
            this.listenerContainers.put(id, container);
            if (startImmediately) {
                // 启动消息监听容器
                startIfNecessary(container);
            }
        }
    }

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文