炸锅了,Java 多线程批量操作,居然有人不做事务控制?

举报
民工哥 发表于 2022/07/15 23:44:40 2022/07/15
【摘要】 点击下方“Java编程鸭”关注并标星 更多精彩 第一时间直达 来源:blog.csdn.net/qq273766764/article/ details/119972911 项目代码基于:MySql 数据,开发框架为:SpringBoot、Mybatis 开发语言为:Java8 项目代码:https://gite...

点击下方“Java编程鸭”关注并标星

更多精彩 第一时间直达

来源:blog.csdn.net/qq273766764/article/

details/119972911

项目代码基于:MySql 数据,开发框架为:SpringBoot、Mybatis

开发语言为:Java8

项目代码:https://gitee.com/john273766764/springboot-mybatis-threads

文章目录

  • 前言

  • 循环操作的代码

  • 使用手动事务的操作代码

  • 尝试多线程进行数据修改

  • 基于两个CountDownLatch控制多线程事务提交

  • 基于TransactionStatus集合来控制多线程事务提交

  • 使用union连接多个select实现批量update

  • 总结


前言

公司业务中遇到一个需求,需要同时修改最多约5万条数据,而且还不支持批量或异步修改操作。于是只能写个for循环操作,但操作耗时太长,只能一步一步寻找其他解决方案。

具体操作如下:

一、循环操作的代码

先写一个最简单的for循环代码,看看耗时情况怎么样。


   
  1. /***
  2.  * 一条一条依次对50000条数据进行更新操作
  3.  * 耗时:2m27s,1m54s
  4.  */
  5. @Test
  6. void updateStudent() {
  7.     List<Student> allStudents = studentMapper.getAll();
  8.     allStudents.forEach(s -> {
  9.         //更新教师信息
  10.         String teacher = s.getTeacher();
  11.         String newTeacher = "TNO_" + new Random().nextInt(100);
  12.         s.setTeacher(newTeacher);
  13.         studentMapper.update(s);
  14.     });
  15. }

循环修改整体耗时约 1分54秒,且代码中没有手动事务控制应该是自动事务提交,所以每次操作事务都会提交所以操作比较慢,我们先对代码中添加手动事务控制,看查询效率怎样。

二、使用手动事务的操作代码

修改后的代码如下:


   
  1. @Autowired
  2. private DataSourceTransactionManager dataSourceTransactionManager;
  3. @Autowired
  4. private TransactionDefinition transactionDefinition;
  5. /**
  6.  * 由于希望更新操作 一次性完成,需要手动控制添加事务
  7.  * 耗时:24s
  8.  * 从测试结果可以看出,添加事务后插入数据的效率有明显的提升
  9.  */
  10. @Test
  11. void updateStudentWithTrans() {
  12.     List<Student> allStudents = studentMapper.getAll();
  13.     TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
  14.     try {
  15.         allStudents.forEach(s -> {
  16.             //更新教师信息
  17.             String teacher = s.getTeacher();
  18.             String newTeacher = "TNO_" + new Random().nextInt(100);
  19.             s.setTeacher(newTeacher);
  20.             studentMapper.update(s);
  21.         });
  22.         dataSourceTransactionManager.commit(transactionStatus);
  23.     } catch (Throwable e) {
  24.         dataSourceTransactionManager.rollback(transactionStatus);
  25.         throw e;
  26.     }
  27. }

添加手动事务操控制后,整体耗时约 24秒,这相对于自动事务提交的代码,快了约5倍,对于大量循环数据库提交操作,添加手动事务可以有效提高操作效率。

三、尝试多线程进行数据修改

添加数据库手动事务后操作效率有明细提高,但还是比较长,接下来尝试多线程提交看是不是能够再快一些。

先添加一个Service将批量修改操作整合一下,具体代码如下:

StudentServiceImpl.java

   
  1. @Service
  2. public class StudentServiceImpl implements StudentService {
  3.     @Autowired
  4.     private StudentMapper studentMapper;
  5.  
  6.     @Autowired
  7.     private DataSourceTransactionManager dataSourceTransactionManager;
  8.  
  9.     @Autowired
  10.     private TransactionDefinition transactionDefinition;
  11.  
  12.     @Override
  13.     public void updateStudents(List<Student> students, CountDownLatch threadLatch) {
  14.         TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
  15.         System.out.println("子线程:" + Thread.currentThread().getName());
  16.         try {
  17.             students.forEach(s -> {
  18.                 // 更新教师信息
  19.                 // String teacher = s.getTeacher();
  20.                 String newTeacher = "TNO_" + new Random().nextInt(100);
  21.                 s.setTeacher(newTeacher);
  22.                 studentMapper.update(s);
  23.             });
  24.             dataSourceTransactionManager.commit(transactionStatus);
  25.             threadLatch.countDown();
  26.         } catch (Throwable e) {
  27.             e.printStackTrace();
  28.             dataSourceTransactionManager.rollback(transactionStatus);
  29.         }
  30.     }
  31. }

批量测试代码,我们采用了多线程进行提交,修改后测试代码如下:


   
  1. @Autowired
  2. private DataSourceTransactionManager dataSourceTransactionManager;
  3. @Autowired
  4. private TransactionDefinition transactionDefinition;
  5. @Autowired
  6. private StudentService studentService;
  7. /**
  8.  * 对用户而言,27s 任是一个较长的时间,我们尝试用多线程的方式来经行修改操作看能否加快处理速度
  9.  * 预计创建10个线程,每个线程进行5000条数据修改操作
  10.  * 耗时统计
  11.  * 1 线程数:1      耗时:25s
  12.  * 2 线程数:2      耗时:14s
  13.  * 3 线程数:5      耗时:15s
  14.  * 4 线程数:10     耗时:15s
  15.  * 5 线程数:100    耗时:15s
  16.  * 6 线程数:200    耗时:15s
  17.  * 7 线程数:500    耗时:17s
  18.  * 8 线程数:1000    耗时:19s
  19.  * 8 线程数:2000    耗时:23s
  20.  * 8 线程数:5000    耗时:29s
  21.  */
  22. @Test
  23. void updateStudentWithThreads() {
  24.     //查询总数据
  25.     List<Student> allStudents = studentMapper.getAll();
  26.     // 线程数量
  27.     final Integer threadCount = 100;
  28.     //每个线程处理的数据量
  29.     final Integer dataPartionLength = (allStudents.size() + threadCount - 1) / threadCount;
  30.     // 创建多线程处理任务
  31.     ExecutorService studentThreadPool = Executors.newFixedThreadPool(threadCount);
  32.     CountDownLatch threadLatchs = new CountDownLatch(threadCount);
  33.     for (int i = 0; i < threadCount; i++) {
  34.         // 每个线程处理的数据
  35.         List<Student> threadDatas = allStudents.stream()
  36.                 .skip(i * dataPartionLength).limit(dataPartionLength).collect(Collectors.toList());
  37.         studentThreadPool.execute(() -> {
  38.             studentService.updateStudents(threadDatas, threadLatchs);
  39.         });
  40.     }
  41.     try {
  42.         // 倒计时锁设置超时时间 30s
  43.         threadLatchs.await(30, TimeUnit.SECONDS);
  44.     } catch (Throwable e) {
  45.         e.printStackTrace();
  46.     }
  47.     System.out.println("主线程完成");
  48. }

多线程提交修改时,我们尝试了不同线程数对提交速度的影响,具体可以看下面表格,

f51cefdb0a1121695b8586decb7f0668.png 多线程修改50000条数据时 不同线程数耗时对比(秒)

根据表格,我们线程数增大提交速度并非一直增大,在当前情况下约在2-5个线程数时,提交速度最快(实际线程数还是需要根据服务器配置实际测试)。

四、基于两个CountDownLatch控制多线程事务提交

由于多线程提交时,每个线程事务时单独的,无法保证一致性,我们尝试给多线程添加事务控制,来保证每个线程都是在插入数据完成后在提交事务,

