Spring Boot + ActiveMQ + Mybatis + mysql
基于MQ + 事件表 实现分布式事务解决方案
什么是分布式事务就不啰嗦了,我了解的也不多
就是rpc调用导致的事务问题,区别于本地事务
比如:
用户注册后,即送皮肤
用户注册是操作在DB1,送皮肤是操作在DB2
不是简单的本地事务
分布式事务保证操作DB1、 DB2都成功
我们引入事件表 + MQ 解决分布式事务
什么是事件表,就是记录用户注册/送皮肤这个事件的状态
用户注册成功后,也要向user_evernt表插入一个对应的事件数据,并置当前状态为NEW
然后定时任务1 ,user_evernt扫表,取出NEW的数据进行送皮肤操作,
把送皮肤的消息发送到MQ,发送后把user_evernt状态置为PUBLISHED
处理送皮肤的服务接收到消息后,向reward_evernt表插入一条数据,并把状态置为PUBLISHED
然后定时任务2,reward_evernt扫表,取出PUBLISHED的数据进行操作,并把状态置为DONE
建表sql
create database userdb;
use userdb;
CREATE TABLE `user` (
`id` bigint(20) unsigned primary key auto_increment COMMENT '自增长,主键',
`user_name` varchar(100) DEFAULT NULL COMMENT '用户名'
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `event` (
`id` bigint(20) unsigned primary key auto_increment COMMENT '自增长,主键',
`type` varchar(30) DEFAULT NULL COMMENT '事件类型',
`process` varchar(30) DEFAULT NULL COMMENT '表示事件进行到了哪个环节',
`content` text COMMENT '事件包含的内容',
`create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间'
) ENGINE=InnoDB DEFAULT CHARSET=utf8
;
create database rewarddb;
use rewarddb;
CREATE TABLE `reward` (
`id` bigint(20) unsigned primary key auto_increment COMMENT '自增长,主键',
`user_id` bigint(20) not null default 0,
`skin` varchar(100) DEFAULT '龙瞎' COMMENT '皮肤'
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `event` (
`id` bigint(20) unsigned primary key auto_increment COMMENT '自增长,主键',
`type` varchar(30) DEFAULT NULL COMMENT '事件类型',
`process` varchar(30) DEFAULT NULL COMMENT '表示事件进行到了哪个环节',
`content` text COMMENT '事件包含的内容',
`create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间'
) ENGINE=InnoDB DEFAULT CHARSET=utf8
;
创建sprint boot项目:
项目整体结构:
maven依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.8.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>pers.wmx</groupId>
<artifactId>activemq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>activemq</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.60</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.properties
## 数据源配置
#spring.datasource.url=jdbc:mysql://localhost:3306/mydb?useUnicode=true&characterEncoding=utf8
#spring.datasource.username=root
#spring.datasource.password=1997
#spring.datasource.driver-class-name=com.mysql.jdbc.Driver
# 配置第一个数据源
spring.datasource.hikari.db1.jdbc-url=jdbc:mysql://localhost:3306/userdb?useUnicode=true&characterEncoding=utf8&useSSL=true
spring.datasource.hikari.db1.username=root
spring.datasource.hikari.db1.password=1997
spring.datasource.hikari.db1.driver-class-name=com.mysql.cj.jdbc.Driver
# 配置第二个数据源
spring.datasource.hikari.db2.jdbc-url=jdbc:mysql://localhost:3306/rewarddb?useUnicode=true&characterEncoding=utf8&useSSL=true
spring.datasource.hikari.db2.username=root
spring.datasource.hikari.db2.password=1997
spring.datasource.hikari.db2.driver-class-name=com.mysql.cj.jdbc.Driver
## Mybatis 配置
#mybatis.typeAliasesPackage=pers.wmx.activemq
#mybatis.configuration.mapUnderscoreToCamelCase=true
spring.activemq.broker-url=tcp://ip:61616
#true 表示使用内置的MQ,false则连接服务器
spring.activemq.in-memory=false
数据源1配置类:
package pers.wmx.activemq.config;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import javax.sql.DataSource;
/**
* @author wmx
* @date 2019-09-26
*/
@Configuration
@MapperScan(basePackages = "pers.wmx.activemq.db1.dao", sqlSessionTemplateRef = "db1SqlSessionTemplate")
public class DataSource1Config {
/**
* 生成数据源. @Primary 注解声明为默认数据源
*/
@Bean(name = "db1DataSource")
@ConfigurationProperties(prefix = "spring.datasource.hikari.db1")
@Primary
public DataSource testDataSource() {
return DataSourceBuilder.create().build();
}
/**
* 创建 SqlSessionFactory
*/
@Bean(name = "db1SqlSessionFactory")
@Primary
public SqlSessionFactory testSqlSessionFactory(@Qualifier("db1DataSource") DataSource dataSource) throws Exception {
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
bean.setDataSource(dataSource);
// bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:mybatis/mapper/db1/*.xml"));
return bean.getObject();
}
/**
* 配置事务管理
*/
@Bean(name = "db1TransactionManager")
@Primary
public DataSourceTransactionManager testTransactionManager(@Qualifier("db1DataSource") DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
@Bean(name = "db1SqlSessionTemplate")
@Primary
public SqlSessionTemplate testSqlSessionTemplate(@Qualifier("db1SqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
return new SqlSessionTemplate(sqlSessionFactory);
}
}
数据源2配置类:
package pers.wmx.activemq.config;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import javax.sql.DataSource;
/**
* @author wmx
* @date 2019-09-26
*/
@Configuration
@MapperScan(basePackages = "pers.wmx.activemq.db2.dao", sqlSessionTemplateRef = "db2SqlSessionTemplate")
public class DataSource2Config {
@Bean(name = "db2DataSource")
@ConfigurationProperties(prefix = "spring.datasource.hikari.db2")
public DataSource testDataSource() {
return DataSourceBuilder.create().build();
}
@Bean(name = "db2SqlSessionFactory")
public SqlSessionFactory testSqlSessionFactory(@Qualifier("db2DataSource") DataSource dataSource) throws Exception {
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
bean.setDataSource(dataSource);
//bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:mybatis/mapper/db2/*.xml"));
return bean.getObject();
}
@Bean(name = "db2TransactionManager")
public DataSourceTransactionManager testTransactionManager(@Qualifier("db2DataSource") DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
@Bean(name = "db2SqlSessionTemplate")
public SqlSessionTemplate testSqlSessionTemplate(@Qualifier("db2SqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
return new SqlSessionTemplate(sqlSessionFactory);
}
}
实体类:
package pers.wmx.activemq.bean;
import lombok.Data;
/**
* @author wmx
* @date 2019-09-26
*/
@Data
public class User {
private Integer id;
private String userName;
}
package pers.wmx.activemq.bean;
import lombok.Data;
@Data
public class Reward {
private Integer id;
private Integer userId;
private String skin;
}
package pers.wmx.activemq.bean;
import lombok.Data;
import java.util.Date;
@Data
public class Event {
private Integer id;
private String type;
private String process;
private String content;
private Date createTime;
private Date updateTime;
}
常量类:
package pers.wmx.activemq.constant;
public enum EventProcess {
NEW("NEW", "新建"),
PUBLISHED("PUBLISHED", "已发布"),
PROCESSED("PROCESSED", "已处理"),
;
private String value;
private String desc;
EventProcess(String value, String desc) {
this.value = value;
this.desc = desc;
}
public String getValue() {
return value;
}
public String getDesc() {
return desc;
}
}
package pers.wmx.activemq.constant;
public enum EventType {
NEW_USER("NEW_USER", "新增用户"),
NEW_REWARD("NEW_REWARD", "新增奖励"),
;
private String value;
private String desc;
EventType(String value, String desc) {
this.value = value;
this.desc = desc;
}
public String getValue() {
return value;
}
public String getDesc() {
return desc;
}
}
package pers.wmx.activemq.constant;
import java.util.ArrayList;
import java.util.List;
/**皮肤列表
* @author wmx
* @date 2019-09-26
*/
public class RewardConst {
public static List<String> skinList = new ArrayList<>();
static{
skinList.add("猩红之月 压缩");
skinList.add("泳池派对 德莱文");
skinList.add("未来战士 伊泽瑞尔");
skinList.add("死兆星 嘉文四世");
skinList.add("蔷薇绅士 杰斯");
skinList.add("死兆星 锤石");
}
}
先看下dao层:
package pers.wmx.activemq.db1.dao;
import org.apache.ibatis.annotations.*;
import org.springframework.beans.factory.annotation.Qualifier;
import pers.wmx.activemq.bean.User;
/**
* @author wmx
* @date 2019-09-26
*/
@Mapper
@Qualifier("db1SqlSessionTemplate")
public interface UserDao {
@Insert("insert into user(user_name) " +
"values(#{user.userName})")
@Options(useGeneratedKeys=true, keyProperty="user.id", keyColumn="id") //插入成功后返回id
int insert(@Param("user") User user);
}
package pers.wmx.activemq.db1.dao;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;
import org.springframework.beans.factory.annotation.Qualifier;
import pers.wmx.activemq.bean.Event;
import java.util.List;
/**
* @author wmx
* @date 2019-09-26
*/
@Qualifier("db1SqlSessionTemplate")
public interface UserEventDao {
@Insert("insert into event(type, process, content)" +
"values(#{type},#{process},#{content})")
int newEvent(Event event);
@Select({
"<script>",
"select id, type, process, content",
"from event",
"where process = 'NEW' ",
"</script>"
})
List<Event> getNewEventList();
@Update({
"<script>",
"update event",
"set process = #{process}",
"where id = #{id}",
"</script>"
})
void updateProcess(Event event);
}
值得注意的地方我们配置的是双数据源,即2个db
@Qualifier("db1SqlSessionTemplate")
注入我们之前写的数据源配置类
package pers.wmx.activemq.db2.dao;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Options;
import org.apache.ibatis.annotations.Param;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Repository;
import pers.wmx.activemq.bean.Reward;
/**
* @author wmx
* @date 2019-09-26
*/
@Repository
@Qualifier("db1SqlSessionTemplate")
public interface RewardDao {
@Insert("insert into reward(user_id,skin) " +
"values(#{reward.userId},#{reward.skin})")
@Options(useGeneratedKeys=true, keyProperty="id", keyColumn="id") //插入成功后返回id
void newReward(@Param("reward") Reward reward);
}
package pers.wmx.activemq.db2.dao;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Repository;
import pers.wmx.activemq.bean.Event;
import java.util.List;
/**
* @author wmx
* @date 2019-09-26
*/
@Repository
@Qualifier("db1SqlSessionTemplate")
public interface RewardEventDao {
@Insert("insert into event(type, process, content)" +
"values(#{type},#{process},#{content})")
int insert(Event event);
@Select({
"<script>",
"select id, type, process, content",
"from event",
"where process = 'PUBLISHED' ",
"</script>"
})
List<Event> getNewEventList();
@Update({
"<script>",
"update event",
"set process = #{process}",
"where id = #{id}",
"</script>"
})
void updateProcess(Event event);
}
再看下service层
package pers.wmx.activemq.service;
import com.alibaba.fastjson.JSON;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import pers.wmx.activemq.bean.Event;
import pers.wmx.activemq.bean.Reward;
import pers.wmx.activemq.bean.User;
import pers.wmx.activemq.constant.EventProcess;
import pers.wmx.activemq.constant.EventType;
import pers.wmx.activemq.constant.RewardConst;
import pers.wmx.activemq.db1.dao.UserDao;
import pers.wmx.activemq.db1.dao.UserEventDao;
import java.util.List;
import java.util.Random;
/**
* @author wmx
* @date 2019-09-26
*/
@Service
public class UserService {
@Autowired
UserDao userDao;
@Autowired
UserEventDao userEventDao;
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Transactional
public void newUser(String userName) {
User user = new User();
user.setUserName(userName);
// 1.保存用户
Integer row = userDao.insert(user);
Integer userId = user.getId();
// 2.新增事件
Event event = new Event();
event.setType(EventType.NEW_USER.getValue());
event.setProcess(EventProcess.NEW.getValue());
Reward reward = new Reward();
reward.setUserId(userId);
Random random = new Random();
int index = random.nextInt(RewardConst.skinList.size());
if(RewardConst.skinList != null && RewardConst.skinList.size()>0) {
reward.setSkin(RewardConst.skinList.get(index));
}
// 将对象转成 json 字符串存到事件的内容字段中
event.setContent(JSON.toJSONString(reward));
userEventDao.newEvent(event);
}
public List<Event> getNewEventList() {
List<Event> eventList = userEventDao.getNewEventList();
return eventList;
}
public void executeEvent(Event event) {
if (event != null) {
String eventProcess = event.getProcess();
if ((EventProcess.NEW.getValue().equals(eventProcess))
&& (EventType.NEW_USER.getValue().equals(event.getType()))) {
String messageContent = event.getContent();
System.out.println("发送消息");
jmsMessagingTemplate.convertAndSend("doReward",messageContent);
event.setProcess(EventProcess.PUBLISHED.getValue());
userEventDao.updateProcess(event);
}
}
}
}
package pers.wmx.activemq.service;
import com.alibaba.fastjson.JSON;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import pers.wmx.activemq.bean.Event;
import pers.wmx.activemq.bean.Reward;
import pers.wmx.activemq.constant.EventProcess;
import pers.wmx.activemq.constant.EventType;
import pers.wmx.activemq.db2.dao.RewardDao;
import pers.wmx.activemq.db2.dao.RewardEventDao;
import java.util.List;
/**
* @author wmx
* @date 2019-09-26
*/
@Service
public class RewardService {
@Autowired
RewardEventDao rewardEventDao;
@Autowired
RewardDao rewardDao;
public void newEvent(Event event) {
if(event != null){
rewardEventDao.insert(event);
}
}
public List<Event> getPublishedEventList() {
List<Event> eventList = rewardEventDao.getNewEventList();
return eventList;
}
public void executeEvent(Event event) {
if (event != null) {
String eventProcess = event.getProcess();
if ((EventProcess.PUBLISHED.getValue().equals(eventProcess))
&& (EventType.NEW_REWARD.getValue().equals(event.getType()))) {
Reward reward = JSON.parseObject(event.getContent(), Reward.class);
rewardDao.newReward(reward);
event.setProcess(EventProcess.PROCESSED.getValue());
rewardEventDao.updateProcess(event);
}
}
}
}
定时任务部分:
注意定时任务的一个坑,需要在启动类加一个注解@EnableScheduling
否则定时任务不执行
package pers.wmx.activemq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
public class ActivemqApplication {
public static void main(String[] args) {
SpringApplication.run(ActivemqApplication.class, args);
}
}
扫用户事件表定时任务:
package pers.wmx.activemq.task;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import pers.wmx.activemq.bean.Event;
import pers.wmx.activemq.service.UserService;
import java.util.List;
@Component
public class UserScheduled {
@Autowired
private UserService userService;
//每3秒执行一次
@Scheduled(fixedRate=3000)
public void executeEvent() {
System.out.println("执行定时任务 user ...");
List<Event> eventList = userService.getNewEventList();
if (!CollectionUtils.isEmpty(eventList)) {
System.out.println("新建用户的事件记录总数:" + eventList.size());
eventList.forEach(e->{
userService.executeEvent(e);
});
} else {
System.out.println("待处理的事件总数:0");
}
}
}
扫奖励事件表定时任务:
package pers.wmx.activemq.task;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import pers.wmx.activemq.bean.Event;
import pers.wmx.activemq.service.RewardService;
import java.util.List;
@Component
public class RewardScheduled {
@Autowired
private RewardService rewardService;
@Scheduled(cron = "*/5 * * * * *")
public void executeEvent() {
System.out.println("执行定时任务 reward ...");
List<Event> eventList = rewardService.getPublishedEventList();
if (!CollectionUtils.isEmpty(eventList)) {
System.out.println("已发布的奖励事件记录总数:" + eventList.size());
eventList.forEach(e->rewardService.executeEvent(e));
} else {
System.out.println("待处理的事件总数:0");
}
}
}
cron表达式见 http://cron.qqe2.com/
消息消费者:
package pers.wmx.activemq.mq;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import pers.wmx.activemq.bean.Event;
import pers.wmx.activemq.constant.EventProcess;
import pers.wmx.activemq.constant.EventType;
import pers.wmx.activemq.service.RewardService;
/**
* @author wmx
* @date 2019-09-26
*/
@Component
public class RewardListener {
@Autowired
RewardService rewardService;
@JmsListener(destination = "doReward")
public void readMessage(String msg){
System.out.println("msg = " + msg);
if (!StringUtils.isEmpty(msg)) {
// 新增事件
Event event = new Event();
event.setType(EventType.NEW_REWARD.getValue());
event.setProcess(EventProcess.PUBLISHED.getValue());
event.setContent(msg);
rewardService.newEvent(event);
}
}
}
单元测试:
@Test
public void testNewUser(){
userService.newUser("haha");
}
执行单元测试
user 表
user对应的事件表
reward表
reward对应的事件表
现在,我们再重新看这个图,是不是就比较清晰了
1,先写用户表,再写事件表( NEW),事件表内容content就是 用户ID + 奖励皮肤
2,定时任务扫事件表,取出NEW状态下的数据,把content发送到activeMQ,并把状态置为PUBLISHED
3,消费者接收消息后,写事件表,content就是从activeMQ中读到的消息,即用户ID + 奖励皮肤,状态为PUBLISHED
4,定时任务扫奖励事件表,取出状态为PUBLISHED的数据,然后再reward表新加一条数据,再把事件表中对应的数据状态置为PROCESSED
整个过程完成
这样就保证了两个数据源之间数据的最终一致性了
今天涉及的内容包括SpringBoot,mybatis,mysql,activeMQ,定时任务,多数据源配置,事件表
还是很有收获的
有些小问题不注意就会调不通或者出错,比如定时任务不执行,多数据源不起作用等等
参考:
《分布式消息中间件实践》
http://bbs.itheima.com/thread-452900-1-1.html
https://blog.csdn.net/qq_34297563/article/details/90445684
转载请注明:汪明鑫的个人博客 » 消息队列 分布式事务方案
说点什么
您将是第一位评论人!