目录

分布式事务解决方案对比与实践:从理论到落地

前言

在跨境电商和短链平台项目中,我们频繁遇到分布式事务问题:

  • 优惠券核销:扣减库存 + 创建用户优惠券 + 记录日志
  • 订单支付:扣减余额 + 创建订单 + 扣减库存
  • 短链创建:生成短链码 + 扣减流量包 + 记录统计

本文将系统性地对比各种分布式事务方案,并分享我们的实战经验。

方案选择决策树

本地消息表执行流程

一、分布式事务基础

1.1 CAP定理

分布式系统无法同时满足:

  • C(Consistency):一致性
  • A(Availability):可用性
  • P(Partition Tolerance):分区容错性

选择: P必须满足,在C和A之间权衡。

1.2 BASE理论

  • Basically Available:基本可用
  • Soft state:软状态
  • Eventually consistent:最终一致性

核心思想: 放弃强一致性,追求最终一致性。

1.3 分布式事务分类

类型 代表方案 一致性级别 性能 复杂度
强一致性 XA/2PC 强一致
最终一致性 TCC、Saga、本地消息表 最终一致

二、方案对比

2.1 XA/2PC(两阶段提交)

原理:

阶段一(投票):
  Coordinator -> 询问所有Participant是否可以提交
  Participant -> 执行本地事务,锁定资源,返回Yes/No

阶段二(提交):
  如果所有Participant返回Yes -> Coordinator发送Commit
  如果有Participant返回No -> Coordinator发送Rollback

优点:

  • ✅ 强一致性
  • ✅ 标准化(JTA规范)

缺点:

  • ❌ 同步阻塞,性能差
  • ❌ 单点故障(Coordinator宕机)
  • ❌ 资源锁定时间长

适用场景: 金融核心系统(银行转账)

2.2 TCC(Try-Confirm-Cancel)

原理:

Try:预留资源(业务检查 + 资源预留)
Confirm:确认执行(真正执行业务)
Cancel:取消执行(释放预留资源)

示例:

@Service
public class CouponTCCService {
    
    @Autowired
    private CouponMapper couponMapper;
    
    @Autowired
    private UserCouponMapper userCouponMapper;
    
    /**
     * Try阶段:预留资源
     */
    @Transactional
    public boolean tryGrab(Long userId, Long couponId) {
        // 1. 检查库存
        Coupon coupon = couponMapper.selectById(couponId);
        if (coupon.getRemainStock() <= 0) {
            return false;
        }
        
        // 2. 检查是否已领取
        if (userCouponMapper.exists(userId, couponId)) {
            return false;
        }
        
        // 3. 预留资源(冻结库存)
        int affected = couponMapper.freezeStock(couponId, 1);
        if (affected == 0) {
            return false;
        }
        
        // 4. 创建预领取记录(状态:PENDING)
        UserCoupon userCoupon = new UserCoupon();
        userCoupon.setUserId(userId);
        userCoupon.setCouponId(couponId);
        userCoupon.setStatus(UserCouponStatus.PENDING);
        userCouponMapper.insert(userCoupon);
        
        return true;
    }
    
    /**
     * Confirm阶段:确认执行
     */
    @Transactional
    public boolean confirmGrab(Long userId, Long couponId) {
        // 1. 扣减实际库存
        couponMapper.decreaseStock(couponId, 1);
        
        // 2. 释放冻结库存
        couponMapper.unfreezeStock(couponId, 1);
        
        // 3. 更新用户优惠券状态
        userCouponMapper.updateStatus(userId, couponId, UserCouponStatus.ACTIVE);
        
        return true;
    }
    
    /**
     * Cancel阶段:取消执行
     */
    @Transactional
    public boolean cancelGrab(Long userId, Long couponId) {
        // 1. 释放冻结库存
        couponMapper.unfreezeStock(couponId, 1);
        
        // 2. 删除预领取记录
        userCouponMapper.deletePending(userId, couponId);
        
        return true;
    }
}

优点:

  • ✅ 性能较好(无全局锁)
  • ✅ 资源锁定时间短

缺点:

  • ❌ 业务侵入性强(需拆分为3个接口)
  • ❌ 代码复杂度高
  • ❌ Confirm/Cancel需幂等

适用场景: 支付、库存扣减(需要预留资源)

2.3 Saga模式

原理:

长事务拆分为多个本地事务
每个本地事务有对应的补偿操作
如果某个步骤失败,执行前面步骤的补偿操作

两种实现方式:

1. 编排式(Choreography)

服务A完成 -> 发送事件 -> 服务B执行 -> 发送事件 -> 服务C执行
服务B失败 -> 发送补偿事件 -> 服务A执行补偿

2. 编排式(Orchestration)

Saga协调器统一调度:
Step1: 调用服务A
Step2: 调用服务B
Step3: 调用服务C
如果Step3失败 -> 执行Step2补偿 -> 执行Step1补偿

示例(优惠券核销):

@Component
public class CouponSagaOrchestrator {
    
    @Autowired
    private CouponService couponService;
    
