1.数据库表导入
- 在测试分布式事务之前,我们需要先设计数据库,以及准备测试数据。
- 新建数据库,命名为:seata
创建表(导入 Sql)
/*
Navicat MySQL Data Transfer
Source Server : Mysql
Source Server Version : 80019
Source Host : localhost:3306
Source Database : seata
Target Server Type : MYSQL
Target Server Version : 80019
File Encoding : 65001
Date: 2020-05-21 16:22:26
*/
SET FOREIGN_KEY_CHECKS=0;
-- ----------------------------
-- Table structure for account_tbl
-- ----------------------------
DROP TABLE IF EXISTS account_tbl;
CREATE TABLE account_tbl (
id int NOT NULL AUTO_INCREMENT,
user_id varchar(255) DEFAULT NULL,
money int DEFAULT '0',
PRIMARY KEY (id)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8;
-- ----------------------------
-- Records of account_tbl
-- ----------------------------
INSERT INTO account_tbl VALUES ('1', 'DQCGM_USER_1', '10000');
INSERT INTO account_tbl VALUES ('2', 'DQCGM_USER_2', '10000');
-- ----------------------------
-- Table structure for order_tbl
-- ----------------------------
DROP TABLE IF EXISTS order_tbl;
CREATE TABLE order_tbl (
id int NOT NULL AUTO_INCREMENT,
user_id varchar(255) DEFAULT NULL,
commodity_code varchar(255) DEFAULT NULL,
count int DEFAULT '0',
money int DEFAULT '0',
PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Records of order_tbl
-- ----------------------------
-- ----------------------------
-- Table structure for storage_tbl
-- ----------------------------
DROP TABLE IF EXISTS storage_tbl;
CREATE TABLE storage_tbl (
id int NOT NULL AUTO_INCREMENT,
commodity_code varchar(255) DEFAULT NULL,
count int DEFAULT '0',
PRIMARY KEY (id),
UNIQUE KEY commodity_code (commodity_code)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8;
-- ----------------------------
-- Records of storage_tbl
-- ----------------------------
INSERT INTO storage_tbl VALUES ('1', 'HUAWEI_0001', '10');
INSERT INTO storage_tbl VALUES ('2', 'XIAOMI_002', '10');
-- ----------------------------
-- Table structure for undo_log
-- ----------------------------
DROP TABLE IF EXISTS undo_log;
CREATE TABLE undo_log (
id bigint NOT NULL AUTO_INCREMENT,
branch_id bigint NOT NULL,
xid varchar(100) NOT NULL,
context varchar(128) NOT NULL,
rollback_info longblob NOT NULL,
log_status int NOT NULL,
log_created datetime NOT NULL,
log_modified datetime NOT NULL,
ext varchar(100) DEFAULT NULL,
PRIMARY KEY (id),
UNIQUE KEY ux_undo_log (xid,branch_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Records of undo_log
-- ----------------------------
- 表有如下:
- account :用户的账号表
- Order:订单表;
- Stoage:商品的库存表;
- undo_log:回滚事务表,SEATA AT 模式需要 UNDO_LOG 表。
2.模型对象和 Mapper 对象生成
使用 IDEA 连接数据库:
成功后,如图所示:
执行代码的生成:
提示:若大家没有安装 mybatis 的代码生成插件,还请自行安装,或者手动编译
Account_tbl:
Order_tbl:
Storage_tbl:
3.storage-service 代码的完善
3.1 接口设计
- 在 storage-service 里面,主要完成对库存的扣减
- 新建一个接口:
- 命名为:StorageService,代码如下:
代码如下:
public interface StorageService {
/**
* 扣减商品的库存
* @param commodityCode
* 商品的编码
* @param count
* 扣减商品的数量
*/
void deduct(String commodityCode, int count);
}
3.2 实现该接口
名称为:impl.StorageService,代码的实现如下:
@Service
public class StorageServiceImpl implements StorageService {
private static Logger logger = LoggerFactory.getLogger(StorageServiceImpl.class);
@Autowired
private StorageTblMapper storageTblMapper;
@Override
public void deduct(String commodityCode, int count) {
logger.info("开始扣减库存,商品编码:{},数量:{}", commodityCode, count);
StorageTbl storageTbl = storageTblMapper.selectOne(new LambdaQueryWrapper<StorageTbl>().eq(StorageTbl::getCommodityCode, commodityCode));
int idleCount = storageTbl.getCount() - count;
if (idleCount < 0) {
throw new RuntimeException("库存不足");
}
storageTbl.setCount(idleCount);
storageTblMapper.updateById(storageTbl);
logger.info("库存扣减成功,商品编码:{},剩余数量:{}", commodityCode, idleCount);
}
}
3.3 使用 Restful 暴露此接口
添加一个 Controller
代码如下:
@RestController
public class StorageController {
private static Logger logger = LoggerFactory.getLogger(StorageController.class) ;
@Autowired
private StorageService storageService ;
/**
* 扣减商品的库存
* @param commodityCode 商品的编码
* @param count 商品的数量
* @return
*/
@GetMapping("/deduct/{commodityCode}/{count}")
public ResponseEntity<Void> deduct(@PathVariable("commodityCode") String commodityCode, @PathVariable("count") Integer count){
logger.info("Account Service ... xid: " + RootContext.getXID());
// 开始扣减库存
storageService.deduct(commodityCode , count);
return ResponseEntity.ok().build() ;
}
}
3.4 添加配置文件
内容如下:
server:
port: 18084
spring:
application:
name: storage-service
cloud:
alibaba:
seata:
tx-service-group: storage-service
nacos:
discovery:
server-addr: localhost:8848
datasource:
name: storageDataSource
type: com.alibaba.druid.pool.DruidDataSource
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/seata?useSSL=false&serverTimezone=UTC
druid:
max-active: 20
min-idle: 2
initial-size: 2
seata:
service:
vgroup-mapping:
account-service: default
grouplist:
default: 127.0.0.1:8091
disable-global-transaction: false
enabled: true
mybatis-plus:
mapper-locations: classpath:/mapper/*.xml
3.5 添加启动类
@SpringBootApplication
@EnableDiscoveryClient
@MapperScan("com.dqcgm.mapper")
public class StorageServiceApplication {
public static void main(String[] args) {
SpringApplication.run(StorageServiceApplication.class ,args) ;
}
}
3.6 启动项目测试
启动项目后,打印该日志,说明连接 seata-server 成功。
4.account-service 代码的完善
4.1 接口设计
- 在 account-service 里面,主要完成对用户余扣减
- 新建一个接口:
命名为:AccountService,代码如下:
代码如下:
public interface AccountService {
/**
* 从用户的账号扣减金额
* @param userId
* 用户的 Id
* @param money
* 金额
*/
void debit(String userId, int money);
}
4.2 实现该接口
名称为:impl.StorageService,代码的实现如下:
@Service
public class AccountServiceImpl implements AccountService {
@Autowired
private AccountTblMapper accountTblMapper;
private static Logger logger = LoggerFactory.getLogger(AccountServiceImpl.class);
@Override
public void debit(String userId, int money) {
logger.info("准备扣减用户:{} 余额,扣减的数目为:{}", userId, money);
AccountTbl accountTbl = accountTblMapper.selectOne(new LambdaQueryWrapper<AccountTbl>().eq(AccountTbl::getUserId, userId));
int idleMoney = accountTbl.getMoney() - money;
if (idleMoney < 0) {
throw new RuntimeException("用户余额不足");
}
accountTbl.setMoney(idleMoney);
accountTblMapper.updateById(accountTbl);
logger.info("扣减用户{}金额成功,剩余金额为{}", userId, money);
}
}
4.3 使用 Restful 暴露此接口
添加一个 Controller
- 名称为:
- 代码如下:
@RestController
public class AccountController {
@Autowired
private AccountService accountService ;
private static Logger logger = LoggerFactory.getLogger(AccountController.class) ;
@GetMapping("/debit/{userId}/{money}")
public ResponseEntity<Void> debit( @PathVariable("userId") String userId, @PathVariable("money") Integer money){
logger.info("Account Service ... xid: " + RootContext.getXID());
// 开始扣减余额
accountService.debit(userId , money);
return ResponseEntity.ok().build() ;
}
}
4.4 添加配置文件
在 resource 目录里面新建配置文件:
内容如下:
server:
port: 18085
spring:
application:
name: account-service
cloud:
alibaba:
seata:
tx-service-group: account-service
nacos:
discovery:
server-addr: localhost:8848
datasource:
type: com.alibaba.druid.pool.DruidDataSource
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/seata?useSSL=false&serverTimezone=UTC
druid:
max-active: 20
min-idle: 2
initial-size: 2
seata:
service:
vgroup-mapping:
account-service: default
grouplist:
default: 127.0.0.1:8091
disable-global-transaction: false
enabled: true
mybatis-plus:
mapper-locations: classpath:/mapper/*.xml
4.5 添加启动类
命名为 AccountServiceApplication ,代码如下:
@SpringBootApplication
@EnableDiscoveryClient
@MapperScan("com.dqcgm.mapper")
public class AccoutServiceApplication {
public static void main(String[] args) {
SpringApplication.run(AccoutServiceApplication.class ,args) ;
}
}
4.6 启动项目测试
启动项目后,打印该日志,说明连接 seata-server 成功
5.order-service 代码的完善
5.1 接口设计
- 在 order-service 里面,主要完成保存用户订单的操作。
- 新建一个接口:
命名为:OrderService,代码如下:
代码如下:
public interface OrderService {
/**
* 创建一个订单
* @param userId 用户 id
* @param commodityCode 商品的编号
* @param orderCount 商品的数量
* @return OrderTbl
*/
OrderTbl create(String userId, String commodityCode, int orderCount) ;
}
5.2 实现该接口
名称为:impl.OrderService,代码的实现如下:
@Service
public class OrderServiceImpl implements OrderService {
@Autowired
private OrderTblMapper orderTblMapper;
@Autowired
private AccountService accountService;
private static Logger logger = LoggerFactory.getLogger(OrderServiceImpl.class);
@Override public OrderTbl create(String userId, String commodityCode, int orderCount){
logger.info("准备为{}创建一个订单,商品编号为{},数量为{}", userId,commodityCode, orderCount);
// 1 计算总金额
int orderMoney = calculate(commodityCode, orderCount);
accountService.debit(userId, orderMoney);
OrderTbl order = new OrderTbl();
order.setUserId(userId);
order.setCommodityCode(commodityCode);
order.setCount(orderCount);
order.setMoney(orderMoney);
orderTblMapper.insert(order);
// INSERT INTO orders ...
return order;
}
private int calculate(String commodityCode, int orderCount) {
// 我们现在没有商品的表,在此我们把商品的价格定死
int prodPrice = 0 ;
if("HUAWEI_0001".equals(commodityCode)){
// 华为时 100
prodPrice = 100;
}else if ("XIAOMI_002".equals(commodityCode)){
// 小米时 200
prodPrice = 200 ;
}else {
prodPrice = 1000 ; // 其他为 1000
}
return orderCount * prodPrice ;
}
}
5.3 远程调用 account-service 的实现
创建一个 AccountService 的类,该类里面主要完成对 accout-servic 的远程调用
名称为:
//实现对账号服务的远程调用
@Service
public class AccountService {
private static Logger logger = LoggerFactory.getLogger(AccountService.class) ;
//1.ribbon 的方式
@Autowired
private RestTemplate restTemplate ;
//2.feign 的方式
public void debit(String userId, int orderMoney) {
ResponseEntity<Void> entity = restTemplate.getForEntity( "http://accout-service/debit/{userId}/{orderMoney}", Void.class, userId, orderMoney );
if(entity.getStatusCode()== HttpStatus.OK){
logger.info("扣减用户{}金额成功,本次扣减的数目为{}",userId,orderMoney);
return ;
}
logger.info("扣减用户{}金额失败",userId);
throw new RuntimeException("扣减金额失败") ;
}
}
我们在此使用的时 Ribbon 做远程调用,下一博文我也会测试 Feign
5.4 Ribbon 集成
创建一个配置类:
代码如下:
@Configuration
public class HttpUtilConfig {
@LoadBalanced
@Bean
public RestTemplate restTemplate(){
return new RestTemplate() ;
}
}
5.5 使用 Restful 暴露此接口
添加一个 Controller
命名为:
代码如下:
@RestController
public class OrderController {
private static Logger logger = LoggerFactory.getLogger(OrderController.class) ;
@Autowired
private OrderService orderService ;
/**
* 创建订单
* @param userId
* 用户 Id
* @param commodityCode
* 商品的编号
* @param orderCount
* 商品的数量
* @return
*/
@GetMapping("/create/{userId}/{commodityCode}/{orderCount}")
public ResponseEntity<Void> create(@PathVariable("userId") String userId, @PathVariable("commodityCode") String commodityCode, @PathVariable("orderCount") int orderCount){
logger.info("Order Service ... xid: " + RootContext.getXID());
orderService.create(userId, commodityCode, orderCount) ;
return ResponseEntity.ok().build() ;
}
}
5.6 添加配置文件
- 在 resource 目录里面新建配置文件:
- 命名为:application.yml
- 内容如下:
server:
port: 18086
spring:
application:
name: order-service
cloud:
alibaba:
seata:
tx-service-group: order-service
nacos:
discovery:
server-addr: localhost:8848
datasource:
name: orderDataSource
type: com.alibaba.druid.pool.DruidDataSource
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/seata?useSSL=false&serverTimezone=UTC
druid:
max-active: 20
min-idle: 2
initial-size: 2
seata:
service:
vgroup-mapping:
order-service: default
grouplist:
default: 127.0.0.1:8091
disable-global-transaction: false
enabled: true
mybatis-plus:
mapper-locations: classpath:/mapper/*.xml
5.7 添加启动类
- 命名为:OrderServiceApplication
- 代码如下:
@SpringBootApplication
@EnableDiscoveryClient
@MapperScan("com.dqcgm.mapper")
public class OrderServiceApplication {
public static void main(String[] args) {
SpringApplication.run(OrderServiceApplication.class ,args) ;
}
}
5.8 启动项目测试
启动项目后,打印该日志,说明连接 seata-server 成功。
6.business-service 代码的完善
6.1 接口设计
在 business-service 里面,主要完成下单的逻辑,包含 2 个主要的步骤,就是对库存服务和订单服务的远程调用
- 新建一个接口:
命名为:com.dqcgm.service.BusinessService
代码如下:
public interface BusinessService {
/**
* 采购/下单的过程
* @param userId
* 用户的 Id
* @param commodityCode
* 商品的编码
* @param orderCount
* 商品的数量
*/
void purchase(String userId, String commodityCode, int orderCount) ;
}
6.2 实现该接口
名称为:impl.BusinessServiceImpl,代码的实现如下:
@Service
public class BusinessServiceImpl implements BusinessService {
private static Logger logger = LoggerFactory.getLogger(BusinessServiceImpl.class) ;
@Autowired
private StorageService storageService;
@Autowired
private OrderService orderService;
@Override
public void purchase(String userId, String commodityCode, int orderCount) {
logger.info("准备下单,用户:{},商品:{},数量: {}",userId,commodityCode,orderCount);
storageService.deduct(commodityCode, orderCount);
orderService.create(userId, commodityCode, orderCount) ;
logger.info("下单完成");
}
}
6.3 远程调用 storage-service 的实现
创建一个 StorageService 的类,该类里面主要完成对 storage-servic 的远程调用
名称为:
@Service
public class StorageService {
private static Logger logger = LoggerFactory.getLogger(StorageService.class) ;
//1.采用 Ribbon 的形式
@Autowired
private RestTemplate restTemplate ;
//2.采用 Feign 的形式
public void deduct(String commodityCode, int orderCount) {
ResponseEntity<Void> entity = restTemplate. getForEntity( "http://storage-service/debut/{commodityCode}/{orderCount}", Void.class, commodityCode, orderCount );
if (entity.getStatusCode()== HttpStatus.OK){
logger.info("扣减库存成功,商品编号为{},本次扣减的数量为{}",commodityCode,orderCount);
return;
}
throw new RuntimeException("扣减库存失败") ;
}
}
我们在此使用的时 Ribbon 做远程调用,下一博文我也会测试 Feign 。
6.4 远程调用 order-service 的实现
新建一个类:
代码如下:
@Service
public class OrderService {
private static Logger logger = LoggerFactory.getLogger(StorageService.class) ;
//1.采用 Ribbon 的形式
@Autowired
private RestTemplate restTemplate ;
//2.采用 Feign 的形式
public void create(String userId, String commodityCode, int orderCount) {
ResponseEntity<Void> entity = restTemplate. getForEntity( "http://order-service/create/{userId}/{commodityCode}/{orderCount}", Void.class, userId , commodityCode, orderCount );
if (entity.getStatusCode()== HttpStatus.OK){
logger.info("订单创建成功,用户为{} ,商品编号为{},本次扣减的数量为{}",userId , commodityCode,orderCount);
return;
}
throw new RuntimeException("订单创建失败") ;
}
}
6.5 集成 Ribbon
添加一个 HttpUtilConfig 的配置类:
代码如下:
@Configuration
public class HttpUtilConfig {
@LoadBalanced
@Bean
public RestTemplate restTemplate(){
return new RestTemplate() ;
}
}
6.6 添加配置文件
在 resource 目录里面新建配置文件:
命名为:application.yml
内容如下:
server:
port: 18087
spring:
application:
name: business-service
cloud:
alibaba:
seata:
tx-service-group: business-service
nacos:
discovery:
server-addr: localhost:8848
seata:
service:
vgroup-mapping:
business-service: default
grouplist:
default: 127.0.0.1:8091
disable-global-transaction: false
enabled: true
6.7 添加启动类
命名为:BusinessServiceApplication
代码如下:
@SpringBootApplication
@EnableDiscoveryClient
public class BusinessServiceApplication {
public static void main(String[] args) {
SpringApplication.run(BusinessServiceApplication.class ,args) ;
}
}
6.8 暴露下单接口
继续改造启动类:
@SpringBootApplication
@EnableDiscoveryClient
@RestController
public class BusinessServiceApplication {
@Autowired
private BusinessService businessService ;
public static void main(String[] args) {
SpringApplication.run(BusinessServiceApplication.class ,args) ;
}
/**
* 开始下单
* @param userId
* 用户的 Id
* @param commodityCode
* 商品的编号
* @param orderCount
* 商品的数量
* @return
*/
@GetMapping("/purchase/{userId}/{commodityCode}/{orderCount}")
public ResponseEntity<Void> purchase( @PathVariable("userId") String userId, @PathVariable("commodityCode")String commodityCode, @PathVariable("orderCount")Integer orderCount){
businessService.purchase(userId,commodityCode,orderCount);
return ResponseEntity.ok().build() ;
}
}
6.9 启动项目测试
启动项目后,打印该日志,说明连接 seata-server 成功
7.总体的调用流程如下
都启动完成后:
Nacos-Server:
8.正常下单测试
在浏览器里面访问:
http://localhost:18087/purchase/DQCGM_USER_1/HUAWEI_0001/1
- 代表 DQCGM_USER_1购买 HUAWEI_0001 产品 1 件
- 数据库里面:
- Accout_tbl 里面,DQCGM_USER_1 用户的金额减少 100;
Storage_tbl 里面,HUAWEI_0001 的库存减少了 1;
Order_Tbl 里面,创建了一条订单记录;
说明,此时远程调用时正常的。
9.分布式事务的演示
我们演示如图的异常:
我们可以发现,远程调用共有 3 处
9.1 在 accout-service 服务扣减余额触发异常
9.2 重启 accout-service
9.3 还原数据库里面的数据
Account_Tbl:
Storage_Tbl:
9.4 重新下单测试
http://localhost:18087/purchase/DQCGM_USER_1/HUAWEI_0001/1
数据库的数据:
Account_Tbl:
Storage_Tbl:
我们发现,分布式事务产生了,accout-service 内部的异常,导致 accout_tbl 表数据回滚了。但是,在 storage_tbl :位于 stoage-service 的事务却没有回滚
10.使用 Seata 解决分布式问题
10.1 改造 accout-service 里面的 AccountServiceImpl
当用户的 ID 为:DQCGM_USER_2 时,我们抛出异常,当为其他用户时,我们正常的下单
10.2 改造 BusinessServiceImpl
添加一个注解,看他是否能解决分布式事务的问题
10.3 重启测试
- 重启 accout-service,business-service 测试
- 使用 DQCGM_USER_1 正常的下单测试:
- Stoage_tbl:库存正常
- Accout_Tbl:余额正常
- 使用 DQCGM_USER_2 下单测试:
- 发现发生异常后,
- stoage_tbl 里面的没有发生改变,数据正常
- Accout_tbl 里面的数据也没有发生改变,数据正常
- 分布式事务测试成功了