- Seata 是什么
- Seata术语
- Seata常见问题
- 用户文档
- 开发者指南
- 运维指南
- 博客文章
- Seata 基于改良版雪花算法的分布式UUID生成器分析
- Seata 新特性支持 undo_log 压缩
- ConcurrentHashMap 导致的 Seata 死锁问题
- Seata 应用侧启动过程剖析 注册中心与配置中心模块
- Seata 应用侧启动过程剖析 RM & TM 如何与 TC 建立连接
- Spring Cloud 集成 Seata 分布式事务 TCC 模式
- Seata 配置管理原理解析
- seata-golang 通信模型详解
- Seata 数据源代理解析
- 分布式事务 Seata 源码-Client 端启动流程
- Mac 下的 Seata Demo 环境搭建(AT模式)
- 分布式事务 Seata 源码 - Server 端启动流程
- 分布式事务如何实现?深入解读 Seata 的 XA 模式
- Seata 极简入门
- Seata config 模块源码分析
- 源码分析 Seata-XID 传递 Dubbo 篇
- Seata tcc 模块源码分析
- 通过 AOP 动态创建/关闭 Seata 分布式事务
- Seata core 模块源码分析
- Seata 动态配置订阅与降级实现原理
- Seata 配置中心实现原理
- Docker 部署 Seata 与 Nacos 整合
- Seata 分布式事务启用 Nacos 做配置中心
- 透过源码解决 Seata AT 模式整合 Mybatis-Plus 失去 MP 特性的问题
- SpringBoot+Dubbo+MybatisPlus 整合 seata 分布式事务
- Seata 客户端需要同时启动 RM 和 TM 吗?
- Seata AT 模式启动源码分析
- 基于 Seata Saga 设计更有弹性的金融应用
- 分布式事务 Seata 及其三种模式详解
- 分布式事务中间件 Seata 的设计原理
- Seata分布式Go Server正式开源-TaaS设计简介
- Seata(Fescar)分布式事务 整合 Spring Cloud
- Fescar 与 Spring Cloud 集成源码深度剖析
- 深度剖析一站式分布式事务方案Seata-Server
- TCC适用模型与适用场景分析
- TCC 理论及设计实现指南介绍
- 如何使用Seata保证Dubbo微服务间的一致性
- Fescar分布式事务原理解析探秘
- MT 模式
文章来源于网络收集而来,版权归原创者所有,如有侵权请及时联系!
源码分析 Seata-XID 传递 Dubbo 篇
1.首先来看下包结构,在seata-dubbo和seata-dubbo-alibaba下有统一由TransactionPropagationFilter这个类,分别对应apache-dubbo跟alibaba-dubbo.
分析源码
package io.seata.integration.dubbo;
import io.seata.core.context.RootContext;
import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Activate(group = {Constants.PROVIDER, Constants.CONSUMER}, order = 100)
public class TransactionPropagationFilter implements Filter {
private static final Logger LOGGER = LoggerFactory.getLogger(TransactionPropagationFilter.class);
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
//获取本地XID
String xid = RootContext.getXID();
String xidInterceptorType = RootContext.getXIDInterceptorType();
//获取Dubbo隐式传参中的XID
String rpcXid = getRpcXid();
String rpcXidInterceptorType = RpcContext.getContext().getAttachment(RootContext.KEY_XID_INTERCEPTOR_TYPE);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("xid in RootContext[{}] xid in RpcContext[{}]", xid, rpcXid);
}
boolean bind = false;
if (xid != null) {
//传递XID
RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid);
RpcContext.getContext().setAttachment(RootContext.KEY_XID_INTERCEPTOR_TYPE, xidInterceptorType);
} else {
if (rpcXid != null) {
//绑定XID
RootContext.bind(rpcXid);
RootContext.bindInterceptorType(rpcXidInterceptorType);
bind = true;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("bind[{}] interceptorType[{}] to RootContext", rpcXid, rpcXidInterceptorType);
}
}
}
try {
return invoker.invoke(invocation);
} finally {
if (bind) {
//进行剔除已完成事务的XID
String unbindInterceptorType = RootContext.unbindInterceptorType();
String unbindXid = RootContext.unbind();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("unbind[{}] interceptorType[{}] from RootContext", unbindXid, unbindInterceptorType);
}
//如果发现解绑的XID并不是当前接收到的XID
if (!rpcXid.equalsIgnoreCase(unbindXid)) {
LOGGER.warn("xid in change during RPC from {} to {}, xidInterceptorType from {} to {} ", rpcXid, unbindXid, rpcXidInterceptorType, unbindInterceptorType);
if (unbindXid != null) {
//重新绑定XID
RootContext.bind(unbindXid);
RootContext.bindInterceptorType(unbindInterceptorType);
LOGGER.warn("bind [{}] interceptorType[{}] back to RootContext", unbindXid, unbindInterceptorType);
}
}
}
}
}
/**
* get rpc xid
* @return
*/
private String getRpcXid() {
String rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID);
if (rpcXid == null) {
rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID.toLowerCase());
}
return rpcXid;
}
}
1.根据源码,我们可以推出相应的逻辑处理
要点知识
1.Dubbo @Activate注解:
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Activate {
/**
* Group过滤条件。
* <br />
* 包含{@link ExtensionLoader#getActivateExtension}的group参数给的值,则返回扩展。
* <br />
* 如没有Group设置,则不过滤。
*/
String[] group() default {};
/**
* Key过滤条件。包含{@link ExtensionLoader#getActivateExtension}的URL的参数Key中有,则返回扩展。
* <p/>
* 示例:<br/>
* 注解的值 <code>@Activate("cache,validatioin")</code>,
* 则{@link ExtensionLoader#getActivateExtension}的URL的参数有<code>cache</code>Key,或是<code>validatioin</code>则返回扩展。
* <br/>
* 如没有设置,则不过滤。
*/
String[] value() default {};
/**
* 排序信息,可以不提供。
*/
String[] before() default {};
/**
* 排序信息,可以不提供。
*/
String[] after() default {};
/**
* 排序信息,可以不提供。
*/
int order() default 0;
}
可以分析得知,Seata的dubbo过滤器上的注解@Activate(group = {Constants.PROVIDER, Constants.CONSUMER}, order = 100),表示dubbo的服务提供方跟消费方都会触发到这个过滤器,所以我们的Seata发起者会产生一个XID的传递,上述流程图跟代码已经很清晰的表示了.
2.Dubbo隐式传参可以通过 RpcContext
上的 setAttachment
和 getAttachment
在服务消费方和提供方之间进行参数的隐式传递。
获取:RpcContext.getContext().getAttachment(RootContext.KEY_XID);
传递:RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid);
总结
更多源码阅读请访问Seata官网
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论