不寻常的同步队列 SynchronousQueue

在这里插入图片描述

从不浪费时间的人,没有工夫抱怨时间不够。

——杰弗

Q:简单介绍下SynchronousQueue?

  • 同样是阻塞队列的家族成员。
    在这里插入图片描述

每个插入操作必须等待另一个线程进行相应删除操作的阻塞队列,反之亦然。
就如同下图中的场景:
在这里插入图片描述
SynchronousQueue 内部没有任何一个容量。

  • 无法执行 peek,因为仅当尝试删除元素时,元素才存在
  • 无法执行插入元素(使用任何方法),除非另一个线程试图将其删除
  • 无法执行迭代,因为没有东西能给你迭代

队首是第一个排队的插入线程试图添加到队列中的元素;如果没有这样的排队线程,则没有元素可用于删除,并且poll() 方法将返回null。 对于其他Collection接口的实现方法(例如contains),SynchronousQueue就是个空容器。 此队列不支持null元素。

SynchronousQueue 类似于CSP和Ada中使用的集合通道。它们非常适合切换设计,在该设计中,在一个线程中运行的对象必须与在另一个线程中运行的对象同步,以便向其传递一些信息,事件或任务。

此类支持可选的公平性策略,用于排序正在等待的生产者和消费者线程。 默认情况下,不保证此排序。 但是,将公平性设置为true构造的队列将授予线程FIFO顺序的访问权限。

此类及其迭代器实现Collection和Iterator接口的所有可选方法。此类也是Java集合框架一员。产自 JDK1.5。

Q:讲讲它底层的数据结构?

> 这种必问面试题,我自然是准备了~

众所周知,SynchronousQueue 支持公平和非公平两种策略。为了支持它们,底层其实有两种数据结构 - 队列与栈,都是通过链表来实现的。

  • 让我们通过 UML 图总览全局,可极大方便地观察包含哪些内部类数据结构

    Transferer

    > 栈和队列共享的内部API。

抽象类 Transferer 为 TransferStack栈和TransferQueue队列的公共类。

  • 其定义了转移数据的公共操作transfer,用来统一put和take。在这里插入图片描述

参数:

  • E
    若非空,该条目将被消费者处理
    若为空,请求转让收益由生产者提供的条目
  • timed
    如果此操作应超时
  • nanos
    超时时间,以纳秒
  • 返回值:
    如果非空,提供或接收的条目; 如果为null,该操作失败,原因是超时或中断 - 调用者可以区分其中发生这些通过检查Thread.interrupted

由TransferStack和TransferQueue具体实现

TransferQueue

  • 队首和队尾
    在这里插入图片描述
    TransferQueue队列中的节点类
static final class QNode {
     // 队列中的下一个元素
     volatile QNode next;          
     // 当前元素的值,如果当前元素被阻塞住了,等其他线程来唤醒自己时,其他线程会把自己 set 到 item 里面CAS'ed to or from null
     volatile Object item;         
     volatile Thread waiter;       // 控制 park/unpark
     // 是否有数据,队列元素的类型标识,入队时有数据值为true,出队时无数据值为false
     final boolean isData;
     ...
}     

TransferStack

/* Modes for SNodes, ORed together in node fields */
/** 表示消费者 */
static final int REQUEST    = 0;
/** 表示生产者 */
static final int DATA       = 1;
/**  处于真正的匹配状态 */
static final int FULFILLING = 2;

TransferStack 栈中的节点

static final class SNode {
    // 栈顶的下一个节点
    volatile SNode next;
    // 匹配阻塞的栈元素能否被唤醒 
    volatile SNode match;    
    // to control park/unpark   
    volatile Thread waiter;  
    // 数据; 对于 REQUESTs为null
    Object item;               
    int mode;
    // Note: item and mode fields 不需要被 volatile修饰,因为他们永远是被先写,再被读,而非 volatile/原子 操作。
  • 栈顶,初始为 null
    在这里插入图片描述

    Q:那你说说如何操作元素的吧?

    首先注意到该属性

    Transferer

  • 只在构造方法中设置。
  • 此处可以看出源码中默认采用非公平策略。即使用栈结构。在这里插入图片描述

在没有进一步复杂的序列化的情况下不能将其声明为final。 由于每个public方法最多只能访问一次,因此在这里使用volatile而不是final不会明显降低性能。
在这里插入图片描述

让我们看看

put 方法

在这里插入图片描述

take 方法在这里插入图片描述

所以其真正实现都是在各个数据结构中具体的 transfer 方法中。那么我们就直接看

transfer(队列)

  • 先看队列结构中的实现
    在这里插入图片描述

基本算法是循环尝试执行以下两个操作之一:

  1. 如果队列明显为空或持有相同模式的节点,请尝试将节点添加到waiter队列中,等待被实现(或取消)并返回匹配项
  2. 如果队列显然包含等待项,并且此调用是互补模式,请尝试通过对等待节点的CAS'ing item字段进行排队并将其出队,然后返回匹配项来实现

在每种情况下,一路检查并尝试帮助其他停滞/缓慢的线程推进头和尾。

循环以空检查开始,以避免看到未初始化的头或尾值。 这在当前的SynchronousQueue中永远不会发生,但是如果调用者持有对传输者volatile/final引用,则可能会发生这种情况。 无论如何,这里的检查是因为将空检查放在循环的顶部,通常比隐式散布检查要快。

E transfer(E e, boolean timed, long nanos) {
    QNode s = null; // constructed/reused as needed
    // 判断e是否为数据.e为空时,出队操作false,入队操作true
    boolean isData = (e != null);
    // 死循环
    for (;;) {
        // 队尾
        QNode t = tail;
        // 队首
        QNode h = head;
        // 队首/尾为空,未初始化
        if (t == null || h == null)         // 看见了未初始化值
            continue;                       // 自旋
        // 1. h==t:队列为空
        // 2. 上次和本次是同样操作,即都是入队或出队. 只判断尾部是因为不可能存在头节点和尾结点不一样的情况
        if (h == t || t.isData == isData) { // 队列空或者模式相同
            QNode tn = t.next;
            // 判断队尾是否已被其它线程修改
            if (t != tail)                  // inconsistent read
                // 是,则重新循环
                continue;
            // 判断tn是否非空
            if (tn != null) {               // lagging tail
                // tn非空,则说明其它线程已添加了节点,尝试更新队尾
                advanceTail(t, tn);
                // 重新循环
                continue;
            }
            // 设置超时时间 && 超时时间≤0
            if (timed && nanos <= 0)        // 无法再等待
                return null;
            // 判断s是否为null    
            if (s == null)
                // 是,则初始化节点s
                s = new QNode(e, isData);
            // CAS将s添加到队尾的next   
            if (!t.casNext(null, s))        // 链接失败
                // 则重新循环 
                continue;
            // 尝试更新队尾,队尾此时为s
            advanceTail(t, s);              // swing tail an

剩余60%内容,订阅专栏后可继续查看/也可单篇购买

Java源码模拟面试解析指南 文章被收录于专栏

<p> “挨踢”业行情日益严峻,企业招聘门槛愈来愈高,大厂hc更是少之又少,而Java技术面试普遍对基础知识的掌握考察特别深,大多数同学突击所看的 Java 面试基础知识点根本达不到面试官近乎挑剔的要求。 本专刊针对如今的校招及社招痛点,深入解析 JDK 的核心源码,探究 JDK 的设计精髓及最佳实践,同时以模拟面试的场景切入,让同学们在阅读过程中也能轻松掌握面试技巧。 本专刊购买后即可解锁所有章节,故不可以退换哦~ </p>

全部评论

相关推荐

评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务