多线程和并发篇

news/2025/2/22 17:48:09

多线程和并发篇

  • 创建一个对象时底层汇编指令实现步骤(cpu可能会进行指令重排序):
  • 一、二、三级缓存的实现:
  • 并发编程三要素:
  • 线程的五大状态:
  • 创建线程的三种方式:
  • 线程的特征和状态:
  • Thread.States枚举类中定义的线程状态
  • Thread类常用方法:
  • Synchronized关键字
  • 解决原子性+可见性+有序性问题的方法:
  • 单例模式的实现方式与线程安全问题:饿汉式 vs 懒汉式
  • 悲观锁vs乐观锁
  • 什么是CAS
  • CAS原理
  • 线程间的协作机制
  • 线程池
    • 使用线程池的好处
    • 线程池的构造
    • 线程池的运行原理
    • 线程池提交任务的两种方式execute()和submit()的区别:
    • 源码分析:
    • 线程池的关闭
    • Executors 构建线程池以及问题分析
    • 推荐创建线程池方式—自定义创建

创建一个对象时底层汇编指令实现步骤(cpu可能会进行指令重排序):

1、指令重排序不会影响单个线程的执行,但是会影响到线程并发执行的正确性。
2、并发编程正确执行需要保证其不管如何重排序,都得保证最终结果一致性
在这里插入图片描述

一、二、三级缓存的实现:

1、CPU缓存的容量比内存小的多但是交换速度却比内存要快得多
2、缓存的出现主要是为了解决CPU运算速度与内存读写速度不匹配的矛盾,因为CPU运算速度要比内存读写速度快很多,这样会使CPU花费很长时间等待数据到来或把数据写入内存。
3、CPU缓存可以分为一级缓存,二级缓存,部分高端CPU还具有三级缓存,每一级缓存中所储存的全部数据都是下一级缓存的一部分,这三种缓存的技术难度和制造成本是相对递减的,所以其容量也是相对递增的
4、当CPU要读取一个数据时,首先从一级缓存中查找,如果没有找到再从二级缓存中查找,如果还是没有就从三级缓存或内存中查找。
5、一般来说,每级缓存的命中率大概都在80%左右,也就是说全部数据量的80%都可以在一级缓存中找到,只剩下20%的总数据量才需要从二级缓存、三级缓存或内存中读取,由此可见一级缓存是整个CPU缓存架构中最为重要的部分。
一级缓存L1
1、一级缓存可分为一级指令缓存和一级数据缓存
二级缓存L2
1、二级缓存就是一级缓存的缓冲器
2、存储那些CPU处理时需要用到、一级缓存又无法存储的数据。
三级缓存L3
1、二级缓存的缓冲器,它们的容量递增,但单位制造成本却递减。

在这里插入图片描述

并发编程三要素:

原子性:一个或者多个操作要么全部执行成功要么全部执行失败。
有序性:程序执行顺序按照代码顺序先后执行,但是CPU可能会对指令进行重排序
可见性:当多个线程访问同一个变量时,如果一个线程修改了变量,其他线程立即获取最新的值。

线程的五大状态:

创建状态:当用new操作符创建一个线程的时候。
就绪状态:调用start方法,处于就绪状态的线程并不一定马上就执行run方法,还需要等待CPU的调度。
运行状态:CPU开始调度线程,并开始执行Run方法。
阻塞状态:线程的执行过程中可能因为一些原因进入阻塞状态,比如调用sleep方法,获取尝试得到一个锁等等。、
死亡状态:Run方法执行完或者执行中遇到异常

创建线程的三种方式:

1.继承Thread类
1.自定义一个类声明为Thread的子类。
2.重写run方法;
3.创建线程对象,调用start方法启动线程
4.Thread类本质也是实现了Runnable接口

java">class PrimeThread extends Thread{
	long minPrime;
	PrimeThread(long minPrime){
		this.minPrime = minPrime;
	}
	//实现run()方法
	@Override
	public void run(){
	//...
	}
public static void main(String[] args){
	//创建线程对象
	PrimeThread ph = new PrimeThread();
	ph.start();
 }  
}

线程开启不一定立即执行,CPU安排调度
2.实现Runnable接口
1.创建一个线程类,实现Runnable接口。
2.实现run方法
3.创建线程对象,通过线程对象来start开启线程(代理)

java">class PrimeThread implements Runnable{
	long minPrime;
	//构造函数
	PrimeThread(long minPrime){
		this.minPrime = minPrime;
	}
	//实现run()方法
	@Override
	public void run(){
	//...
	}
public static void main(String[] args){
	//创建线程对象
	PrimeThread ph = new PrimeThread(143);
	new Thread(ph).start();
 }  
}

3.实现Callable接口
1.实现Callable接口,需要返回值类型
2.重写call方法,需要抛出异常
3.创建目标对象
4.创建执行服务:ExecutorService ser = Executors.newFixedThreadPool(1);
5.提交执行:Future result = ser.submit(t1);
6.获取结果:boolean r1 = result1.get()
7.关闭服务:ser.shutdownNow();

线程的特征和状态:

1) 不管是否并发,都有一个主线程的Thread对象;执行该程序时,java虚拟机会创建一个新的Thread并在该线程中执行main方法,这是非并发应用程序的唯一线程,也是并发应用程序中的第一个线程。
2) Java线程共享应用程序中所有资源,包括内存和打开的文件,但是需要使用同步,也就是加锁来避免数据竞争
3) Java线程的优先级,介于Thread.MAX_PRIORITY(10)Thread.MIN_PRIORITY(1)之间,默认是5,通常较高优先级的线程会先于较低优先级的线程之前执行,但不是绝对,CPU可能会进行指令重排
4) Java的两种线程:守护线程和非守护线程,我们常用的多线程都是非守护线程。
所有非守护线程执行完毕,JVM进程会自动退出,不会考虑守护线程运行情况
守护线程主要用于垃圾收集器或缓存管理器中,在start方法执行前通过isDaemon方法检查线程是否为守护线程,也可以通过setDaemon方法将某个线程指定为守护线程

Thread.States枚举类中定义的线程状态

New: Thread对象已经创建还未执行

Runnable: Thread对象已经在java虚拟机中运行

Blocked:Thread对象正在等待锁定

Waiting: Thread对象正在等待另一个线程的动作

Time_Waiting: Thread对象正在等待另一个线程的动作,但是有时间限制

Terminated: Thread对象已经完成了执行

getState方法可以获取Thread对象的状态,并且修改,但是在给定时间内一个线程只能有一个状态

Thread类常用方法:

1) 获取和设置Thread对象信息的方法:

getId:返回Thread对象的标识符,他是线程创建分配的一个正整数,整个生命周期唯一且不能修改。

getName/setName:获取或设置Thread对象的名称。

getPriority/setPriority:获取或设置Thread对象的优先级。

isDaemon/setDeamon:获取或者建立Threa对象的守护条件。

getState:返回Thread对象的状态。
2) Interrupt:中断目标线程,比如该线程处于sleep休眠中,使用它就可以直接唤醒,并给线程打上中断标记

3) Interrupted:判断目标线程是否被中断,并清除该线程中断标记。

4) Isinterrupted:判断目标线程是否被中断,但是不会清楚线程中断标记。

5) Sleep(long ms):线程执行暂停、休眠的时间。

6) Join:暂停线程执行(强制插队),直到调用该方法的线程执行结束为止,就是A B两个线程,B线程通过join方法切入到A线程中,那么A线程需要等待B线程执行完毕,他才能继续执行。
7)yield:让当前正在执行的线程暂停,但不阻塞,将线程从运行状态转为就绪状态;让CPU重新调度,礼让不一定会成功
8)isAlive:测试线程是否处于活动状态
9) currentThread:Thread类静态方法,返回实际执行该代码的Thread对象。

Synchronized关键字

1)、修饰一个代码块:被修饰的代码块称为同步语句块,其作用的范围是大括号{}括起来的代码,作用的对象是调用这个代码块的对象,修饰代码块,指定加锁对象,对给定对象加锁,进入同步代码库前要获得给定对象的锁
2)、修饰一个方法:被修饰的方法称为同步方法,其作用的范围是整个方法,作用的对象是调用这个方法的对象
3)、修改一个静态的方法:其作用的范围是整个静态方法,作用的对象是这个类的所有对象
4)、修改一个类:其作用的范围是synchronized后面括号括起来的部分,作用主的对象是这个类的所有对象
5)、synchronized 关键字加到 static 静态方法和 synchronized(class) 代码块上都是对当前 Class 类上锁

解决原子性+可见性+有序性问题的方法:

原子性:
锁机制
synchronized:Java中的synchronized关键字可以用来保证代码块或方法的原子性。
ReentrantLock:Java中的ReentrantLock类提供了比synchronized更灵活的锁机制。
原子类
AtomicInteger、AtomicLong、AtomicBoolean等:Java中的java.util.concurrent.atomic包提供了一系列原子类,这些类的操作都是原子的。
可见性:
volatile关键字
volatile关键字可以确保变量的修改对所有线程是可见的。它防止了指令重排序,并保证了每次读取都是从主内存中读取最新的值
synchronized关键字
synchronized关键字不仅保证了原子性,也保证了可见性。当一个线程释放锁时,它会将所有修改的变量刷新到主内存中,而当另一个线程获取锁时,它会从主内存中读取最新的值
有序性
volatile关键字
volatile关键字可以防止指令重排序。当一个变量被声明为volatile时,对该变量的写操作会在后续的读操作之前完成。
synchronized关键字:
synchronized关键字可以保证代码块或方法内的操作是有序的。
内存屏障(Memory Barrier):
内存屏障是一种CPU指令,用于确保指令的执行顺序。Java中的volatile关键字和synchronized关键字都会插入内存屏障。
Atomic类:
Atomic类中的方法(如compareAndSet)也会插入内存屏障,以保证操作的有序性。
使用双检锁Double check lock;先进行一次判断,在synchronized上锁之后,再进行一次判断

单例模式的实现方式与线程安全问题:饿汉式 vs 懒汉式

1、单例模式是一种创建型设计模式
2、要求一个类在整个系统中只有一个实例,并且提供一个全局访问点。
3、它的核心思想是确保类只有一个实例,并且提供一个访问该实例的方式,全局都能共享这个实例
饿汉式单例(Eager Initialization)
优点
1、饿汉式单例在类加载时就会初始化实例,确保在任何情况下都能获取到唯一的实例。
2、因为类加载是线程安全的,所以这种方式天然地保证了线程安全问题。
缺点
1、无论是否使用这个实例,类的实例都会在程序启动时创建,这在某些情况下可能会导致资源浪费,尤其是在类的实例创建较为复杂或消耗大量资源时。
代码实现:

java">public class Singleton {
    // 在类加载时就创建单例对象
    private static final Singleton instance = new Singleton();
 
    // 私有化构造器,防止外部实例化
    private Singleton() {}
 
    // 提供公共的静态方法来访问实例
    public static Singleton getInstance() {
        return instance;
    }
}

懒汉式单例(Lazy Initialization)
优点
1、实例在第一次使用时才会被创建
2、懒汉式单例的优点是只有在需要的时候才会创建实例,可以避免不必要的资源浪费
缺点
1、懒汉式单例需要处理线程安全问题,因为多线程环境下,多个线程同时调用 getInstance() 方法时,可能会创建多个实例,从而破坏单例模式的设计。

java">public class Singleton {
    private static Singleton instance;
 
    // 私有化构造器,防止外部实例化
    private Singleton() {}
 
    // 提供公共的静态方法来获取实例,第一次访问时创建实例
    public static Singleton getInstance() {
        if (instance == null) {
            instance = new Singleton();
        }
        return instance;
    }
}

懒汉式线程安全问题的解决方案
1、使用 synchronized 关键字(在需要上锁的地方直接加锁)
可以通过在 getInstance() 方法上添加 synchronized 关键字来解决线程安全问题。这样,在实例未创建时,只有一个线程能够执行实例创建的过程,其他线程会被阻塞。

java">public class Singleton {
    private static Singleton instance;
 
    private Singleton() {}
 
    public static synchronized Singleton getInstance() {
        if (instance == null) {
            instance = new Singleton();
        }
        return instance;
    }
}

这种方式虽然能解决线程安全问题,但每次获取实例时都需要进行同步操作,导致性能开销较大。
2、双重检查锁定(Double-Checked Locking)(先检查是否为空,在上锁,再进一步检查是否为空)

java">public class Singleton {
    private static volatile Singleton instance;
 
    private Singleton() {}
 
    public static Singleton getInstance() {
        if (instance == null) {
            synchronized (Singleton.class) {
            //再进一步检查是否为空,防止指令重排序,两个线程都拿到空值,进行资源篡改
                if (instance == null) {  
                    instance = new Singleton();
                }
            }
        }
        return instance;
    }
}

这种方式能有效避免每次调用时都进行同步,从而提高性能。但需要注意的是,instance 变量需要声明为 volatile,以确保线程间的可见性
在这里插入图片描述

3、使用静态内部类

java">public class Singleton {
    private Singleton() {}
 
    private static class SingletonHelper {
        private static final Singleton INSTANCE = new Singleton();
    }
 
    public static Singleton getInstance() {
        return SingletonHelper.INSTANCE;
    }
}

这种方式的优势在于:
1、只有在调用 getInstance() 时,SingletonHelper 类才会被加载,实例才会被创建。
2、类加载机制天然保证了线程安全,不需要显式的同步。
3、相较于其他懒汉式方法,它更简洁高效。

持有一把锁:把某个对象当成锁来使用,某个线程拿到对象的锁,对该对象有使用权。其他线程将被锁在外面阻塞等待,直到该线程释放对象锁,其他线程进行争夺该对象的锁
某对象持有锁的原理:给被锁定的对象做标记(在内部做标记位)

java">public static void main(){
	Object o = new Object();
	//System.out.println(ClassLayout.parseInstance(o).toPrintable());
	synchronized(o){
		System.out.println(ClassLayout.parseInstance(o).toPrintable());
	}
}

结果:
在这里插入图片描述
锁的状态
锁主要存在四种状态,依次是:无锁状态、偏向锁状态、轻量级锁状态、重量级锁状态
1、他们会随着竞争的激烈而逐渐升级。注意锁可以升级不可降级,这种策略是为了提高获得锁和释放锁的效率。
所有用户程序都是运行在用户态的, 但是有时候程序确实需要做一些内核态的事情, 例如从硬盘读取

悲观锁vs乐观锁

1、悲观锁
当认为数据被并发修改的几率比较大,需要在修改之前借助于数据库锁机制,先对数据进行加锁的思想被称为悲观锁如synchronized
1、效率方面,处理锁的操作会产生了额外的开销,而且增加了死锁的机会
2、当一个线程在处理某行数据的时候,其它线程只能等待
2、乐观锁
乐观锁思想认为,数据一般是不会造成冲突的。只有在提交数据的时候,才会对数据的冲突进行检测。
乐观锁实现不需要借助数据库的锁机制,只要就是两个步骤:冲突检测和数据更新
其中一种典型的是实现方法就是CAS(Compare and Swap),是一种无锁的原子操作
悲观锁与乐观锁的使用场景
1、乐观锁(CAS为例)适用于写比较少的情况下(多读场景)
2、即冲突真的很少发生的时候,这样可以省去了锁的开销,加大了系统的整个吞吐量。
3、悲观锁(synchronized为例)适用于写多的情况;
4、因为多写的情况,一般会经常产生冲突,这就会导致上层应用会不断的进行retry,这样反倒是降低了性能

什么是CAS

什么是CAS
在 CAS 中,有这样三个值:
V:要更新的变量(var)
E:预期值(expected)(旧值)
N:新值(new)
比较并交换的过程如下:
1、判断 V 是否等于 E,
2、如果等于,将 V 的值设置为 N
3、如果不等,说明已经有其它线程更新了 V,于是当前线程放弃更新,什么都不做。

例:如果有一个多个线程共享的变量mm原本等于 5,我现在在线程 A 中,想把它设置为新的值 6;
我们使用 CAS 来做这个事情;

1)首先我们用 i 去与 5 对比,发现它等于 5,说明没有被其它线程改过,那我就把它设置为新的值 6,此次 CAS 成功,i的值被设置成了 6;
2)如果不等于 5,说明i被其它线程改过了(比如现在i的值为 2),那么我就什么也不做,此次 CAS 失败,i的值仍然为 2。

在这个例子中,i就是 V,5 就是 E,6 就是 N。
那有没有可能我在判断了i为 5 之后,正准备更新它的新值的时候,被其它线程更改了i的值呢?

不会的。因为 CAS 是一种原子操作,它是一种系统原语,是一条 CPU 的原子指令,从 CPU 层面已经保证它的原子性

CAS原理

在 Java 中,如果一个方法是 native 的,那 Java 就不负责具体实现它,而是交给底层的 JVM 使用 C 语言 或者 C++ 去实现。
CAS原理
在 Java 中,有一个Unsafe类, 它在sun.misc包中。它里面都是一些native方法,其中就有几个是关于 CAS 的:

java">boolean compareAndSwapObject(Object o, long offset,Object expected, Object x);
boolean compareAndSwapInt(Object o, long offset,int expected,int x);
boolean compareAndSwapLong(Object o, long offset,long expected,long x);

Unsafe 对 CAS 的实现是通过 C++ 实现的,它的具体实现和操作系统、CPU 都有关系。Q完全3,’手段,但会存在ABA 问题、长时间自旋、多个共享变量的原子操作
ABA问题:一个值原来是 A,变成了 B,又变回了 A,此A已非从前的A,虽然长的一样,但是经过一系列操作,内部发生改变
解决ABA问题
1、通过一个顺序递增的version字段,设置版本号。根据版本号判断是否为之前的对象。AtomicStampedReference

java">static AtomicStampedReference<orderRef> orderRef = new  AtomicMarkableReference<>(new Order(),0);

2、不需要计数的话直接用标记true/false的marke。
AtomicMarkableReference

java">static AtomicMarkableReference<orderRef> = new  AtomicMarkableReference<>(new Order(),false);

解决长时间自旋问题
让 JVM 支持处理器提供的pause 指令
pause 指令能让自旋失败时 cpu 睡眠一小段时间再继续自旋,从而使得读操作的频率降低很多
解决多个共享变量的原子操作
当对一个共享变量执行操作时,CAS 能够保证该变量的原子性。
但是对于多个共享变量,CAS 就无法保证操作的原子性,这时通常有两种做法:
1)使用AtomicReference类保证对象之间的原子性,把多个变量放到一个对象里面进行 CAS 操作
2)使用锁。

线程间的协作机制

线程通信是一种等待唤醒机制。wait/notify机制
1,wait():表示线程一直等待,直到其他线程通知,会释放已持有的锁资源
2,wait(long timeout):指定等待的毫秒数
3,notify() 唤醒一个处于等待状态的线程
4,notifyAll()唤醒同一个对象上所有调用wait()方法的线程,优先级别高的线程优先调度

均是Object方法,都只能在同步方法或同步代码块中使用,否则会抛出异常:IIIegaIMonitorStateException

线程通信的经典模型:生产者消费者模式
生产者和消费者共享同一个资源,两者之间相互依赖,互为条件
1.对于生产者,没生产产品之前,要通知消费者等待。而生产了产品之后,又需要通知消费者消费。
2.对于消费者,在消费之后,要通知生产者已经结束消费,需要生产新的产品以供消费。
在生产者消费者问题中,仅有synchronized是不够的
synchronized可阻止并发更新同一个共享资源,实现了同步;
synchronized不能用来实现不同线程之间的消息传递

测试,生产者消费者模型–>利用缓区解决:

java">public class TestPC{
	public static void main(String[] args){
		SynContainer synContainer = new SynContainer();
		new Productor(synContainer ).start();
		new Consumer(synContainer).start();
	}
	
}
//生产者
class Productor extends Thread{
	//创建容器
	SynContainer sy;
	public Productor(SynContainer sy){
		this.sy = sy;
	}
	//生产
	@Override
	public void run(){
		for(int i = 0;i<100;i++){
			container.push(new Producer(i));
			System.out.println("生产了"+i+"个产品");
		}
	}
}
//消费者
class Consumer extends Thread{
	//创建容器
	SynContainer sy;
	public Productor(SynContainer sy){
		this.sy = sy;
	}
	//消费
	@Override
	public void run(){
		for(int i = 0;i<100;i++){
			System.out.println("消费了"+sy.pop().id+"个产品");
		}
	}

}
//产品
class Producer{
 int id;//产品编号
 public producer(int id){
		this.id = id;
}
}
//缓冲区
class SynContainer{
	//需要一个容器
	Producer[] producer = new Producer[10];
	//容器计数器
	int count = 0;
	//生产者放入产品
    public synchronized void push(Producer producer){
		//如果容器满了,就需要等待消费者消费产品
		if(count==producer.length){
		//通知消费者消费,生产者等待
			try{
				this.wait();
			}catch(InterruptedException e){
				e.prinStackTrace();
			}
		}
		//如果没有满,我们就需要丢入产品
		producer[count] = producer;
		count++;
		//可以通知消费者消费
		this.notifyAll();
	}
	//消费者消费产品
	public synchronized Producer pop(){
		//判断能否消费
		if(count==0){
			//等待生产者生产,消费者等待	
			try{
				this.wait();
			}catch(InterruptedException e){
				e.prinStackTrace();
			}
		}
		//如果可以消费
		count--;
		Producer pro = producer[count];
		//通知生产者生产
		this.notifyAll();
		return pro ;
	}
}

为什么wait和notify方法要在同步块中调用
wait()方法强制当前线程释放对象锁。这意味着在调用某对象的wait()方法之前,当前线程已经获得该对象的锁。因此,线程必须在某个对象的同步方法或同步代码块中才能调用该对象的wait()方法。
调用对象的notify()和notifyAll()方法之前,调用线程必须已经得到该对象的锁。因此,必须在某个对象的同步方法或同步代码块中才能调用该对象的notify()或notifyAll()方法。

  • 调用wait()方法的原因通常是,调用线程希望某个特殊的状态(或变量)被设置之后再继续执行
  • 调用notify()或notifyAll()方法的原因通常是,调用线程希望告诉其他等待中的线程:“特殊状态已经被设置”这个状态作为线程间通信的通道,它必须是一个可变的共享状态(或变量)。

线程池

如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。
线程池可以管理一堆线程,让线程执行完任务之后不进行销毁,而是继续去处理其它线程已经提交的任务。

使用线程池的好处

1、降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
2、提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
3、提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

线程池的构造

Java 主要是通过构建 ThreadPoolExecutor 来创建线程池的。
ThreadPoolExecutor 的构造方法:
在这里插入图片描述
corePoolSize:线程池中用来工作的核心线程数量
maximumPoolSize:最大线程数,线程池允许创建的最大线程数
keepAliveTime:超出 corePoolSize 后创建的线程存活时间或者是所有线程最大存活时间,取决于配置。
unit:keepAliveTime 的时间单位
workQueue:任务队列,是一个阻塞队列,当线程数达到核心线程数后,会将任务存储在阻塞队列中
threadFactory :线程池内部创建线程所用的工厂。
handler:拒绝策略;当队列已满并且线程数量达到最大线程数量时,会调用该方法处理任务。
线程池的构造其实很简单,就是传入一堆参数,然后进行简单的赋值操作。

线程池的运行原理

1、线程池刚创建时,里面没有一个线程。任务队列是作为参数传进来的。不过,就算队列里面有任务,线程池也不会马上执行它们。
如果想要在执行之前创建好核心线程数,可以调用 prestartAllCoreThreads 方法来实现,默认是没有线程的。
在这里插入图片描述
2、当调用 execute() 方法添加一个任务时,线程池会做如下判断:

1)如果正在运行的线程数量小于 corePoolSize,那么直接通过 ThreadFactory 创建一个线程来执行这个任务;当任务执行完之后,线程不会退出,而是会去阻塞队列中获取任务

2)如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入阻塞队列;在阻塞队列的线程获取阻塞队列的任务运行
在这里插入图片描述

3)如果这时候阻塞队列满了,而且正在运行的线程数量小于maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;
在这里插入图片描述
所以,就算阻塞队列中有任务,新创建的线程还是会优先处理这个提交的任务,而不是从队列中获取已有的任务执行从这可以看出,先提交的任务不一定先执行。

4)如果阻塞队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,此时就会执行拒绝策略,也就是构造线程池的时候,传入的 RejectedExecutionHandler 对象,来处理这个任务则线程池会抛出异常RejectExecutionException。
在这里插入图片描述
JDK 自带的 RejectedExecutionHandler 实现有 4 种

