Advertisement

分布式事务(七)Seata TCC模式-Spring Cloud微服务添加 TCC 分布式事务

阅读量:

项目源码: https://gitee.com/benwang6/seata-samples

准备订单项目案例

新建 seata-tcc 工程

新建 Empty Project:

该工程被命名为 seata-tcc ,并被放置于 seata-samples 文件夹内 ,与 seata-at 工程一同归入。

导入订单项目,无事务版本

下载项目代码

  1. 访问 git 仓库 https://gitee.com/benwang6/seata-samples
  2. 访问项目标签

下载无事务版

解压到 seata-tcc 目录

压缩文件中的 7 个项目目录解压缩到 seata-tcc 目录:

导入项目

在 idea 中按两下 shift 键,搜索 add maven projects,打开 maven 工具:

然后选择 seata-tcc 工程目录下的 7 个项目的 pom.xml 导入:

order启动全局事务,添加“保存订单”分支事务

在订单项目中执行添加订单:

我们要添加以下 TCC 事务操作的代码:

Try - 第一阶段:锁定数据阶段,在订单表中执行直接插入操作以更新数据库记录,并将订单状态字段赋值为0(锁定状态)。

  • Confirm - 第二阶段,提交事务,将订单状态修改成1(正常状态)。
  • Cancel - 第二阶段,回滚事务,删除订单。

order-parent 添加 seata 依赖

打开 order-parent 中注释掉的 seata 依赖:


4.0.0

org.springframework.boot
spring-boot-starter-parent
2.3.2.RELEASE


cn.tedu
order-parent
1.0-SNAPSHOT
pom
order-parent

该系统的当前版本为$ {v}。 该druid组件的当前版本为$ {v}。 该seata组件的当前版本为$ {v}。 该alibaba-seata组件的当前版本为$ {v}. 该spring cloud组件的当前版本已定名为Hoxton.SR6。

(dependences)
(dependency)
(org.springframework.boot)
(sprint-boot-starter-base)
)
(dependency)
(org.springframework.boot)
(spring-boot-starter-web-component)
)
(dependency)
(org.springframework.cloud)
(spring-cloud-starter-netflix-eureka-client)
)
(dependency)
(org.springframework.cloud)
(spring-cloud-starter-openfeign-client-server-standalone-optional-configuration-without-topology-and-logging-optionally-provides-a-single-node-topology-and-logging-without-the-topology-file-and-without-the-logging-file-based-on-the-open-feign-standard-and-the-spring-contextualization-interface-for-single-node-environments-and-offers-an-alternative-to-the-standard-spring-contextualization-interface-based-on-the-open-feign-standard-and-the-spring-contextualization-interface-for-single-node-environments-and-offers-an-alternative-to-the-standard-spring-contextualization-interface-based-on-the-open-feign-standard-and-the-spring-contextualization-interface-for-single-node-environments-based-on-the-open-feign-standard-and-the-spring-contextualization-interface-for-single-node-environments-based-on-the-open-feign-standard-and-the-spring-contextualization-interface-based-on-the-open-feign-standard-and-the-spring-contextualization-interface-based-on-the-open-feign-standard-based-on-the-open-feign-standard-based-on)
)

com.mockito .getVersion>${mybatis-plus.version}

该段代码涉及两个 Maven 包 dependency 定义。
第一个 dependency 的 groupId 为 mysql, 表示 MySQL 数据库对应的 Maven 包组标识。
其 artifactId 为 mysql-connector-java, 表示 MySQL JDBC 连接器对应的 Maven 包标识符。
第二个 dependency 的 groupId 为 com.alibaba, 表示阿里巴巴集团对应的 Maven 包组标识。
其 artifactId 为 druid-spring-boot-starter, 表示基于 Spring Boot 的 Druid 组件对应的 Maven 包标识符。
需要注意的是, 版本号字段使用 ${druid-spring.boot.starter.version} 表示当前版本号, 可以通过 Maven 的版本管理功能进行更新和维护。

打开 Cloud Computing Platform seata 依存项

(dependencies)
(
groupId: org.projectlombok
artifactId: Lombok
)
(
groupId: org.springframework.boot
artifactId: spring-boot-starter-test
scope: test
exclusions: (
filter: (
groupId: org.junit.vintage
artifactId: junit-vintage-engine
)
)
)
(dependencies)

