BlockQueue

阻塞队列(BlockingQueue)是一个支持俩个附加操作的队列.这俩个附加操作是:在队列为空时获取元素的线程会等待队列变为非空.当队列满时,粗糙农户元素的线程会等待队列可用.阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程.阻塞队列就是生产存放元素的容器,而消费者也只从容器里拿元素.

使用场景:多线程并发处理,线程池

队列FIFO先进先出,一端写入一端取出

写入如果队列满了就必须阻塞等待 如果队列是空的必须阻塞等待生产

注意:bblockingqueue不接受null值 视图添加一个null元素时会抛出异常

blockingqueue可以限定容量的超过给定容量时是无法添加的

JDK中的七个队列

  1. ArrayBlockingQueue(常用):基于数组有界阻塞队列
  2. LinkedBlockingQueue(常用):基于链表的有界阻塞队列,大小为Integer最大值
  3. PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列
  4. DelayQueue:使用优先级队列实现的延迟无界阻塞队列
  5. SynchronousQueue:一个不存储元素的阻塞队列
  6. LinkTransferQueue:一个由链表结构组成的无界阻塞队列
  7. LinkedBlockingDeque:一个由链表组成的双向阻塞队列
方法类型失败抛出异常特殊值( 有返回值)阻塞阻塞特定时间
插入addofferputoffer
移除removepolltakepoll
判断队列首elementpeek--

失败抛异常

/**
     * 抛出异常
     */
    public static void test1(){
        //指定队列大小
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
        //add添加成功返回true
        System.out.println(arrayBlockingQueue.add("1"));
        System.out.println(arrayBlockingQueue.add("2"));
        System.out.println(arrayBlockingQueue.add("3"));
        //查看队首的元素是谁 1
        System.out.println(arrayBlockingQueue.element());
        //超过队列大小 add会抛出异常  Queue full
//        System.out.println(arrayBlockingQueue.add("4"));
        //remove取出一个元素  返回取出的值   如果队列为空  remove会抛出异常
        // NoSuchElementException
        System.out.println(arrayBlockingQueue.remove());
        System.out.println(arrayBlockingQueue.remove());
        System.out.println(arrayBlockingQueue.remove());
        System.out.println(arrayBlockingQueue.remove());
    }

返回值不抛异常

public static void test1(){
        //队列的大小
        ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(2);
        //offer  添加一个元素  返回一个boolean值   成功返回true失败返回true
        System.out.println(blockingQueue.offer(1));
        System.out.println(blockingQueue.offer(2));
        System.out.println(blockingQueue.offer(3));
        System.out.println("----------------");
        //检测队首元素
        System.out.println(blockingQueue.peek());
        //poll  取出一个元素  返回一个元素    队列为空时 取出null
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.peek());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
    }

一直阻塞到有空位为止

 public static void test1(){
        ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(2);

        try {
            //put添加元素 没有返回值 满了一直阻塞
            //队列大小为二   第三个元素放不进去   阻塞两秒过后就会结束
            blockingQueue.put("1");
            blockingQueue.put("2");
            blockingQueue.put("3");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        try {
            //取出元素  空了一直阻塞  返回值取出的元素
            System.out.println(blockingQueue.take());;
            System.out.println(blockingQueue.take());
            System.out.println(blockingQueue.take());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

边界

  • 容量的大小,分为有界和无界两种。
  • 无界队列意味着里面可以容纳非常多的元素,例如 LinkedBlockingQueue 的上限是 Integer.MAX_VALUE,是非常大的一个数,可以近似认为是无限容量,因为我们几乎无法把这个容量装满。
  • 但是有的阻塞队列是有界的,例如 ArrayBlockingQueue 如果容量满了,也不会扩容,所以一旦满了就无法再往里放数据了。

应用场景

BlockingQueue是线程安全的,可以解决线程的安全问题

因为阻塞队列是线程安全的,所以生产者和消费者可以是多线程的,不会发生线程安全问题

生产者消费者直接使用线程安全的队列就可以,而不需要自己去考虑更多的现成问题,这也就意味着,考虑锁等线程安全问题的重任从开发者赚到了队列上,降低了开发的难度和工作量

队列还能起到隔离作用.生产者不需要关注消费者逻辑.实现了具体任务与执行任务类之间的解耦,提高了安全性

队列详解

PriorityBlockingQueue

priorityBlockqueue是一个无界队列没有限制,在内存允许的情况下可以无限添加元素,它又是具有优先级的队列,是通过构造函数传入的对象来判断,传入的对象必须实现comparable接口

队列的元素默认情况下元素采用自然顺序升序排列,或者根据构造队列时提供的 Comparator 进行排序,具体取决于所使用的构造方法。需要注意的是不能保证同优先级元素的顺序。PriorityBlockingQueue也是基于最小二叉堆实现,使用基于CAS实现的自旋锁来控制队列的动态扩容,保证了扩容操作不会阻塞take操作的执行。

DelayQueue

DelayQueue是一个通过PriorityBlockingQueue实现延迟获取元素的无界阻塞队列,其中添加进该队列的元素必须实现Delayed接口(指定延迟时间),而且只有在延迟期满后才能从中提取元素

应用场景

  1. 缓存系统的设计:可以用DelayQueue保存元素的有效期,使用线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了
  2. 定时任务调度,使用DelayQueue保存当天将执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,比如TimerQueue就是使用DelayQueue实现的

DelayQueue的泛型参数需要实现Delayed接口,Delayed接口继承了Comparable接口,DelayQueue内部使用非线程安全的优先队列(PriorityQueue),并使用Leader/Followers模式,最小化不必要的等待时间。DelayQueue不允许包含null元素。

leader/followers模式

  • 由若干个线程(一般组成线程池)用来处理大量的事件
  • 有一个线程作为领导者,等待事件的发生;其他的线程作为追随者,仅仅是睡眠
  • 假如有时间需要处理,领导者就会从追随者中指定一个新的领导者,自己去处理事件
  • 唤醒的追随者作为新的领导者等待事件的发生
  • 处理时间的线程处理完播后,就会成为追随者中的一员直到被唤醒成为领导者
  • 假如需要处理的事情太多,而线程数量不够(能够动态创建的另当别论),啧有的事件可能会得不到处理

所有线程会有三种身份中的一种leader和follower,以及一个干活中的状态proccesser.它的基本原则就是永远最多只有一个leader.而所有follower都在等待成为loeader.线程池启动时会自动产生一个Leader负责等待网络IO事件,当有一个事件产生时,Leader线程首先通知一个Follower线程将其提拔为新的Leader,然后自己就去干活了,去处理这个网络事件,处理完毕后加入Follower线程等待队列,等待下次成为Leader。这种方法可以增强CPU高速缓存相似性,及消除动态内存分配和线程间的数据交换。

package test5;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
 * @Author: suiyi
 * @Date: 2022/7/20 10:47
 */
class MyDelay<T> implements Delayed {


    long delayTime; // 延迟时间
    long expire; // 过期时间
    T data;

    public MyDelay(long delayTime, T t) {
        this.delayTime = delayTime;
        // 过期时间 = 当前时间 + 延迟时间
        this.expire = System.currentTimeMillis() + delayTime;
        data = t;
    }

    /**
     * 剩余时间 = 到期时间 - 当前时间
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    /**
     * 优先级规则:两个任务比较,时间短的优先执行
     */
    @Override
    public int compareTo(Delayed o) {
        long f = this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
        return (int) f;
    }


    @Override
    public String toString() {
        return "delayTime=" + delayTime +
                ", expire=" + expire +
                ", data=" + data;
    }
    static BlockingQueue<Delayed> queue = new DelayQueue();

    public static void main(String[] args) throws InterruptedException {
        queue.add(new MyDelay<String>(8000, "第一次添加任务"));
        queue.add(new MyDelay<String>(3000, "第二次添加任务"));
        queue.add(new MyDelay<String>(5000, "第三次添加任务"));

        while (!queue.isEmpty()) {
            Delayed delayed = queue.take();
            System.out.println(delayed);
        }
    }
}

DelayQueue其实采用了装饰器模式,在对PriorityQueue进行包装下增加了延时时间获取元素的功能,其主要特点归纳如下:

DelayQueue是一个无界阻塞队列,队列内部使用PriorityQueue来实现。

进入队列的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素,只有在延迟期满时才能从中提取元素;

该队列头部是延迟期满后保存时间最长的Delayed元素;

如果没有延迟未过期元素,且队列没有头部,并且poll将返回null;

当一个元素的getDelay(TimeUnit.NANOSECONDS)方法返回一个小于等于0的值时,表示该元素已过期;

无法使用poll或take移除未到期的元素,也不会将这些元素作为正常元素对待;例如:size方法返回到期和未到期元素的计数之和。

此队列不允许使用null元素。

SynchronousQueue

SynchronousQueue是BlockingQueue的一种,所以SynchronousQueue是线程安全的。SynchronousQueue和其他的BlockingQueue不同的是SynchronousQueue的capacity是0。即SynchronousQueue不存储任何元素。

也就是说SynchronousQueue的每一次insert操作,必须等待其他线性的remove操作。而每一个remove操作也必须等待其他线程的insert操作。

这种特性可以让我们想起了Exchanger。和Exchanger不同的是,使用SynchronousQueue可以在两个线程中传递同一个对象。一个线程放对象,另外一个线程取对象。

Last modification:August 8, 2023
如果觉得我的文章对你有用,请随意赞赏