Spring事务源码探究
Spring事务管理是Spring框架提供的一种机制,用于管理数据库事务。它提供了声明式事务管理和编程式事务管理两种方式。
1. 基础
1.1. 声明式事务管理
在声明式事务管理中,我们可以使用注解或XML配置来管理事务。常用的注解有@Transactional,可以标记在类或方法上,表示该类或方法需要事务管理。
1@Transactional
2public class UserService {
3 public void updateUser(User user) {
4 // 更新用户信息
5 }
6}
在XML配置中,我们可以使用<tx:advice>和<tx:attributes>来配置事务属性。
1<tx:advice id="txAdvice" transaction-manager="transactionManager">
2 <tx:attributes>
3 <tx:method name="update*" propagation="REQUIRED"/>
4 </tx:attributes>
5</tx:advice>
1.2. 编程式事务管理
编程式事务管理是通过编写代码来管理事务。Spring提供了TransactionTemplate来简化编程式事务管理的操作。
1public class UserService {
2 private TransactionTemplate transactionTemplate;
3
4 public void updateUser(User user) {
5 transactionTemplate.execute(new TransactionCallbackWithoutResult() {
6 @Override
7 protected void doInTransactionWithoutResult(TransactionStatus status) {
8 // 更新用户信息
9 }
10 });
11 }
12}
1.3. 事务传播行为
事务传播行为定义了在不同事务方法调用之间事务是如何传播的。常见的传播行为包括REQUIRED、REQUIRES_NEW、NESTED等。
1@Transactional(propagation = Propagation.REQUIRED)
2public void updateUserInfo(User user) {
3 // 更新用户信息
4}
1.4. 事务隔离级别
事务隔离级别定义了事务之间的隔离程度,包括READ_UNCOMMITTED、READ_COMMITTED、REPEATABLE_READ、SERIALIZABLE等级别。
1@Transactional(isolation = Isolation.READ_COMMITTED)
2public void updateUserInfo(User user) {
3 // 更新用户信息
4}
Spring事务管理提供了灵活和强大的功能,可以帮助我们管理数据库事务,确保数据的一致性和完整性。
1.5. 源码中的定义
1public interface TransactionDefinition {
2 // 传播机制
3 int PROPAGATION_REQUIRED = 0;
4 int PROPAGATION_SUPPORTS = 1;
5 int PROPAGATION_MANDATORY = 2;
6 int PROPAGATION_REQUIRES_NEW = 3;
7 int PROPAGATION_NOT_SUPPORTED = 4;
8 int PROPAGATION_NEVER = 5;
9 int PROPAGATION_NESTED = 6;
10
11 // 隔离级别
12 int ISOLATION_DEFAULT = -1;
13 int ISOLATION_READ_UNCOMMITTED = 1; // same as java.sql.Connection.TRANSACTION_READ_UNCOMMITTED;
14 int ISOLATION_READ_COMMITTED = 2; // same as java.sql.Connection.TRANSACTION_READ_COMMITTED;
15 int ISOLATION_REPEATABLE_READ = 4; // same as java.sql.Connection.TRANSACTION_REPEATABLE_READ;
16 int ISOLATION_SERIALIZABLE = 8; // same as java.sql.Connection.TRANSACTION_SERIALIZABLE;
17
18 int TIMEOUT_DEFAULT = -1;
19}
2. 使用方法
2.1. 主要注解
@EnableTransactionManagement@Transactional
1@ComponentScan("org.springframework.gang")
2@EnableAspectJAutoProxy
3@EnableTransactionManagement
4public class AppConfig {
5
6 // ...
7
8
9 @Bean
10 public JdbcTemplate jdbcTemplate() {
11 JdbcTemplate jdbctemplate = new JdbcTemplate(dataSource());
12 return jdbctemplate;
13 }
14
15 @Bean
16 public PlatformTransactionManager transactionManager() {
17 DataSourceTransactionManager transactionManager = new DataSourceTransactionManager();
18 transactionManager.setDataSource(dataSource());
19 return transactionManager;
20 }
21
22 @Bean
23 public DataSource dataSource() {
24 DriverManagerDataSource dataSource = new DriverManagerDataSource();
25 dataSource.setDriverClassName("com.mysql.jdbc.Driver");
26 dataSource.setUrl("jdbc:mysql://localhost:3306/test");
27 dataSource.setUsername("root");
28 dataSource.setPassword("123456");
29 return dataSource;
30 }
31
32}
2.2. EnableTrnasactionManagement
1@Target(ElementType.TYPE)
2@Retention(RetentionPolicy.RUNTIME)
3@Documented
4@Import(TransactionManagementConfigurationSelector.class)
5public @interface EnableTransactionManagement {
6 // ...
7}
导入了一个TransactionManagementConfigurationSelector这个类,
TransactionManagementConfigurationSelector.java
1public class TransactionManagementConfigurationSelector extends AdviceModeImportSelector<EnableTransactionManagement> {
2 @Override
3 protected String[] selectImports(AdviceMode adviceMode) {
4 switch (adviceMode) {
5 case PROXY:
6 // 默认PROXY
7 return new String[] {AutoProxyRegistrar.class.getName(),
8 ProxyTransactionManagementConfiguration.class.getName()};
9 case ASPECTJ:
10 // 使用AspectJ
11 return new String[] {determineTransactionAspectClass()};
12 default:
13 return null;
14 }
15 }
16
17 private String determineTransactionAspectClass() {
18 return (ClassUtils.isPresent("javax.transaction.Transactional", getClass().getClassLoader()) ?
19 TransactionManagementConfigUtils.JTA_TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME :
20 TransactionManagementConfigUtils.TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME);
21 }
22
23}
想Spring容器中注入了两个类:
AutoProxyRegistrar.javaProxyTransactionManagementConfiguration.java
2.2.1. AutoProxyRegistrar
实际上是像容器中注册了InfrastructureAdvisorAutoProxyCreator.java
1 @Nullable
2 public static BeanDefinition registerAutoProxyCreatorIfNecessary(BeanDefinitionRegistry registry, @Nullable Object source) {
3 // 注册 InfrastructureAdvisorAutoProxyCreator
4 return registerOrEscalateApcAsRequired(InfrastructureAdvisorAutoProxyCreator.class, registry, source);
5 }
开启AOP,会生成Spring容器中的Advisor对象,但是和AnnotationAwareAspectJAutoProxyCreator不一样,他不会去解析AspectJ的那些注解,虽然都是AbstractAdvisorAutoProxyCreator的子类,但是功能不同。
2.2.2. ProxyTransactionManagementConfiguration
这个类像容器中又注册了三个Bean:
1@Configuration(proxyBeanMethods = false)
2@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
3public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {
4
5 @Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME)
6 @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
7 public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor(TransactionAttributeSource transactionAttributeSource, TransactionInterceptor transactionInterceptor) {
8
9 BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
10
11
12 advisor.setTransactionAttributeSource(transactionAttributeSource);
13
14 // 代理逻辑
15 advisor.setAdvice(transactionInterceptor);
16 if (this.enableTx != null) {
17 advisor.setOrder(this.enableTx.<Integer>getNumber("order"));
18 }
19 return advisor;
20 }
21
22 @Bean
23 @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
24 public TransactionAttributeSource transactionAttributeSource() {
25 return new AnnotationTransactionAttributeSource();
26 }
27
28 @Bean
29 @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
30 public TransactionInterceptor transactionInterceptor(TransactionAttributeSource transactionAttributeSource) {
31 TransactionInterceptor interceptor = new TransactionInterceptor();
32 interceptor.setTransactionAttributeSource(transactionAttributeSource);
33 if (this.txManager != null) {
34 interceptor.setTransactionManager(this.txManager);
35 }
36 return interceptor;
37 }
38
39}
BeanFactoryTransactionAttributeSourceAdvisor:实际上就是一个Advisor。从该方法的参数中可以看到需要一个TransactionInterceptor,这个Bean在第三个方法返回的。TransactionAttributeSource:AnnotationTransactionAttributeSource定义了一个PointCut,这个类会解析@Transactional注解中的元素信息determineTransactionAttribute(AnnotatedElement element)。TransactionInterceptor:事务的代理逻辑。
2.2.3. BeanFactoryTransactionAttributeSourceAdvisor
这个类是事务的Advisor,那么就要有PointCut和Advice两部分。首先来看PointCut到底在哪里,在这个类中有一个pointcut属性,内容如下:
1private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut() {
2 @Override
3 @Nullable
4 protected TransactionAttributeSource getTransactionAttributeSource() {
5 return transactionAttributeSource;
6 }
7};
PointCut的筛选,要么是类筛选(ClassFilter),要么是方法筛选(MethodFilter),所以再进一步看一下TransactionAttributeSourcePointcut,构造方法如下:
1protected TransactionAttributeSourcePointcut() {
2 setClassFilter(new TransactionAttributeSourceClassFilter());
3}
4
5
6private class TransactionAttributeSourceClassFilter implements ClassFilter {
7
8 @Override
9 public boolean matches(Class<?> clazz) {
10 // 事务内部的几个类,要先排除掉。
11 if (TransactionalProxy.class.isAssignableFrom(clazz) ||
12 TransactionManager.class.isAssignableFrom(clazz) ||
13 PersistenceExceptionTranslator.class.isAssignableFrom(clazz)) {
14 return false;
15 }
16 TransactionAttributeSource tas = getTransactionAttributeSource();
17 return (tas == null || tas.isCandidateClass(clazz));
18 }
19}
TransactionAttributeSourceClassFilter是一个内部类,实现了类过滤的matches方法。
先过滤掉和事务相关的几个注解。
在是用tas的方法筛选出有@Transactional注解的类。调用链如下:
-
org.springframework.transaction.annotation.AnnotationTransactionAttributeSource#isCandidateClass -
parser.isCandidateClass(targetClass) -
org.springframework.transaction.annotation.SpringTransactionAnnotationParser#isCandidateClass1@Override 2public boolean isCandidateClass(Class<?> targetClass) { 3 return AnnotationUtils.isCandidateClass(targetClass, Transactional.class); 4}从上面的代码中可以看出,校验了类上是否有
Transactional注解
在TransactionAttributeSourcePointcut中还有方法的筛选逻辑:
1@Override
2public boolean matches(Method method, Class<?> targetClass) {
3 TransactionAttributeSource tas = getTransactionAttributeSource();
4 return (tas == null || tas.getTransactionAttribute(method, targetClass) != null);
5}
tas.getTransactionAttribute(method, targetClass)这个方法会解析并缓存匹配到的TransactionAttribute,
进入该方法之后,到达AbstractFallbackTransactionAttributeSource#computeTransactionAttribute中,在这个方法中会去解析@Transactional注解,经过parse之后生成TransactionAttribute,具体代码如下:
SpringTransactionAnnotationParser#parseTransactionAnnotation(AnnotationAttributes)
1protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) {
2 RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute();
3
4 Propagation propagation = attributes.getEnum("propagation");
5 rbta.setPropagationBehavior(propagation.value());
6 Isolation isolation = attributes.getEnum("isolation");
7 rbta.setIsolationLevel(isolation.value());
8
9 rbta.setTimeout(attributes.getNumber("timeout").intValue());
10 String timeoutString = attributes.getString("timeoutString");
11 Assert.isTrue(!StringUtils.hasText(timeoutString) || rbta.getTimeout() < 0,
12 "Specify 'timeout' or 'timeoutString', not both");
13 rbta.setTimeoutString(timeoutString);
14
15 rbta.setReadOnly(attributes.getBoolean("readOnly"));
16 rbta.setQualifier(attributes.getString("value"));
17 rbta.setLabels(Arrays.asList(attributes.getStringArray("label")));
18
19 List<RollbackRuleAttribute> rollbackRules = new ArrayList<>();
20 for (Class<?> rbRule : attributes.getClassArray("rollbackFor")) {
21 rollbackRules.add(new RollbackRuleAttribute(rbRule));
22 }
23 for (String rbRule : attributes.getStringArray("rollbackForClassName")) {
24 rollbackRules.add(new RollbackRuleAttribute(rbRule));
25 }
26 for (Class<?> rbRule : attributes.getClassArray("noRollbackFor")) {
27 rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
28 }
29 for (String rbRule : attributes.getStringArray("noRollbackForClassName")) {
30 rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
31 }
32 rbta.setRollbackRules(rollbackRules);
33
34 return rbta;
35}
3. Spring事务处理逻辑
3.1. TransactionInterceptor
1@Transactional
2public void insert() {
3 jdbcTemplate.execute("insert into user(name,sex,age) values('test','男',18)");
4}
主要的代理逻辑入口是TransactionInterceptor这个类的invoke()方法。
1public Object invoke(MethodInvocation invocation) throws Throwable {
2
3 Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
4
5 return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {
6 @Override
7 @Nullable
8 public Object proceedWithInvocation() throws Throwable {
9
10 // 执行后续的Interceptor, 以及被代理的方法:如userService的insert方法
11 return invocation.proceed();
12 }
13 @Override
14 public Object getTarget() {
15 return invocation.getThis();
16 }
17 @Override
18 public Object[] getArguments() {
19 return invocation.getArguments();
20 }
21 });
22}
在进入invokeWithinTransaction()方法,Spring事务主要的逻辑就是在这个方法中。
1protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
2 final InvocationCallback invocation) throws Throwable {
3
4 // TransactionAttribute就是@Transactional中的配置
5 TransactionAttributeSource tas = getTransactionAttributeSource();
6 // 获取@Transactional注解中的属性值
7 final TransactionAttribute txAttr = (
8 tas != null ?
9 tas.getTransactionAttribute(method, targetClass) :
10 null
11 );
12
13 // 返回Spring容器中类型为TransactionManager的Bean对象
14 final TransactionManager tm = determineTransactionManager(txAttr);
15
16 // ReactiveTransactionManager用得少,并且它只是执行方式是响应式的,原理流程和普通的是一样的
17 // ,..
18
19 // 把tm强制转换成PlatformTransactionManager, 所以我们在定义时得定义PlatformTransactionManager类型
20 PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
21
22 // joinpoint的唯一标识, 就是当前在执行方法的名字
23 final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
24
25 // CallbackPreferringPlatformTransactionManager表示有回调功能的PlatformTransactionManager
26 if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
27 // Standard transaction demarcation with getTransaction and commit/rollback calls.
28
29 // ☆ ->
30 // 如果有必要就创建事务,这里就涉及到事务传播机制的实现了
31 // TransactionInfo表示一个逻辑事务,比如两个逻辑事务属于同一个物理事务
32 TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
33
34 Object retVal;
35 try {
36 // This is an around advice: Invoke the next interceptor in the chain.
37 // This will normally result in a target object being invoked.
38 // 执行下一个Interceptor或被代理对象中的方法
39 retVal = invocation.proceedWithInvocation(); // test()
40 } catch (Throwable ex) {
41 // target invocation exception
42 // 抛异常了, 回滚事务
43 completeTransactionAfterThrowing(txInfo, ex);
44 throw ex;
45 } finally {
46 cleanupTransactionInfo(txInfo);
47 }
48
49 if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
50 // Set rollback-only in case of Vavr failure matching our rollback rules...
51 TransactionStatus status = txInfo.getTransactionStatus();
52 if (status != null && txAttr != null) {
53 retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
54 }
55 }
56
57 // 提交事务
58 commitTransactionAfterReturning(txInfo);
59 return retVal;
60 } else {
61 // ...
62 }
63}
进入32行的createTransactionIfNecessary()这个方法,这个方法是判断是否需要开启新的事务以及开启事务。
3.2. 如何开启新事务的
1protected TransactionInfo createTransactionIfNecessary(
2 @Nullable PlatformTransactionManager tm,
3 @Nullable TransactionAttribute txAttr,
4 final String joinpointIdentification
5) {
6
7 // If no name specified, apply method identification as transaction name.
8 if (txAttr != null && txAttr.getName() == null) {
9 txAttr = new DelegatingTransactionAttribute(txAttr) {
10 @Override
11 public String getName() {
12 return joinpointIdentification;
13 }
14 };
15 }
16
17 // 每个逻辑事务都会创建一个TransactionStatus,
18 // 但是TransactionStatus中有一个属性代表当前逻辑事务底层的物理事务是不是新的
19 TransactionStatus status = null;
20 if (txAttr != null) {
21 if (tm != null) {
22
23 // ☆ ->
24 // 开启事务
25 status = tm.getTransaction(txAttr);
26 } else {
27 if (logger.isDebugEnabled()) {
28 logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured");
29 }
30 }
31 }
32
33 //
34 return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
35}
进入25行的tm.getTransaction(txAttr);方法中
AbstractPlatformTransactionManager.java
1@Override
2public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
3
4 // Use defaults if no transaction definition given.
5 TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
6
7 // 获取一个新的DataSourceTransactionObject对象
8 Object transaction = doGetTransaction();
9 boolean debugEnabled = logger.isDebugEnabled();
10
11 // 判断是否存在一个事务
12 if (isExistingTransaction(transaction)) {
13 // Existing transaction found -> check propagation behavior to find out how to behave.
14 // 已经存在一个事务了
15 return handleExistingTransaction(def, transaction, debugEnabled);
16 }
17
18 // 不存在事务
19 // Check definition settings for new transaction.
20 // 判断@Transactional(timeout=?)
21 if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
22 throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
23 }
24
25 // No existing transaction found -> check propagation behavior to find out how to proceed.
26 if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
27 // 如果配置的事手动开启事务 则直接抛出异常
28 throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation 'mandatory'");
29 } else if (
30 def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
31 def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
32 def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED
33 ) {
34 // 当前的Thread中在没有事务的前提下, 上面三种传播机制是等价的
35
36 // 没有事务需要挂起, 不过TransactionSynchronization有可能需要挂起
37 // suspendedResources表示当前线程被挂起的资源持有对象(数据库连接、TransactionSynchronization)
38 SuspendedResourcesHolder suspendedResources = suspend(null);
39 if (debugEnabled) {
40 logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
41 }
42
43 try {
44 // ☆ ->
45 // 开启事务后, transaction中就会有数据库连接了, 并且是isTransactionActive为true的
46 // 并返回TransactionStatus对象, 该对象保存了很多信息, 包括被挂起的资源
47 return startTransaction(def, transaction, debugEnabled, suspendedResources);
48 } catch (RuntimeException | Error ex) {
49 resume(null, suspendedResources);
50 throw ex;
51 }
52 } else {
53 // Create "empty" transaction: no actual transaction, but potentially synchronization.
54 if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
55 logger.warn("Custom isolation level specified but no actual transaction initiated; isolation level will effectively be ignored: " + def);
56 }
57 boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
58 return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
59 }
60}
走上来会先判断是否已经存在一个事务了,如果已经存在了,则走handleExistingTransaction(def, transaction, debugEnabled);的逻辑,并直接return回去。否则继续往下执行。
如何判断当前已经存在事务了?
猜测是通过ThreadLocal,上面代码的第8行Object transaction = doGetTransaction();
org.springframework.jdbc.datasource.DataSourceTransactionManager
1@Override
2protected Object doGetTransaction() {
3 DataSourceTransactionObject txObject = new DataSourceTransactionObject();
4 txObject.setSavepointAllowed(isNestedTransactionAllowed());
5 ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
6 txObject.setConnectionHolder(conHolder, false);
7 return txObject;
8}
第5行getResource,最终会走到
org.springframework.transaction.support.TransactionSynchronizationManager
1private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<>("Transactional resources");
2
3@Nullable
4private static Object doGetResource(Object actualKey) {
5 Map<Object, Object> map = resources.get();
6 if (map == null) {
7 return null;
8 }
9 Object value = map.get(actualKey);
10 // Transparently remove ResourceHolder that was marked as void...
11 if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
12 map.remove(actualKey);
13 // Remove entire ThreadLocal if empty...
14 if (map.isEmpty()) {
15 resources.remove();
16 }
17 value = null;
18 }
19 return value;
20}
上面的resources实际上就是要一个ThreadLocal。其中Map的key为DataSource,value为ConnectionHolder对象。
先看下已存在事务的情况:
1private TransactionStatus handleExistingTransaction(
2 TransactionDefinition definition,
3 Object transaction,
4 boolean debugEnabled
5 ) throws TransactionException {
6
7 // 判断当前事务的传播机制
8
9 // 如果是NEVER 就直接报错
10 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
11 // 以非事务方式运行,如果当前存在事务,则抛出异常
12 throw new IllegalTransactionStateException("Existing transaction found for transaction marked with propagation 'never'");
13 }
14
15 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
16 // 以非事务方式运行,如果当前存在事务,则把当前事务挂起。
17
18 if (debugEnabled) {
19 logger.debug("Suspending current transaction");
20 }
21
22 // 挂起当前事务, 不新建事务
23 Object suspendedResources = suspend(transaction);
24 boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
25 return prepareTransactionStatus(
26 definition,
27 null,
28 false,
29 newSynchronization,
30 debugEnabled,
31 suspendedResources
32 );
33 }
34
35 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
36 // 不管当前有没有事务, 都会新建一个事务
37
38 if (debugEnabled) {
39 logger.debug("Suspending current transaction, creating new transaction with name [" + definition.getName() + "]");
40 }
41
42 // 先挂起当前事务
43 SuspendedResourcesHolder suspendedResources = suspend(transaction);
44 try {
45
46 // 开启一个新事务
47 return startTransaction(definition, transaction, debugEnabled, suspendedResources);
48 } catch (RuntimeException | Error beginEx) {
49 resumeAfterBeginException(transaction, suspendedResources, beginEx);
50 throw beginEx;
51 }
52 }
53
54 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
55 // 如果当前存在事务,则创建一个事务作为当前事务的嵌套事务来运行;如果当前没有事务,则该取值等价于 PROPAGATION_REQUIRED
56 if (!isNestedTransactionAllowed()) {
57 throw new NestedTransactionNotSupportedException("Transaction manager does not allow nested transactions by default - specify 'nestedTransactionAllowed' property with value 'true'");
58 }
59 if (debugEnabled) {
60 logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
61 }
62 if (useSavepointForNestedTransaction()) {
63 DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
64
65 // 创建一个savepoint
66 status.createAndHoldSavepoint();
67 return status;
68 } else {
69 // Nested transaction through nested begin and commit/rollback calls.
70 // Usually only for JTA: Spring synchronization might get activated here
71 // in case of a pre-existing JTA transaction.
72 return startTransaction(definition, transaction, debugEnabled, null);
73 }
74 }
75
76 // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
77 if (debugEnabled) {
78 logger.debug("Participating in existing transaction");
79 }
80
81 if (isValidateExistingTransaction()) {
82 if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
83 Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
84 if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
85 Constants isoConstants = DefaultTransactionDefinition.constants;
86 throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] specifies isolation level which is incompatible with existing transaction: " + (currentIsolationLevel != null ? isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) : "(unknown)"));
87 }
88 }
89 if (!definition.isReadOnly()) {
90 if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
91 throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] is not marked as read-only but existing transaction is");
92 }
93 }
94 }
95
96 // 如果依然是Propagation.REQUIRED
97 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
98 return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
99 }
100}
3.3. suspend
回到主线AbstractPlatformTransactionManager#getTransaction()之中,如果当前线程中不存在事务,则先挂起,但是即然都没有事务了,还需要挂起什么呢,所以suspend(null)方法中的参数是null。
1protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
2 // synchronizations 是一个ThreadLocal<Set<TransactionSynchronization>>
3 // 我们可以在任何地方通过TransactionSynchronizationManager给当前线程添加TransactionSynchronization.
4
5 if (TransactionSynchronizationManager.isSynchronizationActive()) {
6 // 调用TransactionSynchronization的suspend方法, 并清空和返回当前线程中所有的
7 List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
8 try {
9 Object suspendedResources = null;
10 if (transaction != null) {
11
12 // 挂起事务, 把transaction中的Connection清空, 并把resources中的key-value进行移除,
13 // 并返回数据连接Connection对象
14 suspendedResources = doSuspend(transaction);
15 }
16
17 // 获取并清空当前线程中关于TransactionSynchronizationManager的设置,
18 // 准备开启新的事务(因为你都要挂起了, 肯定是为了新开启别的事务)
19 String name = TransactionSynchronizationManager.getCurrentTransactionName();
20 TransactionSynchronizationManager.setCurrentTransactionName(null);
21 boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
22 TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
23 Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
24 TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
25 boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
26 TransactionSynchronizationManager.setActualTransactionActive(false);
27
28 // 将当前线程中的数据库连接对象、TransactionSynchronization对象、TransactionSynchronizationManager中的设置构造成一个对象
29 // 表示被挂起的资源持有对象,持有了当前线程中的事务对象、TransactionSynchronization对象
30 return new SuspendedResourcesHolder(suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
31
32 } catch (RuntimeException | Error ex) {
33 // doSuspend failed - original transaction is still active...
34 doResumeSynchronization(suspendedSynchronizations);
35 throw ex;
36 }
37 } else if (transaction != null) {
38 // Transaction active but no synchronization active.
39 Object suspendedResources = doSuspend(transaction);
40 return new SuspendedResourcesHolder(suspendedResources);
41 } else {
42 // Neither transaction nor synchronization active.
43 return null;
44 }
45 }
如果没有事务的话TransactionSynchronizationManager.isSynchronizationActive()这个条件也是false。这个方法对应的是TransactionSynchronizationManager#initSynchronization()。这个方法在org.springframework.transaction.support.AbstractPlatformTransactionManager#startTransaction方法的最后被间接调用到,在prepareSynchronization()可以看到。
那么挂起到底执行了什么?进入doSuspend()方法中:
DataSourceTransactionManager#doSuspend
1@Override
2protected Object doSuspend(Object transaction) {
3 DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
4 txObject.setConnectionHolder(null);
5 return TransactionSynchronizationManager.unbindResource(obtainDataSource());
6}
- txObject中的connetionHolder清空
- 删除resource的ThreadLocal的DataSource
- 返回连接对象。
除了doSuspend()方法中需要做的,还有suspend()中的剩下的逻辑,拿到当前TransactionSynchronizationManagement中的当前事务的名字啊,是否是readOnly啊,之类的其他属性(这些属性在事务同步管理器中都是以ThreadLocal的形式存储的),最后设置到SuspendedResourcesHolder中,并返回这个SuspendedResourcesHolder。
从上面的过程来看,挂起实际上就是一个保存现场的过程。
Spring这里还为我们提供了一个扩展点,当我们执行userService.insert()方法的时候,如果有这个需求:当前事务被挂起、恢复、事务提交前,事务提交后的时候需要执行一个操作,我们就可以向TransactionSynchronizationManager中注册一个TransactionSynchronization。代码如下:
1@Transactional
2public void insert() {
3 TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
4 @Override
5 public void suspend() {
6 TransactionSynchronization.super.suspend();
7 }
8
9 @Override
10 public void resume() {
11 TransactionSynchronization.super.resume();
12 }
13
14 @Override
15 public void beforeCommit(boolean readOnly) {
16 TransactionSynchronization.super.beforeCommit(readOnly);
17 }
18
19 @Override
20 public void afterCommit() {
21 TransactionSynchronization.super.afterCommit();
22 }
23 });
24
25
26 jdbcTemplate.execute("insert into user(name,sex,age) values('test','男',18)");
27}
而我们注册的这个同步器什么时候被执行呢,执行的逻辑(挂起)就在doSuspendSynchronization()中。
1private List<TransactionSynchronization> doSuspendSynchronization() {
2 List<TransactionSynchronization> suspendedSynchronizations = TransactionSynchronizationManager.getSynchronizations();
3 for (TransactionSynchronization synchronization : suspendedSynchronizations) {
4 synchronization.suspend();
5 }
6 TransactionSynchronizationManager.clearSynchronization();
7 return suspendedSynchronizations;
8}
从代码中可以看出,拿到了所有的同步器,并且挨个执行suspend()扩展方法。
3.4. startTransaction
startTransaction(def, transaction, debugEnabled, suspendedResources);开启新事务的逻辑。
1private TransactionStatus startTransaction(
2 TransactionDefinition definition,
3 Object transaction,
4 boolean debugEnabled,
5 @Nullable SuspendedResourcesHolder suspendedResources
6) {
7 // 是否开启一个新的TransactionSynchronization
8 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
9
10 // 开启的这个事务的状态信息
11 // 事务的定义、用来保存数据库连接的对象、是否是新事务, 是否是新的TransactionSynchronization
12 DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
13
14 // ☆ -> DataSourceTransactionManager
15 // 开启事务
16 doBegin(transaction, definition);
17
18 // 如果需要新开一个TransactionSynchronization, 就把新创建的事务的一些状态信息
19 // 设置到TransactionSynchronizationManager中
20 prepareSynchronization(status, definition);
21 return status;
22}
3.5. doBegin
doBegin(transaction, definition);开启事务
1protected void doBegin(Object transaction, TransactionDefinition definition) {
2 DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
3 Connection con = null;
4
5 try {
6 // 如果当前线程中所使用的DataSource还没有创建过数据库连接, 就获取一个数据库连接
7 if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
8 Connection newCon = obtainDataSource().getConnection();
9 txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
10 }
11
12 txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
13 con = txObject.getConnectionHolder().getConnection();
14
15 // 根据@Transactional注解中的设置, 设置Connection的readOnly与隔离级别
16 Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
17 txObject.setPreviousIsolationLevel(previousIsolationLevel);
18 txObject.setReadOnly(definition.isReadOnly());
19
20 // 设置autoCommit为false
21 if (con.getAutoCommit()) {
22 txObject.setMustRestoreAutoCommit(true);
23
24 // 设置
25 con.setAutoCommit(false);
26 }
27
28 prepareTransactionalConnection(con, definition);
29 txObject.getConnectionHolder().setTransactionActive(true);
30
31 // 设置数据库连接超时时间
32 int timeout = determineTimeout(definition);
33 if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
34 txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
35 }
36
37 // 把新建的数据库连接设置到resources中,resources就是一个ThreadLocal<Map<Object, Object>>,
38 // 事务管理器中的设置的DataSource对象为key,数据库连接对象为value
39 if (txObject.isNewConnectionHolder()) {
40 TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
41 }
42 } catch (Throwable ex) {
43 if (txObject.isNewConnectionHolder()) {
44 DataSourceUtils.releaseConnection(con, obtainDataSource());
45 txObject.setConnectionHolder(null, false);
46 }
47 throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
48 }
49}
- 创建数据库连接
- 设置设置相关属性
- 将数据库连接设置到ThreadLocal中
当挂起完毕操作之后接下来就是doBegin()和前面提到的prepareSynchronization()方法,将新的事务设置到TransactionSynchronizationManager中:
1protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
2 if (status.isNewSynchronization()) {
3 TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
4 TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
5 definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
6 definition.getIsolationLevel() :
7 null
8 );
9 TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
10 TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
11 TransactionSynchronizationManager.initSynchronization();
12 }
13}
上面的逻辑处理完了之后,要么有异常回滚事务,要么正常结束提交事务。
对应到TransactionAspectSupport#invokeWithinTransaction()方法中的就是下面两个方法:
commitTransactionAfterReturning(txInfo);:提交事务completeTransactionAfterThrowing(txInfo, ex);:回滚事务
3.6. commit
AbstractPlatformTransactionManager#commit
1@Override
2public final void commit(TransactionStatus status) throws TransactionException {
3 if (status.isCompleted()) {
4 throw new IllegalTransactionStateException(
5 "Transaction is already completed - do not call commit or rollback more than once per transaction");
6 }
7
8 DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
9
10 // 可以通过TransactionAspectSupport.currentTransactionStatus().setRollbackOnly(); 来设置
11 // 判断是否需要强制回滚
12 if (defStatus.isLocalRollbackOnly()) {
13 if (defStatus.isDebug()) {
14 logger.debug("Transactional code has requested rollback");
15 }
16 processRollback(defStatus, false);
17 return;
18 }
19
20 // 判断此事务在之前是否设置了需要回滚, 跟globalRollbackOnParticipationFailure有关
21 if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
22 if (defStatus.isDebug()) {
23 logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
24 }
25 processRollback(defStatus, true);
26 return;
27 }
28
29 processCommit(defStatus);
30}
-
defStatus.isLocalRollbackOnly():这个方法是在判断是否是手动强制回滚了,在代码中我们可以通过下面的代码来让事务强制回滚,即使没有异常:1TransactionAspectSupport.currentTransactionStatus().setRollbackOnly(); -
defStatus.isGlobalRollbackOnly():
继续往下走,进入processCommit()方法中
1private void processCommit(DefaultTransactionStatus status) throws TransactionException {
2 try {
3 boolean beforeCompletionInvoked = false;
4
5 try {
6 boolean unexpectedRollback = false;
7
8 // 前面说到的register进去的几个回调方法
9 prepareForCommit(status);
10 triggerBeforeCommit(status);
11 triggerBeforeCompletion(status);
12 beforeCompletionInvoked = true;
13
14 if (status.hasSavepoint()) {
15 if (status.isDebug()) {
16 logger.debug("Releasing transaction savepoint");
17 }
18 unexpectedRollback = status.isGlobalRollbackOnly();
19 status.releaseHeldSavepoint();
20 } else if (status.isNewTransaction()) {
21 if (status.isDebug()) {
22 logger.debug("Initiating transaction commit");
23 }
24 unexpectedRollback = status.isGlobalRollbackOnly();
25
26 // ☆ ->
27 doCommit(status);
28 } else if (isFailEarlyOnGlobalRollbackOnly()) {
29 unexpectedRollback = status.isGlobalRollbackOnly();
30 }
31
32 // Throw UnexpectedRollbackException if we have a global rollback-only
33 // marker but still didn't get a corresponding exception from commit.
34 if (unexpectedRollback) {
35 throw new UnexpectedRollbackException(
36 "Transaction silently rolled back because it has been marked as rollback-only");
37 }
38 } catch (UnexpectedRollbackException ex) {
39 // can only be caused by doCommit
40 // 触发回调
41 triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
42 throw ex;
43 } catch (TransactionException ex) {
44 // can only be caused by doCommit
45 if (isRollbackOnCommitFailure()) {
46 doRollbackOnCommitException(status, ex);
47 } else {
48 // 触发回调
49 triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
50 }
51 throw ex;
52 } catch (RuntimeException | Error ex) {
53 if (!beforeCompletionInvoked) {
54 // 触发回调
55 triggerBeforeCompletion(status);
56 }
57 doRollbackOnCommitException(status, ex);
58 throw ex;
59 }
60
61 // Trigger afterCommit callbacks, with an exception thrown there
62 // propagated to callers but the transaction still considered as committed.
63 try {
64 triggerAfterCommit(status);
65 } finally {
66 triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
67 }
68
69 } finally {
70
71 // 恢复被挂起的资源到当前线程中
72 cleanupAfterCompletion(status);
73 }
74}
在上面的方法中else if (status.isNewTransaction())这个分支判断了当前事务是不是我正在执行的这个方法(如userService的insert方法)新建的,如果是,那么我就可以现在提交。如果不是,有可能是传播下来的,那我现在还不能提交,需要到创建这个事务的方法中去提交。
3.7. doCommit
最关键的方法就是doCommit()方法:
DataSourceTransactionManager#doCommit
1@Override
2protected void doCommit(DefaultTransactionStatus status) {
3 DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
4 Connection con = txObject.getConnectionHolder().getConnection();
5 if (status.isDebug()) {
6 logger.debug("Committing JDBC transaction on Connection [" + con + "]");
7 }
8 try {
9
10 // 提交
11 con.commit();
12 } catch (SQLException ex) {
13 throw translateException("JDBC commit", ex);
14 }
15}
由此整个事务顺利结束?并没有,还需要看一下之前有没有事务被挂起,如果有的话需要将挂起的事务恢复(Resume)。
在doCommit()的最后有一个cleanupAfterCompletion()方法如下:
1private void cleanupAfterCompletion(DefaultTransactionStatus status) {
2 status.setCompleted();
3 if (status.isNewSynchronization()) {
4 TransactionSynchronizationManager.clear();
5 }
6 if (status.isNewTransaction()) {
7 // 这里会去关闭数据库连接
8 doCleanupAfterCompletion(status.getTransaction());
9 }
10
11 // 恢复被挂起的资源到当前线程中
12 if (status.getSuspendedResources() != null) {
13 if (status.isDebug()) {
14 logger.debug("Resuming suspended transaction after completion of inner transaction");
15 }
16 Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
17
18 // 恢复
19 resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
20 }
21}
status.isNewTransaction():判断当前的事务是不是我这个方法(如insert())中新建的,如果是新建的,还需要将连接释放掉。如果当前事务是传播下来的,那就只要提交就可以了。具体可见doCleanupAfterCompletion()方法
还剩一个事务失败的回滚流程completeTransactionAfterThrowing(txInfo, ex);
1protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
2 if (txInfo != null && txInfo.getTransactionStatus() != null) {
3 if (logger.isTraceEnabled()) {
4 logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
5 "] after exception: " + ex);
6 }
7
8 // transactionAttribute的实现类为[RuleBasedTransactionAttribute], 父类为DefaultTransactionAttribute
9 if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
10 try {
11 txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
12 } catch (TransactionSystemException ex2) {
13 logger.error("Application exception overridden by rollback exception", ex);
14 ex2.initApplicationException(ex);
15 throw ex2;
16 } catch (RuntimeException | Error ex2) {
17 logger.error("Application exception overridden by rollback exception", ex);
18 throw ex2;
19 }
20 } else {
21 // We don't roll back on this exception.
22 // Will still roll back if TransactionStatus.isRollbackOnly() is true.
23 try {
24 txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
25 } catch (TransactionSystemException ex2) {
26 logger.error("Application exception overridden by commit exception", ex);
27 ex2.initApplicationException(ex);
28 throw ex2;
29 } catch (RuntimeException | Error ex2) {
30 logger.error("Application exception overridden by commit exception", ex);
31 throw ex2;
32 }
33 }
34 }
35}
txInfo.transactionAttribute.rollbackOn(ex)这里是在判断是否符合我们在@Transactional()中的rollbackFor参数对应的异常。
对应的匹配逻辑:
RuleBasedTransactionAttribute#rollbackOn
1public boolean rollbackOn(Throwable ex) {
2 RollbackRuleAttribute winner = null;
3 int deepest = Integer.MAX_VALUE;
4
5 if (this.rollbackRules != null) {
6 // 遍历所有的RollbackRuleAttribute, 判断现在抛出的异常ex是否匹配RollbackRuleAttribute中指定的异常类型的子类或本身
7 for (RollbackRuleAttribute rule : this.rollbackRules) {
8 int depth = rule.getDepth(ex);
9 if (depth >= 0 && depth < deepest) {
10 deepest = depth;
11 winner = rule;
12 }
13 }
14 }
15
16 // User superclass behavior (rollback on unchecked) if no rule matches.
17 if (winner == null) {
18 return super.rollbackOn(ex);
19 }
20
21 // ex所匹配的RollbackRuleAttribute, 可能是NoRollbackRuleAttribute, 如果是匹配的NoRollbackRuleAttribute,
22 // 那就表示现在这个异常ex 不用刚回滚
23 return !(winner instanceof NoRollbackRuleAttribute);
24 }
3.8. rollback
1@Override
2public final void rollback(TransactionStatus status) throws TransactionException {
3 if (status.isCompleted()) {
4 throw new IllegalTransactionStateException("Transaction is already completed - do not call commit or rollback more than once per transaction");
5 }
6
7 DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
8 processRollback(defStatus, false);
9}
拿到defStatus直接调用下面的processRollback()方法
1private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
2 try {
3 boolean unexpectedRollback = unexpected;
4
5 try {
6 // 回滚之前先触发下面的回调
7 triggerBeforeCompletion(status);
8
9 if (status.hasSavepoint()) {
10
11 // 回滚到上一个savepoint位置
12 status.rollbackToHeldSavepoint();
13 } else if (status.isNewTransaction()) {
14 if (status.isDebug()) {
15 logger.debug("Initiating transaction rollback");
16 }
17
18 // ☆ ->
19 // 如果当前执行的方法是新开的事务, 则直接回滚
20 doRollback(status);
21 } else {
22 // Participating in larger transaction
23 // 如果当前执行的方法已经有一个事务了, 而当前执行的方法抛出了异常, 则要判断整个事务到底要不要回滚, 看具体配置
24 if (status.hasTransaction()) {
25
26 // 如果一个事务中有两个方法,第二个方法抛异常了,那么第二个方法就相当于执行失败需要回滚,
27 // 如果globalRollbackOnParticipationFailure为true,那么第一个方法在没有抛异常的情况下也要回滚
28 if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
29 if (status.isDebug()) {
30 logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
31 }
32
33 // 直接将rollbackOnly设置到ConnectionHolder中去, 表示整个事务的sql都要回滚
34 doSetRollbackOnly(status);
35 } else {
36 if (status.isDebug()) {
37 logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
38 }
39 }
40 } else {
41 logger.debug("Should roll back transaction but cannot - no transaction available");
42 }
43 // Unexpected rollback only matters here if we're asked to fail early
44 if (!isFailEarlyOnGlobalRollbackOnly()) {
45 unexpectedRollback = false;
46 }
47 }
48 } catch (RuntimeException | Error ex) {
49 triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
50 throw ex;
51 }
52
53 triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
54
55 // Raise UnexpectedRollbackException if we had a global rollback-only marker
56 if (unexpectedRollback) {
57 throw new UnexpectedRollbackException("Transaction rolled back because it has been marked as rollback-only");
58 }
59 } finally {
60 cleanupAfterCompletion(status);
61 }
62}
上面的代码中:status.rollbackToHeldSavepoint();
3.8.1. SavePoint
对应着MySQL中的savepoint,使用如下:
1begin;
2
3insert into user(name,sex,age) values('test1','男',18);
4insert into user(name,sex,age) values('test2','男',18);
5insert into user(name,sex,age) values('test3','男',18);
6
7savepoint x;
8
9insert into user(name,sex,age) values('test4','男',18);
10insert into user(name,sex,age) values('test5','男',18);
11
12rollback to x;
13
14
15commit;
此时只会回滚test4和test5。
3.9. doRollback
调用connection的rollback()方法,直接回滚。
1@Override
2protected void doRollback(DefaultTransactionStatus status) {
3 DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
4 Connection con = txObject.getConnectionHolder().getConnection();
5 if (status.isDebug()) {
6 logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
7 }
8 try {
9 con.rollback();
10 } catch (SQLException ex) {
11 throw translateException("JDBC rollback", ex);
12 }
13}