- Seata 是什么
- Seata术语
- Seata常见问题
- 用户文档
- 开发者指南
- 运维指南
- 博客文章
- Seata 基于改良版雪花算法的分布式UUID生成器分析
- Seata 新特性支持 undo_log 压缩
- ConcurrentHashMap 导致的 Seata 死锁问题
- Seata 应用侧启动过程剖析 注册中心与配置中心模块
- Seata 应用侧启动过程剖析 RM & TM 如何与 TC 建立连接
- Spring Cloud 集成 Seata 分布式事务 TCC 模式
- Seata 配置管理原理解析
- seata-golang 通信模型详解
- Seata 数据源代理解析
- 分布式事务 Seata 源码-Client 端启动流程
- Mac 下的 Seata Demo 环境搭建(AT模式)
- 分布式事务 Seata 源码 - Server 端启动流程
- 分布式事务如何实现?深入解读 Seata 的 XA 模式
- Seata 极简入门
- Seata config 模块源码分析
- 源码分析 Seata-XID 传递 Dubbo 篇
- Seata tcc 模块源码分析
- 通过 AOP 动态创建/关闭 Seata 分布式事务
- Seata core 模块源码分析
- Seata 动态配置订阅与降级实现原理
- Seata 配置中心实现原理
- Docker 部署 Seata 与 Nacos 整合
- Seata 分布式事务启用 Nacos 做配置中心
- 透过源码解决 Seata AT 模式整合 Mybatis-Plus 失去 MP 特性的问题
- SpringBoot+Dubbo+MybatisPlus 整合 seata 分布式事务
- Seata 客户端需要同时启动 RM 和 TM 吗?
- Seata AT 模式启动源码分析
- 基于 Seata Saga 设计更有弹性的金融应用
- 分布式事务 Seata 及其三种模式详解
- 分布式事务中间件 Seata 的设计原理
- Seata分布式Go Server正式开源-TaaS设计简介
- Seata(Fescar)分布式事务 整合 Spring Cloud
- Fescar 与 Spring Cloud 集成源码深度剖析
- 深度剖析一站式分布式事务方案Seata-Server
- TCC适用模型与适用场景分析
- TCC 理论及设计实现指南介绍
- 如何使用Seata保证Dubbo微服务间的一致性
- Fescar分布式事务原理解析探秘
- MT 模式
分布式事务 Seata 源码-Client 端启动流程
【分布式事务Seata源码解读二】Client端启动流程
本文从源码的角度分析一下AT模式下Client端启动流程,所谓的Client端,即业务应用方。分布式事务分为三个模块:TC、TM、RM。其中TC位于seata-server端,而TM、RM通过SDK的方式运行在client端。
下图展示了Seata官方Demo的一个分布式事务场景,分为如下几个微服务,共同实现了一个下订单、扣库存、扣余额的分布式事务。
- BusinessService: 业务服务,下单服务的入口
- StorageService: 库存微服务,用于扣减商品库存
- OrderService: 订单微服务,创建订单
- AccountService: 账户微服务,扣减用户账户的余额
从上图也可以看出,在AT模式下Seata Client端主要通过如下三个模块来实现分布式事务:
- GlobalTransactionScanner: GlobalTransactionScanner负责初始TM、RM模块,并为添加分布式事务注解的方法添加拦截器,拦截器负责全局事务的开启、提交或回滚
- DatasourceProxy: DatasourceProxy为DataSource添加拦截,拦截器会拦截所有SQL执行,并作为RM事务参与方的角色参与分布式事务执行。
- Rpc Interceptor: 在上一篇分布式事务Seata源码解读一中有提到分布式事务的几个核心要点,其中有一个是分布式事务的跨服务实例传播。Rpc Interceptor的职责就是负责在多个微服务之间传播事务。
seata-spring-boot-starter
引用seata分布式事务SDK有两种方式,依赖seata-all或者seata-spring-boot-starter,推荐使用seata-spring-boot-starter,因为该starter已经自动注入了上面提到的三个模块,用户只要添加相应的配置,在业务代码添加全局分布式事务注解即可。下面从seata-spring-boot-starter项目中的代码入手:
如下图所示是seata-spring-boot-starter的项目结构: 主要分为以下几个模块:
- properties: properties目录下都是Springboot 适配seata的相关配置类,即可以通过SpringBoot的配置方式来Seata的相关参数
- provider: provider目录下的类负责把Springboot、SpringCloud的配置适配到Seata配置中
- resources: resources目录下主要有两个文件,spring.factories用于注册Springboot的自动装配类,ExtConfigurationProvider用于注册SpringbootConfigurationProvider类,该Provider类负责把SpringBoot的相关配置类适配到Seata中。
对于springboot-starter项目,我们先查看resources/META-INF/spring.factories文件:
# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=
io.seata.spring.boot.autoconfigure.SeataAutoConfiguration
可以看到在spring.factories中配置了自动装配类:SeataAutoConfiguration,在该装配类中主要注入了GlobalTransactionScanner和seataAutoDataSourceProxyCreator两个实例。代码如下:
@ComponentScan(basePackages = "io.seata.spring.boot.autoconfigure.properties")
@ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, name = "enabled",
havingValue = "true",
matchIfMissing = true)
@Configuration
@EnableConfigurationProperties({SeataProperties.class})
public class SeataAutoConfiguration {
...
// GlobalTransactionScanner负责为添加GlobalTransaction注解的方法添加拦截器,
// 并且负责初始化RM、TM
@Bean
@DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
@ConditionalOnMissingBean(GlobalTransactionScanner.class)
public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties,
FailureHandler failureHandler) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Automatically configure Seata");
}
return new GlobalTransactionScanner(seataProperties.getApplicationId(),
seataProperties.getTxServiceGroup(),
failureHandler);
}
// SeataAutoDataSourceProxyCreator负责为Spring中的所有DataSource生成代理对象,
// 从而实现拦截所有SQL的执行
@Bean(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR)
@ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, name = {
"enableAutoDataSourceProxy", "enable-auto" +
"-data-source-proxy"}, havingValue = "true", matchIfMissing = true)
@ConditionalOnMissingBean(SeataAutoDataSourceProxyCreator.class)
public SeataAutoDataSourceProxyCreator seataAutoDataSourceProxyCreator(SeataProperties seataProperties) {
return new SeataAutoDataSourceProxyCreator(seataProperties.isUseJdkProxy(),
seataProperties.getExcludesForAutoProxying());
}
}
GlobalTransactionScanner
GlobalTransactionScanner继承于AutoProxyCreator,AutoProxyCreator是Spring中实现AOP的一种方式,可以拦截Spring中的所有实例,判断是否需要进行代理。下面列出了GlobalTransactionScanner中一些比较重要的字段和拦截代理的核心方法:
public class GlobalTransactionScanner extends AbstractAutoProxyCreator
implements InitializingBean, ApplicationContextAware,
DisposableBean {
...
// interceptor字段是对应一个代理对象的拦截器,
// 可以认为是一个临时变量,有效期是一个被代理对象
private MethodInterceptor interceptor;
// globalTransactionalInterceptor是通用的Interceptor,
// 非TCC事务方式的都使用该Interceptor
private MethodInterceptor globalTransactionalInterceptor;
// PROXYED_SET存储已经代理过的实例,防止重复处理
private static final Set<String> PROXYED_SET = new HashSet<>();
// applicationId是一个服务的唯一标识,
// 对应springcloud项目中的spring.application.name
private final String applicationId;
// 事务的分组标识,参考文章wiki:http://seata.io/zh-cn/docs/user/txgroup/transaction-group.html
private final String txServiceGroup;
...
// 判断是否需要代理目标对象,如果需要代理,则生成拦截器赋值到类变量interceptor中
@Override
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
// 判断是否禁用分布式事务
if (disableGlobalTransaction) {
return bean;
}
try {
synchronized (PROXYED_SET) {
if (PROXYED_SET.contains(beanName)) {
return bean;
}
// 每次处理一个被代理对象时先把interceptor置为null,所以interceptor的
// 生命周期是一个被代理对象,由于是在另外一个方法getAdvicesAndAdvisorsForBean
// 中使用interceptor,所以该interceptor要定义为一个类变量
interceptor = null;
// 判断是否是TCC事务模式,判断的主要依据是方法上是否有TwoPhaseBusinessAction注解
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName,
applicationContext)) {
// 创建一个TCC事务的拦截器
interceptor =
new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
} else {
// 获取待处理对象的class类型
Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
// 获取待处理对象继承的所有接口
Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
// 如果待处理对象的class或者继承的接口上有GlobalTransactional注解,
// 或者待处理对象的class的任一个方法上有GlobalTransactional或者
// GlobalLock注解则返回true,即需要被代理
if (!existsAnnotation(new Class[]{serviceInterface})
&& !existsAnnotation(interfacesIfJdk)) {
return bean;
}
// 如果interceptor为null,即不是TCC模式,
// 则使用globalTransactionalInterceptor作为拦截器
if (interceptor == null) {
// globalTransactionalInterceptor只会被创建一次
if (globalTransactionalInterceptor == null) {
globalTransactionalInterceptor =
new GlobalTransactionalInterceptor(failureHandlerHook);
ConfigurationCache.addConfigListener(
ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener) globalTransactionalInterceptor);
}
interceptor = globalTransactionalInterceptor;
}
}
if (!AopUtils.isAopProxy(bean)) {
// 如果bean本身不是Proxy对象,则直接调用父类的wrapIfNecessary生成代理对象即可
// 在父类中会调用getAdvicesAndAdvisorsForBean获取到上面定义的interceptor
bean = super.wrapIfNecessary(bean, beanName, cacheKey);
} else {
// 如果该bean已经是代理对象了,则直接在代理对象的拦截调用链AdvisedSupport
// 上直接添加新的interceptor即可。
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
Advisor[] advisor = buildAdvisors(beanName,
getAdvicesAndAdvisorsForBean(null, null, null));
for (Advisor avr : advisor) {
advised.addAdvisor(0, avr);
}
}
// 标识该beanName已经处理过了
PROXYED_SET.add(beanName);
return bean;
}
} catch (Exception exx) {
throw new RuntimeException(exx);
}
}
// 返回wrapIfNecessary方法中计算出的interceptor对象
@Override
protected Object[] getAdvicesAndAdvisorsForBean(Class beanClass, String beanName,
TargetSource customTargetSource)
throws BeansException {
return new Object[]{interceptor};
}
}
上面介绍了GlobalTransactionScanner是如何通过注解拦截全局事务的,具体拦截器实现为TccActionInterceptor和GlobalTransactionalInterceptor,对于AT模式来说我们主要关心GlobalTransactionalInterceptor,在后续的文章中会介绍GlobalTransactionalInterceptor的具体实现。
另外GloabalTransactionScanner还负责TM、RM的初始化工作,是在initClient方法中实现的:
private void initClient() {
...
//初始化TM
TMClient.init(applicationId, txServiceGroup);
...
//初始化RM
RMClient.init(applicationId, txServiceGroup);
...
// 注册Spring shutdown的回调,用来释放资源
registerSpringShutdownHook();
}
TMClient、RMClient都是Seata基于Netty实现的Rpc框架的客户端类,只是业务逻辑不同,由于TMClient相对来说更简单一些,我们以RMClient为例看一下源码:
public class RMClient {
// RMClient的init是一个static方法,创建了一个RmNettyRemotingClient实例,并调用init方法
public static void init(String applicationId, String transactionServiceGroup) {
RmNettyRemotingClient rmNettyRemotingClient =
RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());
rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());
rmNettyRemotingClient.init();
}
}
RmNettyRemotingClient的实现如下:
@Sharable
public final class RmNettyRemotingClient extends AbstractNettyRemotingClient {
// ResourceManager负责处理事务参与方,支持AT、TCC、Saga三种模式
private ResourceManager resourceManager;
// RmNettyRemotingClient单例
private static volatile RmNettyRemotingClient instance;
private final AtomicBoolean initialized = new AtomicBoolean(false);
// 微服务的唯一标识
private String applicationId;
// 分布式事务分组名称
private String transactionServiceGroup;
// RMClient中init方法会调用该init方法
public void init() {
// 注册Seata自定义Rpc的Processor
registerProcessor();
if (initialized.compareAndSet(false, true)) {
// 调用父类的init方法,在父类中负责Netty的初始化,与Seata-Server建立连接
super.init();
}
}
// 注册Seata自定义Rpc的Processor
private void registerProcessor() {
// 1.注册Seata-Server发起branchCommit的处理Processor
RmBranchCommitProcessor rmBranchCommitProcessor =
new RmBranchCommitProcessor(getTransactionMessageHandler(), this);
super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor,
messageExecutor);
// 2.注册Seata-Server发起branchRollback的处理Processor
RmBranchRollbackProcessor rmBranchRollbackProcessor =
new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);
super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor
, messageExecutor);
// 3.注册Seata-Server发起删除undoLog的处理Processor
RmUndoLogProcessor rmUndoLogProcessor =
new RmUndoLogProcessor(getTransactionMessageHandler());
super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor,
messageExecutor);
// 4.注册Seata-Server返回Response的处理Processor,ClientOnResponseProcessor
// 用于处理由Client主动发起Request,Seata-Server返回的Response。
// ClientOnResponseProcessor负责把Client发送的Request和Seata-Server
// 返回的Response对应起来,从而实现Rpc
ClientOnResponseProcessor onResponseProcessor =
new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(),
getTransactionMessageHandler());
super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor,
null);
super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT,
onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT,
onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY_RESULT,
onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_REG_RM_RESULT, onResponseProcessor, null);
// 5. 处理Seata-Server返回的Pong消息
ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor,
null);
}
}
上面的逻辑看起来比较复杂,相关类也比较多,如:各种Processor、各种MessageType、TransactionMessageHandler、ResourceManager。其实本质上就是Rpc调用,分为Rm主动调用和Seata主动调用。
- Rm主动调用方法: 如:注册分支、汇报分支状态、申请全局锁等。Rm主动调用的方法都需要在ClientOnResponseProcessor中处理Seata-Server返回的Response
- Seata-Server主动调用方法: 如:提交分支事务、回滚分支事务、删除undolog日志。Seata-Server主动调用的方法,Client端分别对应不同的Processor来处理,并且处理结束后要返回给Seata-Server处理结果Response。而事务提交、回滚的核心实现逻辑都在TransactionMessageHandler、ResourceManager中。
关于TransactionMessageHandler、ResourceManager的具体实现也会在后续的章节中详细描述。
下一篇会介绍一下SeataAutoDataSourceProxyCreator、Rpc Interceptor是如何初始化以及拦截的。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论