JUC并发包

举报
小高先生 发表于 2022/06/05 22:19:26 2022/06/05
【摘要】 讲解了多线程实战的最后一章,JUC并发包

  这是多线程实战的最后一章内容,本章主要学习JUC并发工具包里的一些类以及java线程可见性关键字volatile

一.线程的ThreadLocal本地缓存对象

  为什么要学习这个呢?想一下之前学习的,多线程访问同一个共享变量时很容易出现并发问题,特别是多个县城对同一个变量进行写入的时候。那为了保证安全,一般使用同步措施保证安全性,ThreadLocal是除了加锁之外的一种保证安全的方法。

  创建一个变量之后,如果每个线程访问的都是自己的变量,这样就不会产生不安全的问题,如下图所示,线程一和线程二都有自己的内存来存放数据,这样就避免了不安全问题。

  如果创建了一个ThreadLocal类的变量,那么访问这个变量的每一个线程,都会有这个变量的本地副本,操作这个变量时,实际上操作的是本地内存中的变量,避免线程安全的问题。

  ThreadLocal对象包含了一个静态的内部类ThreadLocalmap,每次调用set方法时,就相当于往其内部的map中增加一条记录,key分别是各自的线程,value是各自的set方法传进去的值。

  案例代码如下:

  原版代码如下:结果出了问题,线程2设置的年龄没起作用,被线程1更改了

public class User {
	private int age;

	public int getAge() {
		return age;
	}

	public void setAge(int age) {
		this.age = age;
	}
	
}
import java.util.Random;

public class UserRunn1 implements Runnable{
	private User u;
	
	public UserRunn1(User u) {
		this.u = u;
	}

	@Override
	public void run() {
		// TODO Auto-generated method stub
		System.out.println(Thread.currentThread().getName()+",进入run()方法");
		
	    Random r = new Random();
	    int age = r.nextInt(100);
	    
	    System.out.println(Thread.currentThread().getName()+"设置年龄为"+age);
	    u.setAge(age);
	    
	    System.out.println(Thread.currentThread().getName()+"设置的年龄之前是:"+u.getAge()+",内存地址是:"+u);
		
	    
	    try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	    System.out.println(Thread.currentThread().getName()+"设置的年龄之后是:"+u.getAge()+",内存地址是:"+u);
	    
	}
	
	
}

public class Test1 {

	public static void main(String[] args) {
		// TODO Auto-generated method stub
		User u = new User();
		UserRunn1 us = new UserRunn1(u);
		Thread t1 = new Thread(us,"线程1");
		Thread t2 = new Thread(us,"线程2");
		
		t1.start();
		t2.start();
	}

}

//发生了数据错误,这里两个线程共享User类变量u,线程2先设置年龄,然后是线程1设置,可是这两个是共享变量的
//因此两个线程修改的是同一个变量,没有同步,所以导致线程2设置的年龄会被线程1改变

  代码执行结果:

线程1,进入run()方法
线程2,进入run()方法
线程2设置年龄为29
线程1设置年龄为73
线程2设置的年龄之前是:29,内存地址是:com.huawei.ThreadLocal.User@434e2b1d
线程1设置的年龄之前是:73,内存地址是:com.huawei.ThreadLocal.User@434e2b1d
线程1设置的年龄之后是:73,内存地址是:com.huawei.ThreadLocal.User@434e2b1d
线程2设置的年龄之后是:73,内存地址是:com.huawei.ThreadLocal.User@434e2b1d


  修改之后的代码:

  修改之后我们发现,各个线程之间互不打扰,User类变量u是共享变量,在使用ThreadLocal之后,每个线程访问的是u的副本,所以每个线程修改变量的时候都是在访问各个线程中u的副本。

public class User {
	private int age;

	public int getAge() {
		return age;
	}

	public void setAge(int age) {
		this.age = age;
	}
	
}

import java.util.Random;

import com.huawei.ThreadLocal.User;

public class UserRunn implements Runnable{
	//声明线程的一个本地变量
	private ThreadLocal<User> userLocal = new ThreadLocal<User>();