AbortPolicy:丢弃任务,抛出运行时异常
CallerRunsPolicy:由提交任务的线程来执行任务
DiscardPolicy:丢弃这个任务,但是不抛异常
DiscardOldestPolicy:从队列中剔除最先进入队列的任务,然后再次提交任务
线程池创建的时候,如果不指定拒绝策略就默认是 AbortPolicy 策略。
也可以自己实现 RejectedExecutionHandler 接口,比如将任务存在数据库或者缓存中,这样就可以从数据库或者缓存中获取被拒绝掉的任务了

3、当一个线程完成任务时,它会从队列中取下一个任务来执行。
4、当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。

线程池提交任务的两种方式execute()和submit()的区别:

1)execute只能提交Runnable类型的任务,无返回值。同步提交
2)submit既可以提交Runnable类型的任务,也可以提交Callable类型的任务,会有一个类型为Future的返回值,但当任务类型为Runnable时,返回值为null。异步提交

源码分析:

execute方法添加一个任务

java">public void execute(Runnable command) {
    // 首先检查提交的任务是否为null,是的话则抛出NullPointerException。
    if (command == null)
        throw new NullPointerException();

    // 获取线程池的当前状态(ctl是一个AtomicInteger,其中包含了线程池状态和工作线程数)
    int c = ctl.get();

    // 1. 检查当前运行的工作线程数是否少于核心线程数(corePoolSize)
    if (workerCountOf(c) < corePoolSize) {
        // 如果少于核心线程数,尝试添加一个新的工作线程来执行提交的任务
        // addWorker方法会检查线程池状态和工作线程数,并决定是否真的添加新线程
        if (addWorker(command, true))
            return;
        // 重新获取线程池的状态,因为在尝试添加线程的过程中线程池的状态可能已经发生变化
        c = ctl.get();
    }

    // 2. 尝试将任务添加到任务队列中
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 双重检查线程池的状态
        if (! isRunning(recheck) && remove(command))  // 如果线程池已经停止,从队列中移除任务
            reject(command);
        // 如果线程池正在运行,但是工作线程数为0,尝试添加一个新的工作线程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 3. 如果任务队列满了,尝试添加一个新的非核心工作线程来执行任务
    else if (!addWorker(command, false))
        // 如果无法添加新的工作线程(可能因为线程池已经停止或者达到最大线程数限制),则拒绝任务
        reject(command);
}

workerCountOf(c)<corePoolSize:判断是否小于核心线程数,是的话就通过 addWorker 方法,addWorker 用来添加线程并执行任务

workQueue.offer(command):尝试往阻塞队列中添加任务。添加失败就会再次调用 addWorker 尝试添加非核心线程来执行任务;如果还是失败了,就会调用 reject(command)来拒绝这个任务

线程池中线程实现重复利用的原理
线程在线程池内部其实被封装成了一个 Worker 对象
创建线程来执行任务的execute方法,是通过 addWorker 方法。在创建 Worker 对象的时候,会把线程和任务一起封装到 Worker 内部,然后调用 runWorker 方法来让线程执行任务
runWorker 方法

java">final void runWorker(Worker w) {
    // 获取当前工作线程
    Thread wt = Thread.currentThread();
    
    // 从 Worker 中取出第一个任务
    Runnable task = w.firstTask;
    w.firstTask = null;
    
    // 解锁 Worker(允许中断)
    w.unlock(); 
    
    boolean completedAbruptly = true;
    try {
        // 当有任务需要执行或者能够从任务队列中获取到任务时,工作线程就会持续运行
        while (task != null || (task = getTask()) != null) {
            // 锁定 Worker,确保在执行任务期间不会被其他线程干扰
            w.lock();
            
            // 如果线程池正在停止,并确保线程已经中断
            // 如果线程没有中断并且线程池已经达到停止状态,中断线程
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            
            try {
                // 在执行任务之前,可以插入一些自定义的操作
                beforeExecute(wt, task);
                
                Throwable thrown = null;
                try {
                    // 实际执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 执行任务后,可以插入一些自定义的操作
                    afterExecute(task, thrown);
                }
            } finally {
                // 清空任务,并更新完成任务的计数
                task = null;
                w.completedTasks++;
                // 解锁 Worker
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 工作线程退出的后续处理
        processWorkerExit(w, completedAbruptly);
    }
}

1、runWorker 内部使用了 while 死循环,当第一个任务执行完之后,会不断地通过 getTask 方法获取任务,只要能获取到任务,就会调用 run 方法继续执行任务,这就是线程能够复用的主要原因
2、但是如果从 getTask 获取不到方法的话,就会调用 finally 中的 processWorkerExit 方法,将线程退出。
3、每次在执行任务之前都会调用 Worker 的 lock 方法,执行完任务之后,会调用 unlock 方法,这样做的目的就可以通过 Woker 的加锁状态判断出当前线程是否正在执行任务
4、如果想知道线程是否正在执行任务,只需要调用 Woker 的 tryLock 方法,根据是否加锁成功就能判断,加锁成功说明当前线程没有加锁,也就没有执行任务了,(在调用 shutdown 方法关闭线程池的时候,就时用这种方式来判断线程有没有在执行任务,如果没有的话,会尝试打断没有执行任务的线程。)
getTask方法的实现:

java">private Runnable getTask() {
    // 标志,表示最后一个poll()操作是否超时
    boolean timedOut = false;

    // 无限循环,直到获取到任务或决定工作线程应该退出
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 如果线程池状态是SHUTDOWN或更高(如STOP)并且任务队列为空,那么工作线程应该减少并退出
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // 检查工作线程是否应当在没有任务执行时,经过keepAliveTime之后被终止
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 如果工作线程数超出最大线程数或者超出核心线程数且上一次poll()超时,并且队列为空或工作线程数大于1,
        // 则尝试减少工作线程数
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 根据timed标志,决定是无限期等待任务,还是等待keepAliveTime时间
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :  // 指定时间内等待
                workQueue.take();  // 无限期等待
            if (r != null)  // 成功获取到任务
                return r;
            // 如果poll()超时,则设置timedOut标志
            timedOut = true;
        } catch (InterruptedException retry) {
            // 如果在等待任务时线程被中断,重置timedOut标志并重新尝试获取任务
            timedOut = false;
        }
    }
}