    @Autowired
    private UserCouponService userCouponService;
    
    @Autowired
    private CouponLogService couponLogService;
    
    /**
     * 正向流程
     */
    public void processCouponGrab(Long userId, Long couponId) {
        SagaInstance saga = SagaInstance.builder()
            .id(generateSagaId())
            .status(SagaStatus.STARTED)
            .build();
        
        try {
            // Step 1: 扣减库存
            saga.addStep(SagaStep.builder()
                .action(() -> couponService.decreaseStock(couponId))
                .compensation(() -> couponService.increaseStock(couponId))
                .build());
            
            // Step 2: 创建用户优惠券
            saga.addStep(SagaStep.builder()
                .action(() -> userCouponService.create(userId, couponId))
                .compensation(() -> userCouponService.delete(userId, couponId))
                .build());
            
            // Step 3: 记录日志
            saga.addStep(SagaStep.builder()
                .action(() -> couponLogService.log(userId, couponId, "GRAB"))
                .compensation(() -> couponLogService.deleteLog(userId, couponId))
                .build());
            
            // 执行Saga
            sagaExecutor.execute(saga);
            
        } catch (Exception e) {
            // 执行补偿
            sagaExecutor.compensate(saga);
            throw new BusinessException("领取优惠券失败", e);
        }
    }
}

优点:

  • ✅ 无全局锁,性能好
  • ✅ 适合长事务
  • ✅ 业务相对简单

缺点:

  • ❌ 最终一致性(中间状态可见)
  • ❌ 补偿逻辑复杂
  • ❌ 调试困难

适用场景: 业务流程长、需要补偿的场景(订单、物流)

2.4 本地消息表(我们的选择)

原理:

1. 业务操作和消息记录在同一本地事务中
2. 定时任务扫描消息表,发送消息
3. 消息消费方处理完成后,更新消息状态
4. 兜底机制:消息消费失败可重试

为什么选择本地消息表?

  • ✅ 性能高(异步处理)
  • ✅ 实现相对简单
  • ✅ 适合我们的业务场景(读多写少)
  • ✅ 与Spring生态集成好

三、本地消息表实战

3.1 数据表设计

-- 业务表:用户优惠券
CREATE TABLE user_coupon (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    user_id BIGINT NOT NULL,
    coupon_id BIGINT NOT NULL,
    status VARCHAR(20) NOT NULL,
    grab_time DATETIME NOT NULL,
    expire_time DATETIME NOT NULL,
    UNIQUE KEY uk_user_coupon (user_id, coupon_id)
);

-- 消息表
CREATE TABLE local_message (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    message_id VARCHAR(64) NOT NULL UNIQUE,
    topic VARCHAR(100) NOT NULL,
    tag VARCHAR(100),
    body TEXT NOT NULL,
    status VARCHAR(20) NOT NULL,  -- PENDING/SENT/SUCCESS/FAIL
    retry_count INT DEFAULT 0,
    create_time DATETIME NOT NULL,
    update_time DATETIME NOT NULL,
    send_time DATETIME,
    success_time DATETIME,
    error_msg TEXT,
    INDEX idx_status_time (status, create_time)
);

3.2 核心实现

@Service
public class LocalMessageService {
    
    @Autowired
    private LocalMessageMapper messageMapper;
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    /**
     * 保存消息(业务操作和消息在同一事务中)
     */
    @Transactional
    public void saveMessage(String topic, String tag, Object body) {
        LocalMessage message = new LocalMessage();
        message.setMessageId(generateMessageId());
        message.setTopic(topic);
        message.setTag(tag);
        message.setBody(JSON.toJSONString(body));
        message.setStatus(MessageStatus.PENDING);
        message.setCreateTime(LocalDateTime.now());
        message.setUpdateTime(LocalDateTime.now());
        
        messageMapper.insert(message);
    }
    
    /**
     * 定时发送消息(每5秒执行)
     */
    @Scheduled(fixedDelay = 5000)
    public void sendPendingMessages() {
        // 查询待发送消息(限制100条)
        List<LocalMessage> pendingMessages = messageMapper.selectByStatus(
            MessageStatus.PENDING, 100);
        
        for (LocalMessage message : pendingMessages) {
            try {
                // 发送消息
                SendResult result = rocketMQTemplate.syncSend(
                    message.getTopic() + ":" + message.getTag(),
                    message.getBody());
                
                if (result.getSendStatus() == SendStatus.SEND_OK) {
                    // 更新为已发送
                    messageMapper.updateStatus(message.getId(), 
                        MessageStatus.SENT, LocalDateTime.now());
                } else {
                    // 重试次数+1
                    incrementRetry(message.getId());
                }
            } catch (Exception e) {
                log.error("发送消息失败,messageId={}", message.getMessageId(), e);
                incrementRetry(message.getId(), e.getMessage());
            }
        }
    }
    
