Spring事务管理是Spring框架提供的一种机制,用于管理数据库事务。它提供了声明式事务管理和编程式事务管理两种方式。

1. 基础


1.1. 声明式事务管理

在声明式事务管理中,我们可以使用注解或XML配置来管理事务。常用的注解有@Transactional,可以标记在类或方法上,表示该类或方法需要事务管理。

1@Transactional
2public class UserService {
3    public void updateUser(User user) {
4        // 更新用户信息
5    }
6}

在XML配置中,我们可以使用<tx:advice><tx:attributes>来配置事务属性。

1<tx:advice id="txAdvice" transaction-manager="transactionManager">
2    <tx:attributes>
3        <tx:method name="update*" propagation="REQUIRED"/>
4    </tx:attributes>
5</tx:advice>

1.2. 编程式事务管理

编程式事务管理是通过编写代码来管理事务。Spring提供了TransactionTemplate来简化编程式事务管理的操作。

 1public class UserService {
 2    private TransactionTemplate transactionTemplate;
 3    
 4    public void updateUser(User user) {
 5        transactionTemplate.execute(new TransactionCallbackWithoutResult() {
 6            @Override
 7            protected void doInTransactionWithoutResult(TransactionStatus status) {
 8                // 更新用户信息
 9            }
10        });
11    }
12}

1.3. 事务传播行为

事务传播行为定义了在不同事务方法调用之间事务是如何传播的。常见的传播行为包括REQUIREDREQUIRES_NEWNESTED等。

1@Transactional(propagation = Propagation.REQUIRED)
2public void updateUserInfo(User user) {
3    // 更新用户信息
4}

1.4. 事务隔离级别

事务隔离级别定义了事务之间的隔离程度,包括READ_UNCOMMITTEDREAD_COMMITTEDREPEATABLE_READSERIALIZABLE等级别。

1@Transactional(isolation = Isolation.READ_COMMITTED)
2public void updateUserInfo(User user) {
3    // 更新用户信息
4}

Spring事务管理提供了灵活和强大的功能,可以帮助我们管理数据库事务,确保数据的一致性和完整性。

1.5. 源码中的定义

 1public interface TransactionDefinition {
 2    // 传播机制
 3	int PROPAGATION_REQUIRED = 0;
 4	int PROPAGATION_SUPPORTS = 1;
 5	int PROPAGATION_MANDATORY = 2;
 6	int PROPAGATION_REQUIRES_NEW = 3;
 7	int PROPAGATION_NOT_SUPPORTED = 4;
 8	int PROPAGATION_NEVER = 5;
 9	int PROPAGATION_NESTED = 6;
10    
11    // 隔离级别
12	int ISOLATION_DEFAULT = -1;
13	int ISOLATION_READ_UNCOMMITTED = 1;  // same as java.sql.Connection.TRANSACTION_READ_UNCOMMITTED;
14	int ISOLATION_READ_COMMITTED = 2;  // same as java.sql.Connection.TRANSACTION_READ_COMMITTED;
15	int ISOLATION_REPEATABLE_READ = 4;  // same as java.sql.Connection.TRANSACTION_REPEATABLE_READ;
16	int ISOLATION_SERIALIZABLE = 8;  // same as java.sql.Connection.TRANSACTION_SERIALIZABLE;
17
18	int TIMEOUT_DEFAULT = -1;
19}

2. 使用方法

2.1. 主要注解

  • @EnableTransactionManagement
  • @Transactional
 1@ComponentScan("org.springframework.gang")
 2@EnableAspectJAutoProxy
 3@EnableTransactionManagement
 4public class AppConfig {
 5    
 6    // ...
 7    
 8    
 9	@Bean
10	public JdbcTemplate jdbcTemplate() {
11		JdbcTemplate jdbctemplate = new JdbcTemplate(dataSource());
12		return jdbctemplate;
13	}
14
15	@Bean
16	public PlatformTransactionManager transactionManager() {
17		DataSourceTransactionManager transactionManager = new DataSourceTransactionManager();
18		transactionManager.setDataSource(dataSource());
19		return transactionManager;
20	}
21
22	@Bean
23	public DataSource dataSource() {
24		DriverManagerDataSource dataSource = new DriverManagerDataSource();
25		dataSource.setDriverClassName("com.mysql.jdbc.Driver");
26		dataSource.setUrl("jdbc:mysql://localhost:3306/test");
27		dataSource.setUsername("root");
28		dataSource.setPassword("123456");
29		return dataSource;
30	}
31
32}

2.2. EnableTrnasactionManagement

1@Target(ElementType.TYPE)
2@Retention(RetentionPolicy.RUNTIME)
3@Documented
4@Import(TransactionManagementConfigurationSelector.class)
5public @interface EnableTransactionManagement {
6    // ...
7}

导入了一个TransactionManagementConfigurationSelector这个类,

TransactionManagementConfigurationSelector.java

 1public class TransactionManagementConfigurationSelector extends AdviceModeImportSelector<EnableTransactionManagement> {
 2	@Override
 3	protected String[] selectImports(AdviceMode adviceMode) {
 4		switch (adviceMode) {
 5			case PROXY:
 6                // 默认PROXY
 7				return new String[] {AutoProxyRegistrar.class.getName(),
 8						ProxyTransactionManagementConfiguration.class.getName()};
 9			case ASPECTJ:
10                // 使用AspectJ
11				return new String[] {determineTransactionAspectClass()};
12			default:
13				return null;
14		}
15	}
16
17	private String determineTransactionAspectClass() {
18		return (ClassUtils.isPresent("javax.transaction.Transactional", getClass().getClassLoader()) ?
19				TransactionManagementConfigUtils.JTA_TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME :
20				TransactionManagementConfigUtils.TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME);
21	}
22
23}

想Spring容器中注入了两个类:

  • AutoProxyRegistrar.java
  • ProxyTransactionManagementConfiguration.java

2.2.1. AutoProxyRegistrar

实际上是像容器中注册了InfrastructureAdvisorAutoProxyCreator.java

1	@Nullable
2	public static BeanDefinition registerAutoProxyCreatorIfNecessary(BeanDefinitionRegistry registry, @Nullable Object source) {
3		// 注册 InfrastructureAdvisorAutoProxyCreator
4		return registerOrEscalateApcAsRequired(InfrastructureAdvisorAutoProxyCreator.class, registry, source);
5	}

开启AOP,会生成Spring容器中的Advisor对象,但是和AnnotationAwareAspectJAutoProxyCreator不一样,他不会去解析AspectJ的那些注解,虽然都是AbstractAdvisorAutoProxyCreator的子类,但是功能不同。


2.2.2. ProxyTransactionManagementConfiguration

这个类像容器中又注册了三个Bean:

 1@Configuration(proxyBeanMethods = false)
 2@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
 3public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {
 4
 5	@Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME)
 6	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
 7	public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor(TransactionAttributeSource transactionAttributeSource, TransactionInterceptor transactionInterceptor) {
 8
 9		BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
10        
11        
12		advisor.setTransactionAttributeSource(transactionAttributeSource);
13        
14        // 代理逻辑
15		advisor.setAdvice(transactionInterceptor);
16		if (this.enableTx != null) {
17			advisor.setOrder(this.enableTx.<Integer>getNumber("order"));
18		}
19		return advisor;
20	}
21
22	@Bean
23	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
24	public TransactionAttributeSource transactionAttributeSource() {
25		return new AnnotationTransactionAttributeSource();
26	}
27
28	@Bean
29	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
30	public TransactionInterceptor transactionInterceptor(TransactionAttributeSource transactionAttributeSource) {
31		TransactionInterceptor interceptor = new TransactionInterceptor();
32		interceptor.setTransactionAttributeSource(transactionAttributeSource);
33		if (this.txManager != null) {
34			interceptor.setTransactionManager(this.txManager);
35		}
36		return interceptor;
37	}
38
39}
  1. BeanFactoryTransactionAttributeSourceAdvisor:实际上就是一个Advisor。从该方法的参数中可以看到需要一个TransactionInterceptor,这个Bean在第三个方法返回的。
  2. TransactionAttributeSourceAnnotationTransactionAttributeSource定义了一个PointCut,这个类会解析@Transactional注解中的元素信息determineTransactionAttribute(AnnotatedElement element)
  3. TransactionInterceptor:事务的代理逻辑。

2.2.3. BeanFactoryTransactionAttributeSourceAdvisor

这个类是事务的Advisor,那么就要有PointCut和Advice两部分。首先来看PointCut到底在哪里,在这个类中有一个pointcut属性,内容如下:

1private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut() {
2    @Override
3    @Nullable
4    protected TransactionAttributeSource getTransactionAttributeSource() {
5        return transactionAttributeSource;
6    }
7};

PointCut的筛选,要么是类筛选(ClassFilter),要么是方法筛选(MethodFilter),所以再进一步看一下TransactionAttributeSourcePointcut,构造方法如下:

 1protected TransactionAttributeSourcePointcut() {
 2    setClassFilter(new TransactionAttributeSourceClassFilter());
 3}
 4
 5
 6private class TransactionAttributeSourceClassFilter implements ClassFilter {
 7
 8    @Override
 9    public boolean matches(Class<?> clazz) {
10        // 事务内部的几个类,要先排除掉。
11        if (TransactionalProxy.class.isAssignableFrom(clazz) ||
12            TransactionManager.class.isAssignableFrom(clazz) ||
13            PersistenceExceptionTranslator.class.isAssignableFrom(clazz)) {
14            return false;
15        }
16        TransactionAttributeSource tas = getTransactionAttributeSource();
17        return (tas == null || tas.isCandidateClass(clazz));
18    }
19}

TransactionAttributeSourceClassFilter是一个内部类,实现了类过滤的matches方法。

先过滤掉和事务相关的几个注解。

在是用tas的方法筛选出有@Transactional注解的类。调用链如下:

  1. org.springframework.transaction.annotation.AnnotationTransactionAttributeSource#isCandidateClass

  2. parser.isCandidateClass(targetClass)

  3. org.springframework.transaction.annotation.SpringTransactionAnnotationParser#isCandidateClass

    1@Override
    2public boolean isCandidateClass(Class<?> targetClass) {
    3    return AnnotationUtils.isCandidateClass(targetClass, Transactional.class);
    4}
    

    从上面的代码中可以看出,校验了类上是否有Transactional注解

TransactionAttributeSourcePointcut中还有方法的筛选逻辑:

1@Override
2public boolean matches(Method method, Class<?> targetClass) {
3    TransactionAttributeSource tas = getTransactionAttributeSource();
4    return (tas == null || tas.getTransactionAttribute(method, targetClass) != null);
5}

tas.getTransactionAttribute(method, targetClass)这个方法会解析并缓存匹配到的TransactionAttribute

进入该方法之后,到达AbstractFallbackTransactionAttributeSource#computeTransactionAttribute中,在这个方法中会去解析@Transactional注解,经过parse之后生成TransactionAttribute,具体代码如下:

SpringTransactionAnnotationParser#parseTransactionAnnotation(AnnotationAttributes)

 1protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) {
 2    RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute();
 3
 4    Propagation propagation = attributes.getEnum("propagation");
 5    rbta.setPropagationBehavior(propagation.value());
 6    Isolation isolation = attributes.getEnum("isolation");
 7    rbta.setIsolationLevel(isolation.value());
 8
 9    rbta.setTimeout(attributes.getNumber("timeout").intValue());
10    String timeoutString = attributes.getString("timeoutString");
11    Assert.isTrue(!StringUtils.hasText(timeoutString) || rbta.getTimeout() < 0,
12                  "Specify 'timeout' or 'timeoutString', not both");
13    rbta.setTimeoutString(timeoutString);
14
15    rbta.setReadOnly(attributes.getBoolean("readOnly"));
16    rbta.setQualifier(attributes.getString("value"));
17    rbta.setLabels(Arrays.asList(attributes.getStringArray("label")));
18
19    List<RollbackRuleAttribute> rollbackRules = new ArrayList<>();
20    for (Class<?> rbRule : attributes.getClassArray("rollbackFor")) {
21        rollbackRules.add(new RollbackRuleAttribute(rbRule));
22    }
23    for (String rbRule : attributes.getStringArray("rollbackForClassName")) {
24        rollbackRules.add(new RollbackRuleAttribute(rbRule));
25    }
26    for (Class<?> rbRule : attributes.getClassArray("noRollbackFor")) {
27        rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
28    }
29    for (String rbRule : attributes.getStringArray("noRollbackForClassName")) {
30        rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
31    }
32    rbta.setRollbackRules(rollbackRules);
33
34    return rbta;
35}

3. Spring事务处理逻辑

3.1. TransactionInterceptor

1@Transactional
2public void insert() {
3    jdbcTemplate.execute("insert into user(name,sex,age) values('test','男',18)");
4}

主要的代理逻辑入口是TransactionInterceptor这个类的invoke()方法。

 1public Object invoke(MethodInvocation invocation) throws Throwable {
 2    
 3    Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
 4
 5    return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {
 6        @Override
 7        @Nullable
 8        public Object proceedWithInvocation() throws Throwable {
 9
10            // 执行后续的Interceptor, 以及被代理的方法:如userService的insert方法
11            return invocation.proceed();
12        }
13        @Override
14        public Object getTarget() {
15            return invocation.getThis();
16        }
17        @Override
18        public Object[] getArguments() {
19            return invocation.getArguments();
20        }
21    });
22}

在进入invokeWithinTransaction()方法,Spring事务主要的逻辑就是在这个方法中。

 1protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
 2                                         final InvocationCallback invocation) throws Throwable {
 3
 4    // TransactionAttribute就是@Transactional中的配置
 5    TransactionAttributeSource tas = getTransactionAttributeSource();
 6    // 获取@Transactional注解中的属性值
 7    final TransactionAttribute txAttr = (
 8        tas != null ?
 9        tas.getTransactionAttribute(method, targetClass) :
10        null
11    );
12
13    // 返回Spring容器中类型为TransactionManager的Bean对象
14    final TransactionManager tm = determineTransactionManager(txAttr);
15
16    // ReactiveTransactionManager用得少,并且它只是执行方式是响应式的,原理流程和普通的是一样的
17    // ,..
18
19    // 把tm强制转换成PlatformTransactionManager, 所以我们在定义时得定义PlatformTransactionManager类型
20    PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
21
22    // joinpoint的唯一标识, 就是当前在执行方法的名字
23    final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
24
25    // CallbackPreferringPlatformTransactionManager表示有回调功能的PlatformTransactionManager
26    if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
27        // Standard transaction demarcation with getTransaction and commit/rollback calls.
28
29        // ☆ ->
30        // 如果有必要就创建事务,这里就涉及到事务传播机制的实现了
31        // TransactionInfo表示一个逻辑事务,比如两个逻辑事务属于同一个物理事务
32        TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
33
34        Object retVal;
35        try {
36            // This is an around advice: Invoke the next interceptor in the chain.
37            // This will normally result in a target object being invoked.
38            // 执行下一个Interceptor或被代理对象中的方法
39            retVal = invocation.proceedWithInvocation(); // test()
40        } catch (Throwable ex) {
41            // target invocation exception
42            // 抛异常了, 回滚事务
43            completeTransactionAfterThrowing(txInfo, ex);
44            throw ex;
45        } finally {
46            cleanupTransactionInfo(txInfo);
47        }
48
49        if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
50            // Set rollback-only in case of Vavr failure matching our rollback rules...
51            TransactionStatus status = txInfo.getTransactionStatus();
52            if (status != null && txAttr != null) {
53                retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
54            }
55        }
56
57        // 提交事务
58        commitTransactionAfterReturning(txInfo);
59        return retVal;
60    } else {
61        // ...
62    }
63}

进入32行的createTransactionIfNecessary()这个方法,这个方法是判断是否需要开启新的事务以及开启事务。

3.2. 如何开启新事务的

 1protected TransactionInfo createTransactionIfNecessary(
 2    @Nullable PlatformTransactionManager tm,
 3    @Nullable TransactionAttribute txAttr,
 4    final String joinpointIdentification
 5) {
 6
 7    // If no name specified, apply method identification as transaction name.
 8    if (txAttr != null && txAttr.getName() == null) {
 9        txAttr = new DelegatingTransactionAttribute(txAttr) {
10            @Override
11            public String getName() {
12                return joinpointIdentification;
13            }
14        };
15    }
16
17    // 每个逻辑事务都会创建一个TransactionStatus,
18    // 但是TransactionStatus中有一个属性代表当前逻辑事务底层的物理事务是不是新的
19    TransactionStatus status = null;
20    if (txAttr != null) {
21        if (tm != null) {
22
23            // ☆ ->
24            // 开启事务
25            status = tm.getTransaction(txAttr);
26        } else {
27            if (logger.isDebugEnabled()) {
28                logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured");
29            }
30        }
31    }
32
33    //
34    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
35}

