事务提交之后异步执行工具类封装
一、背景
许多时候,我们期望在事务提交之后异步执行某些逻辑,调用外部系统,发送MQ,推送ES等等;当事务回滚时,异步操作也不执行,这些异步操作需要等待事务完成后才执行;比如出入库的事务执行完毕后,异步发送MQ给报表系统、ES等等
二、猜想
我们在项目中大多都是使用声明式事务(@Transactional注解) ,spring会基于动态代理机制对我们的业务方法进行增强,控制connection,从而达到事务的目的。那么我们能否在此找寻一些蛛丝马迹。我们来看下spring事务的相关核心类(装配流程不详细叙述)
(相关资料图)
TransactionInterceptor
public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable { @Override @Nullable public Object invoke(MethodInvocation invocation) throws Throwable { // Work out the target class: may be {@code null}. // The TransactionAttributeSource should be passed the target class // as well as the method, which may be from an interface. Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); // Adapt to TransactionAspectSupport"s invokeWithinTransaction... return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed); }}
TransactionAspectSupport(重点关注事务提交之后做了哪些事情,有哪些扩展点)
public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean { protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final InvocationCallback invocation) throws Throwable { // If the transaction attribute is null, the method is non-transactional. TransactionAttributeSource tas = getTransactionAttributeSource(); final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null); final TransactionManager tm = determineTransactionManager(txAttr); if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) { ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> { if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) { throw new TransactionUsageException( "Unsupported annotated transaction on suspending function detected: " + method + ". Use TransactionalOperator.transactional extensions instead."); } ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType()); if (adapter == null) { throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " + method.getReturnType()); } return new ReactiveTransactionSupport(adapter); }); return txSupport.invokeWithinTransaction( method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm); } PlatformTransactionManager ptm = asPlatformTransactionManager(tm); final String joinpointIdentification = methodIdentification(method, targetClass, txAttr); if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) { // 创建事务,此处也会创建connection TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification); Object retVal; try { // 执行目标方法 retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { // 目标方法异常时处理 completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { // 重置TransactionInfo ThreadLocal cleanupTransactionInfo(txInfo); } if (vavrPresent && VavrDelegate.isVavrTry(retVal)) { // Set rollback-only in case of Vavr failure matching our rollback rules... TransactionStatus status = txInfo.getTransactionStatus(); if (status != null && txAttr != null) { retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status); } } // 业务方法成功执行,提交事务(重点关注此处),最终会调用AbstractPlatformTransactionManager#commit方法 commitTransactionAfterReturning(txInfo); return retVal; } else { final ThrowableHolder throwableHolder = new ThrowableHolder(); // It"s a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in. try { Object result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> { TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status); try { Object retVal = invocation.proceedWithInvocation(); if (vavrPresent && VavrDelegate.isVavrTry(retVal)) { // Set rollback-only in case of Vavr failure matching our rollback rules... retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status); } return retVal; } catch (Throwable ex) { if (txAttr.rollbackOn(ex)) { // A RuntimeException: will lead to a rollback. if (ex instanceof RuntimeException) { throw (RuntimeException) ex; } else { throw new ThrowableHolderException(ex); } } else { // A normal return value: will lead to a commit. throwableHolder.throwable = ex; return null; } } finally { cleanupTransactionInfo(txInfo); } }); // Check result state: It might indicate a Throwable to rethrow. if (throwableHolder.throwable != null) { throw throwableHolder.throwable; } return result; } catch (ThrowableHolderException ex) { throw ex.getCause(); } catch (TransactionSystemException ex2) { if (throwableHolder.throwable != null) { logger.error("Application exception overridden by commit exception", throwableHolder.throwable); ex2.initApplicationException(throwableHolder.throwable); } throw ex2; } catch (Throwable ex2) { if (throwableHolder.throwable != null) { logger.error("Application exception overridden by commit exception", throwableHolder.throwable); } throw ex2; } }}}
AbstractPlatformTransactionManager
public final void commit(TransactionStatus status) throws TransactionException { if (status.isCompleted()) { throw new IllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction"); } DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; if (defStatus.isLocalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Transactional code has requested rollback"); } processRollback(defStatus, false); return; } if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Global transaction is marked as rollback-only but transactional code requested commit"); } processRollback(defStatus, true); return; } // 事务提交处理 processCommit(defStatus);}private void processCommit(DefaultTransactionStatus status) throws TransactionException { try { boolean beforeCompletionInvoked = false; try { boolean unexpectedRollback = false; prepareForCommit(status); triggerBeforeCommit(status); triggerBeforeCompletion(status); beforeCompletionInvoked = true; if (status.hasSavepoint()) { if (status.isDebug()) { logger.debug("Releasing transaction savepoint"); } unexpectedRollback = status.isGlobalRollbackOnly(); status.releaseHeldSavepoint(); } else if (status.isNewTransaction()) { if (status.isDebug()) { logger.debug("Initiating transaction commit"); } unexpectedRollback = status.isGlobalRollbackOnly(); doCommit(status); } else if (isFailEarlyOnGlobalRollbackOnly()) { unexpectedRollback = status.isGlobalRollbackOnly(); } // Throw UnexpectedRollbackException if we have a global rollback-only // marker but still didn"t get a corresponding exception from commit. if (unexpectedRollback) { throw new UnexpectedRollbackException( "Transaction silently rolled back because it has been marked as rollback-only"); } } catch (UnexpectedRollbackException ex) { // can only be caused by doCommit triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); throw ex; } catch (TransactionException ex) { // can only be caused by doCommit if (isRollbackOnCommitFailure()) { doRollbackOnCommitException(status, ex); } else { triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); } throw ex; } catch (RuntimeException | Error ex) { if (!beforeCompletionInvoked) { triggerBeforeCompletion(status); } doRollbackOnCommitException(status, ex); throw ex; } // Trigger afterCommit callbacks, with an exception thrown there // propagated to callers but the transaction still considered as committed. try { // 在事务提交后触发(追踪到这里就离真相不远了) triggerAfterCommit(status); } finally { triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED); } } finally { cleanupAfterCompletion(status); }}
TransactionSynchronizationUtils
public abstract class TransactionSynchronizationUtils { public static void triggerAfterCommit() { // TransactionSynchronizationManager: 事务同步器管理 invokeAfterCommit(TransactionSynchronizationManager.getSynchronizations()); } public static void invokeAfterCommit(@Nullable List<TransactionSynchronization> synchronizations) { if (synchronizations != null) { for (TransactionSynchronization synchronization : synchronizations) { // 调用TransactionSynchronization#afterCommit方法,默认实现为空,留给子类扩展 // 那么我们想在事务提交之后做一些异步操作,实现此方法即可 synchronization.afterCommit(); } } }}
TransactionSynchronization
public interface TransactionSynchronization extends Flushable { default void afterCommit() {}}
过程中我们发现TransactionSynchronizationManager、TransactionSynchronization、TransactionSynchronizationAdapter 等相关类涉及aop的整个流程,篇幅有限,在此不详细展开,当然我们的一些扩展也是离不开这些基础类的
三、实现
1、事务提交之后异步执行,我们需自定义synchronization.afterCommit,结合线程池一起使用,定义线程池TaskExecutor
@Beanpublic TaskExecutor taskExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(******); taskExecutor.setMaxPoolSize(******); taskExecutor.setKeepAliveSeconds(******); taskExecutor.setQueueCapacity(******); taskExecutor.setThreadNamePrefix(******); taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); taskExecutor.initialize(); return taskExecutor;}
2、定义AfterCommitExecutor接口
public interface AfterCommitExecutor extends Executor { }
3、定义AfterCommitExecutorImpl实现类,注意需继承TransactionSynchronizationAdapter类
import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;import org.springframework.core.NamedThreadLocal;import org.springframework.core.task.TaskExecutor;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.transaction.support.TransactionSynchronizationAdapter;import org.springframework.transaction.support.TransactionSynchronizationManager;import java.util.List;import java.util.ArrayList;@Componentpublic class AfterCommitExecutorImpl extends TransactionSynchronizationAdapter implements AfterCommitExecutor { private static final Logger LOGGER = LoggerFactory.getLogger(AfterCommitExecutorImpl.class); // 保存要运行的任务线程 private static final ThreadLocal<List<Runnable>> RUNNABLE_THREAD_LOCAL = new NamedThreadLocal<>("AfterCommitRunnable"); // 设置线程池 @Autowired private TaskExecutor taskExecutor; /** * 异步执行 * * @param runnable 异步线程 */ @Override public void execute(Runnable runnable) { LOGGER.info("Submitting new runnable {} to run after commit", runnable); // 如果事务已经提交,马上进行异步处理 if (!TransactionSynchronizationManager.isSynchronizationActive()) { LOGGER.info("Transaction synchronization is NOT ACTIVE. Executing right now runnable {}", runnable); runnable.run(); return; } // 同一个事务的合并到一起处理(注意:没有初始化则初始化,并注册) List<Runnable> threadRunnableList = RUNNABLE_THREAD_LOCAL.get(); if (null == threadRunnableList) { threadRunnableList = new ArrayList<>(); RUNNABLE_THREAD_LOCAL.set(threadRunnableList); TransactionSynchronizationManager.registerSynchronization(this); } threadRunnableList.add(runnable); } /** * 监听到事务提交之后执行方法 */ @Override public void afterCommit() { List<Runnable> threadRunnableList = RUNNABLE_THREAD_LOCAL.get(); LOGGER.info("Transaction successfully committed, executing {} threadRunnable", threadRunnableList.size()); for (Runnable runnable : threadRunnableList) { try { taskExecutor.execute(runnable); } catch (RuntimeException e) { LOGGER.error("Failed to execute runnable " + runnable, e); } } } /** * 事务提交/回滚执行 * * @param status (STATUS_COMMITTED-0、STATUS_ROLLED_BACK-1、STATUS_UNKNOWN-2) */ @Override public void afterCompletion(int status) { LOGGER.info("Transaction completed with status {}", status == STATUS_COMMITTED ? "COMMITTED" : "ROLLED_BACK"); RUNNABLE_THREAD_LOCAL.remove(); }}
4、使用
工具类封装好了,使用上那就很简便了;注入AfterCommitExecutor ,调用AfterCommitExecutor .execute(runnable)即可
四、总结
spring如此庞大,找准切入点,许多问题都是可以找到解决思路、或者方案;
你对spring了解多少......
关键词:
推荐阅读
月壤形成的主要原因 月壤与土壤有什么区别
月壤形成的主要原因月壤形成过程没有生物活动参与,没有有机质,还极度缺水干燥;组成月壤的矿物粉末基本是由陨石撞击破砰形成,因此,粉末 【详细】
域名抢注是是什么意思?投资角度来看什么域名好?
域名抢注是是什么意思域名抢注是通过抢先注册的方式获得互联网删除的域名的使用权。域名是由点分隔的一串数字,用于标记一台计算机或一组计 【详细】
捷达保养费用是多少?捷达是哪个国家的品牌?
捷达保养费用是多少?全新捷达的保修期为2年或6万公里,以先到者为准,新车可享受一次免费保养,首次免费保养在5000-7500km或1年内进行。如 【详细】
天然气泄露会造成爆炸吗?天然气泄漏怎么办?
天然气泄露会造成爆炸吗?家里用的天然气如果泄露是会发生爆炸的。当空气中含有混合天然气时,在与火源接触的一系列爆炸危险中,就会发生爆 【详细】
四部门明确App收集个人信息范围 个人信息保护范围判断标准
四部门明确App收集个人信息范围近日,国家互联网信息办公室、工业和信息化部、公安部、国家市场监督管理总局联合印发《常见类型移动互联网 【详细】
相关新闻
- 事务提交之后异步执行工具类封装
- 今日播报!微信支付一日三度回应校园费率问题,超10亿通道成本是什么?
- 【全球快播报】太突然!这杂志明年停刊
- 【全球速看料】2020年夫妻债务如何承担?-天天快看
- 甘油三酯高吃什么中成药(吃什么中药可以治疗高血脂)_每日看点
- 狂降30%!上海二手房6月第4周网签成交2168套!
- dnf找云珂谈谈的任务在哪(dnf找回角色)
- 优秀投资者必备的7种素质
- 事实证明,59岁定居日本的马云,已走上了另一条“康庄大道”
- 世界今热点:停止运营,启动退款!很多人都用过
- 欧盟是干什么的(欧盟是什么?)|环球聚看点
- LCK尴尬一幕!Doran请求暂停结果闹乌龙,队友小花生采访真实|每日信息
- 当前热议!京剧名家云讲堂|“京胡圣手”燕守平专辑(上)
- 全球热资讯!工会送清凉 防暑保安康|广东工会:“智送清凉”普惠户外劳动者
- 焦点快报!嘉峪关酒泉机场组织复航校验飞行 刘凯出席
- 环球今热点:科技进步、大数据扫黄,为何国家还不能关闭所有黄色网站?
- 巨桶方便面卖500元!还遭疯抢?网友:看不懂_每日观点
- 绝世唐门:让女儿住进男生宿舍?唐三为了拿捏霍雨浩费尽了心思!
- 什么时候种玉米(玉米一般几月份种) 每日视点
- 思念是一种病歌词_张震岳蔡健雅思念是一种病-全球新资讯