线程池的 5 种状态

java">private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

RUNNING:线程池创建时就是这个状态,能够接收新任务,以及对已添加的任务进行处理。
SHUTDOWN:调用 shutdown 方法,线程池就会转换成 SHUTDOWN 状态,此时线程池不再接收新任务,但能继续处理已添加的任务到队列中
STOP:调用 shutdownNow 方法,线程池就会转换成 STOP 状态,不接收新任务,也不能继续处理已添加的任务到队列中任务,并且会尝试中断正在处理的任务的线程
TIDYING:SHUTDOWN 状态下,任务数为 0, 其他所有任务已终止,线程池会变为 TIDYING 状态;线程池在 SHUTDOWN 状态,任务队列为空且执行中任务为空,线程池会变为 TIDYING 状态;线程池在 STOP 状态,线程池中执行中任务为空时,线程池会变为 TIDYING 状态。
TERMINATED:线程池彻底终止。线程池在 TIDYING 状态执行完 terminated() 方法就会转变为 TERMINATED 状态。
线程池状态具体是存在 ctl 成员变量中的,ctl 中不仅存储了线程池的状态还存储了当前线程池中线程数的大小

java">private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

在这里插入图片描述

线程池的关闭

线程池提供了 shutdown 和 shutdownNow 两个方法来关闭线程池。

shutdown方法

java">/**
 * 启动一次顺序关闭,在这次关闭中,执行器不再接受新任务,但会继续处理队列中的已存在任务。
 * 当所有任务都完成后,线程池中的线程会逐渐退出。
 */
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock; // ThreadPoolExecutor的主锁
    mainLock.lock(); // 加锁以确保独占访问

    try {
        checkShutdownAccess(); // 检查是否有关闭的权限
        advanceRunState(SHUTDOWN); // 将执行器的状态更新为SHUTDOWN
        interruptIdleWorkers(); // 中断所有闲置的工作线程
        onShutdown(); // ScheduledThreadPoolExecutor中的挂钩方法,可供子类重写以进行额外操作
    } finally {
        mainLock.unlock(); // 无论try块如何退出都要释放锁
    }

    tryTerminate(); // 如果条件允许,尝试终止执行器
}

就是将线程池的状态修改为 SHUTDOWN,然后尝试打断空闲的线程
shutdownNow 方法

java">/**
 * 尝试停止所有正在执行的任务,停止处理等待的任务,
 * 并返回等待处理的任务列表。
 *
 * @return 从未开始执行的任务列表
 */
public List<Runnable> shutdownNow() {
    List<Runnable> tasks; // 用于存储未执行的任务的列表
    final ReentrantLock mainLock = this.mainLock; // ThreadPoolExecutor的主锁
    mainLock.lock(); // 加锁以确保独占访问

    try {
        checkShutdownAccess(); // 检查是否有关闭的权限
        advanceRunState(STOP); // 将执行器的状态更新为STOP
        interruptWorkers(); // 中断所有工作线程
        tasks = drainQueue(); // 清空队列并将结果放入任务列表中
    } finally {
        mainLock.unlock(); // 无论try块如何退出都要释放锁
    }

    tryTerminate(); // 如果条件允许,尝试终止执行器

    return tasks; // 返回队列中未被执行的任务列表
}

就是将线程池的状态修改为 STOP,然后尝试打断所有的线程,从阻塞队列中移除剩余的任务,这也是为什么 shutdownNow 不能执行剩余任务的原因。
线程池的监控
在项目中使用线程池的时候,一般需要对线程池进行监控,方便出问题的时候快速定位
线程池本身提供了一些方法来获取线程池的运行状态。
getCompletedTaskCount:已经执行完成的任务数量
getLargestPoolSize:线程池里曾经创建过的最大的线程数量。这个主要是用来判断线程是否满过。
getActiveCount:获取正在执行任务的线程数据
getPoolSize:获取当前线程池中线程数量的大小
线程池的使用场景
在 Java 程序中,不建议单纯继承 Thread 或者实现 Runnable 接口来创建线程,这样会导致频繁创建及销毁线程,引发资源耗尽的风险。
使用线程池是一种更合理的选择,方便管理任务,同时实现线程的重复利用。