进入25行的tm.getTransaction(txAttr);方法中

AbstractPlatformTransactionManager.java

 1@Override
 2public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
 3
 4    // Use defaults if no transaction definition given.
 5    TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
 6
 7    // 获取一个新的DataSourceTransactionObject对象
 8    Object transaction = doGetTransaction();
 9    boolean debugEnabled = logger.isDebugEnabled();
10
11    // 判断是否存在一个事务
12    if (isExistingTransaction(transaction)) {
13        // Existing transaction found -> check propagation behavior to find out how to behave.
14        // 已经存在一个事务了
15        return handleExistingTransaction(def, transaction, debugEnabled);
16    }
17
18    // 不存在事务
19    // Check definition settings for new transaction.
20    // 判断@Transactional(timeout=?)
21    if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
22        throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
23    }
24
25    // No existing transaction found -> check propagation behavior to find out how to proceed.
26    if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
27        // 如果配置的事手动开启事务 则直接抛出异常
28        throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation 'mandatory'");
29    } else if (
30        def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
31        def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
32        def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED
33    ) {
34        // 当前的Thread中在没有事务的前提下, 上面三种传播机制是等价的
35
36        // 没有事务需要挂起, 不过TransactionSynchronization有可能需要挂起
37        // suspendedResources表示当前线程被挂起的资源持有对象(数据库连接、TransactionSynchronization)
38        SuspendedResourcesHolder suspendedResources = suspend(null);
39        if (debugEnabled) {
40            logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
41        }
42
43        try {
44            // ☆ ->
45            // 开启事务后, transaction中就会有数据库连接了, 并且是isTransactionActive为true的
46            // 并返回TransactionStatus对象, 该对象保存了很多信息, 包括被挂起的资源
47            return startTransaction(def, transaction, debugEnabled, suspendedResources);
48        } catch (RuntimeException | Error ex) {
49            resume(null, suspendedResources);
50            throw ex;
51        }
52    } else {
53        // Create "empty" transaction: no actual transaction, but potentially synchronization.
54        if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
55            logger.warn("Custom isolation level specified but no actual transaction initiated; isolation level will effectively be ignored: " + def);
56        }
57        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
58        return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
59    }
60}

走上来会先判断是否已经存在一个事务了,如果已经存在了,则走handleExistingTransaction(def, transaction, debugEnabled);的逻辑,并直接return回去。否则继续往下执行。

如何判断当前已经存在事务了?

猜测是通过ThreadLocal,上面代码的第8行Object transaction = doGetTransaction();

org.springframework.jdbc.datasource.DataSourceTransactionManager

1@Override
2protected Object doGetTransaction() {
3    DataSourceTransactionObject txObject = new DataSourceTransactionObject();
4    txObject.setSavepointAllowed(isNestedTransactionAllowed());
5    ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
6    txObject.setConnectionHolder(conHolder, false);
7    return txObject;
8}

第5行getResource,最终会走到

org.springframework.transaction.support.TransactionSynchronizationManager

 1private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<>("Transactional resources");
 2
 3@Nullable
 4private static Object doGetResource(Object actualKey) {
 5    Map<Object, Object> map = resources.get();
 6    if (map == null) {
 7        return null;
 8    }
 9    Object value = map.get(actualKey);
10    // Transparently remove ResourceHolder that was marked as void...
11    if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
12        map.remove(actualKey);
13        // Remove entire ThreadLocal if empty...
14        if (map.isEmpty()) {
15            resources.remove();
16        }
17        value = null;
18    }
19    return value;
20}

上面的resources实际上就是要一个ThreadLocal。其中Map的key为DataSource,value为ConnectionHolder对象。