org.springframework.cloud spring-cloud-dependencies ${spring-cloud.version} pom import org.springframework.boot spring-boot-maven-plugin

配置

application.yml

设置全局事务组的组名:

spring:
application:
name: order

datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost/seata_order?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8
username: root
password: root

事务组设置

cloud:
alibaba:
seata:
tx-service-group: order_tx_group

......

registry.conf 和 file.conf

与 AT 事务中的配置完全相同:

registry.conf

registry {

file 、nacos 、eureka、redis、zk、consul、etcd3、sofa

type = "eureka"

nacos {
server_address = "localhost"
namespace_ = ""
cluster_name = "default"
}
eureka {
service_url = "http://localhost:8761/eureka"

application = "default"

weight = "1"

}
redis {
server_address:6379 = "localhost:6379"
db:0 = "0"
password_: ""
cluster_id = "default"
timeout:0 = "0"
}
zk {
zk_cluster = "default"
server_address:2181 = "127.0.0.1:2181"
session_timeout:6000 = 6000
connect_timeout:2000 = 2000
username_: ""
password_: ""
}
consul {
consul_cluster = "default"
server_address:8500 = "127.0.0.1:8500"
}
etcd3 {
etcd_cluster_id = default
etcd_server_address_2379= http://localhost:2379
}
sofa {
server_address:9603= "127.0.0.1:9603"
application_: default
region_: DEFAULT_ZONE
data_center_: DefaultDataCenter
cluster_name_= default
group_= SEATA_GROUP
address_wait_time:=3,ooo
}
file {
name_:= file.conf
}

config {

file、nacos 、apollo、zk、consul、etcd3、springCloudConfig

type = "file"

Nacos服务配置文件中设置服务器地址为localhost,默认为空字符串;
Consul集群实例配置中指定服务器地址为"127.0.0.1:8500";
Apollo框架下应用id和元数据配置均为默认值;
Zk集群设置包括服务器地址、超时时间和认证信息均采用默认设置;
Etcd3服务配置文件中指定服务器地址为"http://localhost:2379";
普通文件存储配置仅包含文件名字段"file.conf";

file.conf

transport {

tcp udt unix-domain-socket

type = "TCP"
#NIO NATIVE
server = "NIO"
#enable heartbeat
heartbeat = true

the client batch send request enable

enableClientBatchSendRequest = true
#thread factory for netty
threadFactory {
bossThreadPrefix = "NettyBoss"
workerThreadPrefix = "NettyServerNIOWorker"
serverExecutorThread-prefix = "NettyServerBizHandler"
shareBossWorker = false
clientSelectorThreadPrefix = "NettyClientSelector"
clientSelectorThreadSize = 1
clientWorkerThreadPrefix = "NettyClientWorkerThread"

netty boss thread size,will not be used for UDT

bossThreadSize = 1
#auto default pin or 8
workerThreadSize = "default"
}
shutdown {

when destroy server, wait seconds

wait = 3
}
serialization = "seata"
compressor = "none"
}
service {
#transaction service group mapping

order_tx_group 与 yml 中的 “tx-service-group: order_tx_group” 配置一致

“seata-server” 与 TC 服务器的注册名一致

从eureka获取seata-server的地址,再向seata-server注册自己,设置group

vgroupMapping.order_tx_group = "seata-server"
#only support when registry.type=file, please don't set multiple addresses
order_tx_group.grouplist = "127.0.0.1:8091"
#degrade, current not support
enableDegrade = false
#disable seata
disableGlobalTransaction = false
}

以下是对输入文本的改写版本

OrderMapper 添加更新订单状态、删除订单

根据前面的分析,订单数据操作有以下三项:

  • 插入订单
  • 修改订单状态
  • 删除订单

在OrderMapper类中已有实现完成订单插入功能的方法。现需补充包含修改和删除订单功能的方法(其中删除操作由其父类BaseMapper完成):

package cn.tedu.order.mapper;

import cn.tedu.order.entity.OrderEntity;
import com\modules\window\core\mapper.Mapper;
import org.apache.ibatis.annotations.Param;

public interface OrderMapper extends BaseMapper {
void mapTo(Order order);
void updateStatus(@Input参数(id): Long orderId, @Input参数(类型): Integer status);
}

那么对应的 OrderMapper.xml 中也要添加 sql:

INSERT INTO `order` (`id`,`user_id`,`product_id`,`count`,`money`,`status`) VALUES(#{id}, #{userId}, #{productId}, #{count}, #{money}, ${status}); UPDATE `order` SET `status`=#{status} WHERE `id`=#{orderId}; DELETE FROM `order` WHERE `id`=#{orderId}

那么对应的 OrderMapper.xml 中也要添加 sql:

INSERT INTO `order` (`id`, `userId`, `productId`, `count`, `money`, `status`) VALUES(#{id}, #{userId}, #{productId}, #{count}, #{money}, ${status}); UPDATE `order` SET `status`=#{status} WHERE `id`=#{orderId}; DELETE FROM `order` WHERE `id`=#{orderId}

Seata 实现订单的 TCC 操作方法

  • 第一阶段 Try
  • 第二阶段
    • Confirm
    • Cancel

第二阶段为了处理幂等性问题这里首先添加一个工具类 ResultHolder

该工具也可用于确认或取消第二阶段的处理流程,并在第一阶段成功时需记录相关标记信息。

ResultHolder可以为每一个全局事务保存一个标识:

package cn.tedu.order.tcc;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

定义为ResultHolder类型的公有类{ private static Map<Class, Map> map = new ConcurrentHashMap, Map<String, String>>; }

public static void setResult(Class<?> actionClass, String xid, String v) {
Map<String, String> results = map.get(actionClass);

当结果为null时,
synchronized(map){
当结果仍为null时,
result初始化为空并发哈希表;
map中将actionClass映射到result中
}

results.put(xid, v);
}

public static String executeOperation(Class<?> actionClass, String xid) {
Map<String, String> results = map.get(actionClass);
if (results == null) {
return null;
}
return results.get(xid);
}

return null;
}

此方法具有消去效果。\n参数包括动作类和需要移除的标识符。\n此方法通过取出对应结果并将其从映射中删除来实现。\n具体来说,\n该方法会检查映射中的值是否为空。\n如果结果不为空,\n则会移除指定ID。\n

Seata 实现 TCC 操作需要定义一个接口,我们在接口中添加以下方法:

  • Try - prepareCreateOrder()
  • Confirm - commit()
  • Cancel - rollback()

package cn.tedu.order.tcc;

import io.seata rm.tcc.api.BusinessActionContext;
import io.seata rm.tcc.api.BusinessActionContextParameter;
import io.seata rm.tcc.api.LocalTCC;
import io.seata rm.tcc.api.TwoPhaseBusinessAction;

import java.math.BigDecimal;

@LocalTCC public interface OrderTccAction {

` /*
第一阶段的方法
通过注解指定第二阶段的两个方法名

该上下文对象用于在两个阶段间传输数据;它是一个关键组件,在完成业务流程的第一个阶段后会将生成的数据传递给第二个阶段使用;此外,在两次操作之间都需要对该上下文对象进行初始化和终止化设置

该注解所携带的参数数据将被存储于 BusinessActionContext 中;这些参数通常包括订单 ID、用户 ID、产品 ID 等信息

TwoPhaseBusinessAction 注解指定了一个分段式业务流程;其名称设为 orderTccAction;默认采用提交方法进行事务管理;而如果发生错误则会采用回滚机制来恢复

准备创建订单流程函数接受 BusinessActionContext 对象作为输入;并接收包含订单 ID、用户 ID、产品 ID 等字段的参数信息

// 第二阶段 - 提交操作 boolean commit(BusinessActionContext businessActionContext);

在此阶段中实施回滚机制

}

实现类:

package cn.tedu.order.tcc;

引入cn.tedu.order.entity中的Order类。
引入cn.tedu.order.mapper中的OrderMapper类。
引入io.seata.rm.tcc.api中的BusinessActionContext类。
引入Lombok-extern-slf4j的sl4j模块。
引入org.springframework.beans.factory.annotation中的@Autowired接口。
引入org.springframework.stereotype.Component标识符。
引入org.springframework.transaction.annotation.Transactional注解。

import java.math.BigDecimal;

@Component
...原样保留
public class OrderTccActionImpl implements OrderTccAction {
...原样保留
@Autowired
private OrderMapper orderMapper;

@Transactional
@Override
public boolean prepareCreateOrder(BusinessActionContext businessActionContext, Long orderId, Long userId, Long productId, Integer count, BigDecimal money) {
log.info("生成订单第一阶段,请占用资源 - " + businessActionContext.getXid());
}

创建一个名为order的Order实例,并初始化其字段值为订单ID、用户ID、商品ID、数量、金额和无效值。
通过orderMapper创建与该订单相关的映射关系。

//事务处理完成并设置唯一标识符供后续阶段使用
ResultHolder.setResult(getClass(), businessActionContext.getXid(), "p");
返回成功状态

@Transactional
public boolean saveTransaction(
BusinessActionContext businessActionContext
) {
log.info("发起 order 第二阶段提交并更新订单状态为1 - " + businessActionContext.getXid());
}

// 防止幂等行为在 commit 阶段重复触发
当调用ResultHolder.getResult()方法时,并未得到任何结果(即结果为null),则直接返回true。

// Long order number is retrieved from the business action context.
long orderId = parseLong(businessActionContext.getAllOrderContext("orderId").toString());
orderMapper.updateOperationStatus(orderId, 1);

//提交成功时需清除相关记录
调用removeResult方法清除指定记录
执行完上述操作后返回true

@Transactional
@Override
public boolean rollback(BusinessActionContext businessActionContext) {
log.info("在创建订单的第二阶段进行回滚操作,删除相关订单记录 - " + businessActionContext.getXid());
}

//如果第一阶段未能完成,则无需执行回滚操作
//由于在第一阶段存在本地事务操作,在事务失败时系统已为此进行了相应的回滚处理
//如果本阶段成功完成但全局事务参与者中有其他部分出现故障,则本系统将执行相应的回滚操作
if (ResultHolder.getResult(getClass(), businessActionContext.getXid()) == null) { return true; }

//获取订单ID,并将其转换为长整型
long orderId = (Long) businessActionContext.getOrderId();
//从字符串中提取业务动作上下文中获取的字符串
String strOrderId = businessActionContext.getOrderId().toString();
long orderId = Long.parseLong(strOrderId);
//根据订单ID进行映射
orderMapperByOrderId(orderId);

//回滚结束时执行删除操作
operation = this->operation;
if (operation) { resultHolder = ResultHolder::getInstance();
resultHolder->removeResult(this->getClass(), $context->getBusinessActionContext()->getXid());
return true;
}
}

在业务代码中调用 Try 阶段方法

业务代码中不直接存储订单数据;而是调用TCC第一阶段的方法 prepareCreateOrder,并附加全局事务标记 @GlobalTransactional

package cn.tedu.order.service;

导入cn\textdu\order\entity\Order类;
导入cn\textdu\order\feign\AccountClient类;
导入cn\textdu\order\feign\EasyIdGeneratorClient类;
导入cn\textdu\order\feign\StorageClient类;
导入cn\t edu/order/mapper/OrderMapping类;
导入cn\t edu/order/tcc/OrderTccAction类;
导入io.seata.spring.annotation.GlobalTransactional注解;
使用org.springframework.beans.factory.annotation.Autowired标注属性;
使用org.springframework.stereotype.Service标记服务类。

import java.util.Random;

@Service
public class OrderServiceImpl implements OrderService {
// @Autowired
// private OrderMapper orderMapper;
@Autowired
EasyIdGeneratorClient easyIdGeneratorClient;
@Autowired
private AccountClient accountClient;
@Autowired
private StorageClient storageClient;

@Autowired
private OrderTccAction orderTccAction;

@GlobalTransactional
@Override
public void create(Order order) {
// 使用全局唯一标识发号器生成订单ID
Long orderId = easyIdGeneratorClient.nextId("order_business");
order.setId(orderId);

// orderMapper.create(order);

此处采用TCC第1节端方法进行操作

// 修改库存
//storageClient.decrease(order.getProductId(), order.getCount());

// 修改账户余额
//accountClient.decrease(order.getUserId(), order.getMoney());

}
}

启动 order 进行测试

按顺序启动服务:

  1. Eureka
  2. Seata Server
  3. Easy Id Generator
  4. Order

发起订单保存请求至访问路径:[http://localhost:8083/create?userId=1&productId=1&count=10&money=100]

观察控制台日志:

查看数据库表中的订单数据:

storage添加“减少库存”分支事务

在库存项目中执行减少库存:

我们要添加以下 TCC 事务操作的代码:

  • Try - 第一阶,冻结数据阶段,将要减少的库存量先冻结:
  • Confirm - 第二阶段,提交事务,使用冻结的库存完成业务数据处理:
  • Cancel - 第二阶段,回滚事务,冻结的库存解冻,恢复以前的库存量:

配置

有三个文件需要配置:

  • application.yml
  • registry.conf
  • file.conf

这三份文件的参数设置与order项目的配置一致,请查阅订单配置一章以获取详细信息。

StorageMapper 添加冻结库存相关方法

根据前面的分析,库存数据操作有以下三项:

  • 冻结库存
  • 冻结库存量修改为已售出量
  • 解冻库存

在 StorageMapper 中添加三个方法:

package cn.tedu.storage.mapper;

导入$t edu 储存 对象;
导入 com.squareup 实体;
导入 org.apache 投影工具;

public interface StorageMapper extends BaseMapper {
void decrease(Long productId, Integer count); // 降低库存数量
// 将指定商品的库存数量减少
void updateFrozen(@Param("productId") Long productId, @Param("residue") Integer residue, @Param("frozen") Integer frozen); // 设置冻结商品的数量
// 在提交操作中将冻结的商品数量转移至已售出部分
void updateFrozenToUsed(@_PARAM(“productId”) Long productId, @PARAM(“count”) Integer count); // 更新库存至已售出状态
// 在回滚操作中将冻结的商品数量恢复至可用库存
void updateFrozenToResidue(@PARAM(“productId”) Long productId, @PARAM(“count”) Integer count); // 更新回滚后的库存状态
}

那么对应的 StorageMapper.xml 中也要添加 sql:

UPDATE storage SET used = used + #{count},residue = residue - #{count} WHERE product_id = #{productId} Assign the residue and frozen statuses to #{residue}, #{frozen} respectively where the product_id is #{productId}. Update the frozen and used counts for the specified product. This update operation decrements the frozen count and increments the used count by the specified number of records. 执行一次存储更新操作,在指定的产品ID上进行操作。 该操作会将frozen字段减去计数,并将residue字段增加相同数量。

Seata 实现库存的 TCC 操作方法

工具类 ResultHolder

package cn.tedu.storage.tcc;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class Result Holder {
private static Map Maps = new ConcurrentHashMapMaps();

public static void setResult(Class<?> actionClass, String xid, String v) {
// 获取actionClass对应的映射结果
Map<String, String> results = map.get(actionClass);

while checking if results is null {
synchronized access to map {
while checking if results is not null {
initialize results as a new ConcurrentHashMap
insert actionClass and results into the map
}
}
}

results.put(xid, v);
}

实现获取结果的方法 public static String getResult(Class<?> actionClass, String xid) {
Map<String, String> results = map.get(actionClass);
if (results != null) {
return results.get(xid);
}

return null;
}

该静态方法旨在删除指定标识符。该方法通过映射获取相关结果集。若结果集存在,则执行后续操作。从结果集中删除该标识符。

添加 TCC 接口,在接口中添加以下方法:

  • Try - prepareDecreaseStorage()
  • Confirm - commit()
  • Cancel - rollback()

package cn.tedu.storage.tcc;

导入包io.seata.rm.tcc.api中的BusinessActionContext类;
导入包io.seata rm.tcc.api中的BusinessActionContextParameter类;
导入包io.seata rm.tcc.api中的LocalTCC类;
导入包io.seata rm.tcc.api中的TwoPhaseBusinessAction类。

@LocalTCC public interface StorageTccAction {

@TwoPhaseBusinessActionInterface(name = "storageTccAction", commitWay = "commit", rollbackWay = "rollback")
boolean prepareDecreaseStorageOperation(BusinessActionContext businessActionContext,
@BusinessActionContextParameter(paramName = "productId") Long productId,
@BusinessActionContextParameter(paramName = "数量") Integer count);

boolean commit(BusinessActionContext businessActionContext);

boolean rollback(BusinessActionContext businessActionContext);

}

实现类:

package cn.tedu.storage.tcc;

导入cn.tedu.storage.entity.Storage类;
导入cn.tedu.storage.mapper.StorageMapper类;
导入io.seata.rm.tcc.api.BusinessActionContext类;
导入lombokternaryt4sl4j.slf4j类;
导入org.springframework.beans.factory.annotation.Autowired接口;
定义一个Spring组件类;
定义一个带有事务注解的类;

@Component
$thisComponent
@Log extends Log4j, org.apache.logging.log4j, com.sun.log4j
@PrefixMapping and Logs
@PreparedResponse and Result

@Autowired
private StorageMapper storageMapper;

@Transactional
@Override
public boolean prepareDecreaseStorage(BusinessActionContext businessActionContext, Long productId, Integer count) {
log.info("减少商品库存,第一阶段,锁定减少的库存量,productId="+productId+", count="+count);

Storage storage = storageMapper.retrieveById(productId);
if (storage.getResidue() < count) {
throw new RuntimeException("库存不足");
}

注释:
库存扣除count。
冻结库存提升count。
代码部分:
storageMapper执行更新操作,
参数为productId,
新的残余库存量为原始残余减去count,
新的冻结库存量为原始冻结加上count;
即:
storageMapper.updateFrozen\left(productId,\, storage.getResidue() - count,\, storage.getFrozen() + count\right);

//保存标识
ResultHolder.setResult(getClass(), businessActionContext.getXid(), "p");
return true;
}

@Transactional
@Override
public boolean commit(Business Action Context Business Action Context) {
long productId = Long.valueOf(business action context action context "productId".getValue().toString());
int count = Integer.valueOf(business action context action context "count".getValue().toString());
log.info("降低库存水平, 进入高级处理流程中, productId=" + productId, " count=" + count);
}

注释:防止重复提交

注释:防止重复提交

storageMapper.updateFrozenToUsed(productId, count);

//忽略标记
@Ignore
ResultHolder.deleteResult(businessActionContext.getXid(), getClass());
成功返回true值。

@Transactional
@Override
public boolean rollback(BusinessActionContext businessActionContext) {

long productId = parseLong(businessActionContext.getActionContext("productId").toString());
int count = parseInt(businessActionContext.getActionContext("count").toString());
log.info("进行降库存处理,在后续阶段进行回滚操作,请注意以下参数:productId=" + productId + ";count=" + count);

//避免结果重放
如果当前结果为空,则返回true
if (ResultHolder.getResult(getClass(), businessActionContext.getXid()) == null) {
return true;
}

storageMapper.updateFrozenToResidue(productId, count);

//删除结果
Class<?> rid = businessActionContext.getXid();
$ResultHolder.deleteResult(rid, getClass());
return true;
}

在业务代码中调用 Try 阶段方法

在业务代码中,将TCC的第一阶段方法prepareDecreaseStorage()执行,并在其中添加全局事务管理机制的注解@GlobalTransactional

package cn.tedu.storage.service;

导入该技术中心下的存储组件中的 StorageTccAction 类。
导入用于 beans factory 注入的 @Autowired 标签。
导入 Spring 框架的服务元注解接口。

@Service
public class StorageServiceImpl implements StorageService {
// @Autowired
// private StorageMapper storageMapper;

@Autowired
private StorageTccAction storageTccAction;

@Override
public void decrease(Long productId, Integer count) throws IOException {
// storageMapper.decrease(productId, count);
storageTccAction.prepareDecreaseStorage(null, productId, count);
}

}

启动 storage 进行测试

按顺序启动服务:

  1. Eureka
  2. Seata Server
  3. Easy Id Generator
  4. Storage
  5. Order

调用保存订单,地址:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100

观察 storage 的控制台日志:

查看数据库表中的库存数据:

account添加“扣减金额”分支事务

扣除金额 TCC 事务分析详见《分布式事务(六)Seata TCC模式-TCC模式介绍

配置

有三个文件需要配置:

  • application.yml
  • registry.conf
  • file.conf

这三个文件的设置与上面 order 项目的配置完全相同,请参考上面订单配置一章进行配置。

AccountMapper 添加冻结库存相关方法

根据前面的分析,库存数据操作有以下三项:

  • 冻结库存
  • 冻结库存量修改为已售出量
  • 解冻库存

在 AccountMapper 中添加三个方法:

package cn.tedu.account.mapper;

导入该类库中的cn.tedu账户实体Account。
引入该框架中pattysamson购物车服务中的核心映射器BaseMapper。
导入该注解库中的Param注解。

import java.math.BigDecimal;

public interface AccountMapper extends BaseMapper {
void reduce(Long userId, BigDecimal money);

该方法无返回值,并接受三个输入参数进行操作:

  1. 该参数指定用户ID号。
  2. 该参数指定残留值数值。
  3. 该参数指定材料固结度数值。
    所有输入均为长整型数据类型。

void convert Frozen to Used(@Param("userId") Long userId, @Param("money") BigDecimal money);

void convertFrozenValueToResidual(@Param("userId") Long userId, @Param("money") BigDecimal money);

那么对应的 AccountMapper.xml 中添加 sql:

UPDATE account SET residue = residue - #{money},used = used + #{money} where user_id = #{userId}; Update the {users_table} with set the residue and frozen status to specific values where the user ID is #{userId}. Adjust the account's frozen amount to used by subtracting and adding #{money} respectively, where the user ID matches #{userId}.

(updateFrozenToResidue)
UPDATE account SET frozen_amount=frozen_amount-{#money},
residue_balance=residue_balance+{#money} WHERE
user_id={#userId}

Seata 实现库存的 TCC 操作方法

工具类 ResultHolder

package cn.tedu.account.tcc;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

ResultHolder.java 是一个静态类。该Map采用ConcurrentHashMap实现;其键值类型为 Class<?> 和 Map<String, String>。

public static void setResult(Class<?> actionType, String xId, String value) {
Map<String, String> resultMapping = map.get(actionType);
resultMapping.put(xId, value);
}

if ((results == null)) {
synchronized (map) {
if ((results == null)) {
results = new ConcurrentHashMap<>();
map.put(actionClass, results);
}
}
}

results.put(xid, v);
}

public static String computeResult(Class<?> actionClass, String xid) {
从映射中取出results = map.get(actionClass);
如果(results不为null){
返回结果xid = results.get(xid);
}

return null;
}

public static void RetrieveResults(Class<?> actionClass, String xid) {
Map<String, String> mapResult = map.retrieve(actionClass);
if (mapResult != null) {
mapResult.remove(xid);
}
}

添加 TCC 接口,在接口中添加以下方法:

  • Try - prepareDecreaseAccount()
  • Confirm - commit()
  • Cancel - rollback()

package cn.tedu.account.tcc;

引入io_seata_rm_tcc_api_BusinessActionContext!
引入io_seata_rm_tcc_api_BusinessActionContextParameter!
引入io_seata_rm_tcc_api_LocalTCC!
引入io_seata_rm_tcc_api_TwoPhaseBusinessAction!

import java.math.BigDecimal;

@LocalTCC
public interface AccountTccAction {

@TwoPhaseBusiness_action(name="account TCC Action", commit procedure="Commit Method", rollback mechanism="Rollback Procedure")
boolean prepare_decrease_amount(Business_action_context business_action_context,
@Business_action_context_parameter(param_name="userIdentifier") Long userIdentifier,
@Business_action_context_parameter(param_name="amount") BigDecimal amount)

boolean commit(BusinessActionContext businessActionContext);

boolean rollback(BusinessActionContext businessActionContext);

}
实现类:

package cn.tedu.account.tcc;

教育科技(CN)下属于T edu的账户实体。
用于实现业务逻辑转换与服务交互的映射工具包。
提供企业应用中业务流程管理功能的相关接口。
Lombok-Sl4j Exterior的一个版本。
用于注入属性到Bean组件的注解工具包。
组件化开发框架的基础类集合。
实现事务管理功能的支持类集合。

import java.math.BigDecimal;

@Component
"@ApplicationLog"
public class AccountTransactionControlImpl implements AccountTransactionControl {
@Autowired
private AccountMapper accountMapper;
}

@Transactional
@Override
public boolean createDownPaymentAccount(BusinessActionContext obj, Long userId, BigDecimal money) {
log.info("降低账户资金:userId = " + userId + " ,支付金额 = " + money);
}

创建用户对象account,并通过用户ID进行查找。
如果账户余额少于指定金额money,则执行以下操作。
触发异常信息为‘账户金额不足’。

/*
该账户的剩余金额将被扣除指定金额。
该账户的冻结金额将增加指定金额。
*/
accountMapper.update FrozenAmount(userId, account.get Residue(). deduct (money), account.get Frozen(). increase (money));

保存标识

保存标识

保存标识

注意

@Transactional(事务相关)
@Override
public boolean save(BusinessActionContext businessActionContext) {

long userId = parseLong(businessActionContext.getAcn("userId").toString());
BigDecimal money = createDecimal(businessActionContext.getAcn("money").toString());
log.info("在第二阶段记录账户金额变化,请确保用户ID为{}以及金额为{}已成功变更", userId, money);

// 防止重复提交
if (ResultHolder.getResult(getClass(), businessActionContext.getXid()) == null) {
return true;
}

accountMapper.updateFrozenToUsed(userId, money);

//删除标识
ResultHolder.removeResult(getClass(), businessActionContext.getXid());
return true;
}

@Transactional
@Override
public boolean rollback(BusinessActionContext businessActionContext) {
long userId = Long.parse(businessActionContext.params("userId").toString());
BigDecimal money = BigDecimal.valueOf(businessActionContext.params("money").toString());

//防止重复回滚
if (ResultHolder.getResult(getClass(), businessActionContext.getXid()) == null) {
return true;
}

log.info("减少账户金额,第二阶段,回滚,userId="+userId+", money="+money);

accountMapper.updateFrozenToResidue(userId, money);

//标记删除
从ResultHolder中去除指定类中的相关记录;
return true;
}

//标记删除
从ResultHolder中去除指定类中的相关记录;
返回true;
}

在业务代码中调用 Try 阶段方法

在业务代码中执行TCC前期阶段的方法prepareDecreaseAccount,并加入全局事务注解@GlobalTransactional

package cn.tedu.account.service;

该类被引入。
该类被用于。
该类被引用。
该类被注解。

import java.math.BigDecimal;
@Service
public class AccountServiceImpl implements AccountService {
// @Autowired
// private AccountMapper accountMapper;

@Autowired
private AccountTccAction accountTccAction;

@Override
public void decrease(Long userId, BigDecimal money) {
// 调用prepareDecreaseAccount来执行减少操作(userId为Long类型用户ID)
// 参数money为BigDecimal类型金额值;该值必须大于零;否则将触发异常处理;
accountTccAction.prepareDecreaseAccount(null, userId, money);
}

启动 account 进行测试

按顺序启动服务:

  1. Eureka
  2. Seata Server
  3. Easy Id Generator
  4. Storage
  5. Account
  6. Order

调用保存订单,地址:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100

观察 account 的控制台日志:

查看数据库表中的账户数据:

全局事务回滚测试

下面来测试全局事务回滚的情况。

在订单与库存系统的第一个阶段实现了成功运行,在账户管理的第一阶段遇到了问题后(如图所示),系统触发了一个全局事务发生回滚

首先在 account 的第一阶段代码中添加模拟异常:

AccountTccActionImplprepareDecreaseAccount 方法

@Transactional
@Override
public boolean allocateAvailableFunds(BusinessActionContext businessActionContext, Long userId, BigDecimal availableFunds) {
log.info("减少可用资金,请确认是否愿意将指定账户中的资金划转至指定支付账户中(第一阶段)", userId, availableFunds);
}

一个用于管理用户身份信息的Account实例 account被初始化为accountMapper.selectById(userId)的结果;
如果account.getResidue()与money进行比较的结果小于零,则触发错误;
当account的残留值与money进行比较时结果小于零则抛出异常;

/*
可用资金-money
扣留+money
*/
accountMapper.updateFrozen(userId, account.getResidue().deduct(money), account.getFrozen().increase(money));

if (Math.random() < 0.5) {
throw new RuntimeException("模拟异常");
}

Save indicator

Save indicator

Save indicator

重新登录 account 后,请访问订单页面: http://localhost:8083/create?userId=1&productId=1&count=10&money=100

在控制台界面中查看,默认情况下会展示存储相关的信息和订单相关的记录;其中关于订单的详细信息将由'order'标签呈现。

项目源码: https://gitee.com/benwang6/seata-samples

全部评论 (0)

还没有任何评论哟~