	@Override
	public void run() {
		// TODO Auto-generated method stub
		System.out.println(Thread.currentThread().getName()+"进入run方法");
		User u = getUser();
        
		Random r = new Random();
		int age = r.nextInt(100);
		
		System.out.println(Thread.currentThread().getName()+"设置的年龄是"+age);
		u.setAge(age);
		
		System.out.println(Thread.currentThread().getName()+"设置的年龄之前是:"+u.getAge()+",内存地址是:"+u);
		
		try {
			Thread.sleep(2000);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		System.out.println(Thread.currentThread().getName()+"设置的年龄之后是:"+u.getAge()+",内存地址是:"+u);
	}
	
	public User getUser() {
		User u = userLocal.get();//使用get()方法,想要从ThreadLocal中获取User对象,但是现在得到的肯定是空
		//因为我们还没set(),也就是没有往ThreadLocal的map中添加记录
		if(u == null) {
			u = new User();//先创建出User对象
			userLocal.set(u);//把User对象当成value传入set(),这就是往map中添加了记录
		}
		
		return u;
	}
	
}
public class Test {

	public static void main(String[] args) {
		// TODO Auto-generated method stub
		UserRunn ur = new UserRunn();
		Thread t1 = new Thread(ur,"线程1");
		Thread t2 = new Thread(ur,"线程2");
		
		t1.start();
		t2.start();
	}

}
//要验证一下能不能通过各个线程key取出各自传入set()的值
//各个线程之间互不打扰,各个线程访问的都是各自存入的变量

二.线程的volatile关键字

  用volatile修饰变量,告知线程访问该对象时要从共享内存中获取,而且对他的改变必须同步改变回共享内存,能保证所有线程访问变量的可见性。它的作用是保证变量在多个线程的可见性,但是不能保证原子性。不能替代synchronized的同步功能

  案例代码如下:

  原始代码如下:输出结果只有线程开始执行任务,却不会输出线程结束执行任务。原因如下:每个线程都有自己的内存,比如在这个案例中,子线程和主线程都会访问flag这个变量,子线程会将变量拷贝到自己的内存中,主线程对其改变,但还没来得及写入共享内存中,所以子线程没有收到改变。

  要怎么解决呢,就可以在flag前面加上volatile,这样flag就被强行存储在共享内存中,主线程修改后会同步刷新共享内存,这样子线程从共享内存就会得到最新的值。

  但是有个问题,Thread.sleep(2000)去掉后,就正常运行了,很奇怪

public class UserThread extends Thread{
	private static boolean flag =true;

	@Override
	public void run() {
		System.out.println(Thread.currentThread().getName()+"线程开始执行任务");
		while(flag) {
			
		}
		System.out.println(Thread.currentThread().getName()+"线程结束执行任务");
	}
	
	public void setFlag(boolean flag) {
		this.flag = flag;
	}
}
public class Test {

	public static void main(String[] args) {
		// TODO Auto-generated method stub
		UserThread ut = new UserThread();
		ut.start();
		
		try {
			Thread.sleep(2000);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		ut.setFlag(false);
	}

}

  volatile不能代替synchronized,案例代码如下:

  原始版:输出结果应该是1000,但是每次输出都是小于1000,原因是什么?是因为volatile不能保证原子性,多个线程访问共享资源时,出现了数据安全问题。比如线程1先执行run()方法,执行n++,有可能n刚加完还没来得及存储呢,另一个线程就开始执行任务,那他在执行n++时,加的是原来的值,这样就相当于两个线程执行n++,实际相当于只执行了一次。这样就会导致输出结果变小。

public class ThreadVolatile extends Thread{
	private volatile static int n;
	
	
	@Override
	public void run() {
		for(int i=0;i<10;i++) {
			n++;
			try {
				Thread.sleep(100);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}


	public static void main(String[] args) {
		// TODO Auto-generated method stub
		Thread[] threads = new Thread[100];
		for(int i=0;i<threads.length;i++) {
			threads[i] = new ThreadVolatile();
		}
		for(int i=0;i<threads.length;i++) {
			threads[i].start();
		}
		for(int i=0;i<threads.length;i++) {
			try {
				threads[i].join();
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		
		System.out.println("n="+ThreadVolatile.n);
	}

}

  修改,利用synchronized同步

public class ThreadVolatile extends Thread{
	private static int n = 0;
	public static synchronized void inc() {
		n++;
	}
	
	@Override
	public void run() {
		for(int i=0;i<10;i++) {
			inc();
			try {
				sleep(3);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}


	public static void main(String[] args) {
		// TODO Auto-generated method stub
		Thread[] threads = new Thread[100];
		for(int i=0;i<threads.length;i++) {
			threads[i] = new ThreadVolatile();
		}
		for(int i=0;i<threads.length;i++) {
			threads[i].start();
		}
		for(int i=0;i<threads.length;i++) {
			try {
				threads[i].join();
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		
		System.out.println("n="+ThreadVolatile.n);
	}

}

三.线程池的作用和应用

  这节代码都是没学过的

四.线程的同步工具类CountDownLatch

  作用是在一组正在执行任务的线程执行结束之前,允许一个或多个线程一直等到。CountDownLatch有两个主要方法,分别是countDown()方法以及await()方法。在创建CountDownLatch类的时候,要传入int参数,这个参数是计数器的初始值,也就是允许几个线程执行任务。线程中的run()方法执行结束后,就要调用countDown()方法,每调用一次计数器减1。await()方法是阻塞线程的作用,也就是让这个await()所在的线程一直等着计数器参数个数的线程执行完任务它才能执行。只要计数器值大于0,那await()就会一直阻塞线程,当计数器等于0的时候,await()停止阻塞,让这个线程执行任务,这个现象只出现一次,计数无法被重置

  直接看代码,案例一。

  原版代码:

  

public class UserThread1 extends Thread{

	private int sum1 = 0;
	


	@Override
	public void run() {
		for(int i = 0;i<=100;i++) {
			sum1+=i;
		}

	}
	
	public int getSum1() {
		return this.sum1;
	}
}

public class UserThread2 extends Thread{
	private int sum2 = 0;

	@Override
	public void run() {
		for(int i=101;i<=200;i++) {
			sum2+=i;
		}
	}
	
	public int getSum2() {
		return this.sum2;
	}
}

public class Test {

	public static void main(String[] args) {
		// TODO Auto-generated method stub
		UserThread1 ut1 = new UserThread1();
		UserThread2 ut2 = new UserThread2();
		
		ut1.start();
		ut2.start();
		
		System.out.println(ut1.getSum1()+ut2.getSum2());
		//这样输出是0,因为子线程还没执行就执行主线程,所以输出是0
                //要让主线程先阻塞
	}

}

  修改后的代码:使用CountDownLatch类

import java.util.concurrent.CountDownLatch;

public class UserThread1 extends Thread{

	private CountDownLatch cd;
	private int sum1 = 0;
	
    public UserThread1(CountDownLatch cd) {
    	this.cd = cd;
    }

	@Override
	public void run() {
		for(int i = 0;i<=100;i++) {
			sum1+=i;
		}
		cd.countDown();
	}
	
	public int getSum1() {
		return this.sum1;
	}
}

import java.util.concurrent.CountDownLatch;

public class UserThread2 extends Thread{
	private CountDownLatch cd;
	private int sum2 = 0;
    
	public UserThread2(CountDownLatch cd) {
		this.cd = cd;
	}
	@Override
	public void run() {
		for(int i=101;i<=200;i++) {
			sum2+=i;
		}
		cd.countDown();
	}
	
	public int getSum2() {
		return this.sum2;
	}
}

import java.util.concurrent.CountDownLatch;

public class Test {

	public static void main(String[] args) throws InterruptedException {
		// TODO Auto-generated method stub
		CountDownLatch cd = new CountDownLatch(2);//2就是指执行任务的两个线程
		UserThread1 ut1 = new UserThread1(cd);
		UserThread2 ut2 = new UserThread2(cd);
		
		ut1.start();
		ut2.start();
		
		cd.await();//阻塞主线程
		
		System.out.println("计算两个线程sum和"+(ut1.getSum1()+ut2.getSum2()));
		
	}

}

五.线程同步工具类CyclicBarrier

  这个类的作用就是让一组线程互相等待,直到到达某个公共屏障点。假如两个线程执行任务,一个先完成,一个后完成,如果我想让他俩互相等待同时完成,那就要用到这个类。举个例子:让所有线程等待完成之后,再去执行下一部分。 比如约某些朋友去餐厅吃饭,可能有人早到,有人晚到,但是餐厅规定必须所有人到齐才能进,这里的人就是各个线程,餐厅就是CyclicBarrier

  案例代码如下,先看原版代码:

public class TimeCount {
	private int count1;
	private int count2;
	private int sum;
	public int getCount1() {
		return count1;
	}
	public void setCount1(int count1) {
		this.count1 = count1;
	}
	public int getCount2() {
		return count2;
	}
	public void setCount2(int count2) {
		this.count2 = count2;
	}
	
	public void sum(int sum) {
		this.sum =sum;
	}
	
	public int getSum() {
		return this.count1+this.count2;
	}
}
public class UserRunn1 extends Thread{
	private TimeCount tc;
	private String name;
	
	
	public UserRunn1(String name,TimeCount tc) {
		this.name = name;
		this.tc = tc;
	}
	@Override
	public void run() {
		if(name.equals("爬虫")) {
			try {
				Thread.sleep(2000);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			tc.setCount1(2000);
		}else if(name.equals("发邮件")) {
			try {
				Thread.sleep(4000);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			tc.setCount2(4000);
		}
		
		System.out.println("---"+name+"---end");
	}
	
	
}
public class Test {

	public static void main(String[] args) {
		// TODO Auto-generated method stub
		TimeCount tc = new TimeCount();
		UserRunn1 us1 = new UserRunn1("爬虫",tc);
		UserRunn1 us2 = new UserRunn1("发邮件",tc);
		
		us1.start();
		us2.start();
		
		//这样正常输出是先输出爬虫再输出发邮件
	}

}

  修改后的代码:

public class TimeCount {
	private int count1;
	private int count2;
	private int sum;
	public int getCount1() {
		return count1;
	}
	public void setCount1(int count1) {
		this.count1 = count1;
	}
	public int getCount2() {
		return count2;
	}
	public void setCount2(int count2) {
		this.count2 = count2;
	}
	
	public void sum(int sum) {
		this.sum =sum;
	}
	
	public int getSum() {
		return this.count1+this.count2;
	}
}
public class UserRunn1 extends Thread{
	private TimeCount tc;
	private String name;
	private CyclicBarrier c;
	
	public UserRunn1(String name,TimeCount tc,CyclicBarrier c) {
		this.name = name;
		this.tc = tc;
                this.c = c;
	}
	@Override
	public void run() {
		if(name.equals("爬虫")) {
			try {
				Thread.sleep(2000);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			tc.setCount1(2000);
		}else if(name.equals("发邮件")) {
			try {
				Thread.sleep(4000);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			tc.setCount2(4000);
		}
		
                c.await();
		System.out.println("---"+name+"---end");
	}
	
	
}
public class Test {

	public static void main(String[] args) {
		// TODO Auto-generated method stub
                CyclicBarrier c = new CyclicBarrier(2);
		TimeCount tc = new TimeCount();
		UserRunn1 us1 = new UserRunn1("爬虫",tc);
		UserRunn1 us2 = new UserRunn1("发邮件",tc);
		
		us1.start();
		us2.start();
		
		//这样是等两个线程都执行完之后才会输出
	}

}

最后一种写法,可以在CyclicBarrier中再添加一个参数,匿名构造类

public class TimeCount {
	private int count1;
	private int count2;
	private int sum;
	public int getCount1() {
		return count1;
	}
	public void setCount1(int count1) {
		this.count1 = count1;
	}
	public int getCount2() {
		return count2;
	}
	public void setCount2(int count2) {
		this.count2 = count2;
	}
	
	public void sum(int sum) {
		this.sum =sum;
	}
	
	public int getSum() {
		return this.count1+this.count2;
	}
}
public class UserRunn1 extends Thread{
	private TimeCount tc;
	private String name;
	private CyclicBarrier c;
	
	public UserRunn1(String name,TimeCount tc,CyclicBarrier c) {
		this.name = name;
		this.tc = tc;
                this.c = c;
	}
	@Override
	public void run() {
		if(name.equals("爬虫")) {
			try {
				Thread.sleep(2000);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			tc.setCount1(2000);
		}else if(name.equals("发邮件")) {
			try {
				Thread.sleep(4000);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			tc.setCount2(4000);
		}
		
                c.await();
		System.out.println("---"+name+"---end");
	}
	
	
}
public class Test {

	public static void main(String[] args) {
		// TODO Auto-generated method stub
                
		TimeCount tc = new TimeCount();
                CyclicBarrier c = new CyclicBarrier(2,new Runnable() {//这个CyclicBarrier还可以有一个参数,意思是
			//这是最后一个线程要执行的任务

			@Override
			public void run() {
				// TODO Auto-generated method stub
				int sum = tc.getSum();
				System.out.println("总耗时为:"+sum);
			}
			
		});
		UserRunn1 us1 = new UserRunn1("爬虫",tc);
		UserRunn1 us2 = new UserRunn1("发邮件",tc);
		
		us1.start();
		us2.start();
		
		//这样是等两个线程都执行完之后才会输出
	}

}

六.线程的同步工具类Semaphore

  这个类可以控制同时访问某些特定资源的线程数量,协调各个线程,以保证合理使用资源。举个例子,停车场门口的车牌,进入一辆车,车位减1,离开一辆车,车位加1,车位为0时,不能再进入了。SemaPhore是一个计数信号量,本质是一个共享锁,通过state变量来实现共享,每次调用acquire()方法,state值减一,当调用release()方法时,state值加一。当state变量为0时,其他线程阻塞,进入等待队列。常用于限流

  案例代码如下:

  原始代码:创建5个线程,就是进入停车场5辆车,停车场没有数量限制

public class Address {
	public void autoCar() {
		System.out.println(Thread.currentThread().getName()+"进入停车场");
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		System.out.println(Thread.currentThread().getName()+"离开停车场");
	}
}

public class Car extends Thread{
	private Address address;
	
	public Car(Address address) {
		this.address = address;
	}
	@Override
	public void run() {
		this.address.autoCar();
	}
	
	
}
public class Test {

	public static void main(String[] args) {
		// TODO Auto-generated method stub
		Address address = new Address();
		for(int i = 0;i<5;i++) {//创建5个线程
			new  Car(address).start();
		}
	}

}

//一下子进入5辆车,没啥限制

  升级:对停车场停车数量进行限制,一次只能进入两辆。利用Semaphore加锁,对共享资源的线程数量进行限制,state参数值就是允许一次执行任务的线程数量,线程执行run()方法时要先执行aqurie()方法,state就会减一,执行完任务后要执行release()方法,state加一。state为0的时候,其他项执行任务的线程就要等待。

import java.util.concurrent.Semaphore;

public class Address {
	private Semaphore sm;
	private int num;
	public Address(int num) {
		this.num = num;
		sm = new Semaphore(num);
	}
	
	public void autoCar() {
		try {
			sm.acquire();//加锁,相当于汽车进入停车场之后,车位数量减一,state变量减一
		} catch (InterruptedException e1) {
			// TODO Auto-generated catch block
			e1.printStackTrace();
		}
		System.out.println(Thread.currentThread().getName()+"进入停车场");
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		System.out.println(Thread.currentThread().getName()+"离开停车场");
		//释放,相当于汽车离开停车场之后,车位数量加一,state变量加一
		sm.release();
	}
}

public class Car extends Thread{
	private Address address;
	
	public Car(Address address) {
		this.address = address;
	}
	@Override
	public void run() {
		this.address.autoCar();
	}
	
	
}
public class Test {

	public static void main(String[] args) {
		// TODO Auto-generated method stub
		Address address = new Address(2);
		for(int i = 0;i<6;i++) {//创建5个线程
			new  Car(address).start();
		}
	}

}

//一下子只能进入2辆车,有数量限制了

七.线程的交换类Exchanger

  Exchanger(交换者)用于线程协助的工具类,用于进行线程之间交换数据。它会提供一个同步点,当线程到了同步点,那两个线程就会交换数据。

  两个线程通过Exchange()进行交换数据

  案例代码如下:

import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Test extends Thread{
	private static final Exchanger<String> exgr = new Exchanger<String>();
	private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
	
	public static void main(String[] args) {

		//现在要做一件事,想让上面的线程拿到银行流水B,让下面的线程拿到银行流水B
		threadPool.execute(new Runnable() {

			@Override
			public void run() {
				// TODO Auto-generated method stub
				String A = "银行流水A";
				String B;
				try {
					B = exgr.exchange(A);//这个A意思是在这个线程中,把A的数据传进去,等待对面来接收。并且用B来接收对面传过来数据
					System.out.println(Thread.currentThread().getName()+B);
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
				
			}
			
		});
		
		threadPool.execute(new Runnable() {

			@Override
			public void run() {
				// TODO Auto-generated method stub
				String B = "银行流水B";
				String A;
				try {
					A = exgr.exchange(B);//把B的数据传进去,并且用A接收对面传来的数据
					System.out.println(Thread.currentThread().getName()+A);
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
				
			}
			
		});
		
		threadPool.shutdown();
	}

}

八.Fork/Join机制

  Fork/Join框架是Java7提供的一个用于并行执行任务的框架,将一个大任务分解为多个小任务,最终汇总每个小任务的执行结果后得到大任务的结果。这里面有一个思想,叫做分治法,意思是将一个规模大的问题分解为多个规模小的问题,然后分别解决每个小问题,将每个小问题结果合并就得到了大问题的结果了

  需要两个类,ForkJoinTask和ForkJoinPool ,RecursiveTask继承了ForkJoinTask类,专门用于有返回机制的任务, ForkJoinTask需要通过ForkJoinPool执行

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

import com.huawei.ForkJoin.ContTask;

public class ContTest extends RecursiveTask<Integer>{
	private int start;
	private int end;
	
	//计算任务量的阈值
	private static final int TASKSIZE = 2;//假如我想计算1-100的和,100-1=99,99大于阈值2,所以要拆分任务
	//拆分成两个任务,1加到50和51加到100,但是这两个任务也是大于阈值,所以要继续拆分,直到阈值小于等于2.
	
	private int count = 0;
	
	public ContTest(int start,int end) {
		this.start = start;
		this.end = end;
	}
	
	//重写compute方法,任务执行的主要计算
	@Override
	protected Integer compute() {
		int sum = 0;
		System.out.println("开启线程进行计算"+count++);
		//和与之进行比较
		boolean flag = (this.end-this.start)/2<=TASKSIZE;
		if(flag) {
			//没必要拆分
			for(int i=start;i<=end;i++) {
				sum+=i;
			}
		}else {
			//要进行拆分计算
			System.out.println("这个任务徐要进行拆分计算"+Thread.currentThread().getName());
			//任务大于阈值,拆分成两个任务
			int middle = (start+end)/2;
			ContTest ct1 = new ContTest(start,middle);
			ContTest ct2 = new ContTest(middle+1,end);//就是一个递归
			//开启计算分布式任务
			invokeAll(ct1,ct2);
			//要拿到执行结果
			//可以阻塞一下,用join方法,并且用一个变量接收返回值
			Integer taskSum1 = ct1.join();
			Integer taskSum2 = ct2.join();
			
			sum = taskSum1+ taskSum2;
			
			
		}
		return sum;
	}


	public static void main(String[] args) {
		
		//分布式计算的池
		ForkJoinPool forkJoinPool = new ForkJoinPool();
		//初始化设置任务
		ContTask contTask = new ContTask(1,100);
		//分布式计算任务,提交任务
		ForkJoinTask forkJoinTask = forkJoinPool.submit(contTask);
		
		//最后得到计算结果
		try {
			System.out.println(forkJoinTask.get());
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (ExecutionException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
	}

}

九.线程的组合案例购票

  案例代码如下:

  这个代码我有一个问题,在Tickets类中,不睡眠就会出现共享资源的问题,这是为什么呢,为什么一定要睡眠?

public class Tickets {
	private int allowance;//票数
	
	public Tickets(int allowance) {
		this.allowance = allowance;
	}
	
	public int getAllowance() {
		return allowance;
	}

	public void setAllowance(int allowance) {
		this.allowance = allowance;
	}

	public void byTickets(int num) {
		synchronized(this) {
			int before = this.allowance;
			int after = before - num;//剩余票数
			
			try {
				Thread.sleep(100);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			
			setAllowance(after);
		}
		
	}
}

public class CustomerRunnable implements Runnable{
	private Tickets tickets;
	
	public CustomerRunnable(Tickets tickets) {
		this.tickets = tickets;
	}
	@Override
	public void run() {
		// TODO Auto-generated method stub
		tickets.byTickets(1);
		System.out.println(Thread.currentThread().getName()+"购票成功,余票为:"+tickets.getAllowance());
	}
	
}

public class Test {

	public static void main(String[] args) {
		// TODO Auto-generated method stub
		Tickets tickets = new Tickets(100);
		CustomerRunnable[] cr = new CustomerRunnable[100];
		Thread[] thread = new Thread[100];
		
		//创建100个CustomerRunnable对象
		for(int i = 0;i<cr.length;i++) {
			cr[i] = new CustomerRunnable(tickets);
		}
		for(int i = 0;i<thread.length;i++) {
			thread[i] = new Thread(cr[i]);
			thread[i].start();
		}
	}

}

十.线程的组合案例购物

  案例代码如下:

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class Shop {
	private Semaphore semaphore;
	public Shop(int num) {
		//实例化信号量,限流,每次可以有num个线程获得许可
		semaphore = new Semaphore(num);
	}
	//用户抢购
	public void userShopping(String name) throws InterruptedException {
		boolean flag = false;
		
		//怎么限流抢购,什么时候抢购
		//调用tryAcquire()方法,flag表示什么时候用户能抢购,意思是在规定时间内,这里设置的是1s
		//也就是在1s之内,线程来抢夺这num个执行任务的机会,抢到的线程返回true,每抢到一次,num-1
		flag = this.semaphore.tryAcquire(1, TimeUnit.SECONDS);
		if(flag) {
			System.out.println(name + ",抢购成功");
			Thread.sleep(1000);//这个地方一定要睡眠,因为我们设置的是必须在1s内抢到资源的线程才算抢购成功,如果我们不设置
			//就会出现一种情况,假如有20个用户来抢购,不设置睡眠就会导致这20个用户都能抢到了。
			//所以我们要延时,不能让所有线程都抢到。要让一些线程抢夺名额的时候已经超过了1s,这样他们必不可能抢到。
			//5个抢到后,1s中还剩一点末尾,还会有几个线程抢到,剩下的就抢不到了
			semaphore.release();//注意要释放num,释放一次num+1
		}else {
			System.out.println(name + "对不起");
		}
	}
}
public class User extends Thread{
	private String name;
	private Shop shop;
	
	public User(String name,Shop shop) {
		this.name = name;
		this.shop = shop;
	}

	@Override
	public void run() {
		try {
			shop.userShopping(name);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
	
}

public class Test {

	public static void main(String[] args) {
		// TODO Auto-generated method stub
		Shop shop = new Shop(5);
		for(int i=0;i<20;i++) {
			new User("张"+i,shop).start();
		}
	}

}

十一.线程的锁的synchronized和Lock、volatile区别

  这么一看这三个都是有关防止并发线程访问共享资源出现错误的功能。

  首先看一下synchronized和volatile的区别:

  • volatile关键字解决的是变量在多个线程间的可见性,简单来说就是将一个变量设置成共享变量,可以被多个线程共同访问,并且这些线程要从共享内存中找到这个变量。任何一个线程对这个变量进行修改,其他线程都会知道,并且其他线程再要对共享变量进行修改,都是基于之前的改变之后再进行改变。synchronize关键字解决的是多个线程间访问共享资源的安全性问题
  • volatile关键字修饰的变量,synchronized修饰方法和代码块
  • 多线程访问volatile变量不会发生阻塞,而访问synchronized会发生阻塞
  • volatile可以保证变量在多个线程间的可见性,但不能保证原子性;而synchronized能够保证原子性,也可以间接保证可见性,因为他会将私有内存和共有内存中的数据进行同步

  再看一下synchronized和Lock之间的区别:

  • Lock是一个接口,而synchronized是一个关键字,synchronized是java中内置的语言
  • synchronized在发生异常时,会自动释放线程占有的锁,因此不会发生死锁;而Lock在发生异常时,如果没有使用unlock()主动解锁,则很有可能死锁,因此要注意使用Lock时,要在finally块中释放锁
  • Lock可以提高多个线程进行读操作的效率(读写锁)

十二.线程的读写分离机制

  ReadWriteLock,顾名思义,读写锁,维护一对相关的锁:“读取锁”和“写入锁”,一个用于读取,一个用于写入。分别介绍每个锁的特点。

  读取锁:用于只读操作,是共享锁,能被多个线程获取

  写入锁:用于写入操作,是独占锁,写入操作只能被一个线程锁获取

  不能同时存在读取锁和写入锁!可以读/读,但不能读/写和写/写

  案例代码如下:

public class MyCount {
	private String id;
	private int cash;
	
	public MyCount(String id,int cash) {
		this.id = id;
		this.cash = cash;
	}

	public String getId() {
		return id;
	}

	public void setId(String id) {
		this.id = id;
	}

	public int getCash() {
		System.out.println(Thread.currentThread().getName()+"getCash cash"+cash);
		return cash;
	}

	public void setCash(int cash) {
		System.out.println(Thread.currentThread().getName()+"setCash cash"+cash);
		this.cash = cash;
	}
	
	
}
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class User extends Thread{
	private String name;
	private MyCount myCount;
	private ReadWriteLock myLock;//执行操作所需的锁对象
	
	public User(String name,MyCount myCount) {
		this.myCount = myCount;
		this.name = name;
		this.myLock = new ReentrantReadWriteLock();
	}
	//读取锁
	public void getCash() {
		new Thread() {
			public void run() {
				myLock.readLock().lock();
				System.out.println(Thread.currentThread().getName()+"getCash start");
				myCount.getCash();
				try {
					Thread.sleep(1);
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
				System.out.println(Thread.currentThread().getName()+"getCash end");
				myLock.readLock().unlock();
			}
		}.start();;
	}
	
	//写入锁
	public void setCash(int cash) {
		new Thread() {
			public void run() {
				myLock.writeLock().lock();
				System.out.println(Thread.currentThread().getName()+"setCash start");
				myCount.setCash(cash);
				try {
					Thread.sleep(1);
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
				System.out.println(Thread.currentThread().getName()+"setCash end");
				myLock.writeLock().unlock();
			}
		}.start();
	}
}
public class Test {

	public static void main(String[] args) {
		// TODO Auto-generated method stub
		//创建账户
		MyCount myCount = new MyCount("111",10000);
		//创建用户,并制定账户
		User user = new User("小高",myCount);
		
		//分别启动3个线程用来读取和写入
		for(int i = 0;i<3;i++) {
			user.getCash();
			user.setCash((i+1)*1000);
		}
	}
//根据运行结果可以发现,在读取的时候,有些线程还没有执行完读取任务,会有另一个线程也来执行读取任务了,这是因为
//读取锁是共享的,而写入锁就不会出现这种问题,因为写入锁是独占锁
}



【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。