JAVA 多线程同步工具类总结

举报
ShaderJoy 发表于 2021/12/31 23:43:58 2021/12/31
【摘要】 CountDownLatch 闭锁:可以延迟线程的进度,直到锁到达终止状态。闭锁的作用相当于一扇门,在锁到达终止状态之前这扇门一直是关闭的。当锁到达终止状态时,允许所有线程通 过。CountDownLatch 有一个初始值,通过调用 countDown 可以减少该值,一直到 0 时到达终止状态。   FutureTask 用于执行一个...

CountDownLatch 闭锁:可以延迟线程的进度,直到锁到达终止状态。闭锁的作用相当于一扇门,在锁到达终止状态之前这扇门一直是关闭的。当锁到达终止状态时,允许所有线程通 过。CountDownLatch 有一个初始值,通过调用 countDown 可以减少该值,一直到 0 时到达终止状态。

  FutureTask 用于执行一个可返回结果的长任务,任务在单独的线程中执行,其他线程可以用 get 方法取任务结果,如果任务尚未完成,线程在 get 上阻塞。

  Semaphore 用于 控制 同时访问某资源 ,或 同时执行某操作的线程数目 。信号量有一个初始值即可以分配的信号量总数目。线程任务开始前先调用 acquire 取得信号量,任务结束后调用 release 释放信号量。在 acquire 是如果没有可用信号量,线程将阻塞在 acquire 上,直到其他线程释放一个信号量。

  CyclicBarrier 栅栏用于多个线程多次迭代时进行同步,在一轮任务中,任何线程完成任务后都在 barrier 上等待,直到所有其他线程也完成任务,然后一起释放,同时进入下一轮迭代。

CountDownLatch 的例子


  
  1. import java.util.concurrent.CountDownLatch;
  2. import java.util.concurrent.atomic.AtomicInteger;
  3. public class DemoOfLatch
  4. {
  5. // 利用闭锁 CountDownLatch 控制主线程和子线程的同步
  6. public static void main(String[] args)
  7. {
  8. int numberOfThread = 5;
  9. final CountDownLatch startLatch = new CountDownLatch(1); // 用于控制子线程开始
  10. final CountDownLatch stopLatch = new CountDownLatch(numberOfThread); // 用于子线程计数
  11. final AtomicInteger count = new AtomicInteger(0); // 用于分配子线程唯一标识
  12. System.out.println("Main thread start…");
  13. for (int i = 0; i < numberOfThread; i++)
  14. {
  15. Thread thread = new Thread(new Runnable()
  16. {
  17. @Override
  18. public void run()
  19. {
  20. int tid = count.getAndIncrement();
  21. try
  22. {
  23. // 等代主线程打开启动信号
  24. startLatch.await();
  25. System.out.printf("Thread %d started…%n", tid);
  26. int duration = (int) (Math.random() * 5000);
  27. Thread.sleep(duration);
  28. } catch (InterruptedException e)
  29. {
  30. e.printStackTrace();
  31. Thread.currentThread().interrupt();
  32. } finally
  33. {
  34. System.out.printf("Thread %d stoped…%n", tid);
  35. // 线程终止前减少线程计数
  36. stopLatch.countDown();
  37. }
  38. }
  39. });
  40. thread.start();
  41. }
  42. // 在放行子线程之前做点什么别的事情
  43. System.out
  44. .println("Main thread do preparation work for child threads…");
  45. try
  46. {
  47. Thread.sleep(2000);
  48. } catch (InterruptedException e)
  49. {
  50. e.printStackTrace();
  51. }
  52. // 打开闭锁放行所有子线程
  53. System.out.println("Main thread let child threads go…");
  54. startLatch.countDown();
  55. try
  56. {
  57. // 等待子线程计数降为 0 即所有子线程执行完毕
  58. System.out.println("Main thread wait for all child threads…");
  59. stopLatch.await();
  60. } catch (InterruptedException e)
  61. {
  62. e.printStackTrace();
  63. }
  64. System.out.println("Main thread exit…");
  65. }
  66. }
FutureTask 的例子


  
  1. import java.util.concurrent.Callable;
  2. import java.util.concurrent.ExecutionException;
  3. import java.util.concurrent.FutureTask;
  4. public class DemoOfFutureTask
  5. {
  6. public static void main(String[] args)
  7. {
  8. // 创建一个 Future Task 用于并发执行长任务
  9. final FutureTask<Movie> future = new FutureTask<Movie>(
  10. new Callable<Movie>()
  11. {
  12. @Override
  13. public Movie call() throws Exception
  14. {
  15. System.out.println("Future task started…");
  16. Thread.sleep(5000);
  17. System.out.println("Future task finished…");
  18. return new Movie("2012", "Unknown");
  19. }
  20. });
  21. // 在子线程中启动任务
  22. Thread thread = new Thread(future);
  23. thread.start();
  24. // 主线程干点别的事情
  25. System.out.println("Now let's do sth eles…");
  26. try
  27. {
  28. Thread.sleep(1000);
  29. } catch (InterruptedException e1)
  30. {
  31. e1.printStackTrace();
  32. }
  33. // 主线程开始取结果
  34. System.out.println("Now wait for result of future task…");
  35. try
  36. {
  37. Movie res = future.get();
  38. System.out.printf("Result from task is name=%s, actor=%s",
  39. res.name, res.actor);
  40. } catch (InterruptedException e)
  41. {
  42. e.printStackTrace();
  43. } catch (ExecutionException e)
  44. {
  45. e.printStackTrace();
  46. }
  47. }
  48. public static class Movie
  49. {
  50. final public String name;
  51. final public String actor;
  52. public Movie(String name, String actor)
  53. {
  54. this.name = name;
  55. this.actor = actor;
  56. }
  57. }
  58. }

Semaphore 的例子:


  
  1. import java.util.concurrent.Semaphore;
  2. import java.util.concurrent.atomic.AtomicInteger;
  3. public class DemoOfSemaphore
  4. {
  5. /**
  6. * @param args
  7. */
  8. public static void main(String[] args)
  9. {
  10. final int numOfThread = 5;
  11. final AtomicInteger count = new AtomicInteger(0); // 用于分配唯一线程标识
  12. final Semaphore semaphore = new Semaphore(numOfThread); // 用于控制并发线程数目
  13. for (int i = 0; i < 10; i++)
  14. {
  15. Thread thread = new Thread(new Runnable()
  16. {
  17. @Override
  18. public void run()
  19. {
  20. int tid = count.getAndIncrement();
  21. try
  22. {// 等待直到取得信号量
  23. System.out
  24. .printf("Thread %d wait on semaphore…%n", tid);
  25. semaphore.acquire();
  26. // 取得信号量之后做点事情
  27. System.out.printf("Thread %d get semaphore…%n", tid);
  28. int duration = (int) (Math.random() * 5000);
  29. Thread.sleep(duration);
  30. } catch (InterruptedException e)
  31. {
  32. e.printStackTrace();
  33. } finally
  34. {
  35. // 做完后释放信号量
  36. System.out
  37. .printf("Thread %d release semaphore…%n", tid);
  38. semaphore.release();
  39. }
  40. }
  41. });
  42. thread.start();
  43. }
  44. }
  45. }

CyclicBarrier 的例子:


  
  1. import java.util.concurrent.BrokenBarrierException;
  2. import java.util.concurrent.CyclicBarrier;
  3. public class DemoOfBarrier
  4. {
  5. public static void main(String[] args)
  6. {
  7. final int numOfThread = 2;
  8. final int numOfIteration = 2;
  9. // 创建一个用于线程同步的 Barrier 对象
  10. final CyclicBarrier barrier = new CyclicBarrier(numOfThread,
  11. new Runnable()
  12. {
  13. // 当所有线程到达 Barrier 后会执行这个任务
  14. // 任务在第一个 到达 Barrier 的线程中执行
  15. @Override
  16. public void run()
  17. {
  18. long tid = Thread.currentThread().getId();
  19. // 当所有线程完成一轮迭代之后做点清除/准备/提交工作
  20. System.out.printf(
  21. "[%d] - All threads arrived barrier…%n", tid);
  22. try
  23. {
  24. Thread.sleep(2000);
  25. } catch (InterruptedException e)
  26. {
  27. e.printStackTrace();
  28. }
  29. System.out.printf("[%d] - Clear work done…%n", tid);
  30. }
  31. });
  32. // 创建并启动多个线程,他们在 Barrier 上同步
  33. for (int i = 0; i < numOfThread; i++)
  34. {
  35. Thread thread = new Thread(new Runnable()
  36. {
  37. @Override
  38. public void run()
  39. {
  40. long tid = Thread.currentThread().getId();
  41. for (int k = 0; k < numOfIteration; k++)
  42. {
  43. try
  44. {
  45. // 线程进行一轮迭代,做点事情
  46. System.out.printf("Thread %d start its work…%n",
  47. tid);
  48. long duration = (int) (Math.random() * 5000);
  49. Thread.sleep(duration);
  50. // 做完迭代后等待其他线程完成迭代
  51. System.out.printf("Thread %d wait on barrier…%n",
  52. tid);
  53. int num = barrier.await();
  54. // 显示完成的顺序
  55. System.out.printf(
  56. "Thread %d pass barrier with order=%d…%n",
  57. tid, num);
  58. } catch (InterruptedException e)
  59. {
  60. e.printStackTrace();
  61. Thread.currentThread().interrupt();
  62. } catch (BrokenBarrierException e)
  63. {
  64. e.printStackTrace();
  65. }
  66. }
  67. }
  68. });
  69. thread.start();
  70. }
  71. }
  72. }






文章来源: panda1234lee.blog.csdn.net,作者:panda1234lee,版权归原作者所有,如需转载,请联系作者。

原文链接:panda1234lee.blog.csdn.net/article/details/8535238

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。