先看下已存在事务的情况:

  1private TransactionStatus handleExistingTransaction(
  2			TransactionDefinition definition,
  3			Object transaction,
  4			boolean debugEnabled
  5	) throws TransactionException {
  6
  7		// 判断当前事务的传播机制
  8
  9		// 如果是NEVER 就直接报错
 10		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
 11			// 以非事务方式运行,如果当前存在事务,则抛出异常
 12			throw new IllegalTransactionStateException("Existing transaction found for transaction marked with propagation 'never'");
 13		}
 14
 15		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
 16			// 以非事务方式运行,如果当前存在事务,则把当前事务挂起。
 17
 18			if (debugEnabled) {
 19				logger.debug("Suspending current transaction");
 20			}
 21
 22			// 挂起当前事务, 不新建事务
 23			Object suspendedResources = suspend(transaction);
 24			boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
 25			return prepareTransactionStatus(
 26					definition,
 27					null,
 28					false,
 29					newSynchronization,
 30					debugEnabled,
 31					suspendedResources
 32			);
 33		}
 34
 35		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
 36			// 不管当前有没有事务, 都会新建一个事务
 37
 38			if (debugEnabled) {
 39				logger.debug("Suspending current transaction, creating new transaction with name [" + definition.getName() + "]");
 40			}
 41
 42			// 先挂起当前事务
 43			SuspendedResourcesHolder suspendedResources = suspend(transaction);
 44			try {
 45
 46				// 开启一个新事务
 47				return startTransaction(definition, transaction, debugEnabled, suspendedResources);
 48			} catch (RuntimeException | Error beginEx) {
 49				resumeAfterBeginException(transaction, suspendedResources, beginEx);
 50				throw beginEx;
 51			}
 52		}
 53
 54		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
 55			// 如果当前存在事务,则创建一个事务作为当前事务的嵌套事务来运行;如果当前没有事务,则该取值等价于 PROPAGATION_REQUIRED
 56			if (!isNestedTransactionAllowed()) {
 57				throw new NestedTransactionNotSupportedException("Transaction manager does not allow nested transactions by default - specify 'nestedTransactionAllowed' property with value 'true'");
 58			}
 59			if (debugEnabled) {
 60				logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
 61			}
 62			if (useSavepointForNestedTransaction()) {
 63				DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
 64
 65				// 创建一个savepoint
 66				status.createAndHoldSavepoint();
 67				return status;
 68			} else {
 69				// Nested transaction through nested begin and commit/rollback calls.
 70				// Usually only for JTA: Spring synchronization might get activated here
 71				// in case of a pre-existing JTA transaction.
 72				return startTransaction(definition, transaction, debugEnabled, null);
 73			}
 74		}
 75
 76		// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
 77		if (debugEnabled) {
 78			logger.debug("Participating in existing transaction");
 79		}
 80
 81		if (isValidateExistingTransaction()) {
 82			if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
 83				Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
 84				if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
 85					Constants isoConstants = DefaultTransactionDefinition.constants;
 86					throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] specifies isolation level which is incompatible with existing transaction: " + (currentIsolationLevel != null ? isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) : "(unknown)"));
 87				}
 88			}
 89			if (!definition.isReadOnly()) {
 90				if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
 91					throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] is not marked as read-only but existing transaction is");
 92				}
 93			}
 94		}
 95
 96		// 如果依然是Propagation.REQUIRED
 97		boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
 98		return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
 99	}
100}

3.3. suspend

回到主线AbstractPlatformTransactionManager#getTransaction()之中,如果当前线程中不存在事务,则先挂起,但是即然都没有事务了,还需要挂起什么呢,所以suspend(null)方法中的参数是null。

 1protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
 2		// synchronizations 是一个ThreadLocal<Set<TransactionSynchronization>>
 3		// 我们可以在任何地方通过TransactionSynchronizationManager给当前线程添加TransactionSynchronization.
 4
 5		if (TransactionSynchronizationManager.isSynchronizationActive()) {
 6			// 调用TransactionSynchronization的suspend方法, 并清空和返回当前线程中所有的
 7			List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
 8			try {
 9				Object suspendedResources = null;
10				if (transaction != null) {
11
12					// 挂起事务, 把transaction中的Connection清空, 并把resources中的key-value进行移除,
13					// 并返回数据连接Connection对象
14					suspendedResources = doSuspend(transaction);
15				}
16
17				// 获取并清空当前线程中关于TransactionSynchronizationManager的设置,
18				// 准备开启新的事务(因为你都要挂起了, 肯定是为了新开启别的事务)
19				String name = TransactionSynchronizationManager.getCurrentTransactionName();
20				TransactionSynchronizationManager.setCurrentTransactionName(null);
21				boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
22				TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
23				Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
24				TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
25				boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
26				TransactionSynchronizationManager.setActualTransactionActive(false);
27
28				// 将当前线程中的数据库连接对象、TransactionSynchronization对象、TransactionSynchronizationManager中的设置构造成一个对象
29				// 表示被挂起的资源持有对象,持有了当前线程中的事务对象、TransactionSynchronization对象
30				return new SuspendedResourcesHolder(suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
31
32			} catch (RuntimeException | Error ex) {
33				// doSuspend failed - original transaction is still active...
34				doResumeSynchronization(suspendedSynchronizations);
35				throw ex;
36			}
37		} else if (transaction != null) {
38			// Transaction active but no synchronization active.
39			Object suspendedResources = doSuspend(transaction);
40			return new SuspendedResourcesHolder(suspendedResources);
41		} else {
42			// Neither transaction nor synchronization active.
43			return null;
44		}
45	}

如果没有事务的话TransactionSynchronizationManager.isSynchronizationActive()这个条件也是false。这个方法对应的是TransactionSynchronizationManager#initSynchronization()。这个方法在org.springframework.transaction.support.AbstractPlatformTransactionManager#startTransaction方法的最后被间接调用到,在prepareSynchronization()可以看到。

那么挂起到底执行了什么?进入doSuspend()方法中:

DataSourceTransactionManager#doSuspend

1@Override
2protected Object doSuspend(Object transaction) {
3    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
4    txObject.setConnectionHolder(null);
5    return TransactionSynchronizationManager.unbindResource(obtainDataSource());
6}
  • txObject中的connetionHolder清空
  • 删除resource的ThreadLocal的DataSource
  • 返回连接对象。

除了doSuspend()方法中需要做的,还有suspend()中的剩下的逻辑,拿到当前TransactionSynchronizationManagement中的当前事务的名字啊,是否是readOnly啊,之类的其他属性(这些属性在事务同步管理器中都是以ThreadLocal的形式存储的),最后设置到SuspendedResourcesHolder中,并返回这个SuspendedResourcesHolder

从上面的过程来看,挂起实际上就是一个保存现场的过程

