当前位置:科学 > 正文

事务提交之后异步执行工具类封装

2023-07-01 10:08:40  来源:言沫东

一、背景

许多时候,我们期望在事务提交之后异步执行某些逻辑,调用外部系统,发送MQ,推送ES等等;当事务回滚时,异步操作也不执行,这些异步操作需要等待事务完成后才执行;比如出入库的事务执行完毕后,异步发送MQ给报表系统、ES等等

二、猜想

我们在项目中大多都是使用声明式事务(@Transactional注解) ,spring会基于动态代理机制对我们的业务方法进行增强,控制connection,从而达到事务的目的。那么我们能否在此找寻一些蛛丝马迹。我们来看下spring事务的相关核心类(装配流程不详细叙述)


(相关资料图)

TransactionInterceptor

public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {  @Override  @Nullable  public Object invoke(MethodInvocation invocation) throws Throwable {     // Work out the target class: may be {@code null}.     // The TransactionAttributeSource should be passed the target class     // as well as the method, which may be from an interface.     Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);     // Adapt to TransactionAspectSupport"s invokeWithinTransaction...     return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);  }}

TransactionAspectSupport(重点关注事务提交之后做了哪些事情,有哪些扩展点)

public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean { protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final InvocationCallback invocation) throws Throwable {   // If the transaction attribute is null, the method is non-transactional.   TransactionAttributeSource tas = getTransactionAttributeSource();   final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);   final TransactionManager tm = determineTransactionManager(txAttr);   if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {      ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {         if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) {            throw new TransactionUsageException(                  "Unsupported annotated transaction on suspending function detected: " + method +                  ". Use TransactionalOperator.transactional extensions instead.");         }         ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());         if (adapter == null) {            throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " +                  method.getReturnType());         }         return new ReactiveTransactionSupport(adapter);      });      return txSupport.invokeWithinTransaction(            method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);   }   PlatformTransactionManager ptm = asPlatformTransactionManager(tm);   final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);   if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {      // 创建事务,此处也会创建connection      TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);      Object retVal;      try {         // 执行目标方法         retVal = invocation.proceedWithInvocation();      }      catch (Throwable ex) {         // 目标方法异常时处理         completeTransactionAfterThrowing(txInfo, ex);         throw ex;      }      finally { // 重置TransactionInfo ThreadLocal         cleanupTransactionInfo(txInfo);      }      if (vavrPresent && VavrDelegate.isVavrTry(retVal)) {         // Set rollback-only in case of Vavr failure matching our rollback rules...         TransactionStatus status = txInfo.getTransactionStatus();         if (status != null && txAttr != null) {            retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);         }      }  // 业务方法成功执行,提交事务(重点关注此处),最终会调用AbstractPlatformTransactionManager#commit方法      commitTransactionAfterReturning(txInfo);      return retVal;   }   else {      final ThrowableHolder throwableHolder = new ThrowableHolder();      // It"s a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.      try {         Object result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> {            TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status);            try {               Object retVal = invocation.proceedWithInvocation();               if (vavrPresent && VavrDelegate.isVavrTry(retVal)) {                  // Set rollback-only in case of Vavr failure matching our rollback rules...                  retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);               }               return retVal;            }            catch (Throwable ex) {               if (txAttr.rollbackOn(ex)) {                  // A RuntimeException: will lead to a rollback.                  if (ex instanceof RuntimeException) {                     throw (RuntimeException) ex;                  }                  else {                     throw new ThrowableHolderException(ex);                  }               }               else {                  // A normal return value: will lead to a commit.                  throwableHolder.throwable = ex;                  return null;               }            }            finally {               cleanupTransactionInfo(txInfo);            }         });         // Check result state: It might indicate a Throwable to rethrow.         if (throwableHolder.throwable != null) {            throw throwableHolder.throwable;         }         return result;      }      catch (ThrowableHolderException ex) {         throw ex.getCause();      }      catch (TransactionSystemException ex2) {         if (throwableHolder.throwable != null) {            logger.error("Application exception overridden by commit exception", throwableHolder.throwable);            ex2.initApplicationException(throwableHolder.throwable);         }         throw ex2;      }      catch (Throwable ex2) {         if (throwableHolder.throwable != null) {            logger.error("Application exception overridden by commit exception", throwableHolder.throwable);         }         throw ex2;      }   }}}

AbstractPlatformTransactionManager

public final void commit(TransactionStatus status) throws TransactionException {   if (status.isCompleted()) {      throw new IllegalTransactionStateException(            "Transaction is already completed - do not call commit or rollback more than once per transaction");   }   DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;   if (defStatus.isLocalRollbackOnly()) {      if (defStatus.isDebug()) {         logger.debug("Transactional code has requested rollback");      }      processRollback(defStatus, false);      return;   }   if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {      if (defStatus.isDebug()) {         logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");      }      processRollback(defStatus, true);      return;   }   // 事务提交处理   processCommit(defStatus);}private void processCommit(DefaultTransactionStatus status) throws TransactionException {   try {      boolean beforeCompletionInvoked = false;      try {         boolean unexpectedRollback = false;         prepareForCommit(status);         triggerBeforeCommit(status);         triggerBeforeCompletion(status);         beforeCompletionInvoked = true;         if (status.hasSavepoint()) {            if (status.isDebug()) {               logger.debug("Releasing transaction savepoint");            }            unexpectedRollback = status.isGlobalRollbackOnly();            status.releaseHeldSavepoint();         }         else if (status.isNewTransaction()) {            if (status.isDebug()) {               logger.debug("Initiating transaction commit");            }            unexpectedRollback = status.isGlobalRollbackOnly();            doCommit(status);         }         else if (isFailEarlyOnGlobalRollbackOnly()) {            unexpectedRollback = status.isGlobalRollbackOnly();         }         // Throw UnexpectedRollbackException if we have a global rollback-only         // marker but still didn"t get a corresponding exception from commit.         if (unexpectedRollback) {            throw new UnexpectedRollbackException(                  "Transaction silently rolled back because it has been marked as rollback-only");         }      }      catch (UnexpectedRollbackException ex) {         // can only be caused by doCommit         triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);         throw ex;      }      catch (TransactionException ex) {         // can only be caused by doCommit         if (isRollbackOnCommitFailure()) {            doRollbackOnCommitException(status, ex);         }         else {            triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);         }         throw ex;      }      catch (RuntimeException | Error ex) {         if (!beforeCompletionInvoked) {            triggerBeforeCompletion(status);         }         doRollbackOnCommitException(status, ex);         throw ex;      }      // Trigger afterCommit callbacks, with an exception thrown there      // propagated to callers but the transaction still considered as committed.      try { // 在事务提交后触发(追踪到这里就离真相不远了)         triggerAfterCommit(status);      }      finally {         triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);      }   }   finally {      cleanupAfterCompletion(status);   }}

TransactionSynchronizationUtils

public abstract class TransactionSynchronizationUtils {  public static void triggerAfterCommit() {     // TransactionSynchronizationManager: 事务同步器管理     invokeAfterCommit(TransactionSynchronizationManager.getSynchronizations());  }  public static void invokeAfterCommit(@Nullable List<TransactionSynchronization> synchronizations) {     if (synchronizations != null) {        for (TransactionSynchronization synchronization : synchronizations) {   // 调用TransactionSynchronization#afterCommit方法,默认实现为空,留给子类扩展   // 那么我们想在事务提交之后做一些异步操作,实现此方法即可           synchronization.afterCommit();        }     }  }}

TransactionSynchronization

public interface TransactionSynchronization extends Flushable {   default void afterCommit() {}}

过程中我们发现TransactionSynchronizationManager、TransactionSynchronization、TransactionSynchronizationAdapter 等相关类涉及aop的整个流程,篇幅有限,在此不详细展开,当然我们的一些扩展也是离不开这些基础类的

三、实现

1、事务提交之后异步执行,我们需自定义synchronization.afterCommit,结合线程池一起使用,定义线程池TaskExecutor

@Beanpublic TaskExecutor taskExecutor() {    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();    taskExecutor.setCorePoolSize(******);    taskExecutor.setMaxPoolSize(******);    taskExecutor.setKeepAliveSeconds(******);    taskExecutor.setQueueCapacity(******);    taskExecutor.setThreadNamePrefix(******);    taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());    taskExecutor.initialize();    return taskExecutor;}

2、定义AfterCommitExecutor接口

public interface AfterCommitExecutor extends Executor { }

3、定义AfterCommitExecutorImpl实现类,注意需继承TransactionSynchronizationAdapter类

import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;import org.springframework.core.NamedThreadLocal;import org.springframework.core.task.TaskExecutor;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.transaction.support.TransactionSynchronizationAdapter;import org.springframework.transaction.support.TransactionSynchronizationManager;import java.util.List;import java.util.ArrayList;@Componentpublic class AfterCommitExecutorImpl extends TransactionSynchronizationAdapter implements AfterCommitExecutor {    private static final Logger LOGGER = LoggerFactory.getLogger(AfterCommitExecutorImpl.class);    // 保存要运行的任务线程    private static final ThreadLocal<List<Runnable>> RUNNABLE_THREAD_LOCAL = new NamedThreadLocal<>("AfterCommitRunnable");    // 设置线程池    @Autowired    private TaskExecutor taskExecutor;    /**     * 异步执行     *     * @param runnable 异步线程     */    @Override    public void execute(Runnable runnable) {        LOGGER.info("Submitting new runnable {} to run after commit", runnable);        // 如果事务已经提交,马上进行异步处理        if (!TransactionSynchronizationManager.isSynchronizationActive()) {            LOGGER.info("Transaction synchronization is NOT ACTIVE. Executing right now runnable {}", runnable);            runnable.run();            return;        }        // 同一个事务的合并到一起处理(注意:没有初始化则初始化,并注册)        List<Runnable> threadRunnableList = RUNNABLE_THREAD_LOCAL.get();        if (null == threadRunnableList) {            threadRunnableList = new ArrayList<>();            RUNNABLE_THREAD_LOCAL.set(threadRunnableList);            TransactionSynchronizationManager.registerSynchronization(this);        }        threadRunnableList.add(runnable);    }    /**     * 监听到事务提交之后执行方法     */    @Override    public void afterCommit() {        List<Runnable> threadRunnableList = RUNNABLE_THREAD_LOCAL.get();        LOGGER.info("Transaction successfully committed, executing {} threadRunnable", threadRunnableList.size());        for (Runnable runnable : threadRunnableList) {            try {                taskExecutor.execute(runnable);            } catch (RuntimeException e) {                LOGGER.error("Failed to execute runnable " + runnable, e);            }        }    }    /**     * 事务提交/回滚执行     *     * @param status (STATUS_COMMITTED-0、STATUS_ROLLED_BACK-1、STATUS_UNKNOWN-2)     */    @Override    public void afterCompletion(int status) {        LOGGER.info("Transaction completed with status {}", status == STATUS_COMMITTED ? "COMMITTED" : "ROLLED_BACK");        RUNNABLE_THREAD_LOCAL.remove();    }}

4、使用

工具类封装好了,使用上那就很简便了;注入AfterCommitExecutor ,调用AfterCommitExecutor .execute(runnable)即可

四、总结

spring如此庞大,找准切入点,许多问题都是可以找到解决思路、或者方案;

你对spring了解多少......

关键词:

推荐阅读

月壤形成的主要原因 月壤与土壤有什么区别

月壤形成的主要原因月壤形成过程没有生物活动参与,没有有机质,还极度缺水干燥;组成月壤的矿物粉末基本是由陨石撞击破砰形成,因此,粉末 【详细】

域名抢注是是什么意思?投资角度来看什么域名好?

域名抢注是是什么意思域名抢注是通过抢先注册的方式获得互联网删除的域名的使用权。域名是由点分隔的一串数字,用于标记一台计算机或一组计 【详细】

捷达保养费用是多少?捷达是哪个国家的品牌?

捷达保养费用是多少?全新捷达的保修期为2年或6万公里,以先到者为准,新车可享受一次免费保养,首次免费保养在5000-7500km或1年内进行。如 【详细】

天然气泄露会造成爆炸吗?天然气泄漏怎么办?

天然气泄露会造成爆炸吗?家里用的天然气如果泄露是会发生爆炸的。当空气中含有混合天然气时,在与火源接触的一系列爆炸危险中,就会发生爆 【详细】

四部门明确App收集个人信息范围 个人信息保护范围判断标准

四部门明确App收集个人信息范围近日,国家互联网信息办公室、工业和信息化部、公安部、国家市场监督管理总局联合印发《常见类型移动互联网 【详细】

关于我们  |  联系方式  |  免责条款  |  招聘信息  |  广告服务  |  帮助中心

联系我们:85 572 98@qq.com备案号:粤ICP备18023326号-40

科技资讯网 版权所有