这里我们使用两个 CountDownLatch 来控制主线程与子线程事务提交,并设置了超时时间为 30 秒。我们对代码进行了一点修改:


   
  1. @Override
  2. public void updateStudentsThread(List<Student> students, CountDownLatch threadLatch, CountDownLatch mainLatch, StudentTaskError taskStatus) {
  3.     TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
  4.     System.out.println("子线程:" + Thread.currentThread().getName());
  5.     try {
  6.         students.forEach(s -> {
  7.             // 更新教师信息
  8.             // String teacher = s.getTeacher();
  9.             String newTeacher = "TNO_" + new Random().nextInt(100);
  10.             s.setTeacher(newTeacher);
  11.             studentMapper.update(s);
  12.         });
  13.     } catch (Throwable e) {
  14.         taskStatus.setIsError();
  15.     } finally {
  16.         threadLatch.countDown(); // 切换到主线程执行
  17.     }
  18.     try {
  19.         mainLatch.await();  //等待主线程执行
  20.     } catch (Throwable e) {
  21.         taskStatus.setIsError();
  22.     }
  23.     // 判断是否有错误,如有错误 就回滚事务
  24.     if (taskStatus.getIsError()) {
  25.         dataSourceTransactionManager.rollback(transactionStatus);
  26.     } else {
  27.         dataSourceTransactionManager.commit(transactionStatus);
  28.     }
  29. }

   
  1. /**
  2.  * 由于每个线程都是单独的事务,需要添加对线程事务的统一控制
  3.  * 我们这边使用两个 CountDownLatch 对子线程的事务进行控制
  4.  */
  5. @Test
  6. void updateStudentWithThreadsAndTrans() {
  7.     //查询总数据
  8.     List<Student> allStudents = studentMapper.getAll();
  9.     // 线程数量
  10.     final Integer threadCount = 4;
  11.     //每个线程处理的数据量
  12.     final Integer dataPartionLength = (allStudents.size() + threadCount - 1) / threadCount;
  13.     // 创建多线程处理任务
  14.     ExecutorService studentThreadPool = Executors.newFixedThreadPool(threadCount);
  15.     CountDownLatch threadLatchs = new CountDownLatch(threadCount); // 用于计算子线程提交数量
  16.     CountDownLatch mainLatch = new CountDownLatch(1); // 用于判断主线程是否提交
  17.     StudentTaskError taskStatus = new StudentTaskError(); // 用于判断子线程任务是否有错误
  18.     for (int i = 0; i < threadCount; i++) {
  19.         // 每个线程处理的数据
  20.         List<Student> threadDatas = allStudents.stream()
  21.                 .skip(i * dataPartionLength).limit(dataPartionLength)
  22.                 .collect(Collectors.toList());
  23.         studentThreadPool.execute(() -> {
  24.             studentService.updateStudentsThread(threadDatas, threadLatchs, mainLatch, taskStatus);
  25.         });
  26.     }
  27.     try {
  28.         // 倒计时锁设置超时时间 30s
  29.         boolean await = threadLatchs.await(30, TimeUnit.SECONDS);
  30.         if (!await) { // 等待超时,事务回滚
  31.             taskStatus.setIsError();
  32.         }
  33.     } catch (Throwable e) {
  34.         e.printStackTrace();
  35.         taskStatus.setIsError();
  36.     }
  37.     mainLatch.countDown(); // 切换到子线程执行
  38.     studentThreadPool.shutdown(); //关闭线程池
  39.     System.out.println("主线程完成");
  40. }

本想再次测试一下不同线程数对执行效率的影响时,发现当线程数超过10个时,执行时就报错。具体错误内容如下:


   
  1. Exception in thread "pool-1-thread-2" org.springframework.transaction.CannotCreateTransactionException: Could not open JDBC Connection for transaction; nested exception is java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30055ms.
  2.  at org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:309)
  3.  at org.springframework.transaction.support.AbstractPlatformTransactionManager.startTransaction(AbstractPlatformTransactionManager.java:400)
  4.  at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:373)
  5.  at com.example.springbootmybatis.service.Impl.StudentServiceImpl.updateStudentsThread(StudentServiceImpl.java:58)
  6.  at com.example.springbootmybatis.StudentTest.lambda$updateStudentWithThreadsAndTrans$3(StudentTest.java:164)
  7.  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  8.  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  9.  at java.lang.Thread.run(Thread.java:748)
  10. Caused by: java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30055ms.
  11.  at com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:696)
  12.  at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:197)
  13.  at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:162)
  14.  at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:128)
  15.  at org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:265)
  16.  ... 7 more

错误的大致意思时,不能为数据库事务打开 jdbc Connection,连接在30s的时候超时了。由于前面启动的十个线程需要等待主线程完成后才能提交,所以一直占用连接未释放,造成后面的进程创建连接超时。

看错误日志中错误的来源是 HikariPool ,我们来重新配置一下这个连接池的参数,将最大连接数修改为100,具体配置如下:


   
  1. # 连接池中允许的最小连接数。缺省值:10
  2. spring.datasource.hikari.minimum-idle=10
  3. # 连接池中允许的最大连接数。缺省值:10
  4. spring.datasource.hikari.maximum-pool-size=100
  5. # 自动提交
  6. spring.datasource.hikari.auto-commit=true
  7. # 一个连接idle状态的最大时长(毫秒),超时则被释放(retired),缺省:10分钟
  8. spring.datasource.hikari.idle-timeout=30000
  9. # 一个连接的生命时长(毫秒),超时而且没被使用则被释放(retired),缺省:30分钟,建议设置比数据库超时时长少30
  10. spring.datasource.hikari.max-lifetime=1800000
  11. # 等待连接池分配连接的最大时长(毫秒),超过这个时长还没可用的连接则发生SQLException, 缺省:30

再次执行测试发现没有报错,修改线程数为20又执行了一下,同样执行成功了。

五、基于TransactionStatus集合来控制多线程事务提交

