Java 中线程之间如何进行通信?

举报
猿java 发表于 2024/11/27 06:47:42 2024/11/27
【摘要】 嗨,你好呀,我是猿java在 Java中,线程是执行的最小单元,那么线程之间是如何通信的呢?这篇文章我们一起来分析 5种常用的方式。使用 wait()、notify() 和 notifyAll()使用 BlockingQueueExchanger使用 Locks 和 Condition使用 Semaphore 1. 使用 wait()、notify() 和 notifyAll()Java的 ...

嗨,你好呀,我是猿java

在 Java中,线程是执行的最小单元,那么线程之间是如何通信的呢?这篇文章我们一起来分析 5种常用的方式。

  1. 使用 wait()notify()notifyAll()
  2. 使用 BlockingQueue
  3. Exchanger
  4. 使用 LocksCondition
  5. 使用 Semaphore

1. 使用 wait()notify()notifyAll()

Java的 Object 类提供了 wait()notify()notifyAll() 方法,这些方法可以用来实现线程之间的通信,这些方法必须在同步块或同步方法中调用。

  • wait():使当前线程进入等待状态,直到其他线程调用 notify()notifyAll()
  • notify():唤醒在该对象监视器上等待的单个线程。
  • notifyAll():唤醒在该对象监视器上等待的所有线程。

thread-communicate-1.png

示例代码

class SharedResource {
    private int data;
    private boolean hasData = false;

    public synchronized void produce(int value) throws InterruptedException {
        while (hasData) {
            wait();
        }
        this.data = value;
        hasData = true;
        notify();
    }

    public synchronized int consume() throws InterruptedException {
        while (!hasData) {
            wait();
        }
        hasData = false;
        notify();
        return data;
    }
}

public class ProducerConsumerExample {
    public static void main(String[] args) {
        SharedResource resource = new SharedResource();

        Thread producer = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    resource.produce(i);
                    System.out.println("Produced: " + i);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        Thread consumer = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    int data = resource.consume();
                    System.out.println("Consumed: " + data);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        producer.start();
        consumer.start();
    }
}

2. 使用 BlockingQueue

BlockingQueue 是Java中一个强大的接口,提供了线程安全的队列操作,并且可以在生产者-消费者模式中使用。BlockingQueue 不需要显式地使用同步机制,它内部已经处理好了线程同步问题。

thread-communicate-2.png

示例代码

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueExample {
    public static void main(String[] args) {
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);

        Thread producer = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    queue.put(i);
                    System.out.println("Produced: " + i);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        Thread consumer = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    int data = queue.take();
                    System.out.println("Consumed: " + data);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        producer.start();
        consumer.start();
    }
}

3. 使用 LocksCondition

Java提供了 java.util.concurrent.locks 包,其中包含了 Lock 接口和 Condition 接口。Condition 提供了类似于 wait()notify()notifyAll() 的方法,但它们与 Lock 对象一起使用,提供了更灵活的线程通信机制。

thread-communicate-3.png

示例代码

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class SharedResourceWithLock {
    private int data;
    private boolean hasData = false;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public void produce(int value) throws InterruptedException {
        lock.lock();
        try {
            while (hasData) {
                condition.await();
            }
            this.data = value;
            hasData = true;
            condition.signal();
        } finally {
            lock.unlock();
        }
    }

    public int consume() throws InterruptedException {
        lock.lock();
        try {
            while (!hasData) {
                condition.await();
            }
            hasData = false;
            condition.signal();
            return data;
        } finally {
            lock.unlock();
        }
    }
}

public class LockConditionExample {
    public static void main(String[] args) {
        SharedResourceWithLock resource = new SharedResourceWithLock();

        Thread producer = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    resource.produce(i);
                    System.out.println("Produced: " + i);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        Thread consumer = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    int data = resource.consume();
                    System.out.println("Consumed: " + data);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        producer.start();
        consumer.start();
    }
}

4. 使用 Exchanger

Exchanger 是一个用于线程间交换数据的同步点。两个线程可以在此同步点交换数据,Exchangerexchange() 方法用于在两个线程之间交换数据。

thread-communicate-4.png

示例代码

import java.util.concurrent.Exchanger;

public class ExchangerExample {
    public static void main(String[] args) {
        Exchanger<Integer> exchanger = new Exchanger<>();

        Thread producer = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    System.out.println("Produced: " + i);
                    exchanger.exchange(i);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        Thread consumer = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    int data = exchanger.exchange(null);
                    System.out.println("Consumed: " + data);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        producer.start();
        consumer.start();
    }
}

5. 使用 Semaphore

Semaphore 是一个计数信号量,通常用于限制对某些资源的访问。它可以用于控制线程访问共享资源的数量,这在某些情况下也可以用作线程间通信的机制。

thread-communicate-5.png

示例代码

import java.util.concurrent.Semaphore;

class SemaphoreSharedResource {
    private int data;
    private Semaphore semaphore = new Semaphore(1);

    public void produce(int value) throws InterruptedException {
        semaphore.acquire();
        try {
            this.data = value;
            System.out.println("Produced: " + value);
        } finally {
            semaphore.release();
        }
    }

    public int consume() throws InterruptedException {
        semaphore.acquire();
        try {
            System.out.println("Consumed: " + data);
            return data;
        } finally {
            semaphore.release();
        }
    }
}

public class SemaphoreExample {
    public static void main(String[] args) {
        SemaphoreSharedResource resource = new SemaphoreSharedResource();

        Thread producer = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    resource.produce(i);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        Thread consumer = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    resource.consume();
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        producer.start();
        consumer.start();
    }
}

结论

本文,我们分析了 Java线程通信的5种常见方式:

  • wait()/notify() 是一种低级别的同步机制,适合需要精细控制的场合;
  • BlockingQueueExchanger 提供了更高层次的抽象,简化了线程间的数据交换;
  • LocksCondition 提供了更灵活的锁机制,适合复杂的同步场景;
  • Semaphore 则用于控制资源访问。

在实际应用中,需要选择哪种方式取决于具体的应用场景和需求。如何你有好的通信方式,欢迎评论区留言。

交流学习

最后,把猿哥的座右铭送给你:投资自己才是最大的财富。 如果你觉得文章有帮助,请帮忙转发给更多的好友,或关注公众号:猿java,持续输出硬核文章。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。