"/>
侧边栏壁纸
博主头像
PySuper 博主等级

千里之行,始于足下

  • 累计撰写 286 篇文章
  • 累计创建 17 个标签
  • 累计收到 2 条评论

目 录CONTENT

文章目录

分布式事务控制

PySuper
2025-06-02 / 0 评论 / 0 点赞 / 32 阅读 / 0 字
温馨提示:
所有牛逼的人都有一段苦逼的岁月。 但是你只要像SB一样去坚持,终将牛逼!!! ✊✊✊

1、是什么

在分布式系统中,跨越多个节点(服务、数据库、消息队列等)的事务操作,需要保证这些操作要么全部成功,要么全部失败,以维护数据的一致性。

核心特性:

  • 原子性(Atomicity):所有操作要么全部成功,要么全部回滚

  • 一致性(Consistency):事务完成后系统状态保持一致

  • 隔离性(Isolation):并发事务互不干扰

  • 持久性(Durability):事务结果永久保存

在分布式系统中,传统的ACID事务难以实现,需要特殊机制处理

2、为什么要用

随着微服务架构的普及,业务操作常涉及多个服务,典型场景:

  • 电商下单:扣库存 → 创建订单 → 支付

  • 银行转账:A账户扣款 → B账户加款

  • 酒店预订:锁定房源 → 创建订单 → 积分扣除

不用分布式事务的风险:

  • 数据不一致:部分服务成功部分失败

  • 系统可靠性降低:错误难以恢复

  • 业务逻辑混乱:需手动处理部分成功的情况

3、解决方案

两阶段提交(2PC)

  • 阶段1(Prepare):协调者询问参与者是否可提交

  • 阶段2(Commit/Rollback):根据参与者反馈决定提交或回滚

  • 缺点:同步阻塞、单点故障

TCC(Try-Confirm-Cancel)

  • Try:预留资源(如冻结库存)

  • Confirm:确认执行业务(如扣减库存)

  • Cancel:取消操作(如解冻库存)

  • 优点:高性能、无阻塞

  • 缺点:业务侵入性强

基于消息最终一致性

  • 使用消息队列(如RabbitMQ、Kafka)保证事务最终一致

  • 流程:

    • 执行本地事务

    • 发送事务消息

    • 消费者处理消息

  • 优点:松耦合、高吞吐

  • 缺点:实现复杂

Saga模式

  • 长事务拆分为多个本地事务

  • 每个事务有对应的补偿操作

  • 类型:

    • 协同式:每个服务触发下一个操作

    • 编排式:协调器控制流程

4、项目集成

Java(使用Seata)

<!-- pom.xml 添加依赖 -->
<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-spring-boot-starter</artifactId>
    <version>1.5.2</version>
</dependency>
// 应用启动类
@SpringBootApplication
@EnableFeignClients
public class OrderApplication {
    public static void main(String[] args) {
        SpringApplication.run(OrderApplication.class, args);
    }
}

// 业务服务
@Service
public class OrderService {
    @GlobalTransactional // 开启全局事务
    public void createOrder(OrderDTO orderDTO) {
        // 1. 扣减库存
        stockFeignClient.reduce(orderDTO.getProductId());
        
        // 2. 创建订单
        orderDAO.create(orderDTO);
        
        // 3. 扣减余额
        accountFeignClient.reduce(orderDTO.getUserId(), orderDTO.getAmount());
    }
}

Python(使用Saga模式)

# 安装依赖
# pip install saga-pattern

from saga_pattern import Saga, SagaBuilder

def create_order_saga(data):
    saga = (
        SagaBuilder()
        .step("reduce_stock")
            .invoke(lambda: stock_service.reduce(data['product_id']))
            .with_compensation(lambda: stock_service.compensate(data['product_id']))
        .step("create_order")
            .invoke(lambda: order_service.create(data))
            .with_compensation(lambda: order_service.cancel(data['order_id']))
        .step("reduce_balance")
            .invoke(lambda: account_service.reduce(data['user_id'], data['amount']))
            .with_compensation(lambda: account_service.compensate(data['user_id'], data['amount']))
        .build()
    )
    return saga.execute()

5、手动实现

Java(TCC模式)

// 库存服务接口
public interface StockService {
    @Transactional
    boolean tryReduce(Long productId, int count);
    
    @Transactional
    boolean confirmReduce(Long productId, int count);
    
    @Transactional
    boolean cancelReduce(Long productId, int count);
}

// 订单服务
public class OrderCoordinator {
    @Autowired
    private StockService stockService;
    @Autowired
    private OrderService orderService;

    public void createOrder(OrderDTO order) {
        // TCC Try 阶段
        if (!stockService.tryReduce(order.getProductId(), order.getCount())) {
            throw new RuntimeException("库存预留失败");
        }

        try {
            // 主业务操作
            orderService.create(order);
            
            // TCC Confirm 阶段
            stockService.confirmReduce(order.getProductId(), order.getCount());
        } catch (Exception e) {
            // TCC Cancel 阶段
            stockService.cancelReduce(order.getProductId(), order.getCount());
            throw e;
        }
    }
}

Python实现(消息最终一致)

import pika
import json
from django.db import transaction

# 生产者(订单服务)
def create_order(order_data):
    with transaction.atomic():
        # 1. 本地事务:创建订单
        order = Order.objects.create(**order_data)
        
        # 2. 发送事务消息
        connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        channel = connection.channel()
        
        channel.queue_declare(queue='order_created')
        channel.basic_publish(
            exchange='',
            routing_key='order_created',
            body=json.dumps({
                'order_id': order.id,
                'product_id': order.product_id,
                'quantity': order.quantity
            }),
            properties=pika.BasicProperties(
                delivery_mode=2,  # 消息持久化
            )
        )
        connection.close()
    return order

# 消费者(库存服务)
def reduce_stock(ch, method, properties, body):
    data = json.loads(body)
    try:
        # 扣减库存
        with transaction.atomic():
            product = Product.objects.get(id=data['product_id'])
            if product.stock < data['quantity']:
                raise ValueError("库存不足")
            product.stock -= data['quantity']
            product.save()
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        print(f"处理失败: {e}")
        # 加入死信队列人工处理
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

# 启动消费者
def start_consumer():
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='order_created', durable=True)
    channel.basic_consume(queue='order_created', on_message_callback=reduce_stock)
    channel.start_consuming()

6、对比

方案

一致性

性能

复杂度

适用场景

2PC

强一致

中等

传统数据库集群

TCC

最终一致

高并发、短事务

Saga

最终一致

长事务、业务流程复杂

本地消息表

最终一致

中低并发、业务解耦

事务消息

最终一致

消息队列支持的场景

选型建议

  • 金融核心系统:TCC/2PC

  • 电商等高并发:Saga/事务消息

  • 对一致性要求不高:最终一致性方案

7、常见问题

  • 如何选择方案?

    • 根据业务场景的ACID要求、性能需求和团队技术栈决定

  • 事务悬挂问题怎么解决?

    • TCC模式需添加事务状态表记录操作状态

  • 如何监控分布式事务?

    • 使用分布式追踪系统(如SkyWalking, Jaeger)

  • 补偿失败如何处理?

    • 建立死信队列+人工干预机制

  • 跨语言系统如何协调?

    • 使用支持多语言的协调器(如DTM)或基于消息队列实现

0
  1. 支付宝打赏

    qrcode alipay
  2. 微信打赏

    qrcode weixin

评论区