在同事推荐下我们使用事务集合来进行多线程事务控制,主要代码如下


   
  1. @Service
  2. public class StudentsTransactionThread {
  3.  
  4.     @Autowired
  5.     private StudentMapper studentMapper;
  6.     @Autowired
  7.     private StudentService studentService;
  8.     @Autowired
  9.     private PlatformTransactionManager transactionManager;
  10.  
  11.     List<TransactionStatus> transactionStatuses = Collections.synchronizedList(new ArrayList<TransactionStatus>());
  12.  
  13.     @Transactional(propagation = Propagation.REQUIRED, rollbackFor = {Exception.class})
  14.     public void updateStudentWithThreadsAndTrans() throws InterruptedException {
  15.  
  16.         //查询总数据
  17.         List<Student> allStudents = studentMapper.getAll();
  18.  
  19.         // 线程数量
  20.         final Integer threadCount = 2;
  21.  
  22.         //每个线程处理的数据量
  23.         final Integer dataPartionLength = (allStudents.size() + threadCount - 1) / threadCount;
  24.  
  25.         // 创建多线程处理任务
  26.         ExecutorService studentThreadPool = Executors.newFixedThreadPool(threadCount);
  27.         CountDownLatch threadLatchs = new CountDownLatch(threadCount);
  28.         AtomicBoolean isError = new AtomicBoolean(false);
  29.         try {
  30.             for (int i = 0; i < threadCount; i++) {
  31.                 // 每个线程处理的数据
  32.                 List<Student> threadDatas = allStudents.stream()
  33.                         .skip(i * dataPartionLength).limit(dataPartionLength).collect(Collectors.toList());
  34.                 studentThreadPool.execute(() -> {
  35.                     try {
  36.                         try {
  37.                             studentService.updateStudentsTransaction(transactionManager, transactionStatuses, threadDatas);
  38.                         } catch (Throwable e) {
  39.                             e.printStackTrace();
  40.                             isError.set(true);
  41.                         }finally {
  42.                             threadLatchs.countDown();
  43.                         }
  44.                     } catch (Exception e) {
  45.                         e.printStackTrace();
  46.                         isError.set(true);
  47.                     }
  48.                 });
  49.             }
  50.  
  51.             // 倒计时锁设置超时时间 30s
  52.             boolean await = threadLatchs.await(30, TimeUnit.SECONDS);
  53.             // 判断是否超时
  54.             if (!await) {
  55.                 isError.set(true);
  56.             }
  57.         } catch (Throwable e) {
  58.             e.printStackTrace();
  59.             isError.set(true);
  60.         }
  61.  
  62.         if (!transactionStatuses.isEmpty()) {
  63.             if (isError.get()) {
  64.                 transactionStatuses.forEach(s -> transactionManager.rollback(s));
  65.             } else {
  66.                 transactionStatuses.forEach(s -> transactionManager.commit(s));
  67.             }
  68.         }
  69.  
  70.         System.out.println("主线程完成");
  71.     }
  72. }

   
  1. @Override
  2. @Transactional(propagation = Propagation.REQUIRED, rollbackFor = {Exception.class})
  3. public void updateStudentsTransaction(PlatformTransactionManager transactionManager, List<TransactionStatus> transactionStatuses, List<Student> students) {
  4.     // 使用这种方式将事务状态都放在同一个事务里面
  5.     DefaultTransactionDefinition def = new DefaultTransactionDefinition();
  6.     def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); // 事物隔离级别,开启新事务,这样会比较安全些。
  7.     TransactionStatus status = transactionManager.getTransaction(def); // 获得事务状态
  8.     transactionStatuses.add(status);
  9.     students.forEach(s -> {
  10.         // 更新教师信息
  11.         // String teacher = s.getTeacher();
  12.         String newTeacher = "TNO_" + new Random().nextInt(100);
  13.         s.setTeacher(newTeacher);
  14.         studentMapper.update(s);
  15.     });
  16.     System.out.println("子线程:" + Thread.currentThread().getName());
  17. }

由于这个中方式去前面方式相同,需要等待线程执行完成后才会提交事务,所有任会占用Jdbc连接池,如果线程数量超过连接池最大数量会产生连接超时。所以在使用过程中任要控制线程数量,

六、使用union连接多个select实现批量update

有些情况写不支持,批量update,但支持insert 多条数据,这个时候可尝试将需要更新的数据拼接成多条select 语句,然后使用union 连接起来,再使用update 关联这个数据进行update,具体代码演示如下:


   
  1. update student,(
  2.  (select  1 as id,'teacher_A' as teacher) union
  3.  (select  2 as id,'teacher_A' as teacher) union
  4.  (select  3 as id,'teacher_A' as teacher) union
  5.  (select  4 as id,'teacher_A' as teacher)
  6.     /* ....more data ... */
  7.     ) as new_teacher
  8. set
  9.  student.teacher=new_teacher.teacher
  10. where
  11.  student.id=new_teacher.id

这种方式在Mysql 数据库没有配置 allowMultiQueries=true 也可以实现批量更新。

总结

  • 对于大批量数据库操作,使用手动事务提交可以很多程度上提高操作效率

  • 多线程对数据库进行操作时,并非线程数越多操作时间越快,按上述示例大约在2-5个线程时操作时间最快。

  • 对于多线程阻塞事务提交时,线程数量不能过多。

  • 如果能有办法实现批量更新那是最好

END


   
  1. 看完本文有收获?请转发分享给更多人
  2. 关注「Java编程鸭」,提升Java技能
  3. 关注Java编程鸭微信公众号,后台回复:码农大礼包 可以获取最新整理的技术资料一份。涵盖Java 框架学习、架构师学习等!
  4. 文章有帮助的话,在看,转发吧。
  5. 谢谢支持哟 (*^__^*)

文章来源: mingongge.blog.csdn.net,作者:民工哥,版权归原作者所有,如需转载,请联系作者。

原文链接:mingongge.blog.csdn.net/article/details/125795491

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。