异步通知中事务的处理
公司项目之前因为涉及到国产化自主可控改造 ,所以将数据库从oracle换成了性能相对低很多的国产数据库,所以设计了分库分表的方案 。有分必有合,那么在设计到台账之类的集团公司查询的时候就会存在聚合查询的问题。所以引入了es,其方案如下 :
原数据同步方案

由于可能存在事务问题,此处使用了MQ的延迟队列+事务表来实现,消费时查询事务表是否存在事务,如果存在则说明事务提交,消息正常消费 ;如果不存在,则说明事务回滚,则不消费直接确认消息。并添加了3次失败重试+延时机制,来应对长事务。
优化方案
然而本文的主角并非mq,最近调研发现SpingEvent完全可以解决这个问题 ,原来我们自己造了个人家已经有的轮子,并且还不如人家的。
先来介绍下他
Spring Event 是 Spring 提供的一套“应用内发布–订阅(Observer)机制”,
用来在同一个 Spring 容器内解耦模块之间的通信。并且他的响应时间是在ns~µs 级别的,完全不影响性能。
Spring Event只在同一个JVM中生效,不能跨JVM
生产者
公共模块中定义全局的事件实体接口
/**
* 业务事件服务
* @author LONG
*/
public interface BizEvent extends Serializable {
/**
* 业务类型
*/
EventType getBizType(); // EXAM / SURVEY
/**
* 业务动作
*/
EventAction getBizAction(); // FINISH / PASS
/**
* 业务ID
*/
Long getBizId();
/**
* 用户ID
*/
Long getUserId();
}事件总线接口
/**
* 事件总线
*/
public interface BizEventBus {
/**
* 发布事件
* @param event
*/
void publish(BizEvent event);
}事件发布者实现
/**
* @author LONG
* SpringEventBus实现
*/
@Component
@RequiredArgsConstructor
public class SpringBizEventBus implements BizEventBus {
private final ApplicationEventPublisher publisher;
@Override
public void publish(BizEvent event) {
publisher.publishEvent(event);
}
}消费者
普通解耦不支持事务
@EventListener
public void onExamFinish(BizEvent event) {
//业务处理
}异步且支持事务
@Async异步线程处理,不阻塞主线程
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) 事务提交后再执行。
并且Async 必须配置线程池,否则高并发场景下,会创建很多线程。
@Component
public class EventListener {
@Async("threadPoolTaskExecutor")
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void onPointEvent(BizEvent event) {
System.out.println(
"【异步】事务提交后发积分,userId=" + event.getUserId()
);
// 这里写业务逻辑
}
}MQ发送消息
在EventListener 中发送消息即可 ,这样就实现了事务使用SpringEvent做了一层转发 ,在@ConditionalOnProperty 可以做开关来控制。
@Component
@Slf4j
@ConditionalOnProperty(name = "point.enabled", havingValue = "true")
public class PointEventListener {
@Autowired
private RabbitTemplate rabbitTemplate;
@Async("threadPoolTaskExecutor")
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void onPointEvent(BizEvent event) {
log.info("积分事件:{}", event);
rabbitTemplate.convertAndSend("point-exchange", "point-routing", event);
}
}
License:
CC BY 4.0