Spring这里还为我们提供了一个扩展点,当我们执行userService.insert()方法的时候,如果有这个需求:当前事务被挂起、恢复、事务提交前,事务提交后的时候需要执行一个操作,我们就可以向TransactionSynchronizationManager中注册一个TransactionSynchronization。代码如下:

 1@Transactional
 2public void insert() {
 3    TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
 4        @Override
 5        public void suspend() {
 6            TransactionSynchronization.super.suspend();
 7        }
 8
 9        @Override
10        public void resume() {
11            TransactionSynchronization.super.resume();
12        }
13
14        @Override
15        public void beforeCommit(boolean readOnly) {
16            TransactionSynchronization.super.beforeCommit(readOnly);
17        }
18
19        @Override
20        public void afterCommit() {
21            TransactionSynchronization.super.afterCommit();
22        }
23    });
24
25
26    jdbcTemplate.execute("insert into user(name,sex,age) values('test','男',18)");
27}

而我们注册的这个同步器什么时候被执行呢,执行的逻辑(挂起)就在doSuspendSynchronization()中。

1private List<TransactionSynchronization> doSuspendSynchronization() {
2    List<TransactionSynchronization> suspendedSynchronizations = TransactionSynchronizationManager.getSynchronizations();
3    for (TransactionSynchronization synchronization : suspendedSynchronizations) {
4        synchronization.suspend();
5    }
6    TransactionSynchronizationManager.clearSynchronization();
7    return suspendedSynchronizations;
8}

从代码中可以看出,拿到了所有的同步器,并且挨个执行suspend()扩展方法。


3.4. startTransaction

startTransaction(def, transaction, debugEnabled, suspendedResources);开启新事务的逻辑。

 1private TransactionStatus startTransaction(
 2    TransactionDefinition definition,
 3    Object transaction,
 4    boolean debugEnabled,
 5    @Nullable SuspendedResourcesHolder suspendedResources
 6) {
 7    // 是否开启一个新的TransactionSynchronization
 8    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
 9
10    // 开启的这个事务的状态信息
11    // 事务的定义、用来保存数据库连接的对象、是否是新事务, 是否是新的TransactionSynchronization
12    DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
13
14    // ☆ -> DataSourceTransactionManager
15    // 开启事务
16    doBegin(transaction, definition);
17
18    // 如果需要新开一个TransactionSynchronization, 就把新创建的事务的一些状态信息
19    // 设置到TransactionSynchronizationManager中
20    prepareSynchronization(status, definition);
21    return status;
22}

3.5. doBegin

doBegin(transaction, definition);开启事务

 1protected void doBegin(Object transaction, TransactionDefinition definition) {
 2    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
 3    Connection con = null;
 4
 5    try {
 6        // 如果当前线程中所使用的DataSource还没有创建过数据库连接, 就获取一个数据库连接
 7        if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
 8            Connection newCon = obtainDataSource().getConnection();
 9            txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
10        }
11
12        txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
13        con = txObject.getConnectionHolder().getConnection();
14
15        // 根据@Transactional注解中的设置, 设置Connection的readOnly与隔离级别
16        Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
17        txObject.setPreviousIsolationLevel(previousIsolationLevel);
18        txObject.setReadOnly(definition.isReadOnly());
19
20        // 设置autoCommit为false
21        if (con.getAutoCommit()) {
22            txObject.setMustRestoreAutoCommit(true);
23
24            // 设置
25            con.setAutoCommit(false);
26        }
27
28        prepareTransactionalConnection(con, definition);
29        txObject.getConnectionHolder().setTransactionActive(true);
30
31        // 设置数据库连接超时时间
32        int timeout = determineTimeout(definition);
33        if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
34            txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
35        }
36
37        // 把新建的数据库连接设置到resources中,resources就是一个ThreadLocal<Map<Object, Object>>,
38        // 事务管理器中的设置的DataSource对象为key,数据库连接对象为value
39        if (txObject.isNewConnectionHolder()) {
40            TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
41        }
42    } catch (Throwable ex) {
43        if (txObject.isNewConnectionHolder()) {
44            DataSourceUtils.releaseConnection(con, obtainDataSource());
45            txObject.setConnectionHolder(null, false);
46        }
47        throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
48    }
49}
  • 创建数据库连接
  • 设置设置相关属性
  • 将数据库连接设置到ThreadLocal中

当挂起完毕操作之后接下来就是doBegin()和前面提到的prepareSynchronization()方法,将新的事务设置到TransactionSynchronizationManager中:

 1protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
 2    if (status.isNewSynchronization()) {
 3        TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
 4        TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
 5            definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
 6            definition.getIsolationLevel() :
 7            null
 8        );
 9        TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
10        TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
11        TransactionSynchronizationManager.initSynchronization();
12    }
13}

上面的逻辑处理完了之后,要么有异常回滚事务,要么正常结束提交事务。

对应到TransactionAspectSupport#invokeWithinTransaction()方法中的就是下面两个方法:

  1. commitTransactionAfterReturning(txInfo);:提交事务
  2. completeTransactionAfterThrowing(txInfo, ex);:回滚事务

3.6. commit

AbstractPlatformTransactionManager#commit

 1@Override
 2public final void commit(TransactionStatus status) throws TransactionException {
 3    if (status.isCompleted()) {
 4        throw new IllegalTransactionStateException(
 5            "Transaction is already completed - do not call commit or rollback more than once per transaction");
 6    }
 7
 8    DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
 9
10    // 可以通过TransactionAspectSupport.currentTransactionStatus().setRollbackOnly(); 来设置
11    // 判断是否需要强制回滚
12    if (defStatus.isLocalRollbackOnly()) {
13        if (defStatus.isDebug()) {
14            logger.debug("Transactional code has requested rollback");
15        }
16        processRollback(defStatus, false);
17        return;
18    }
19
20    // 判断此事务在之前是否设置了需要回滚, 跟globalRollbackOnParticipationFailure有关
21    if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
22        if (defStatus.isDebug()) {
23            logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
24        }
25        processRollback(defStatus, true);
26        return;
27    }
28
29    processCommit(defStatus);
30}
  • defStatus.isLocalRollbackOnly():这个方法是在判断是否是手动强制回滚了,在代码中我们可以通过下面的代码来让事务强制回滚,即使没有异常:

    1TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
    
  • defStatus.isGlobalRollbackOnly()

