spring data jpa 多数据源,springboot jpa多数据源
00-1010前言问题背景XA事务方案XADataSource、XA协议数据源XAConnectionXAResource引入Atomikos依赖创建JTA事务管理器MysqlXA事务行为链事务方案结论
00-1010多数据源的事务处理是老生常谈,跨两个数据源的事务管理可以看作是分布式事务的范畴。多个数据源的事务在同一个JVM中处理。经典的处理方案是JTA(基于XA协议建模的java标准事务抽象)XA(XA事务协议),常见的JTA实现框架有Atomikos、Bitronix、Narayana等。Spring用组件封装了这些框架。除了共享XA事务方案,本文提供了一种新的多数据源事务解决思路和视角。
00-1010在解决mysql字段脱敏时,结合哈丁-JDBC的脱敏组件功能,为了sql兼容和最小的应用改造,博主给出了多数据源融合的字段脱敏解决方案(只对哈丁-JDBC脱敏代理数据源采取包含脱敏字段表的操作)。这种解决方案解决了问题,但同时也带来了新的问题。数据源的事务是独立的。正如我在这篇文章中提到的,《JPA项目多数据源模式整合sharding-jdbc实现数据脱敏》,在spring的上下文中,每个数据源对应一个独立的事务管理器,事务管理器的默认数据源是业务本身的数据源。因此,当业务需要加密时,需要将@Transactional注释中的事务管理器名称指定为脱敏对应的事务管理器名称。简单的业务场景没有问题,但是一般的业务场景总有一个事务覆盖两个数据源。此时,无法指定哪个事务管理器。因此,这里需要一个具有多个数据源的事务管理器。
00-1010XA协议采用2PC(两阶段提交)来管理分布式事务。XA接口为资源管理器和事务管理器之间的通信提供了一个标准接口。在JDBC的XA事务相关api抽象中,相关接口定义如下
00-1010公共接口XA数据源扩展公共数据源{/* * *尝试使用给定的用户名和密码建立物理数据库连接。返回的连接可以在分布式事务中使用*/x connection getx connection()throwssqlexception;//省略非关键方法,如getLogWriter}
00-1010公共接口xa connection extends pooled connection {/* * *检索一个{@code XAResource}对象,事务管理器将使用此对象管理此{@code XAConnection}对象在分布式事务中的事务行为*/javax . Transaction . xa . xa resource getxa resource()抛出sqlexception}
00-1010公共接口xa resource {/* * commit xid */void commit(xid,boolean one phase)指定的全局事务抛出xaexception/* * *结束代表事务分支执行的工作。资源管理器将XA资源从指定的事务分支中分离出来,并让事务完成。*/void end(Xid xid,int flags)抛出XAException/* * *通知事务管理器忽略此xid事务分支*/void forget(XIDXID)抛出XaException/* * *确定是否是同一个资源管理器*/boolean issame RM(xa resource xares)抛出xaexception/* * *指定xid事务准备阶段*/intprepare (xidxid)引发xaexception/* * *从资源管理器获取准备好的事务分支列表。事务管理器在恢复过程中调用此方法,*以获取当前处于准备状态或初步完成状态的事务分支列表。*/Xid[] recover(int flag)引发XAException/* * *通知资源管理器回滚代表事务分支完成的工作。*/无效角色
lback(Xid xid) throws XAException; /** * 代表xid中指定的事务分支开始工作。 */ void start(Xid xid, int flags) throws XAException; //省略非关键方法}相比较普通的事务管理,JDBC的XA协议管理多了一个XAResource资源管理器,XA事务相关的行为(开启、准备、提交、回滚、结束)都由这个资源管理器来控制,这些都是框架内部的行为,体现在开发层面提供的数据源也变成了XADataSource。而JTA的抽象里,定义了UserTransaction、TransactionManager。想要使用JTA事务,必须先实现这两个接口。所以,如果我们要使用JTA+XA控制多数据源的事务,在sprign boot里以Atomikos为例,
引入Atomikos依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jta-atomikos</artifactId></dependency>
spring boot已经帮我们把XA事务管理器自动装载类定义好了,如:
创建JTA事务管理器
@Configuration(proxyBeanMethods = false)@EnableConfigurationProperties({ AtomikosProperties.class, JtaProperties.class })@ConditionalOnClass({ JtaTransactionManager.class, UserTransactionManager.class })@ConditionalOnMissingBean(PlatformTransactionManager.class)class AtomikosJtaConfiguration {@Bean(initMethod = "init", destroyMethod = "shutdownWait")@ConditionalOnMissingBean(UserTransactionService.class)UserTransactionServiceImp userTransactionService(AtomikosProperties atomikosProperties,JtaProperties jtaProperties) {Properties properties = new Properties();if (StringUtils.hasText(jtaProperties.getTransactionManagerId())) {properties.setProperty("com.atomikos.icatch.tm_unique_name", jtaProperties.getTransactionManagerId());}properties.setProperty("com.atomikos.icatch.log_base_dir", getLogBaseDir(jtaProperties));properties.putAll(atomikosProperties.asProperties());return new UserTransactionServiceImp(properties);}@Bean(initMethod = "init", destroyMethod = "close")@ConditionalOnMissingBean(TransactionManager.class)UserTransactionManager atomikosTransactionManager(UserTransactionService userTransactionService) throws Exception {UserTransactionManager manager = new UserTransactionManager();manager.setStartupTransactionService(false);manager.setForceShutdown(true);return manager;}@Bean@ConditionalOnMissingBean(XADataSourceWrapper.class)AtomikosXADataSourceWrapper xaDataSourceWrapper() {return new AtomikosXADataSourceWrapper();}@BeanJtaTransactionManager transactionManager(UserTransaction userTransaction, TransactionManager transactionManager,ObjectProvidertransactionManagerCustomizers) {JtaTransactionManager jtaTransactionManager = new JtaTransactionManager(userTransaction, transactionManager);transactionManagerCustomizers.ifAvailable((customizers) -> customizers.customize(jtaTransactionManager));return jtaTransactionManager;}}
显然,想要使用XA事务,除了需要提供UserTransaction、TransactionManager的实现。还必须要有一个XADataSource,而sharding-jdbc代理的数据源是DataSource的,我们需要将XADataSource包装成普通的DataSource,spring已经提供了一个AtomikosXADataSourceWrapper的XA数据源包装器,而且在AtomikosJtaConfiguration里已经注册到Spring上下文中,所以我们在自定义数据源时可以直接注入包装器实例,然后,因为是JPA环境,所以在创建EntityManagerFactory实例时,需要指定JPA的事务管理类型为JTA,综上,普通的业务默认数据源配置如下:
/** * @author: kl @kailing.pub * @date: 2020/5/18 */@Configuration@EnableConfigurationProperties({JpaProperties.class, DataSourceProperties.class})public class DataSourceConfiguration{ @Primary @Bean public DataSource dataSource(AtomikosXADataSourceWrapper wrapper, DataSourceProperties dataSourceProperties) throws Exception { MysqlXADataSource dataSource = dataSourceProperties.initializeDataSourceBuilder().type(MysqlXADataSource.class).build(); return wrapper.wrapDataSource(dataSource); } @Primary @Bean(initMethod = "afterPropertiesSet") public LocalContainerEntityManagerFactoryBean entityManagerFactory(JpaProperties jpaProperties, DataSource dataSource, EntityManagerFactoryBuilder factoryBuilder) { return factoryBuilder.dataSource(dataSource) .packages(Constants.BASE_PACKAGES) .properties(jpaProperties.getProperties()) .persistenceUnit("default") .jta(true) .build(); } @Bean @Primary public EntityManager entityManager(EntityManagerFactory entityManagerFactory){ //必须使用SharedEntityManagerCreator创建SharedEntityManager实例,否则SimpleJpaRepository中的事务不生效 return SharedEntityManagerCreator.createSharedEntityManager(entityManagerFactory); }}
sharding-jdbc加密数据源和普通业务数据源其实是同一个数据源,只是走加解密逻辑的数据源需要被sharding-jdbc的加密组件代理一层,加上了加解密的处理逻辑。所以配置如下:
/** * @author: kl @kailing.pub * @date: 2020/5/18 */@Configuration@EnableConfigurationProperties({JpaProperties.class,SpringBootEncryptRuleConfigurationProperties.class, SpringBootPropertiesConfigurationProperties.class})public class EncryptDataSourceConfiguration { @Bean public DataSource encryptDataSource(DataSource dataSource,SpringBootPropertiesConfigurationProperties props,SpringBootEncryptRuleConfigurationProperties encryptRule) throws SQLException { return EncryptDataSourceFactory.createDataSource(dataSource, new EncryptRuleConfigurationYamlSwapper().swap(encryptRule), props.getProps()); } @Bean(initMethod = "afterPropertiesSet") public LocalContainerEntityManagerFactoryBean encryptEntityManagerFactory(@Qualifier("encryptDataSource") DataSource dataSource,JpaProperties jpaProperties, EntityManagerFactoryBuilder factoryBuilder) throws SQLException { return factoryBuilder.dataSource(dataSource) .packages(Constants.BASE_PACKAGES) .properties(jpaProperties.getProperties()) .persistenceUnit("encryptPersistenceUnit") .jta(true) .build(); } @Bean public EntityManager encryptEntityManager(@Qualifier("encryptEntityManagerFactory") EntityManagerFactory entityManagerFactory){ //必须使用SharedEntityManagerCreator创建SharedEntityManager实例,否则SimpleJpaRepository中的事务不生效 return SharedEntityManagerCreator.createSharedEntityManager(entityManagerFactory); }}
遇到问题1、
Connection pool exhausted - try increasing 'maxPoolSize' and/or 'borrowConnectionTimeout' on the DataSourceBean.
解决问题:默认AtomikosXADataSourceWrapper包装器初始化的数据源连接池最大为1,所以需要添加配置参数如:
spring.jta.atomikos.datasource.max-pool-size=20
遇到问题2、
XAER_INVAL: Invalid arguments (or unsupported command)
解决问题:这个是mysql实现XA的bug,仅当您在同一事务中多次访问同一MySQL数据库时,才会发生此问题,在mysql连接url加上如下参数即可,如:
spring.datasource.url = jdbc:mysql://127.0.0.1:3306/xxx?pinGlobalTxToPhysicalConnection=true
Mysql XA事务行为
在这个场景中,虽然是多数据源,但是底层链接的是同一个mysql数据库,所以XA事务行为为,从第一个执行的sql开始(并不是JTA事务begin阶段),生成xid并XA START事务,然后XA END。第二个数据源的sql执行时会判断是否同一个mysql资源,如果是同一个则用刚生成的xid重新XA START RESUME,然后XA END,最终虽然在应用层是两个DataSource,其实最后只会调用XA COMMIT一次。mysql驱动实现的XAResource的start如下:
public void start(Xid xid, int flags) throws XAException { StringBuilder commandBuf = new StringBuilder(MAX_COMMAND_LENGTH); commandBuf.append("XA START "); appendXid(commandBuf, xid); switch (flags) { case TMJOIN: commandBuf.append(" JOIN"); break; case TMRESUME: commandBuf.append(" RESUME"); break; case TMNOFLAGS: // no-op break; default: throw new XAException(XAException.XAER_INVAL); } dispatchCommand(commandBuf.toString()); this.underlyingConnection.setInGlobalTx(true); }
第一次sql执行时,flags=0,走的TMNOFLAGS逻辑,第二次sql执行时,flags=134217728,走的TMRESUME,重新开启事务的逻辑。以上是Mysql XA的真实事务逻辑,但是博主研究下来发现,msyql xa并不支持XA START RESUME这种语句,而且有很多限制《Mysql XA交易限制》,所以在mysql数据库使用XA事务时,最好了解下mysql xa的缺陷
链式事务方案
链式事务不是我首创的叫法,在spring-data-common项目的Transaction包下,已经有一个默认实现ChainedTransactionManager,前文中《深入理解spring的@Transactional工作原理》已经分析了Spring的事务抽象,由PlatformTransactionManager(事务管理器)、TransactionStatus(事务状态)、TransactionDefinition(事务定义)等形态组成,ChainedTransactionManager也是实现了PlatformTransactionManager和TransactionStatus。实现原理也很简单,在ChainedTransactionManager内部维护了事务管理器的集合,通过代理编排真实的事务管理器,在事务开启、提交、回滚时,都分别操作集合里的事务。以达到对多个事务的统一管理。这个方案比较简陋,而且有缺陷,在提交阶段,如果异常不是发生在第一个数据源,那么会存在之前的提交不会回滚,所以在使用ChainedTransactionManager时,尽量把出问题可能性比较大的事务管理器放链的后面(开启事务、提交事务顺序相反)。这里只是抛出了一种新的多数据源事务管理的思路,能用XA尽量用XA管理。
普通的业务默认数据源配置如下:
/** * @author: kl @kailing.pub * @date: 2020/5/18 */@Configuration@EnableConfigurationProperties({JpaProperties.class, DataSourceProperties.class})public class DataSourceConfiguration{ @Primary @Bean public DataSource dataSource(DataSourceProperties dataSourceProperties){ return dataSourceProperties.initializeDataSourceBuilder().type(HikariDataSource.class).build(); } @Primary @Bean(initMethod = "afterPropertiesSet") public LocalContainerEntityManagerFactoryBean entityManagerFactory(JpaProperties jpaProperties, DataSource dataSource, EntityManagerFactoryBuilder factoryBuilder) { return factoryBuilder.dataSource(dataSource) .packages(Constants.BASE_PACKAGES) .properties(jpaProperties.getProperties()) .persistenceUnit("default") .build(); } @Bean @Primary public EntityManager entityManager(EntityManagerFactory entityManagerFactory){ //必须使用SharedEntityManagerCreator创建SharedEntityManager实例,否则SimpleJpaRepository中的事务不生效 return SharedEntityManagerCreator.createSharedEntityManager(entityManagerFactory); } @Primary @Bean public PlatformTransactionManager transactionManager(EntityManagerFactory entityManagerFactory){ JpaTransactionManager txManager = new JpaTransactionManager(); txManager.setEntityManagerFactory(entityManagerFactory); return txManager; }}
sharding-jdbc加密数据源配置如下:
/** * @author: kl @kailing.pub * @date: 2020/5/18 */@Configuration@EnableConfigurationProperties({JpaProperties.class,SpringBootEncryptRuleConfigurationProperties.class, SpringBootPropertiesConfigurationProperties.class})public class EncryptDataSourceConfiguration { @Bean public DataSource encryptDataSource(DataSource dataSource,SpringBootPropertiesConfigurationProperties props,SpringBootEncryptRuleConfigurationProperties encryptRule) throws SQLException { return EncryptDataSourceFactory.createDataSource(dataSource, new EncryptRuleConfigurationYamlSwapper().swap(encryptRule), props.getProps()); } @Bean(initMethod = "afterPropertiesSet") public LocalContainerEntityManagerFactoryBean encryptEntityManagerFactory(@Qualifier("encryptDataSource") DataSource dataSource,JpaProperties jpaProperties, EntityManagerFactoryBuilder factoryBuilder) throws SQLException { return factoryBuilder.dataSource(dataSource) .packages(Constants.BASE_PACKAGES) .properties(jpaProperties.getProperties()) .persistenceUnit("encryptPersistenceUnit") .build(); } @Bean public EntityManager encryptEntityManager(@Qualifier("encryptEntityManagerFactory") EntityManagerFactory entityManagerFactory){ //必须使用SharedEntityManagerCreator创建SharedEntityManager实例,否则SimpleJpaRepository中的事务不生效 return SharedEntityManagerCreator.createSharedEntityManager(entityManagerFactory); } @Bean public PlatformTransactionManager chainedTransactionManager(PlatformTransactionManager transactionManager) throws SQLException { JpaTransactionManager encryptTransactionManager = new JpaTransactionManager(); encryptTransactionManager.setEntityManagerFactory(encryptEntityManagerFactory()); //使用链式事务管理器包装真正的transactionManager、txManager事务 ChainedTransactionManager chainedTransactionManager = new ChainedTransactionManager(encryptTransactionManager,transactionManager); return chainedTransactionManager; }}
使用这种方案,在涉及到多数据源的业务时,需要指定使用哪个事务管理器,如:
@PersistenceContext(unitName = "encryptPersistenceUnit") private EntityManager entityManager; @PersistenceContext private EntityManager manager; @Transactional(transactionManager = "chainedTransactionManager") public AccountModel save(AccountDTO dto){ AccountModel accountModel = AccountMapper.INSTANCE.dtoTo(dto); entityManager.persist(accountModel); entityManager.flush(); AccountModel accountMode2 = AccountMapper.INSTANCE.dtoTo(dto); manager.persist(accountMode2); manager.flush(); return accountModel; }
结语
综上,对于JPA的多数据源分布式事务处理,JTA的事务管理器经过spring boot的封装已经可以开箱即用了。重点在JPA环境下,需要指定EntityManagerFactory的事务使用JTA事务。另本文分享了一种链式事务编排的方式也可以应用在这种场景,但是特殊的场景下不能保证事务的完整性,所以博主推荐使用JtaTransactionManager,有符合的场景也可以试试ChainedTransactionManager。
以上就是JPA多数据源分布式事务处理方案的详细内容,更多关于JPA多数据源分布式事务处理的资料请关注盛行IT其它相关文章!
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。