Executors 构建线程池以及问题分析

Java 里面线程池的顶级接口是 Executor,但是严格意义上讲 Executor 并不是一个线程池,而只是一个执行线程的工具。真正的线程池接口是 ExecutorService
1)newFixedThreadPool固定线程数量的线程池:核心线程数与最大线程数相等

java">public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

2)单个线程数量的线程池

java">public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

3)接近无限大线程数量的线程池

java">public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

4)带定时调度功能的线程池

java">public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

虽然 JDK 提供了快速创建线程池的方法,但其实不推荐使用 Executors 来创建线程池,因为从上面构造线程池的代码可以看出,newFixedThreadPool 线程池由于使用了 LinkedBlockingQueue,队列的容量默认无限大,实际使用中出现任务过多时会导致内存溢出;newCachedThreadPool 线程池由于核心线程数无限大,当任务过多的时候会导致创建大量的线程,可能机器负载过高导致服务宕机。

推荐创建线程池方式—自定义创建

Executor中通过dafaultThreadFactory工厂创建线程,
自己创建线程池一般建议自定义线程工厂,构建线程的时候设置线程的名称,这样在查日志的时候就方便知道是哪个线程执行的代码。
dafaultThreadFactory源码
在这里插入图片描述
自定义类实现ThreadFactory,重写里边的方法,自己创建线程工厂
在这里插入图片描述
实际应用:
在商城订单中进行异步多线程订单请求处理,上千个用户订单发起访问,先扔到阻塞队列里,进行线程执行,同时线程返回用户信息告知用户正在处理请求。


http://www.niftyadmin.cn/n/5862596.html

相关文章

vue 判断一个属性值,如果是null或者空字符串或者是空格没有值的情况下,赋值为--

在 Vue 中&#xff0c;可以通过多种方式来判断一个属性值是否为 null、空字符串或者仅包含空格&#xff0c;如果满足这些条件则将其赋值为 --。下面分别介绍在模板和计算属性、方法中实现的具体做法。 1. 在模板中直接判断 如果只需要在模板中对属性值进行显示处理&#xff0c…

分布式光伏运维云平台:智能化运维,助力光伏电站高效运行

1光伏背景 行业背景--国家政策 发改能源〔2022〕206号文件指出&#xff1a;“在农村地区优先支持屋顶分布式光伏发电以及沼气发电等生物质能发电接入电网&#xff0c;电网企业等应当优先收购其发电量。”《国家能源局综合司关于报送整县&#xff08;市、区&#xff09;屋顶分…

无人机遥控器接口作用详解!

USB接口&#xff1a; 功能&#xff1a;USB接口是一种通用串行总线接口&#xff0c;用于连接外部设备&#xff0c;如手机、平板、电脑或充电设备。在无人机遥控器上&#xff0c;USB接口通常用于数据传输和充电。 应用&#xff1a;用户可以通过USB接口将遥控器与电脑连接&#…

rabbitMq创建队列和交换机不成功的问题(解决方案和排查问题思路)

问题背景: 1.SpringbootRabbitmq项目启动后不能自动创建交换机和队列 2.消费者和生产者是在2个不同微服务中 3.先启动生产者的模块, 启动成功, 但是交换机和队列没有创建, 然后启动消费者一直启动失败 4.生产者的微服务配置了交换机队列和绑定key的声明, 消费者直接监听了队…

Docker 部署AnythingLLM

两个指令搞定 1.下载镜像 docker pull mintplexlabs/anythingllm 2.运行容器 export STORAGE_LOCATION$HOME/anythingllm mkdir -p $STORAGE_LOCATION chmod -R 777 $STORAGE_LOCATION touch "$STORAGE_LOCATION/.env" docker run -d -p 3001:3001 \ --cap-add SY…

设计模式教程:中介者模式(Mediator Pattern)

中介者模式是一种行为型设计模式&#xff0c;它用于减少对象之间的直接依赖关系。通过引入一个中介者对象&#xff0c;所有对象的交互都通过中介者进行&#xff0c;而不是直接相互通信。这种模式的主要目的是减少对象之间的耦合&#xff0c;提升系统的灵活性和可维护性。 1. 定…

深入浅出机器学习:概念、算法与实践

目录 引言 机器学习的基本概念 什么是机器学习 机器学习的基本要素 机器学习的主要类型 监督学习&#xff08;Supervised Learning&#xff09; 无监督学习&#xff08;Unsupervised Learning&#xff09; 强化学习&#xff08;Reinforcement Learning&#xff09; 机器…

批量操作实现与优化

1、批量操作 方案设计 基本功能实现 /*** 批量添加题目和题库关联** param questionIdList 题目id列表* param questionBankId 题库id* param loginUser 登录用户*/OverrideTransactional(rollbackFor Exception.class)public void batchAddQuestionBankQuestion(List<Lon…