继续往下走,进入processCommit()方法中

 1private void processCommit(DefaultTransactionStatus status) throws TransactionException {
 2    try {
 3        boolean beforeCompletionInvoked = false;
 4
 5        try {
 6            boolean unexpectedRollback = false;
 7
 8            // 前面说到的register进去的几个回调方法
 9            prepareForCommit(status);
10            triggerBeforeCommit(status);
11            triggerBeforeCompletion(status);
12            beforeCompletionInvoked = true;
13
14            if (status.hasSavepoint()) {
15                if (status.isDebug()) {
16                    logger.debug("Releasing transaction savepoint");
17                }
18                unexpectedRollback = status.isGlobalRollbackOnly();
19                status.releaseHeldSavepoint();
20            } else if (status.isNewTransaction()) {
21                if (status.isDebug()) {
22                    logger.debug("Initiating transaction commit");
23                }
24                unexpectedRollback = status.isGlobalRollbackOnly();
25
26                // ☆ ->
27                doCommit(status);
28            } else if (isFailEarlyOnGlobalRollbackOnly()) {
29                unexpectedRollback = status.isGlobalRollbackOnly();
30            }
31
32            // Throw UnexpectedRollbackException if we have a global rollback-only
33            // marker but still didn't get a corresponding exception from commit.
34            if (unexpectedRollback) {
35                throw new UnexpectedRollbackException(
36                    "Transaction silently rolled back because it has been marked as rollback-only");
37            }
38        } catch (UnexpectedRollbackException ex) {
39            // can only be caused by doCommit
40            //  触发回调
41            triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
42            throw ex;
43        } catch (TransactionException ex) {
44            // can only be caused by doCommit
45            if (isRollbackOnCommitFailure()) {
46                doRollbackOnCommitException(status, ex);
47            } else {
48                //  触发回调
49                triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
50            }
51            throw ex;
52        } catch (RuntimeException | Error ex) {
53            if (!beforeCompletionInvoked) {
54                //  触发回调
55                triggerBeforeCompletion(status);
56            }
57            doRollbackOnCommitException(status, ex);
58            throw ex;
59        }
60
61        // Trigger afterCommit callbacks, with an exception thrown there
62        // propagated to callers but the transaction still considered as committed.
63        try {
64            triggerAfterCommit(status);
65        } finally {
66            triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
67        }
68
69    } finally {
70
71        // 恢复被挂起的资源到当前线程中
72        cleanupAfterCompletion(status);
73    }
74}

在上面的方法中else if (status.isNewTransaction())这个分支判断了当前事务是不是我正在执行的这个方法(如userService的insert方法)新建的,如果是,那么我就可以现在提交。如果不是,有可能是传播下来的,那我现在还不能提交,需要到创建这个事务的方法中去提交。


3.7. doCommit

最关键的方法就是doCommit()方法:

DataSourceTransactionManager#doCommit

 1@Override
 2protected void doCommit(DefaultTransactionStatus status) {
 3    DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
 4    Connection con = txObject.getConnectionHolder().getConnection();
 5    if (status.isDebug()) {
 6        logger.debug("Committing JDBC transaction on Connection [" + con + "]");
 7    }
 8    try {
 9
10        // 提交
11        con.commit();
12    } catch (SQLException ex) {
13        throw translateException("JDBC commit", ex);
14    }
15}

由此整个事务顺利结束?并没有,还需要看一下之前有没有事务被挂起,如果有的话需要将挂起的事务恢复(Resume)。

doCommit()的最后有一个cleanupAfterCompletion()方法如下:

 1private void cleanupAfterCompletion(DefaultTransactionStatus status) {
 2    status.setCompleted();
 3    if (status.isNewSynchronization()) {
 4        TransactionSynchronizationManager.clear();
 5    }
 6    if (status.isNewTransaction()) {
 7        // 这里会去关闭数据库连接
 8        doCleanupAfterCompletion(status.getTransaction());
 9    }
10
11    // 恢复被挂起的资源到当前线程中
12    if (status.getSuspendedResources() != null) {
13        if (status.isDebug()) {
14            logger.debug("Resuming suspended transaction after completion of inner transaction");
15        }
16        Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
17
18        // 恢复
19        resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
20    }
21}
  • status.isNewTransaction():判断当前的事务是不是我这个方法(如insert())中新建的,如果是新建的,还需要将连接释放掉。如果当前事务是传播下来的,那就只要提交就可以了。具体可见doCleanupAfterCompletion()方法

还剩一个事务失败的回滚流程completeTransactionAfterThrowing(txInfo, ex);

 1protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
 2    if (txInfo != null && txInfo.getTransactionStatus() != null) {
 3        if (logger.isTraceEnabled()) {
 4            logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
 5                         "] after exception: " + ex);
 6        }
 7
 8        // transactionAttribute的实现类为[RuleBasedTransactionAttribute], 父类为DefaultTransactionAttribute
 9        if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
10            try {
11                txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
12            } catch (TransactionSystemException ex2) {
13                logger.error("Application exception overridden by rollback exception", ex);
14                ex2.initApplicationException(ex);
15                throw ex2;
16            } catch (RuntimeException | Error ex2) {
17                logger.error("Application exception overridden by rollback exception", ex);
18                throw ex2;
19            }
20        } else {
21            // We don't roll back on this exception.
22            // Will still roll back if TransactionStatus.isRollbackOnly() is true.
23            try {
24                txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
25            } catch (TransactionSystemException ex2) {
26                logger.error("Application exception overridden by commit exception", ex);
27                ex2.initApplicationException(ex);
28                throw ex2;
29            } catch (RuntimeException | Error ex2) {
30                logger.error("Application exception overridden by commit exception", ex);
31                throw ex2;
32            }
33        }
34    }
35}

txInfo.transactionAttribute.rollbackOn(ex)这里是在判断是否符合我们在@Transactional()中的rollbackFor参数对应的异常。

对应的匹配逻辑:

RuleBasedTransactionAttribute#rollbackOn

 1public boolean rollbackOn(Throwable ex) {
 2		RollbackRuleAttribute winner = null;
 3		int deepest = Integer.MAX_VALUE;
 4
 5		if (this.rollbackRules != null) {
 6			// 遍历所有的RollbackRuleAttribute, 判断现在抛出的异常ex是否匹配RollbackRuleAttribute中指定的异常类型的子类或本身
 7			for (RollbackRuleAttribute rule : this.rollbackRules) {
 8				int depth = rule.getDepth(ex);
 9				if (depth >= 0 && depth < deepest) {
10					deepest = depth;
11					winner = rule;
12				}
13			}
14		}
15
16		// User superclass behavior (rollback on unchecked) if no rule matches.
17		if (winner == null) {
18			return super.rollbackOn(ex);
19		}
20
21		// ex所匹配的RollbackRuleAttribute, 可能是NoRollbackRuleAttribute, 如果是匹配的NoRollbackRuleAttribute,
22		// 那就表示现在这个异常ex 不用刚回滚
23		return !(winner instanceof NoRollbackRuleAttribute);
24	}

3.8. rollback

1@Override
2public final void rollback(TransactionStatus status) throws TransactionException {
3    if (status.isCompleted()) {
4        throw new IllegalTransactionStateException("Transaction is already completed - do not call commit or rollback more than once per transaction");
5    }
6
7    DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
8    processRollback(defStatus, false);
9}

拿到defStatus直接调用下面的processRollback()方法

 1private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
 2    try {
 3        boolean unexpectedRollback = unexpected;
 4
 5        try {
 6            // 回滚之前先触发下面的回调
 7            triggerBeforeCompletion(status);
 8
 9            if (status.hasSavepoint()) {
10                
11                // 回滚到上一个savepoint位置
12                status.rollbackToHeldSavepoint();
13            } else if (status.isNewTransaction()) {
14                if (status.isDebug()) {
15                    logger.debug("Initiating transaction rollback");
16                }
17
18                // ☆ ->
19                // 如果当前执行的方法是新开的事务, 则直接回滚
20                doRollback(status);
21            } else {
22                // Participating in larger transaction
23                // 如果当前执行的方法已经有一个事务了, 而当前执行的方法抛出了异常, 则要判断整个事务到底要不要回滚, 看具体配置
24                if (status.hasTransaction()) {
25
26                    // 如果一个事务中有两个方法,第二个方法抛异常了,那么第二个方法就相当于执行失败需要回滚,
27                    // 如果globalRollbackOnParticipationFailure为true,那么第一个方法在没有抛异常的情况下也要回滚
28                    if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
29                        if (status.isDebug()) {
30                            logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
31                        }
32
33                        // 直接将rollbackOnly设置到ConnectionHolder中去, 表示整个事务的sql都要回滚
34                        doSetRollbackOnly(status);
35                    } else {
36                        if (status.isDebug()) {
37                            logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
38                        }
39                    }
40                } else {
41                    logger.debug("Should roll back transaction but cannot - no transaction available");
42                }
43                // Unexpected rollback only matters here if we're asked to fail early
44                if (!isFailEarlyOnGlobalRollbackOnly()) {
45                    unexpectedRollback = false;
46                }
47            }
48        } catch (RuntimeException | Error ex) {
49            triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
50            throw ex;
51        }
52
53        triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
54
55        // Raise UnexpectedRollbackException if we had a global rollback-only marker
56        if (unexpectedRollback) {
57            throw new UnexpectedRollbackException("Transaction rolled back because it has been marked as rollback-only");
58        }
59    } finally {
60        cleanupAfterCompletion(status);
61    }
62}

上面的代码中:status.rollbackToHeldSavepoint();


3.8.1. SavePoint

对应着MySQL中的savepoint,使用如下:

 1begin;
 2
 3insert into user(name,sex,age) values('test1','男',18);
 4insert into user(name,sex,age) values('test2','男',18);
 5insert into user(name,sex,age) values('test3','男',18);
 6
 7savepoint x;
 8
 9insert into user(name,sex,age) values('test4','男',18);
10insert into user(name,sex,age) values('test5','男',18);
11
12rollback to x;
13
14
15commit;

此时只会回滚test4test5


3.9. doRollback

调用connection的rollback()方法,直接回滚。

 1@Override
 2protected void doRollback(DefaultTransactionStatus status) {
 3    DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
 4    Connection con = txObject.getConnectionHolder().getConnection();
 5    if (status.isDebug()) {
 6        logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
 7    }
 8    try {
 9        con.rollback();
10    } catch (SQLException ex) {
11        throw translateException("JDBC rollback", ex);
12    }
13}
— END —