面试官:Java 多线程怎么做事务控制?一半人答不上来。。(多线程事务如何控制)

  本篇文章为你整理了面试官:Java 多线程怎么做事务控制?一半人答不上来。。(多线程事务如何控制)的详细内容,包含有java多线程项目实战 多线程事务如何控制 java多线程经典案例 java多线程业务场景 面试官:Java 多线程怎么做事务控制?一半人答不上来。。,希望能帮助你了解 面试官:Java 多线程怎么做事务控制?一半人答不上来。。。

  分享Java技术,高并发编程,分布式技术,架构设计,Java面试题,算法,行业动态,程序人生等。

  
项目代码基于:MySql 数据,开发框架为:SpringBoot、Mybatis

  开发语言为:Java8

  公司业务中遇到一个需求,需要同时修改最多约5万条数据,而且还不支持批量或异步修改操作。于是只能写个for循环操作,但操作耗时太长,只能一步一步寻找其他解决方案。

  具体操作如下:

  一、循环操作的代码

  先写一个最简单的for循环代码,看看耗时情况怎么样。

  

/***

 

   * 一条一条依次对50000条数据进行更新操作

   * 耗时:2m27s,1m54s

  @Test

  void updateStudent() {

   List Student allStudents = studentMapper.getAll();

   allStudents.forEach(s - {

   //更新教师信息

   String teacher = s.getTeacher();

   String newTeacher = "TNO_" + new Random().nextInt(100);

   s.setTeacher(newTeacher);

   studentMapper.update(s);

  

 

  循环修改整体耗时约 1分54秒,且代码中没有手动事务控制应该是自动事务提交,所以每次操作事务都会提交所以操作比较慢,我们先对代码中添加手动事务控制,看查询效率怎样。

  最新面试题整理:https://www.javastack.cn/mst/

  二、使用手动事务的操作代码

  修改后的代码如下:

  

@Autowired

 

  private DataSourceTransactionManager dataSourceTransactionManager;

  @Autowired

  private TransactionDefinition transactionDefinition;

   * 由于希望更新操作 一次性完成,需要手动控制添加事务

   * 耗时:24s

   * 从测试结果可以看出,添加事务后插入数据的效率有明显的提升

  @Test

  void updateStudentWithTrans() {

   List Student allStudents = studentMapper.getAll();

   TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);

   try {

   allStudents.forEach(s - {

   //更新教师信息

   String teacher = s.getTeacher();

   String newTeacher = "TNO_" + new Random().nextInt(100);

   s.setTeacher(newTeacher);

   studentMapper.update(s);

   dataSourceTransactionManager.commit(transactionStatus);

   } catch (Throwable e) {

   dataSourceTransactionManager.rollback(transactionStatus);

   throw e;

  

 

  添加手动事务操控制后,整体耗时约 24秒,这相对于自动事务提交的代码,快了约5倍,对于大量循环数据库提交操作,添加手动事务可以有效提高操作效率。

  三、尝试多线程进行数据修改

  添加数据库手动事务后操作效率有明细提高,但还是比较长,接下来尝试多线程提交看是不是能够再快一些。

  先添加一个Service将批量修改操作整合一下,具体代码如下:

  StudentServiceImpl.java

  

@Service

 

  public class StudentServiceImpl implements StudentService {

   @Autowired

   private StudentMapper studentMapper;

   @Autowired

   private DataSourceTransactionManager dataSourceTransactionManager;

   @Autowired

   private TransactionDefinition transactionDefinition;

   @Override

   public void updateStudents(List Student students, CountDownLatch threadLatch) {

   TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);

   System.out.println("子线程:" + Thread.currentThread().getName());

   try {

   students.forEach(s - {

   // 更新教师信息

   // String teacher = s.getTeacher();

   String newTeacher = "TNO_" + new Random().nextInt(100);

   s.setTeacher(newTeacher);

   studentMapper.update(s);

   dataSourceTransactionManager.commit(transactionStatus);

   threadLatch.countDown();

   } catch (Throwable e) {

   e.printStackTrace();

   dataSourceTransactionManager.rollback(transactionStatus);

  

 

  批量测试代码,我们采用了多线程进行提交,修改后测试代码如下:

  

@Autowired

 

  private DataSourceTransactionManager dataSourceTransactionManager;

  @Autowired

  private TransactionDefinition transactionDefinition;

  @Autowired

  private StudentService studentService;

   * 对用户而言,27s 任是一个较长的时间,我们尝试用多线程的方式来经行修改操作看能否加快处理速度

   * 预计创建10个线程,每个线程进行5000条数据修改操作

   * 耗时统计

   * 1 线程数:1 耗时:25s

   * 2 线程数:2 耗时:14s

   * 3 线程数:5 耗时:15s

   * 4 线程数:10 耗时:15s

   * 5 线程数:100 耗时:15s

   * 6 线程数:200 耗时:15s

   * 7 线程数:500 耗时:17s

   * 8 线程数:1000 耗时:19s

   * 8 线程数:2000 耗时:23s

   * 8 线程数:5000 耗时:29s

  @Test

  void updateStudentWithThreads() {

   //查询总数据

   List Student allStudents = studentMapper.getAll();

   // 线程数量

   final Integer threadCount = 100;

   //每个线程处理的数据量

   final Integer dataPartionLength = (allStudents.size() + threadCount - 1) / threadCount;

   // 创建多线程处理任务

   ExecutorService studentThreadPool = Executors.newFixedThreadPool(threadCount);

   CountDownLatch threadLatchs = new CountDownLatch(threadCount);

   for (int i = 0; i threadCount; i++) {

   // 每个线程处理的数据

   List Student threadDatas = allStudents.stream()

   .skip(i * dataPartionLength).limit(dataPartionLength).collect(Collectors.toList());

   studentThreadPool.execute(() - {

   studentService.updateStudents(threadDatas, threadLatchs);

   try {

   // 倒计时锁设置超时时间 30s

   threadLatchs.await(30, TimeUnit.SECONDS);

   } catch (Throwable e) {

   e.printStackTrace();

   System.out.println("主线程完成");

  

 

  多线程提交修改时,我们尝试了不同线程数对提交速度的影响,具体可以看下面表格,

  多线程修改50000条数据时 不同线程数耗时对比(秒)

  根据表格,我们线程数增大提交速度并非一直增大,在当前情况下约在2-5个线程数时,提交速度最快(实际线程数还是需要根据服务器配置实际测试)。

  另外,MySQL 系列面试题和答案全部整理好了,微信搜索Java技术栈,在后台发送:面试,可以在线阅读。

  四、基于两个CountDownLatch控制多线程事务提交

  由于多线程提交时,每个线程事务时单独的,无法保证一致性,我们尝试给多线程添加事务控制,来保证每个线程都是在插入数据完成后在提交事务,

  这里我们使用两个 CountDownLatch 来控制主线程与子线程事务提交,并设置了超时时间为 30 秒。我们对代码进行了一点修改:

  

@Override

 

  public void updateStudentsThread(List Student students, CountDownLatch threadLatch, CountDownLatch mainLatch, StudentTaskError taskStatus) {

   TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);

   System.out.println("子线程:" + Thread.currentThread().getName());

   try {

   students.forEach(s - {

   // 更新教师信息

   // String teacher = s.getTeacher();

   String newTeacher = "TNO_" + new Random().nextInt(100);

   s.setTeacher(newTeacher);

   studentMapper.update(s);

   } catch (Throwable e) {

   taskStatus.setIsError();

   } finally {

   threadLatch.countDown(); // 切换到主线程执行

   try {

   mainLatch.await(); //等待主线程执行

   } catch (Throwable e) {

   taskStatus.setIsError();

   // 判断是否有错误,如有错误 就回滚事务

   if (taskStatus.getIsError()) {

   dataSourceTransactionManager.rollback(transactionStatus);

   } else {

   dataSourceTransactionManager.commit(transactionStatus);

   * 由于每个线程都是单独的事务,需要添加对线程事务的统一控制

   * 我们这边使用两个 CountDownLatch 对子线程的事务进行控制

  @Test

  void updateStudentWithThreadsAndTrans() {

   //查询总数据

   List Student allStudents = studentMapper.getAll();

   // 线程数量

   final Integer threadCount = 4;

   //每个线程处理的数据量

   final Integer dataPartionLength = (allStudents.size() + threadCount - 1) / threadCount;

   // 创建多线程处理任务

   ExecutorService studentThreadPool = Executors.newFixedThreadPool(threadCount);

   CountDownLatch threadLatchs = new CountDownLatch(threadCount); // 用于计算子线程提交数量

   CountDownLatch mainLatch = new CountDownLatch(1); // 用于判断主线程是否提交

   StudentTaskError taskStatus = new StudentTaskError(); // 用于判断子线程任务是否有错误

   for (int i = 0; i threadCount; i++) {

   // 每个线程处理的数据

   List Student threadDatas = allStudents.stream()

   .skip(i * dataPartionLength).limit(dataPartionLength)

   .collect(Collectors.toList());

   studentThreadPool.execute(() - {

   studentService.updateStudentsThread(threadDatas, threadLatchs, mainLatch, taskStatus);

   try {

   // 倒计时锁设置超时时间 30s

   boolean await = threadLatchs.await(30, TimeUnit.SECONDS);

   if (!await) { // 等待超时,事务回滚

   taskStatus.setIsError();

   } catch (Throwable e) {

   e.printStackTrace();

   taskStatus.setIsError();

   mainLatch.countDown(); // 切换到子线程执行

   studentThreadPool.shutdown(); //关闭线程池

   System.out.println("主线程完成");

  

 

  本想再次测试一下不同线程数对执行效率的影响时,发现当线程数超过10个时,执行时就报错。具体错误内容如下:

  

Exception in thread "pool-1-thread-2" org.springframework.transaction.CannotCreateTransactionException: Could not open JDBC Connection for transaction; nested exception is java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30055ms.

 

   at org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:309)

   at org.springframework.transaction.support.AbstractPlatformTransactionManager.startTransaction(AbstractPlatformTransactionManager.java:400)

   at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:373)

   at com.example.springbootmybatis.service.Impl.StudentServiceImpl.updateStudentsThread(StudentServiceImpl.java:58)

   at com.example.springbootmybatis.StudentTest.lambda$updateStudentWithThreadsAndTrans$3(StudentTest.java:164)

   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

   at java.lang.Thread.run(Thread.java:748)

  Caused by: java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30055ms.

   at com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:696)

   at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:197)

   at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:162)

   at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:128)

   at org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:265)

   ... 7 more

  

 

  错误的大致意思时,不能为数据库事务打开 jdbc Connection,连接在30s的时候超时了。由于前面启动的十个线程需要等待主线程完成后才能提交,所以一直占用连接未释放,造成后面的进程创建连接超时。

  看错误日志中错误的来源是 HikariPool ,我们来重新配置一下这个连接池的参数,将最大连接数修改为100,具体配置如下:

  

# 连接池中允许的最小连接数。缺省值:10

 

  spring.datasource.hikari.minimum-idle=10

  # 连接池中允许的最大连接数。缺省值:10

  spring.datasource.hikari.maximum-pool-size=100

  # 自动提交

  spring.datasource.hikari.auto-commit=true

  # 一个连接idle状态的最大时长(毫秒),超时则被释放(retired),缺省:10分钟

  spring.datasource.hikari.idle-timeout=30000

  # 一个连接的生命时长(毫秒),超时而且没被使用则被释放(retired),缺省:30分钟,建议设置比数据库超时时长少30秒

  spring.datasource.hikari.max-lifetime=1800000

  # 等待连接池分配连接的最大时长(毫秒),超过这个时长还没可用的连接则发生SQLException, 缺省:30秒

  

 

  再次执行测试发现没有报错,修改线程数为20又执行了一下,同样执行成功了。另外,关注公众号Java技术栈,在后台回复:面试,可以获取我整理的 Java 系列面试题和答案,非常齐全。

  五、基于TransactionStatus集合来控制多线程事务提交

  在同事推荐下我们使用事务集合来进行多线程事务控制,主要代码如下

  

@Service

 

  public class StudentsTransactionThread {

   @Autowired

   private StudentMapper studentMapper;

   @Autowired

   private StudentService studentService;

   @Autowired

   private PlatformTransactionManager transactionManager;

   List TransactionStatus transactionStatuses = Collections.synchronizedList(new ArrayList TransactionStatus

   @Transactional(propagation = Propagation.REQUIRED, rollbackFor = {Exception.class})

   public void updateStudentWithThreadsAndTrans() throws InterruptedException {

   //查询总数据

   List Student allStudents = studentMapper.getAll();

   // 线程数量

   final Integer threadCount = 2;

   //每个线程处理的数据量

   final Integer dataPartionLength = (allStudents.size() + threadCount - 1) / threadCount;

   // 创建多线程处理任务

   ExecutorService studentThreadPool = Executors.newFixedThreadPool(threadCount);

   CountDownLatch threadLatchs = new CountDownLatch(threadCount);

   AtomicBoolean isError = new AtomicBoolean(false);

   try {

   for (int i = 0; i threadCount; i++) {

   // 每个线程处理的数据

   List Student threadDatas = allStudents.stream()

   .skip(i * dataPartionLength).limit(dataPartionLength).collect(Collectors.toList());

   studentThreadPool.execute(() - {

   try {

   try {

   studentService.updateStudentsTransaction(transactionManager, transactionStatuses, threadDatas);

   } catch (Throwable e) {

   e.printStackTrace();

   isError.set(true);

   }finally {

   threadLatchs.countDown();

   } catch (Exception e) {

   e.printStackTrace();

   isError.set(true);

   // 倒计时锁设置超时时间 30s

   boolean await = threadLatchs.await(30, TimeUnit.SECONDS);

   // 判断是否超时

   if (!await) {

   isError.set(true);

   } catch (Throwable e) {

   e.printStackTrace();

   isError.set(true);

   if (!transactionStatuses.isEmpty()) {

   if (isError.get()) {

   transactionStatuses.forEach(s - transactionManager.rollback(s));

   } else {

   transactionStatuses.forEach(s - transactionManager.commit(s));

   System.out.println("主线程完成");

  @Override

  @Transactional(propagation = Propagation.REQUIRED, rollbackFor = {Exception.class})

  public void updateStudentsTransaction(PlatformTransactionManager transactionManager, List TransactionStatus transactionStatuses, List Student students) {

   // 使用这种方式将事务状态都放在同一个事务里面

   DefaultTransactionDefinition def = new DefaultTransactionDefinition();

   def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); // 事物隔离级别,开启新事务,这样会比较安全些。

   TransactionStatus status = transactionManager.getTransaction(def); // 获得事务状态

   transactionStatuses.add(status);

   students.forEach(s - {

   // 更新教师信息

   // String teacher = s.getTeacher();

   String newTeacher = "TNO_" + new Random().nextInt(100);

   s.setTeacher(newTeacher);

   studentMapper.update(s);

   System.out.println("子线程:" + Thread.currentThread().getName());

  

 

  由于这个中方式去前面方式相同,需要等待线程执行完成后才会提交事务,所有任会占用Jdbc连接池,如果线程数量超过连接池最大数量会产生连接超时。所以在使用过程中任要控制线程数量,

  六、使用union连接多个select实现批量update

  有些情况写不支持,批量update,但支持insert 多条数据,这个时候可尝试将需要更新的数据拼接成多条select 语句,然后使用union 连接起来,再使用update 关联这个数据进行update,具体代码演示如下:

  

update student,(

 

   (select 1 as id,teacher_A as teacher) union

   (select 2 as id,teacher_A as teacher) union

   (select 3 as id,teacher_A as teacher) union

   (select 4 as id,teacher_A as teacher)

   /* ....more data ... */

   ) as new_teacher

   student.teacher=new_teacher.teacher

  where

   student.id=new_teacher.id

  

 

  这种方式在Mysql 数据库没有配置 allowMultiQueries=true 也可以实现批量更新。

  对于大批量数据库操作,使用手动事务提交可以很多程度上提高操作效率

  多线程对数据库进行操作时,并非线程数越多操作时间越快,按上述示例大约在2-5个线程时操作时间最快。

  对于多线程阻塞事务提交时,线程数量不能过多。

  如果能有办法实现批量更新那是最好

  以上就是面试官:Java 多线程怎么做事务控制?一半人答不上来。。(多线程事务如何控制)的详细内容,想要了解更多 面试官:Java 多线程怎么做事务控制?一半人答不上来。。的内容,请持续关注盛行IT软件开发工作室。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: