分布式事务解决方案对比与实践:从理论到落地
目录
前言
在跨境电商和短链平台项目中,我们频繁遇到分布式事务问题:
- 优惠券核销:扣减库存 + 创建用户优惠券 + 记录日志
- 订单支付:扣减余额 + 创建订单 + 扣减库存
- 短链创建:生成短链码 + 扣减流量包 + 记录统计
本文将系统性地对比各种分布式事务方案,并分享我们的实战经验。
方案选择决策树
本地消息表执行流程
一、分布式事务基础
1.1 CAP定理
分布式系统无法同时满足:
- C(Consistency):一致性
- A(Availability):可用性
- P(Partition Tolerance):分区容错性
选择: P必须满足,在C和A之间权衡。
1.2 BASE理论
- Basically Available:基本可用
- Soft state:软状态
- Eventually consistent:最终一致性
核心思想: 放弃强一致性,追求最终一致性。
1.3 分布式事务分类
| 类型 | 代表方案 | 一致性级别 | 性能 | 复杂度 |
|---|---|---|---|---|
| 强一致性 | XA/2PC | 强一致 | 低 | 中 |
| 最终一致性 | TCC、Saga、本地消息表 | 最终一致 | 高 | 高 |
二、方案对比
2.1 XA/2PC(两阶段提交)
原理:
阶段一(投票):
Coordinator -> 询问所有Participant是否可以提交
Participant -> 执行本地事务,锁定资源,返回Yes/No
阶段二(提交):
如果所有Participant返回Yes -> Coordinator发送Commit
如果有Participant返回No -> Coordinator发送Rollback优点:
- ✅ 强一致性
- ✅ 标准化(JTA规范)
缺点:
- ❌ 同步阻塞,性能差
- ❌ 单点故障(Coordinator宕机)
- ❌ 资源锁定时间长
适用场景: 金融核心系统(银行转账)
2.2 TCC(Try-Confirm-Cancel)
原理:
Try:预留资源(业务检查 + 资源预留)
Confirm:确认执行(真正执行业务)
Cancel:取消执行(释放预留资源)示例:
@Service
public class CouponTCCService {
@Autowired
private CouponMapper couponMapper;
@Autowired
private UserCouponMapper userCouponMapper;
/**
* Try阶段:预留资源
*/
@Transactional
public boolean tryGrab(Long userId, Long couponId) {
// 1. 检查库存
Coupon coupon = couponMapper.selectById(couponId);
if (coupon.getRemainStock() <= 0) {
return false;
}
// 2. 检查是否已领取
if (userCouponMapper.exists(userId, couponId)) {
return false;
}
// 3. 预留资源(冻结库存)
int affected = couponMapper.freezeStock(couponId, 1);
if (affected == 0) {
return false;
}
// 4. 创建预领取记录(状态:PENDING)
UserCoupon userCoupon = new UserCoupon();
userCoupon.setUserId(userId);
userCoupon.setCouponId(couponId);
userCoupon.setStatus(UserCouponStatus.PENDING);
userCouponMapper.insert(userCoupon);
return true;
}
/**
* Confirm阶段:确认执行
*/
@Transactional
public boolean confirmGrab(Long userId, Long couponId) {
// 1. 扣减实际库存
couponMapper.decreaseStock(couponId, 1);
// 2. 释放冻结库存
couponMapper.unfreezeStock(couponId, 1);
// 3. 更新用户优惠券状态
userCouponMapper.updateStatus(userId, couponId, UserCouponStatus.ACTIVE);
return true;
}
/**
* Cancel阶段:取消执行
*/
@Transactional
public boolean cancelGrab(Long userId, Long couponId) {
// 1. 释放冻结库存
couponMapper.unfreezeStock(couponId, 1);
// 2. 删除预领取记录
userCouponMapper.deletePending(userId, couponId);
return true;
}
}优点:
- ✅ 性能较好(无全局锁)
- ✅ 资源锁定时间短
缺点:
- ❌ 业务侵入性强(需拆分为3个接口)
- ❌ 代码复杂度高
- ❌ Confirm/Cancel需幂等
适用场景: 支付、库存扣减(需要预留资源)
2.3 Saga模式
原理:
长事务拆分为多个本地事务
每个本地事务有对应的补偿操作
如果某个步骤失败,执行前面步骤的补偿操作两种实现方式:
1. 编排式(Choreography)
服务A完成 -> 发送事件 -> 服务B执行 -> 发送事件 -> 服务C执行
服务B失败 -> 发送补偿事件 -> 服务A执行补偿2. 编排式(Orchestration)
Saga协调器统一调度:
Step1: 调用服务A
Step2: 调用服务B
Step3: 调用服务C
如果Step3失败 -> 执行Step2补偿 -> 执行Step1补偿示例(优惠券核销):
@Component
public class CouponSagaOrchestrator {
@Autowired
private CouponService couponService;
@Autowired
private UserCouponService userCouponService;
@Autowired
private CouponLogService couponLogService;
/**
* 正向流程
*/
public void processCouponGrab(Long userId, Long couponId) {
SagaInstance saga = SagaInstance.builder()
.id(generateSagaId())
.status(SagaStatus.STARTED)
.build();
try {
// Step 1: 扣减库存
saga.addStep(SagaStep.builder()
.action(() -> couponService.decreaseStock(couponId))
.compensation(() -> couponService.increaseStock(couponId))
.build());
// Step 2: 创建用户优惠券
saga.addStep(SagaStep.builder()
.action(() -> userCouponService.create(userId, couponId))
.compensation(() -> userCouponService.delete(userId, couponId))
.build());
// Step 3: 记录日志
saga.addStep(SagaStep.builder()
.action(() -> couponLogService.log(userId, couponId, "GRAB"))
.compensation(() -> couponLogService.deleteLog(userId, couponId))
.build());
// 执行Saga
sagaExecutor.execute(saga);
} catch (Exception e) {
// 执行补偿
sagaExecutor.compensate(saga);
throw new BusinessException("领取优惠券失败", e);
}
}
}优点:
- ✅ 无全局锁,性能好
- ✅ 适合长事务
- ✅ 业务相对简单
缺点:
- ❌ 最终一致性(中间状态可见)
- ❌ 补偿逻辑复杂
- ❌ 调试困难
适用场景: 业务流程长、需要补偿的场景(订单、物流)
2.4 本地消息表(我们的选择)
原理:
1. 业务操作和消息记录在同一本地事务中
2. 定时任务扫描消息表,发送消息
3. 消息消费方处理完成后,更新消息状态
4. 兜底机制:消息消费失败可重试为什么选择本地消息表?
- ✅ 性能高(异步处理)
- ✅ 实现相对简单
- ✅ 适合我们的业务场景(读多写少)
- ✅ 与Spring生态集成好
三、本地消息表实战
3.1 数据表设计
-- 业务表:用户优惠券
CREATE TABLE user_coupon (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
user_id BIGINT NOT NULL,
coupon_id BIGINT NOT NULL,
status VARCHAR(20) NOT NULL,
grab_time DATETIME NOT NULL,
expire_time DATETIME NOT NULL,
UNIQUE KEY uk_user_coupon (user_id, coupon_id)
);
-- 消息表
CREATE TABLE local_message (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
message_id VARCHAR(64) NOT NULL UNIQUE,
topic VARCHAR(100) NOT NULL,
tag VARCHAR(100),
body TEXT NOT NULL,
status VARCHAR(20) NOT NULL, -- PENDING/SENT/SUCCESS/FAIL
retry_count INT DEFAULT 0,
create_time DATETIME NOT NULL,
update_time DATETIME NOT NULL,
send_time DATETIME,
success_time DATETIME,
error_msg TEXT,
INDEX idx_status_time (status, create_time)
);3.2 核心实现
@Service
public class LocalMessageService {
@Autowired
private LocalMessageMapper messageMapper;
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 保存消息(业务操作和消息在同一事务中)
*/
@Transactional
public void saveMessage(String topic, String tag, Object body) {
LocalMessage message = new LocalMessage();
message.setMessageId(generateMessageId());
message.setTopic(topic);
message.setTag(tag);
message.setBody(JSON.toJSONString(body));
message.setStatus(MessageStatus.PENDING);
message.setCreateTime(LocalDateTime.now());
message.setUpdateTime(LocalDateTime.now());
messageMapper.insert(message);
}
/**
* 定时发送消息(每5秒执行)
*/
@Scheduled(fixedDelay = 5000)
public void sendPendingMessages() {
// 查询待发送消息(限制100条)
List<LocalMessage> pendingMessages = messageMapper.selectByStatus(
MessageStatus.PENDING, 100);
for (LocalMessage message : pendingMessages) {
try {
// 发送消息
SendResult result = rocketMQTemplate.syncSend(
message.getTopic() + ":" + message.getTag(),
message.getBody());
if (result.getSendStatus() == SendStatus.SEND_OK) {
// 更新为已发送
messageMapper.updateStatus(message.getId(),
MessageStatus.SENT, LocalDateTime.now());
} else {
// 重试次数+1
incrementRetry(message.getId());
}
} catch (Exception e) {
log.error("发送消息失败,messageId={}", message.getMessageId(), e);
incrementRetry(message.getId(), e.getMessage());
}
}
}
/**
* 处理超时消息(超过5分钟未成功)
*/
@Scheduled(fixedDelay = 60000)
public void handleTimeoutMessages() {
LocalDateTime timeout = LocalDateTime.now().minusMinutes(5);
List<LocalMessage> timeoutMessages = messageMapper.selectTimeout(
MessageStatus.SENT, timeout);
for (LocalMessage message : timeoutMessages) {
if (message.getRetryCount() >= 5) {
// 超过重试次数,标记为失败
messageMapper.updateStatus(message.getId(),
MessageStatus.FAIL, null);
// 发送告警
sendAlert(message);
} else {
// 重新发送
messageMapper.updateStatus(message.getId(),
MessageStatus.PENDING, null);
}
}
}
}3.3 业务集成
@Service
public class CouponGrabService {
@Autowired
private CouponRedisService redisService;
@Autowired
private LocalMessageService messageService;
/**
* 领取优惠券(使用本地消息表)
*/
@Transactional
public void grabCoupon(Long userId, Long couponId) {
// 1. Redis预扣减
Long result = redisService.tryGrabCoupon(userId, couponId, 1);
if (result != 1) {
throw new BusinessException("领取优惠券失败");
}
// 2. 记录本地消息(和业务操作在同一事务中)
CouponGrabMessage message = CouponGrabMessage.builder()
.userId(userId)
.couponId(couponId)
.grabTime(LocalDateTime.now())
.build();
messageService.saveMessage("coupon-topic", "grab", message);
}
}3.4 消费者端幂等处理
@Component
@RocketMQMessageListener(topic = "coupon-topic", consumerGroup = "coupon-consumer")
public class CouponMessageConsumer implements RocketMQListener<String> {
@Autowired
private UserCouponService userCouponService;
@Autowired
private LocalMessageService messageService;
@Override
@Transactional
public void onMessage(String messageBody) {
CouponGrabMessage message = JSON.parseObject(messageBody, CouponGrabMessage.class);
try {
// 1. 幂等检查
if (userCouponService.exists(message.getUserId(), message.getCouponId())) {
log.warn("重复消费,跳过处理");
return;
}
// 2. 业务处理
userCouponService.create(message.getUserId(), message.getCouponId());
// 3. 更新消息状态
messageService.markSuccess(message.getMessageId());
} catch (Exception e) {
log.error("处理消息失败", e);
throw e; // 抛出异常触发重试
}
}
}四、方案选择决策树
是否需要强一致性?
├── 是 -> 使用XA/2PC(金融核心场景)
└── 否 -> 可以最终一致性
├── 事务短且简单?
│ ├── 是 -> 本地消息表(推荐,简单高效)
│ └── 否 -> 流程长、需要补偿?
│ ├── 是 -> Saga模式
│ └── 否 -> 需要预留资源?
│ ├── 是 -> TCC模式
│ └── 否 -> 本地消息表五、我们的实战经验
5.1 业务场景与方案选择
| 业务场景 | 方案 | 原因 |
|---|---|---|
| 优惠券领取 | 本地消息表 | 读多写少,简单高效 |
| 订单支付 | TCC | 需要预留库存和余额 |
| 短链创建 | 本地消息表 | 异步处理流量包扣减 |
| 物流状态流转 | Saga | 流程长,需要补偿 |
5.2 性能对比
| 方案 | 吞吐量(TPS) | 平均延迟 | 数据一致性 |
|---|---|---|---|
| XA/2PC | 500 | 200ms | 强一致 |
| TCC | 3000 | 50ms | 最终一致 |
| Saga | 5000 | 30ms | 最终一致 |
| 本地消息表 | 8000 | 20ms | 最终一致 |
5.3 踩坑记录
坑1:消息顺序问题
- 问题:同一用户的优惠券领取和核销消息乱序
- 解决:使用MessageQueue的选择性功能,按userId分片
坑2:消息积压
- 问题:大促期间消息堆积,消费延迟
- 解决:增加消费者实例,动态扩缩容
坑3:事务超时
- 问题:本地事务执行时间过长,导致消息未记录
- 解决:拆分大事务,核心业务先提交
六、监控与治理
@Component
public class DistributedTxMetrics {
@Autowired
private MeterRegistry meterRegistry;
@Scheduled(fixedRate = 60000)
public void recordMetrics() {
// 统计各状态消息数量
Map<String, Long> statusCount = messageMapper.countByStatus();
statusCount.forEach((status, count) -> {
meterRegistry.gauge("message.count",
Tags.of("status", status), count);
});
// 统计平均处理时间
Double avgProcessTime = messageMapper.avgProcessTime();
meterRegistry.gauge("message.process.time.avg", avgProcessTime);
}
}七、总结
分布式事务没有银弹,关键是:
- 能不用就不用:通过业务设计避免分布式事务
- 最终一致性优先:大部分场景不需要强一致
- 本地消息表够用:简单高效,适合大多数场景
- 监控兜底:完善的监控和告警机制
希望这篇文章对你有所帮助!
相关文章: