BlockQueue
阻塞队列(BlockingQueue)是一个支持俩个附加操作的队列.这俩个附加操作是:在队列为空时获取元素的线程会等待队列变为非空.当队列满时,粗糙农户元素的线程会等待队列可用.阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程.阻塞队列就是生产存放元素的容器,而消费者也只从容器里拿元素.
使用场景:多线程并发处理,线程池
队列FIFO先进先出,一端写入一端取出
写入如果队列满了就必须阻塞等待 如果队列是空的必须阻塞等待生产
注意:bblockingqueue不接受null值 视图添加一个null元素时会抛出异常
blockingqueue可以限定容量的超过给定容量时是无法添加的
JDK中的七个队列
- ArrayBlockingQueue(常用):基于数组有界阻塞队列
- LinkedBlockingQueue(常用):基于链表的有界阻塞队列,大小为Integer最大值
- PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列
- DelayQueue:使用优先级队列实现的延迟无界阻塞队列
- SynchronousQueue:一个不存储元素的阻塞队列
- LinkTransferQueue:一个由链表结构组成的无界阻塞队列
- LinkedBlockingDeque:一个由链表组成的双向阻塞队列
方法类型 | 失败抛出异常 | 特殊值( 有返回值) | 阻塞 | 阻塞特定时间 |
---|---|---|---|---|
插入 | add | offer | put | offer |
移除 | remove | poll | take | poll |
判断队列首 | element | peek | - | - |
失败抛异常
/**
* 抛出异常
*/
public static void test1(){
//指定队列大小
ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
//add添加成功返回true
System.out.println(arrayBlockingQueue.add("1"));
System.out.println(arrayBlockingQueue.add("2"));
System.out.println(arrayBlockingQueue.add("3"));
//查看队首的元素是谁 1
System.out.println(arrayBlockingQueue.element());
//超过队列大小 add会抛出异常 Queue full
// System.out.println(arrayBlockingQueue.add("4"));
//remove取出一个元素 返回取出的值 如果队列为空 remove会抛出异常
// NoSuchElementException
System.out.println(arrayBlockingQueue.remove());
System.out.println(arrayBlockingQueue.remove());
System.out.println(arrayBlockingQueue.remove());
System.out.println(arrayBlockingQueue.remove());
}
返回值不抛异常
public static void test1(){
//队列的大小
ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(2);
//offer 添加一个元素 返回一个boolean值 成功返回true失败返回true
System.out.println(blockingQueue.offer(1));
System.out.println(blockingQueue.offer(2));
System.out.println(blockingQueue.offer(3));
System.out.println("----------------");
//检测队首元素
System.out.println(blockingQueue.peek());
//poll 取出一个元素 返回一个元素 队列为空时 取出null
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.peek());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
}
一直阻塞到有空位为止
public static void test1(){
ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(2);
try {
//put添加元素 没有返回值 满了一直阻塞
//队列大小为二 第三个元素放不进去 阻塞两秒过后就会结束
blockingQueue.put("1");
blockingQueue.put("2");
blockingQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
//取出元素 空了一直阻塞 返回值取出的元素
System.out.println(blockingQueue.take());;
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
边界
- 容量的大小,分为有界和无界两种。
- 无界队列意味着里面可以容纳非常多的元素,例如 LinkedBlockingQueue 的上限是 Integer.MAX_VALUE,是非常大的一个数,可以近似认为是无限容量,因为我们几乎无法把这个容量装满。
- 但是有的阻塞队列是有界的,例如 ArrayBlockingQueue 如果容量满了,也不会扩容,所以一旦满了就无法再往里放数据了。
应用场景
BlockingQueue是线程安全的,可以解决线程的安全问题
因为阻塞队列是线程安全的,所以生产者和消费者可以是多线程的,不会发生线程安全问题
生产者消费者直接使用线程安全的队列就可以,而不需要自己去考虑更多的现成问题,这也就意味着,考虑锁等线程安全问题的重任从开发者赚到了队列上,降低了开发的难度和工作量
队列还能起到隔离作用.生产者不需要关注消费者逻辑.实现了具体任务与执行任务类之间的解耦,提高了安全性
队列详解
PriorityBlockingQueue
priorityBlockqueue是一个无界队列没有限制,在内存允许的情况下可以无限添加元素,它又是具有优先级的队列,是通过构造函数传入的对象来判断,传入的对象必须实现comparable接口
队列的元素默认情况下元素采用自然顺序升序排列,或者根据构造队列时提供的 Comparator 进行排序,具体取决于所使用的构造方法。需要注意的是不能保证同优先级元素的顺序。PriorityBlockingQueue也是基于最小二叉堆实现,使用基于CAS实现的自旋锁来控制队列的动态扩容,保证了扩容操作不会阻塞take操作的执行。
DelayQueue
DelayQueue是一个通过PriorityBlockingQueue实现延迟获取元素的无界阻塞队列,其中添加进该队列的元素必须实现Delayed接口(指定延迟时间),而且只有在延迟期满后才能从中提取元素
应用场景
- 缓存系统的设计:可以用DelayQueue保存元素的有效期,使用线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了
- 定时任务调度,使用DelayQueue保存当天将执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,比如TimerQueue就是使用DelayQueue实现的
DelayQueue的泛型参数需要实现Delayed接口,Delayed接口继承了Comparable接口,DelayQueue内部使用非线程安全的优先队列(PriorityQueue),并使用Leader/Followers模式,最小化不必要的等待时间。DelayQueue不允许包含null元素。
leader/followers模式
- 由若干个线程(一般组成线程池)用来处理大量的事件
- 有一个线程作为领导者,等待事件的发生;其他的线程作为追随者,仅仅是睡眠
- 假如有时间需要处理,领导者就会从追随者中指定一个新的领导者,自己去处理事件
- 唤醒的追随者作为新的领导者等待事件的发生
- 处理时间的线程处理完播后,就会成为追随者中的一员直到被唤醒成为领导者
- 假如需要处理的事情太多,而线程数量不够(能够动态创建的另当别论),啧有的事件可能会得不到处理
所有线程会有三种身份中的一种leader和follower,以及一个干活中的状态proccesser.它的基本原则就是永远最多只有一个leader.而所有follower都在等待成为loeader.线程池启动时会自动产生一个Leader负责等待网络IO事件,当有一个事件产生时,Leader线程首先通知一个Follower线程将其提拔为新的Leader,然后自己就去干活了,去处理这个网络事件,处理完毕后加入Follower线程等待队列,等待下次成为Leader。这种方法可以增强CPU高速缓存相似性,及消除动态内存分配和线程间的数据交换。
package test5;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* @Author: suiyi
* @Date: 2022/7/20 10:47
*/
class MyDelay<T> implements Delayed {
long delayTime; // 延迟时间
long expire; // 过期时间
T data;
public MyDelay(long delayTime, T t) {
this.delayTime = delayTime;
// 过期时间 = 当前时间 + 延迟时间
this.expire = System.currentTimeMillis() + delayTime;
data = t;
}
/**
* 剩余时间 = 到期时间 - 当前时间
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
/**
* 优先级规则:两个任务比较,时间短的优先执行
*/
@Override
public int compareTo(Delayed o) {
long f = this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
return (int) f;
}
@Override
public String toString() {
return "delayTime=" + delayTime +
", expire=" + expire +
", data=" + data;
}
static BlockingQueue<Delayed> queue = new DelayQueue();
public static void main(String[] args) throws InterruptedException {
queue.add(new MyDelay<String>(8000, "第一次添加任务"));
queue.add(new MyDelay<String>(3000, "第二次添加任务"));
queue.add(new MyDelay<String>(5000, "第三次添加任务"));
while (!queue.isEmpty()) {
Delayed delayed = queue.take();
System.out.println(delayed);
}
}
}
DelayQueue其实采用了装饰器模式,在对PriorityQueue进行包装下增加了延时时间获取元素的功能,其主要特点归纳如下:
DelayQueue是一个无界阻塞队列,队列内部使用PriorityQueue来实现。
进入队列的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素,只有在延迟期满时才能从中提取元素;
该队列头部是延迟期满后保存时间最长的Delayed元素;
如果没有延迟未过期元素,且队列没有头部,并且poll将返回null;
当一个元素的getDelay(TimeUnit.NANOSECONDS)方法返回一个小于等于0的值时,表示该元素已过期;
无法使用poll或take移除未到期的元素,也不会将这些元素作为正常元素对待;例如:size方法返回到期和未到期元素的计数之和。
此队列不允许使用null元素。
SynchronousQueue
SynchronousQueue是BlockingQueue的一种,所以SynchronousQueue是线程安全的。SynchronousQueue和其他的BlockingQueue不同的是SynchronousQueue的capacity是0。即SynchronousQueue不存储任何元素。
也就是说SynchronousQueue的每一次insert操作,必须等待其他线性的remove操作。而每一个remove操作也必须等待其他线程的insert操作。
这种特性可以让我们想起了Exchanger。和Exchanger不同的是,使用SynchronousQueue可以在两个线程中传递同一个对象。一个线程放对象,另外一个线程取对象。