深入浅出阻塞队列BlockingQueue及其典型实现ArrayBlockingQueue

深入浅出阻塞队列BlockingQueue及其典型实现ArrayBlockingQueue

在前面的三篇文章中,我们一起陆续地研究了AQS的底层原理,同时研究了AQS在不同场景下的三个应用工具类(ReentrantLock、CountDownLatch、Semaphore)的工作原理,之所以和大家一起分析它们,是因为这三个类是我们平时工作中应用最多的类了,其实在J.U.C中,还有好多直接或者间接通过AQS实现的工具类,比如读写锁ReentrantReadWriteLock、循环栅栏CyclicBarrier等。

本篇我们将一起对AQS的另一类的应用场景的实现类——阻塞队列(实现生产者/消费者模型的经典方法) 做一次深入浅出的分析。

老套路,从UML类图开始

深入浅出阻塞队列BlockingQueue及其典型实现ArrayBlockingQueue

从上面的类图结构和源码的注释分析来看,我总结如下:

  • 第一,BlockingQueue是一个接口,它继承了Queue,Collection,Iterable几大接口的属性和方法。
  • 第二,BlockingQueue继承了Queue,并且进行了进一步的扩展,从而具备以下四种特性: (1)有些方法可抛出异常 (2)有些方法可返回特殊值,如NULL或者false,取决于各个方法 (3)有些方法能够无限期的阻塞当前线程,直到操作成功 (4)有些方法的阻塞操作可以设定最大的时间限制
  • 第三,BlockingQueue不支持添加空元素,当你试图通过add,put,offer方法添加null,会报错;但是null可以被用来标注为特殊返回值,代表一些操作失败了,比如poll等操作
  • 第四,BlockingQueue可能在容量上有限制,即在给定的时间内,它都可能具有剩余容量,超过该容量就不能继续添加了,除非方法是阻塞特性的(为啥?后面会说)。当你没有设置容量大小时,一般默认大小是Integer.MAX_VALUE。
  • 第五,BlockingQueue的实现是线程安全的。所有的有关队列操作的方法,它的实现其内部大多都是使用内部锁或者并发控制以原子方式实现的(如利用ReentrantLock等工具),但是除了一些批量操作则不一定是具备原子操作属性的,毕竟是批量嘛。
  • 第六,BlockingQueue从本质上来说,是不支持类似close或者shutdown这样子的操作的,以表示不能再进行任何操作。当然,如果业务实在需要,你也可以自己自定义实现。
  • 第七,BlockingQueue和其他的并发容器或者工具类一样,一个线程的入队操作是要发生在另一个线程的访问或者删除操作之前,是符合happen-before原则滴。
  • 第八,也是压轴,BlockingQueue的实现主要用于生产者/消费者队列,且还支持集合Collection的特性(所以队列支持类似remove之类的操作,但是这种操作往往性能不会很高,我们也不经常使用)。此外,BlockingQueue可以在生产者/消费者场景中支持多个生产者和多个消费者。

BlockingQueue方法介绍

从上面总结的第一和第二点我们知道,BlockingQueue的接口方法包含四种类型的:会抛异常的、返回特殊值的、会阻塞的和阻塞有超时的;但是按照操作结果大体可分为三种(官方区分的):插入型的、删除型的、读取型的。如下面的表格总结(其实也是源码的注释内容):

深入浅出阻塞队列BlockingQueue及其典型实现ArrayBlockingQueue

下面我们就此表格说明下(我又多分了一个种类->读取删除型,这样更清晰些):

  • 常规操作 插入型: boolean add(e),向队列插入元素,成功返回true,如果队列容量不够了,则抛出IllegalStateException boolean offer(e),向队列插入元素,成功true,失败false,队列容量不够,不会抛出异常 删除型: boolean remove(o),从队列删除指定的元素o,如果队列里存在多个e,且o.equals(e),则所有e都删除,返回值boolean 读取型: E element(),读取队头元素数据,但是不删除元素,如果队列为空,则会抛出异常 E peek(),和element一样,但是队列为空,它就返回null 读取删除型: E poll(),读取并删除队头数据,如果队列为空,则返回空,否则返回头节点元素,即此方法既是取又是删
  • BlockingQueue扩展操作 (阻塞超时)(生产者/消费者实现的核心) 插入型: void put(e),阻塞插入,将元素插入队列,如果队列满了,则会阻塞线程等待,直到队列有空于的节点可供插入 boolean offer(e,timeout,unit),和上面put一样,但是等待有时间限制,到了超时的时间,就会退出 读取删除型: E take(),阻塞读取删除队头元素数据,如果队列为空,则会阻塞线程等待,直到队列有数据添加了,才会继续执行 E poll(timeout,unit),和上面take一样,但是阻塞等待也有时间的限制,超出时间就会退出

BlockingQueue实现类简介

直接上图:

深入浅出阻塞队列BlockingQueue及其典型实现ArrayBlockingQueue

从图中可以知道:

  • 继承接口:BlockingDeque, TransferQueue
  • 实现类:ArrayBlockingQueue, LinkedBlockingQueue, LinkedBlockingDeque, PriorityBlockingQueue,SynchronousQueue,LinkedTransferQueue,DelayQueue,DelayedWorkQueue(线程池的)。

ArrayBlockingQueue && LinkedBlockingQueue 介绍

ArrayBlockingQueue是一个通过数组实现的有界阻塞队列。通过FIFO先进先出的方式操作元素,即队头的元素在队列中停留的时间最长,队尾则最短。新元素以尾插入的形式入队,读取将从队头开始。

ArrayBlockingQueue可以作为一个“有界缓冲区”,是实现生产者/消费者场景的重要手段。生产者生产元素放入队列,消费者从队列读取进行消费。队列满的时候,生产者就不能继续往里面放入元素,则必须阻塞等待;同样的,当队列为空时,消费者就取不到元素而阻塞等待。ArrayBlockingQueue一旦在创建时初始化了容量大小,就不会变化了(如果随时扩容/减少怎么可以阻塞等待呢)。

同时,ArrayBlockingQueue可以支持生产消费过程的是否公平。默认情况是非公平的,当然你也可以自己设置。至于公平和非公平的优缺点这里就不叙述了,看这里。

LinkedBlockingQueue从名字就可以看出,它是一个以链表形式实现的队列,它同样是个有界阻塞队列,可以在初始化的时候指定大小,如果不指定,则默认为Integer.MAX_VALUE。相比较ArrayBlockingQueue而言,LinkedBlockingQueue的吞吐量表现更好点,除了这个其他特性几乎和ArrayBlockingQueue差不多。

BlockingDeque及其实现LinkedBlockingDeque介绍

BlockingDeque继承于BlockingQueue,是一个阻塞有界双端队列,即队列的头和尾都可以进行插入,删除,读取操作。同样的,我贴出官方的表格:(细节就不再叙述,其实都差不多,从方法名字也可以看出来是干嘛的了)

深入浅出阻塞队列BlockingQueue及其典型实现ArrayBlockingQueue

和BlockingQueue的主要区别就在于,BlockingDeque是两头都可以操作

LinkedBlockingDeque就是对BlockingDeque的具体实现,基于链表形式的实现。其具体实现的理念和LinkedBlockingQueue也差不多,也不再叙述了。

PriorityBlockingQueue介绍

PriorityBlockingQueue从名字来看,是一个跟优先级有关系的阻塞队列,准确的说,它是一个支持优先级的无界阻塞队列,其内部定义了Comparator属性,我们可以通过这个自定义元素的排列顺序而改变出队的顺序,从而实现优先级,观其构造器就可知一二:

public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.comparator = comparator;
        this.queue = new Object[initialCapacity];
}

由于是无界,所以使用时小心资源耗尽,发生OOM。

SynchronousQueue介绍

Synchronous从英文翻译来看很有意思,同步的队列。乍一看,不懂。。。。源码注释里写道:这是一个阻塞队列,它的每个插入操作必须等待另一个线程执行相应的删除操作,反之亦然。你无法查看同步队列,因为只有当你试图删除一个元素时,它才会出现;你不能插入一个元素(使用任何方法),除非另一个线程试图删除它;你不能迭代,因为没有东西可以迭代。队列的头部是第一个插入队列的线程试图添加到队列中的元素;如果没有这样的队列线程,那么就没有可以删除的元素,poll()将返回null。

感觉有点绕,简单来说,该队列无法对元素数据进行存储,无法查看队列里的情况,也无法进行元素数据的迭代,原因就是,一个线程插入完成,就会被另一个线程取走;一个线程取走了,又会再有一个线程继续插入。顾名思义,其实这个队列的数据,感觉像是同步的,插入了就会被取走,取走后就会被再次再次插入。(除非是非常非常巧合的情况下说不定会查看到队列的元素数据)

TransferQueue及其实现LinkedTransferQueue介绍

TransferQueue除了继承了BlockingQueue的方法之外,还补充了符合该类特性的方法->Transfer:转移传输的意思,具体哪些呢?如下图接口定义:

深入浅出阻塞队列BlockingQueue及其典型实现ArrayBlockingQueue

从源码贴图可知,主要多了上面几个方法:

  • boolean tryTransfer(E e)/ boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException:如果可能直接传输一个元素给正在等待的消费者,如果没有消费者,则返回false退出。
  • void transfer(E e):和上面一样,传输一个元素给一个正在等待的消费者。和上面的try不同的是,try是如果没有正在等待的消费者,就直接返回退出,而此方法则是等待,直到有消费者将之取走。
  • boolean hasWaitingConsumer():判断是否有等待的消费者,这个方法在不同的时刻调用,返回结果是不一样的,可能前1ms还是true,后1ms为false了。
  • int getWaitingConsumerCount():用来统计当前正在等待的消费者的数量,和上面的方法一样,也只是一个瞬时的数据,因为高并发时刻在变化。而且这个方法的返回速度实际上可能要比hasWaitingConsumer()慢得多(毕竟要遍历嘛)。此方法一般用来调试监控数据的。

LinkedTransferQueue就是TransferQueue的具体实现,是基于链表形式的无界传输阻塞队列,符合FIFO特性,除了以上介绍的transfer特殊属性外,其他特性和其他实现类差不多。

DelayQueue介绍

DelayQueue:一个用来存放延迟元素的无界阻塞队列,这个队列不知道大家有没有使用过,它是实现延迟队列的重要手段之一。 其里面维护的元素数据是一个实现Delayed接口的自定义实现类的对象,这个类会实现一个方法【long getDelay(TimeUnit unit)】,如果该方法返回一个小于或等于0的值,就表示里面的元素已经到达设定的延迟时间,这时就会把元素放入到队列当中。具体原理我会在后面专门写一篇文章来阐述。

BlockingQueue介绍完毕

阻塞队列BlockingQueue及其所有实现类的简单介绍到此结束,下面我们将从上面的具体实现类选取ArrayBlockingQueue作为本篇的另一个主角,来看看具体的阻塞队列是如何实现的。

(PS:很重要的一点,注意每个队列的名字,有界和无界,使用无界队列的时候一定要小心OOM)

ArrayBlockingQueue源码分析

简单说明工作原理

从上面分析阻塞队列以及ArrayBlockingQueue的简单介绍来看,ArrayBlockingQueue最重要的功能就是,能够在操作元素(插入和读取删除)的时候阻塞。其工作原理就是,当ArrayBlockingQueue队列为空时,会阻塞消费者读取数据,直到队列有数据时唤醒消费者继续;当队列满的时候,会阻塞生产者进行插入数据,直到队列有剩余空间时,会唤醒生产者继续插入。

重要属性和构造器

在之前的篇章中,我们就已经知道,wait/notify是实现生产者/消费者的重要手段之一,是通过线程间的通信来完成的,那在我们J.U.C包里,会不会是通过Condition的等待队列机制实现的呢?我们一起来看下,上源码:

     /** The queued items */
    final Object[] items;

    /** items index for next take, poll, peek or remove */
    int takeIndex;

    /** items index for next put, offer, or add */
    int putIndex;

    /** Number of elements in the queue */
    int count;

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;

从这段代码其实我们就已经知道了上面问题的答案,确实是通过Condition的等待队列机制实现的。

  • 首先,ArrayBlockingQueue之所以叫Array,其内部其实维护了一个数组,items;
  • 其次,两个数组下标属性,存放下一个读取和插入的位置(注意:他们的默认值为0):takeIndex和putIndex;还有一个用来表示队列中元素个数的count;
  • 最后,一个主要用来并发控制的锁ReentrantLock;以及两个条件等待队列:notEmpty(用来阻塞消费者)和notFull(用来阻塞生产者)。

我们再来看看构造器,一共重载实现了三个构造器,如下:

    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        this(capacity, fair);

        final ReentrantLock lock = this.lock;
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
            int i = 0;
            try {
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }

从构造器可以看出,ArrayBlockingQueue公平/非公平都支持,默认使用非公平模式。初始化的时候,就已经将消费/生产两大条件等待队列初始化完成了

生产消费主要方法

生产方法put

 public void put(E e) throws InterruptedException {
        checkNotNull(e);
        /**
        为了保证操作的原子性,上来先上锁
        */
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            /**
            当前队列元素实际个数count如果等于数组定义的大小的话,则说明队列里已经满了
            则会调用生产者等待队列的阻塞await方法,对当前消费者线程进行阻塞
            注意:这边是个while循环,跳出循环的唯一条件就是,count!=items.length,什么情况才会
            不相等呢?当然是队列中的元素被读取删除的时候
            */
            while (count == items.length)
                notFull.await();
            /**
            如果不满足上面的条件或者生产者被唤醒从阻塞状态回归时,将调用enqueue(e)方法进行入队操作。
            */
            enqueue(e);
        } finally {
            lock.unlock();
        }
}

private void enqueue(E x) {
    /**
    这部分的代码就很简单了
    首先,就是将当前需要入队的元素,放入putIndex下标对应的坑中
    其次,将putIndex执行+1操作,如果+1后等于数组的长度,说明下标走到尽头了,得重新开始,于是执行=0操作
    最后,以上操作都成功的话,将元素个数count执行+1;并且调用消费者等待队列,唤醒一个阻塞的消费者进行
    消费
    */
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
}

流程比较简单,看代码注释即可明白啦~~

消费方法take

public E take() throws InterruptedException {
        /**
        同样的,进入方法先上锁,保证原子性操作,其实也不仅仅是为了保证原子性
        我们在之前讲过Condition的原理的时候说过,调用其方法前必须先lock
        */
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            /**
            几乎和上面一样,首先是个while循环,条件是当前元素的个数是否为0
            如果为0,说明队列里没数据可以读取消费,则执行notEmpty.await()对消费者阻塞
            如果不为0或者从阻塞等待队列中被唤醒,则执行出队操作dequeue()
            */
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
}

private E dequeue() {
    /**
    首先,取出当前takeIndex下标对应的元素,并且置为空
    其次,takeIndex执行+1操作,并判断是否等于数组大小,判断和上面一样哈~
    最后,count进行-1操作,因为出去了一个元素;并且调用生产者继续放入元素
    */
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
}

同样的,代码注释里,已经分析清楚了~~至于其他的一些操作方法小伙伴们可以自己去看哈~也比较简单。

ArrayBlockingQueue简单总结

到此,我们想要了解的都理清楚了,简单总结下:

  • 首先,ArrayBlockingQueue内部维护了一个一维数组,两个下标(删/插)都是从0开始,加到数组大小之后又从零开始,这样保证了先进先出的特性。(小伙伴们稍微动动笔画个图就知道了)
  • 其次,ArrayBlockingQueue准备了一把锁,用来保证各个操作的原子性,同时这把锁既可以公平又可以非公平
  • 最后,ArrayBlockingQueue准备了两个阻塞等待队列,一个给生产者,一个给消费者,当ArrayBlockingQueue放不下了,就会阻塞生产者,直到消费者消费后;当ArrayBlockingQueue没有数据了,就会阻止消费者消费,直到生产者插入数据。

有了之前AQS的基础,理解阻塞队列ArrayBlockingQueue的原理是不是轻松加愉快呢~当然我们后面也会选一些其他的实现类来分析的,有的可能就比ArrayBlockingQueue复杂多了。

简单用法案例

下面我们写个简单的示例,来加深下对阻塞队列ArrayBlockingQueue的理解。

场景描述:一个队列大小为5的阻塞队列,多个生产者往队列放入随机号,一个消费者进行读取消费。

public class ArrayBlockingQueueDemo {
	public static void main(String[] args) {
		ArrayBlockingQueue queue = new ArrayBlockingQueue<>(5, true);
                //先把消费者搞起来
		Thread consumer = new Thread(new Runnable() {
                    @Override
                    public void run() {
			try {
				for (;;) {
					String value = queue.take();
					System.out.println("消费者取出了值" + value);
				}
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
                }, "consumer");
		consumer.start();
                //循环启动10个生产者
		for (int i = 0; i < 10; i++) {
                    Thread producer = new Thread(new Runnable() {
			@Override
			public void run() {
                            try {
				String value = UUID.randomUUID().toString();
				System.out.println(Thread.currentThread().getName() + "放入值:" + value);
					queue.put(value);
                            } catch (InterruptedException e) {
				e.printStackTrace();
                            }
			}
                    }, "producer" + i);
                    producer.start();
		}
	}
}


/*运行效果
producer1放入值:e06ee09f-46c0-4c33-92b3-53b4300fc1af
producer5放入值:03ed5819-6b51-4ef9-bb93-5f048ead0a1e
producer4放入值:9988b80b-7218-4b95-bbe0-e6d8ec00ba38
消费者取出了值e06ee09f-46c0-4c33-92b3-53b4300fc1af
producer3放入值:01fc2d1c-6fc5-4968-b5ef-dde07639bde4
producer8放入值:410903e5-4e4d-4310-a84d-b87f94f4a797
producer0放入值:3323dda4-baf4-4b27-b574-95cda9ce0b14
producer7放入值:13fc6650-69ff-433e-bdd3-4e726d41abdf
producer2放入值:e398813a-8b50-4221-8a13-845ae91a453a
producer6放入值:9696211c-f2bf-466a-bebc-b55de4608108
producer9放入值:f106cffa-351e-4d4e-ad1a-6986f7136c3a
消费者取出了值03ed5819-6b51-4ef9-bb93-5f048ead0a1e
消费者取出了值9988b80b-7218-4b95-bbe0-e6d8ec00ba38
消费者取出了值01fc2d1c-6fc5-4968-b5ef-dde07639bde4
消费者取出了值410903e5-4e4d-4310-a84d-b87f94f4a797
消费者取出了值3323dda4-baf4-4b27-b574-95cda9ce0b14
消费者取出了值13fc6650-69ff-433e-bdd3-4e726d41abdf
消费者取出了值e398813a-8b50-4221-8a13-845ae91a453a
消费者取出了值9696211c-f2bf-466a-bebc-b55de4608108
消费者取出了值f106cffa-351e-4d4e-ad1a-6986f7136c3a

当然效果每次运行都会不一样的,你可能会有疑问,不是生产一个消费一个吗?
你错了,我们模拟的是多个生产者,只有一个消费者,当消费者在消费打印的时候,说不定已经生产好几个了。
请注意,这是一个高并发场景。
*/

由此我们看出,只有生产者生产放入数据后,消费者才会进行读取消费

到此,本篇到此结束啦~你学会了吗?~~


作者:会飞的鱼2022
链接:https://juejin.cn/post/7117889476709318669

发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章