同步模式之保护性暂停

举报
yd_249383650 发表于 2023/06/28 15:43:19 2023/06/28
【摘要】 ​ 目录定义基本实现 带超时版 GuardedObject多任务版 GuardedObject总结定义即 Guarded Suspension,用在一个线程等待另一个线程的执行结果要点有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)JDK 中,join 的实现、Future 的实...

 


目录

定义

基本实现

 带超时版 GuardedObject

多任务版 GuardedObject

总结



定义

即 Guarded Suspension,用在一个线程等待另一个线程的执行结果

要点

  • 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
  • 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
  • JDK 中,join 的实现、Future 的实现,采用的就是此模式
  • 因为要等待另一方的结果,因此归类到同步模式

编辑

基本实现

public class GuardedObject{
   private  Object response;
   private  final  Object lock=new Object();
   public Object get(){
       synchronized (lock){
           //条件不满足进行等待
           while (response==null){
               try {
                   lock.wait();
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }
           return response;
       }
   }

   public void complete(Object response){
       synchronized (lock){
           this.response=response;
           lock.notifyAll();
       }
   }

}
    public static void main(String[] args) {
        ThreadText GuardedObject= new GuardedObject();
        new Thread(()->{
            String response="结果";
            System.out.println("正在加载");
            try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
            GuardedObject.complete(response);
        }).start();
        System.out.println("wait....");
        //主线程进行等待
        Object o = GuardedObject.get();
        System.out.println("输出结果===》"+o);
    }

编辑


 带超时版 GuardedObject

public class GuardedObject {
   private  Object response;
   private  final  Object lock=new Object();
   public Object get(Long millis){
       synchronized (lock){
           //记录最初时间
           long begin=System.currentTimeMillis();
           //已经经历时间
           long timePassed=0;
           //条件不满足进行等待
           while (response==null){
               long waitTime=millis-timePassed;
               if(waitTime<=0){
                   break;
               }
               try {
                   lock.wait();
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
               timePassed=  System.currentTimeMillis()- begin;
           }
           return response;
       }
   }

   public void complete(Object response){
       synchronized (lock){
           this.response=response;
           lock.notifyAll();
       }
   }

}

测试代码

    public static void main(String[] args) {
        GuardedObject GuardedObject= new GuardedObject();
        new Thread(()->{
            String response="结果";
            System.out.println("正在加载");
            GuardedObject.complete(null);
            try { Thread.sleep(102); } catch (InterruptedException e) { e.printStackTrace(); }
            GuardedObject.complete(null);
            try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); }
            GuardedObject.complete(response);
        }).start();
        System.out.println("wait....");
        //主线程进行等待
        Object o = GuardedObject.get(100);
        System.out.println("输出结果===》"+o);
    
    }

编辑


多任务版 GuardedObject

  图中 Futures 就好比居民楼一层的信箱(每个信箱有房间编号),左侧的 t0,t2,t4 就好比等待邮件的居民,右 侧的 t1,t3,t5 就好比邮递员

  如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类, 这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理

编辑

新增 id 用来标识 Guarded Object

public class GuardedObject {
    //标识GuardedObject
    private int id;

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    private Object response;

    //获取结果
    public Object get(long timeout) {
        synchronized (this) {
            //开始时间
            long begin = System.currentTimeMillis();
            //经历的时间
            long passedTime = 0;
            while (response == null) {
                long waitTime = timeout - passedTime;
                if (waitTime <= 0) {
                    break;
                }
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //求得经历的时间
                passedTime = System.currentTimeMillis() - begin;
            }
            return response;
        }
    }

    //产生结果
    public void complete(Object response) {
        synchronized (this) {
            //给结果进行赋值
            this.response = response;
            this.notifyAll();
        }
    }

    public GuardedObject(int id) {
        this.id = id;
    }

  
}

中间解耦类

class MailBoxes {
    private static Map<Integer, GuardedObject> boxes = new HashMap<>();
    private static int id = 1;

    //产生唯一id
    private static synchronized int generateId() {
        return id++;
    }

    public static GuardedObject getGuardedObject(int id) {
        return boxes.remove(id);
    }


    public static GuardedObject createGuardedObject() {
        GuardedObject guardedObject = new GuardedObject(generateId());
        boxes.put(guardedObject.getId(), guardedObject);
        return guardedObject;
    }

    public static Set<Integer> getIds(){
        return boxes.keySet();
    }

}

业务相关类

class  People extends  Thread{
    public void run() {
        GuardedObject guardedObject = MailBoxes.createGuardedObject();
        System.out.println("开始收信id==>"+guardedObject.getId());
        Object mail = guardedObject.get(5000);
        System.out.println("收到信id==>"+guardedObject.getId()+" 内容==>"+mail);

    }
}


class Postman extends Thread{
    private  int id;
    private  String mail;
    public Postman(int id,String mail){
        this.id=id;
        this.mail=mail;
    }

    public void run() {
        GuardedObject guardedObject = MailBoxes.getGuardedObject(id);
        System.out.println("送信id==>"+id+" 内容==>"+mail);
        guardedObject.complete(mail);
    }
}

测试代码

  public static void main(String[] args) {
        for (int i = 0; i < 3; i++) {
            new People().start();
            try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
        }
        
        for (Integer id:MailBoxes.getIds()){
            new Postman(id,"内容"+id).start();
          
        }
    }

 测试结果:

开始收信id==>1
开始收信id==>2
开始收信id==>3
送信id==>1 内容==>内容1
送信id==>2 内容==>内容2
送信id==>3 内容==>内容3
收到信id==>1 内容==>内容1
收到信id==>2 内容==>内容2
收到信id==>3 内容==>内容3

总结

保护性暂停(Guarded Suspension)是一种线程间的同步机制,它解决了等待-通知模式中的等待超时和虚假唤醒问题。在保护性暂停模式中,一个线程在等待某个特定条件的满足时,会通过循环的方式不断检查这个条件,同时在条件不满足时通过wait()方法来释放占用的锁,并进入等待状态;当条件被满足时,相应的其他线程会通过notify()或notifyAll()方法来唤醒正在等待的线程。

具体来说,保护性暂停包含以下几个方面:

  1. 条件判断:在保护性暂停中,线程在等待前会先进行一次条件判断,以确定是否需要进入等待状态,从而避免不必要的等待和唤醒。通常情况下,在条件不满足时线程会通过wait()方法进入等待状态,而在条件满足时则继续执行。

  2. 执行顺序:在保护性暂停中,线程之间的执行顺序是不可控的。例如,在一个生产者-消费者模型中,当生产者线程唤醒消费者线程时,不能保证其立即执行,也不能保证消费者线程的执行顺序。

  3. 同步机制:在保护性暂停中,需要使用同步机制来确保线程之间的可见性和互斥性。通常情况下,使用synchronized关键字来保护共享资源,在保证线程之间的可见性和互斥性的同时,避免了死锁和饥饿等问题。

  4. 等待超时:为了避免线程一直等待而导致程序不响应,保护性暂停通常会使用等待超时机制。即在等待一定时间后,如果条件仍然不满足,则主动放弃等待并返回一个默认值,从而避免阻塞线程。

总体来说,保护性暂停是一种有效的线程同步机制,它可以在多线程环境下保证数据的正确性和程序的健壮性。但是,在使用保护性暂停时需要注意线程之间的协作和同步问题,特别是在条件判断和等待超时等方面。




【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。