1、是什么
在分布式系统中,跨越多个节点(服务、数据库、消息队列等)的事务操作,需要保证这些操作要么全部成功,要么全部失败,以维护数据的一致性。
核心特性:
原子性(Atomicity):所有操作要么全部成功,要么全部回滚
一致性(Consistency):事务完成后系统状态保持一致
隔离性(Isolation):并发事务互不干扰
持久性(Durability):事务结果永久保存
在分布式系统中,传统的ACID事务难以实现,需要特殊机制处理
2、为什么要用
随着微服务架构的普及,业务操作常涉及多个服务,典型场景:
电商下单:扣库存 → 创建订单 → 支付
银行转账:A账户扣款 → B账户加款
酒店预订:锁定房源 → 创建订单 → 积分扣除
不用分布式事务的风险:
数据不一致:部分服务成功部分失败
系统可靠性降低:错误难以恢复
业务逻辑混乱:需手动处理部分成功的情况
3、解决方案
两阶段提交(2PC)
阶段1(Prepare):协调者询问参与者是否可提交
阶段2(Commit/Rollback):根据参与者反馈决定提交或回滚
缺点:同步阻塞、单点故障
TCC(Try-Confirm-Cancel)
Try:预留资源(如冻结库存)
Confirm:确认执行业务(如扣减库存)
Cancel:取消操作(如解冻库存)
优点:高性能、无阻塞
缺点:业务侵入性强
基于消息最终一致性
使用消息队列(如RabbitMQ、Kafka)保证事务最终一致
流程:
执行本地事务
发送事务消息
消费者处理消息
优点:松耦合、高吞吐
缺点:实现复杂
Saga模式
长事务拆分为多个本地事务
每个事务有对应的补偿操作
类型:
协同式:每个服务触发下一个操作
编排式:协调器控制流程
4、项目集成
Java(使用Seata)
<!-- pom.xml 添加依赖 -->
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.5.2</version>
</dependency>
// 应用启动类
@SpringBootApplication
@EnableFeignClients
public class OrderApplication {
public static void main(String[] args) {
SpringApplication.run(OrderApplication.class, args);
}
}
// 业务服务
@Service
public class OrderService {
@GlobalTransactional // 开启全局事务
public void createOrder(OrderDTO orderDTO) {
// 1. 扣减库存
stockFeignClient.reduce(orderDTO.getProductId());
// 2. 创建订单
orderDAO.create(orderDTO);
// 3. 扣减余额
accountFeignClient.reduce(orderDTO.getUserId(), orderDTO.getAmount());
}
}
Python(使用Saga模式)
# 安装依赖
# pip install saga-pattern
from saga_pattern import Saga, SagaBuilder
def create_order_saga(data):
saga = (
SagaBuilder()
.step("reduce_stock")
.invoke(lambda: stock_service.reduce(data['product_id']))
.with_compensation(lambda: stock_service.compensate(data['product_id']))
.step("create_order")
.invoke(lambda: order_service.create(data))
.with_compensation(lambda: order_service.cancel(data['order_id']))
.step("reduce_balance")
.invoke(lambda: account_service.reduce(data['user_id'], data['amount']))
.with_compensation(lambda: account_service.compensate(data['user_id'], data['amount']))
.build()
)
return saga.execute()
5、手动实现
Java(TCC模式)
// 库存服务接口
public interface StockService {
@Transactional
boolean tryReduce(Long productId, int count);
@Transactional
boolean confirmReduce(Long productId, int count);
@Transactional
boolean cancelReduce(Long productId, int count);
}
// 订单服务
public class OrderCoordinator {
@Autowired
private StockService stockService;
@Autowired
private OrderService orderService;
public void createOrder(OrderDTO order) {
// TCC Try 阶段
if (!stockService.tryReduce(order.getProductId(), order.getCount())) {
throw new RuntimeException("库存预留失败");
}
try {
// 主业务操作
orderService.create(order);
// TCC Confirm 阶段
stockService.confirmReduce(order.getProductId(), order.getCount());
} catch (Exception e) {
// TCC Cancel 阶段
stockService.cancelReduce(order.getProductId(), order.getCount());
throw e;
}
}
}
Python实现(消息最终一致)
import pika
import json
from django.db import transaction
# 生产者(订单服务)
def create_order(order_data):
with transaction.atomic():
# 1. 本地事务:创建订单
order = Order.objects.create(**order_data)
# 2. 发送事务消息
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='order_created')
channel.basic_publish(
exchange='',
routing_key='order_created',
body=json.dumps({
'order_id': order.id,
'product_id': order.product_id,
'quantity': order.quantity
}),
properties=pika.BasicProperties(
delivery_mode=2, # 消息持久化
)
)
connection.close()
return order
# 消费者(库存服务)
def reduce_stock(ch, method, properties, body):
data = json.loads(body)
try:
# 扣减库存
with transaction.atomic():
product = Product.objects.get(id=data['product_id'])
if product.stock < data['quantity']:
raise ValueError("库存不足")
product.stock -= data['quantity']
product.save()
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"处理失败: {e}")
# 加入死信队列人工处理
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
# 启动消费者
def start_consumer():
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='order_created', durable=True)
channel.basic_consume(queue='order_created', on_message_callback=reduce_stock)
channel.start_consuming()
6、对比
选型建议:
金融核心系统:TCC/2PC
电商等高并发:Saga/事务消息
对一致性要求不高:最终一致性方案
7、常见问题
如何选择方案?
根据业务场景的ACID要求、性能需求和团队技术栈决定
事务悬挂问题怎么解决?
TCC模式需添加事务状态表记录操作状态
如何监控分布式事务?
使用分布式追踪系统(如SkyWalking, Jaeger)
补偿失败如何处理?
建立死信队列+人工干预机制
跨语言系统如何协调?
使用支持多语言的协调器(如DTM)或基于消息队列实现
评论区