【动图图解队列】LinkedBlockingQueue篇

前文回顾:
【动图图解队列】ArrayBlockingQueue篇


面试官:好了,聊完了ArrayBlockingQueue,我们接着说说LinkedBlockingQueue

Hydra:还真是不给人喘口气的机会,LinkedBlockingQueue是一个基于链表的阻塞队列,内部是由节点Node构成,每个被加入队列的元素都会被封装成下面的Node节点,并且节点中有指向下一个元素的指针:

static class Node<E> {
    E item;
    Node<E> next;
    Node(E x) { item = x; }
}

LinkedBlockingQueue中的关键属性有下面这些:

private final int capacity;//队列容量
private final AtomicInteger count = new AtomicInteger();//队列中元素数量
transient Node<E> head;//头节点
private transient Node<E> last;//尾节点
//出队锁
private final ReentrantLock takeLock = new ReentrantLock();
//出队的等待条件对象
private final Condition notEmpty = takeLock.newCondition();
//入队锁
private final ReentrantLock putLock = new ReentrantLock();
//入队的等待条件对象
private final Condition notFull = putLock.newCondition();

构造函数分为指定队列长度和不指定队列长度两种,不指定时队列最大长度是int的最大值。当然了,你要是真存这么多的元素,很有可能会引起内存溢出:

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}    

还有另一种在初始化时就可以将集合作为参数传入的构造方法,实现非常好理解,只是循环调用了后面会讲到的enqueue入队方法,这里暂且略过。

LinkedBlockingQueue中,队列的头节点head是不存元素的,它的itemnullnext指向的元素才是真正的第一个元素,它也有两个用于阻塞等待的Condition条件对象。与之前的ArrayBlockingQueue不同,这里出队和入队使用了不同的锁takeLockputLock。队列的结构是这样的:

面试官:为什么要使用两把锁,之前ArrayBlockingQueue使用一把锁,不是也可以保证线程的安全么?

Hydra:使用两把锁,可以保证元素的插入和删除并不互斥,从而能够同时进行,达到提高吞吐量的的效果

面试官:嗯,那还是老规矩,先说插入方法是怎么实现的吧

Hydra:这次就不提父类AbstractQueueadd方法了,反正它调用的也是子类的插入方法offer,我们就直接来看offer方法的源码:

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;//队列中元素个数
    if (count.get() == capacity)//已满
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        //并发情况,再次判断队列是否已满
        if (count.get() < capacity) {
            enqueue(node);
            //注意这里获取的是未添加元素前的对列长度
            c = count.getAndIncrement();
            if (c + 1 < capacity)//未满
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}

offer方法中,首先判断队列是否已满,未满情况下将元素封装成Node对象,尝试获取插入锁,在获取锁后会再进行一次队列已满判断,如果已满则直接释放锁。在持有锁且队列未满的情况下,调用enqueue入队方法。

enqueue方法的实现也非常的简单,将当前尾节点的next指针指向新节点,再把last指向新节点:

private void enqueue(Node<E> node) {
    last = last.next = node;
}

画一张图,方便你理解:

在完成入队后,判断队列是否已满,如果未满则调用notFull.signal(),唤醒等待将元素插入队列的线程。

面试官:我记得在ArrayBlockingQueue里插入元素后,是调用的notEmpty.signal(),怎么这里还不一样了?

Hydra:说到这,就不得不再提一下使用两把锁来分别控制插入和获取元素的好处了。在ArrayBlockingQueue中,使用了同一把锁对入队和出队进行控制,那么如果在插入元素后再唤醒插入线程,那么很有可能等待获取元素的线程就一直得不到唤醒,造成等待时间过长。

而在LinkedBlockingQueue中,分别使用了入队锁putLock和出队锁takeLock,插入线程和获取线程是不会互斥的。所以插入线程可以在这里不断的唤醒其他的插入线程,而无需担心是否会使获取线程等待时间过长,从而在一定程度上提高了吞吐量。当然了,因为offer方法是非阻塞的,并不会挂起阻塞线程,所以这里唤醒的是阻塞插入的put方法的线程。

面试官:那接着往下看,为什么要在c等于0的情况下才去唤醒notEmpty中的等待获取元素的线程?

Hydra:其实获取元素的方法和上面插入元素的方法是一个模式的,只要有一个获取线程在执行方法,那么就会不断的通过notEmpty.signal()唤醒其他的获取线程。只有当c等于0时,才证明之前队列中已经没有元素,这时候获取线程才可能会被阻塞,在这个时候才需要被唤醒。上面的这些可以用一张图来说明:

[offer.gif]

由于我们之前说过,队列中的head节点可以认为是不存储数据的标志性节点,所以可以简单的认为出队时直接取出第二个节点,当然这个过程不是非常的严谨,我会在后面讲解出队的过程中再进行补充说明。

面试官:那么阻塞方法put和它有什么区别?

Hydra:putoffer方法整体思路一致,不同的是加锁是使用的是可被中断的方式,并且当队列中元素已满时,将线程加入notFull等待队列中进行等待,代码中体现在:

while (count.get() == capacity) {
    notFull.await();
}

这个过程体现在上面那张图的notFull等待队列中的元素上,就不重复说明了。另外,和put方法比较类似的,还有一个携带等待时间参数的offer方法,可以进行有限时间内的阻塞添加,当超时后放弃插入元素,我们只看和offer方法不同部分的代码:

public boolean offer(E e, long timeout, TimeUnit unit){
    ...
    long nanos = unit.toNanos(timeout);//转换为纳秒
    ...
    while (count.get() == capacity) {
        if (nanos <= 0)
            return false;
        nanos = notFull.awaitNanos(nanos);
    }
    enqueue(new Node<E>(e));    
    ...
}

awaitNanos方法在await方法的基础上,增加了超时跳出的机制,会在循环中计算是否到达预设的超时时间。如果在到达超时时间前被唤醒,那么会返回超时时间减去已经消耗的时间。无论是被其他线程唤醒返回,还是到达指定的超时时间返回,只要方法返回值小于等于0,那么就认为它已经超时,最终直接返回false结束。

[pic]

面试官:费这么大顿功夫才把插入讲明白,我先喝口水,你接着说获取元素方法

Hydra:……那先看非阻塞的poll方法

public E poll() {
    final AtomicInteger count = this.count;
    if (count.get() == 0)//队列为空
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        if (count.get() > 0) {//队列非空
            x = dequeue();
            //出队前队列长队
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        }
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

出队的逻辑和入队的非常相似,当队列非空时就执行dequeue进行出队操作,完成出队后如果队列仍然非空,那么唤醒等待队列中挂起的获取元素的线程。并且当出队前的元素数量等于队列长度时,在出队后唤醒等待队列上的添加线程。

出队方法dequeue的源码如下:

private E dequeue() {
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

之前提到过,头节点head并不存储数据,它的下一个节点才是真正意义上的第一个节点。在出队操作中,先得到头节点的下一个节点first节点,将当前头节点的next指针指向自己,代码中有一个简单的注释是help gc,个人理解这里是为了降低gc中的引用计数,方便它更早被回收。之后再将新的头节点指向first,并返回清空为null前的内容。使用图来表示是这样的:

面试官:(看看手表)take方法的整体逻辑也差不多,能简单概括一下吗

Hydra:阻塞方法take方法和poll的思路基本一致,是一个可以被中断的阻塞获取方法,在队列为空时,会挂起当前线程,将它添加到条件对象notEmpty的等待队列中,等待其他线程唤醒。

面试官:再给你一句话的时间,总结一下它和ArrayBlockingQueue的异同,我要下班回家了

Hydra:好吧,我总结一下,有下面几点:

  • 队列长度不同,ArrayBlockingQueue创建时需指定长度并且不可修改,而LinkedBlockingQueue可以指定也可以不指定长度
  • 存储方式不同,ArrayBlockingQueue使用数组,而LinkedBlockingQueue使用Node节点的链表
  • ArrayBlockingQueue使用一把锁来控制元素的插入和移除,而LinkedBlockingQueue将入队锁和出队锁分离,提高了并发性能
  • ArrayBlockingQueue采用数组存储元素,因此在插入和移除过程中不需要生成额外对象,LinkedBlockingQueue会生成新的Node节点,对gc会有影响

面试官:明天上午9点,老地方,我们再聊聊别的

Hydra:……

未完待续

作者Hydra,一个参与过风云卫星项目的后端程序员,现任世界500强企业后端技术Leader,资深后端开发。
只想与你有趣、深入、直接地聊聊技术。
个人公众号『码农参上』,一个热爱分享的公众号。欢迎小伙伴们关注,第一时间获取推送。

#面试##学习路径##面试题目#
全部评论
未完待续...什么时候更新下一个啊!
点赞 回复
分享
发布于 2022-01-12 16:00

相关推荐

点赞 1 评论
分享
牛客网
牛客企业服务