Spring 系列
- IoC 容器
- AOP
- SpringMVC
- Spring 事务
- Spring 源码故事(瞎编版)
- Spring 整体脉络
- Spring 类解析
- Spring 自定义标签解析
- Spring Scan 包扫描
- Spring 注解工具类
- Spring 别名注册
- Spring 标签解析类
- Spring ApplicationListener
- Spring messageSource
- Spring 自定义属性解析器
- Spring 排序工具
- Spring-import 注解
- Spring-定时任务
- Spring StopWatch
- Spring 元数据
- Spring 条件接口
- Spring MultiValueMap
- Spring MethodOverride
- Spring BeanDefinitionReaderUtils
- Spring PropertyPlaceholderHelper
- Spring-AnnotationFormatterFactory
- Spring-Formatter
- Spring-Parser
- Spring-Printer
- Spring5 新特性
- Spring RMI
- Spring Message
- SpringBoot
- SpringBootBatch
- Spring Cloud
- SpringSecurity
MyBatis
- 基础支持层
- 核心处理层
- 类解析
Netty
- 网络 IO 技术基础
- JDK1.8 NIO 包 核心组件源码剖析
- Netty 粘拆包及解决方案
- Netty 多协议开发
- 基于 Netty 开发服务端及客户端
- Netty 主要组件的源码分析
- Netty 高级特性
- Netty 技术细节源码分析
Dubbo
- 架构设计
- SPI 机制
- 注册中心
- 远程通信
- RPC
- 集群
Tomcat
- Servlet 与 Servlet 容器
- Web 容器
Redis
Nacos
Sentinel
RocketMQ
- RocketMQ NameServer 与 Broker 的通信
- RocketMQ 生产者启动流程
- RocketMQ 消息发送流程
- RocketMQ 消息发送存储流程
- RocketMQ MappedFile 内存映射文件详解
- RocketMQ ConsumeQueue 详解
- RocketMQ CommitLog 详解
- RocketMQ IndexFile 详解
- RocketMQ 消费者启动流程
- RocketMQ 消息拉取流程
- RocketMQ Broker 处理拉取消息请求流程
- RocketMQ 消息消费流程
番外篇(JDK 1.8)
- 基础类库
- 集合
- 并发编程
学习心得
文章来源于网络收集而来,版权归原创者所有,如有侵权请及时联系!
Spring JmsTemplate
Spring JmsTemplate
- Author: HuiFer
- 源码阅读仓库: SourceHot-spring
- 源码路径:
org.springframework.jms.core.JmsTemplate
源码分析
send 发送消息
@Override
public void send(final String destinationName, final MessageCreator messageCreator) throws JmsException {
// 执行.
execute(session -> {
Destination destination = resolveDestinationName(session, destinationName);
doSend(session, destination, messageCreator);
return null;
}, false);
}
@Nullable
public <T> T execute(SessionCallback<T> action, boolean startConnection) throws JmsException {
Assert.notNull(action, "Callback object must not be null");
Connection conToClose = null;
Session sessionToClose = null;
try {
Session sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession(
obtainConnectionFactory(), this.transactionalResourceFactory, startConnection);
if (sessionToUse == null) {
// 创建链接
conToClose = createConnection();
// 创建session
sessionToClose = createSession(conToClose);
if (startConnection) {
conToClose.start();
}
sessionToUse = sessionToClose;
}
if (logger.isDebugEnabled()) {
logger.debug("Executing callback on JMS Session: " + sessionToUse);
}
/**
* sessionCallback 执行
* {@link JmsTemplate#doSend(Session, javax.jms.Destination, org.springframework.jms.core.MessageCreator)}
*/
return action.doInJms(sessionToUse);
} catch (JMSException ex) {
throw convertJmsAccessException(ex);
} finally {
// 资源释放
JmsUtils.closeSession(sessionToClose);
ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), startConnection);
}
}
- 最后
action.doInJms(sessionToUse)
的操作
Destination destination = resolveDestinationName(session, destinationName);
doSend(session, destination, messageCreator);
return null;
doSend
真正做的发送方法
protected void doSend(Session session, Destination destination, MessageCreator messageCreator)
throws JMSException {
Assert.notNull(messageCreator, "MessageCreator must not be null");
// 创建消息生产者
MessageProducer producer = createProducer(session, destination);
try {
// 创建消息
Message message = messageCreator.createMessage(session);
if (logger.isDebugEnabled()) {
logger.debug("Sending created message: " + message);
}
// 发送
doSend(producer, message);
// Check commit - avoid commit call within a JTA transaction.
if (session.getTransacted() && isSessionLocallyTransacted(session)) {
// Transacted session created by this template -> commit.
JmsUtils.commitIfNecessary(session);
}
} finally {
// 关闭消息生产者
JmsUtils.closeMessageProducer(producer);
}
}
createProducer
中通过javax.jms.Session.createProducer
创建MessageProducer
,第三方消息中间件独立实现createMessage
@Override
public javax.jms.Message createMessage(Session session) throws JMSException {
try {
// 消息转换
return this.messageConverter.toMessage(this.message, session);
} catch (Exception ex) {
throw new MessageConversionException("Could not convert '" + this.message + "'", ex);
}
}
消息转换后续在更新
doSend
这里也是第三方消息中间件实现
protected void doSend(MessageProducer producer, Message message) throws JMSException {
if (this.deliveryDelay >= 0) {
producer.setDeliveryDelay(this.deliveryDelay);
}
if (isExplicitQosEnabled()) {
// 发送消息,第三方消息中间件实现
producer.send(message, getDeliveryMode(), getPriority(), getTimeToLive());
} else {
producer.send(message);
}
}
closeMessageProducer
这个方法特别,直接关闭
public static void closeMessageProducer(@Nullable MessageProducer producer) {
if (producer != null) {
try {
producer.close();
} catch (JMSException ex) {
logger.trace("Could not close JMS MessageProducer", ex);
} catch (Throwable ex) {
// We don't trust the JMS provider: It might throw RuntimeException or Error.
logger.trace("Unexpected exception on closing JMS MessageProducer", ex);
}
}
}
receive 接收消息
@Override
@Nullable
public Message receive(String destinationName) throws JmsException {
return receiveSelected(destinationName, null);
}
@Override
@Nullable
public Message receiveSelected(final String destinationName, @Nullable final String messageSelector) throws JmsException {
return execute(session -> {
Destination destination = resolveDestinationName(session, destinationName);
return doReceive(session, destination, messageSelector);
}, true);
}
@Nullable
protected Message doReceive(Session session, Destination destination, @Nullable String messageSelector)
throws JMSException {
return doReceive(session, createConsumer(session, destination, messageSelector));
}
@Nullable
protected Message doReceive(Session session, MessageConsumer consumer) throws JMSException {
try {
// Use transaction timeout (if available).
long timeout = getReceiveTimeout();
// 链接工厂
ConnectionFactory connectionFactory = getConnectionFactory();
// JMS 资源信息
JmsResourceHolder resourceHolder = null;
if (connectionFactory != null) {
// 从连接对象中获取JMS 资源信息
resourceHolder = (JmsResourceHolder) TransactionSynchronizationManager.getResource(connectionFactory);
}
if (resourceHolder != null && resourceHolder.hasTimeout()) {
// 超时时间
timeout = Math.min(timeout, resourceHolder.getTimeToLiveInMillis());
}
// 具体的消息
Message message = receiveFromConsumer(consumer, timeout);
if (session.getTransacted()) {
// 事务性操作
// Commit necessary - but avoid commit call within a JTA transaction.
if (isSessionLocallyTransacted(session)) {
// Transacted session created by this template -> commit.
JmsUtils.commitIfNecessary(session);
}
} else if (isClientAcknowledge(session)) {
// Manually acknowledge message, if any.
if (message != null) {
message.acknowledge();
}
}
return message;
} finally {
JmsUtils.closeMessageConsumer(consumer);
}
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论