    /**
     * 处理超时消息(超过5分钟未成功)
     */
    @Scheduled(fixedDelay = 60000)
    public void handleTimeoutMessages() {
        LocalDateTime timeout = LocalDateTime.now().minusMinutes(5);
        List<LocalMessage> timeoutMessages = messageMapper.selectTimeout(
            MessageStatus.SENT, timeout);
        
        for (LocalMessage message : timeoutMessages) {
            if (message.getRetryCount() >= 5) {
                // 超过重试次数,标记为失败
                messageMapper.updateStatus(message.getId(), 
                    MessageStatus.FAIL, null);
                // 发送告警
                sendAlert(message);
            } else {
                // 重新发送
                messageMapper.updateStatus(message.getId(), 
                    MessageStatus.PENDING, null);
            }
        }
    }
}

3.3 业务集成

@Service
public class CouponGrabService {
    
    @Autowired
    private CouponRedisService redisService;
    
    @Autowired
    private LocalMessageService messageService;
    
    /**
     * 领取优惠券(使用本地消息表)
     */
    @Transactional
    public void grabCoupon(Long userId, Long couponId) {
        // 1. Redis预扣减
        Long result = redisService.tryGrabCoupon(userId, couponId, 1);
        if (result != 1) {
            throw new BusinessException("领取优惠券失败");
        }
        
        // 2. 记录本地消息(和业务操作在同一事务中)
        CouponGrabMessage message = CouponGrabMessage.builder()
            .userId(userId)
            .couponId(couponId)
            .grabTime(LocalDateTime.now())
            .build();
        
        messageService.saveMessage("coupon-topic", "grab", message);
    }
}

3.4 消费者端幂等处理

@Component
@RocketMQMessageListener(topic = "coupon-topic", consumerGroup = "coupon-consumer")
public class CouponMessageConsumer implements RocketMQListener<String> {
    
    @Autowired
    private UserCouponService userCouponService;
    
    @Autowired
    private LocalMessageService messageService;
    
    @Override
    @Transactional
    public void onMessage(String messageBody) {
        CouponGrabMessage message = JSON.parseObject(messageBody, CouponGrabMessage.class);
        
        try {
            // 1. 幂等检查
            if (userCouponService.exists(message.getUserId(), message.getCouponId())) {
                log.warn("重复消费,跳过处理");
                return;
            }
            
            // 2. 业务处理
            userCouponService.create(message.getUserId(), message.getCouponId());
            
            // 3. 更新消息状态
            messageService.markSuccess(message.getMessageId());
            
        } catch (Exception e) {
            log.error("处理消息失败", e);
            throw e; // 抛出异常触发重试
        }
    }
}

四、方案选择决策树

是否需要强一致性?
├── 是 -> 使用XA/2PC(金融核心场景)
└── 否 -> 可以最终一致性
    ├── 事务短且简单?
    │   ├── 是 -> 本地消息表(推荐,简单高效)
    │   └── 否 -> 流程长、需要补偿?
    │       ├── 是 -> Saga模式
    │       └── 否 -> 需要预留资源?
    │           ├── 是 -> TCC模式
    │           └── 否 -> 本地消息表

五、我们的实战经验

5.1 业务场景与方案选择

业务场景 方案 原因
优惠券领取 本地消息表 读多写少,简单高效
订单支付 TCC 需要预留库存和余额
短链创建 本地消息表 异步处理流量包扣减
物流状态流转 Saga 流程长,需要补偿

5.2 性能对比

方案 吞吐量(TPS) 平均延迟 数据一致性
XA/2PC 500 200ms 强一致
TCC 3000 50ms 最终一致
Saga 5000 30ms 最终一致
本地消息表 8000 20ms 最终一致

5.3 踩坑记录

坑1:消息顺序问题

  • 问题:同一用户的优惠券领取和核销消息乱序
  • 解决:使用MessageQueue的选择性功能,按userId分片

坑2:消息积压

  • 问题:大促期间消息堆积,消费延迟
  • 解决:增加消费者实例,动态扩缩容

坑3:事务超时

  • 问题:本地事务执行时间过长,导致消息未记录
  • 解决:拆分大事务,核心业务先提交

六、监控与治理

@Component
public class DistributedTxMetrics {
    
    @Autowired
    private MeterRegistry meterRegistry;
    
    @Scheduled(fixedRate = 60000)
    public void recordMetrics() {
        // 统计各状态消息数量
        Map<String, Long> statusCount = messageMapper.countByStatus();
        
        statusCount.forEach((status, count) -> {
            meterRegistry.gauge("message.count", 
                Tags.of("status", status), count);
        });
        
        // 统计平均处理时间
        Double avgProcessTime = messageMapper.avgProcessTime();
        meterRegistry.gauge("message.process.time.avg", avgProcessTime);
    }
}

七、总结

分布式事务没有银弹,关键是:

  1. 能不用就不用:通过业务设计避免分布式事务
  2. 最终一致性优先:大部分场景不需要强一致
  3. 本地消息表够用:简单高效,适合大多数场景
  4. 监控兜底:完善的监控和告警机制

希望这篇文章对你有所帮助!


相关文章: