java并发工具类-队列

img

BlockingQueue

java.util.concurrent包中的Java BlockingQueue接口表示一个线程可以安全放入以及从中获取实例的队列。在本文中,我将向你展示如何使用BlockingQueue

使用

一个BlockingQueue通常用于在线程上生成对象,另一个线程消耗对象。这是一个说明这个原则的图表:

img

生产线程将一直生成新对象并将它们插入队列,直到达到队列的容量上限。如果阻塞队列达到其上限,则在尝试插入新对象时会阻止生产线程。它将一直被阻塞,直到消费线程将一个对象从队列中取出。

消费线程不断将对象从阻塞队列中取出并处理它们。如果消费线程试图将对象从空队列中取出实例,那么消费线程将被阻塞,直到生产线程向队列放入一个对象。

方法

BlockingQueue有4组不同的方法用于插入,删除和检查队列中的元素。当不能立即执行所请求的操作时,每组方法的行为会不同。这是一个方法表:

抛出异常 返回特殊值 阻塞 超时
插入 add(o) offer(o) put(o) offer(o, timeout, timeunit)
删除 remove(o) poll() take() poll(timeout, timeunit)
访问 element() peek()
方法行为

这4种不同的行为意味着:

抛出异常

如果请求的操作现在无法完成,则抛出异常。

特殊值

如果请求的操作现在无法完成,则返回特殊值(一般为 true / false).

阻塞

如果请求的操作现在无法完成,则方法调用将阻塞,直到操作能够进行。

超时

如果请求的操作现在无法完成,则方法调用将阻塞直到它能够进行,但等待不超过给定的超时。返回一个特殊值,告知操作是否成功(通常为true / false)

注意

无法插入nullBlockingQueue中。如果你尝试插入nullBlockingQueue则会抛出一个NullPointerException异常。

你可以访问BlockingQueue内的所有元素,而不仅仅是开头和结尾的元素。例如,假设你已将一个对象入队等待处理,但你的应用程序决定取消它。你可以调用remove(o)这样的操作来删除队列中的特定对象。但是,这是个效率很低的操作,所以除非你真的需要,否则你不应该使用Collection中的这些方法。

示例

这是一个Java BlockingQueue示例。该示例使用实现BlockingQueue接口的ArrayBlockingQueue类。

BlockingQueueExample

首先, BlockingQueueExample类在不同的线程中启动ProducerConsumerProducer将一个字符串插入共享的BlockingQueue,而Consumer使用它们。

public class BlockingQueueExample {

    public static void main(String[] args) throws Exception {

        BlockingQueue queue = new ArrayBlockingQueue(1024);

        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);

        new Thread(producer).start();
        new Thread(consumer).start();

        Thread.sleep(4000);
    }
}
Producer

这是Producer类。注意每次put()调用之间它都会睡一秒钟。这将导致Consumer阻塞,为了等待获取队列中的对象。

public class Producer implements Runnable{

    protected BlockingQueue queue = null;

    public Producer(BlockingQueue queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            queue.put("1");
            Thread.sleep(1000);
            queue.put("2");
            Thread.sleep(1000);
            queue.put("3");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
Consumer

这是Consumer类。它从队列中取出对象,然后将它们打印到System.out

public class Consumer implements Runnable{

    protected BlockingQueue queue = null;

    public Consumer(BlockingQueue queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            System.out.println(queue.take());
            System.out.println(queue.take());
            System.out.println(queue.take());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
测试

下面是一个测试:

import org.junit.Test;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingExample {
    @Test
    public void test() throws Exception {
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(1024);
        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);

        Thread thread1 = new Thread(producer);
        Thread thread2 = new Thread(consumer);

        thread1.start();
        thread2.start();
        
        thread1.join();
        thread2.join();
    }

    private static class Producer implements Runnable {
        private BlockingQueue<String> queue;

        Producer(BlockingQueue<String> queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                queue.put("1");
                Thread.sleep(1000);
                queue.put("2");
                Thread.sleep(1000);
                queue.put("3");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private static class Consumer implements Runnable {
        private BlockingQueue<String> queue = null;

        Consumer(BlockingQueue<String> queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                System.out.println(queue.take());
                System.out.println(queue.take());
                System.out.println(queue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

BlockingDeque

java.util.concurrent中的BlockingDeque接口表示一个双向队列,它可以被线程安全的放入以及从中获取实例。在本文中,我将向你展示如何使用BlockingDeque

BlockingDeque类是一个Deque,它会阻塞尝试在deque中插入或删除的线程,以防它能够向队列中插入或删除元素。

deque是“Double Ended Queue”(双端队列)的缩写。因此,deque是一个队列,你可以从它的两端插入和获取元素。

用法

如果线程同时生成和使用同一队列的元素,则可以使用BlockingDeque。如果生成线程需要在队列的两端插入元素,并且消费线程需要从队列的两端移除元素,那么也可以使用它:

img

线程将生成元素并将它们插入队列的任一端。如果deque当前已满,则插入线程将被阻塞,直到删除线程将元素从双端队列中取出。如果deque当前为空,则将阻止删除线程,直到插入线程将元素插入到双端队列中。

方法

BlockingDeque有4组不同的方法用于插入,移除以及检查双端队列中的元素。如果不能立即执行所请求的操作,则每组方法的行为都不同。这是一个方法表:

抛出异常 返回特殊值 阻塞 超时
Insert addFirst(o) offerFirst(o) putFirst(o) offerFirst(o, timeout, timeunit)
Remove removeFirst(o) pollFirst(o) takeFirst(o) pollFirst(timeout, timeunit)
Examine getFirst(o) peekFirst(o)
抛出异常 返回特殊值 阻塞 超时
Insert addLast(o) offerLast(o) putLast(o) offerLast(o, timeout, timeunit)
Remove removeLast(o) pollLast(o) takeLast(o) pollLast(timeout, timeunit)
Examine getLast(o) peekLast(o)

这与BlockingQueue类似,只多了一组方法。

继承自 BlockingQueue

BlockingDeque接口扩展BlockingQueue接口。这意味着你可以使用BlockingDeque作为BlockingQueue。如果这样做,各种插入方法会将元素添加到双端队列的末尾,而删除方法将从双端队列的开头删除元素,即BlockingQueue接口的插入和删除方法。

下面是一个表格,对应了BlockingQueueBlockingDeque的方法:

BlockingQueue BlockingDeque
add() addLast()
offer() x 2 offerLast() x 2
put() putLast()
remove() removeFirst()
poll() x 2 pollFirst() x 2
take() takeFirst()
element() getFirst()
peek() peekFirst()

示例

这是一个如何使用BlockingDeque方法的小代码示例:

BlockingDeque<String> deque = new LinkedBlockingDeque<String>();

deque.addFirst("1");
deque.addLast("2");

String two = deque.takeLast();
String one = deque.takeFirst();

ArrayBlockingQueue

ArrayBlockingQueue类实现了BlockingQueue接口。阅读BlockingQueue文本以获取有关的更多信息。

ArrayBlockingQueue是一个有界的阻塞队列,它将元素存储在数组内部。有界意味着它无法存储无限量的元素,它可以同时存储的元素数量有一个上限。你需要在实例化时设置上限,之后无法更改,所以它和ArrayList有些区别,不要因为它们的名称相似而将它们的功能混杂。

ArrayBlockingQueue内部是以FIFO(先入先出)次序来存储元素的。队列的头部是在队列中存活时间最长的元素,而队列的尾部是在队列中存活时间最短的元素。

示例

以下是实例化和使用ArrayBlockingQueue的例子:

BlockingQueue queue = new ArrayBlockingQueue(1024);

queue.put("1");

Object object = queue.take();

这是一个使用Java 泛型的BlockingQueue例子:

BlockingQueue<String> queue = new ArrayBlockingQueue<String>(1024);

queue.put("1");

String string = queue.take();

源码

成员变量

ArrayBlockingQueue中使用了这几个成员变量来保证操作,其实内部使用了一个循环数组,其中takeIndex和putIndex其实相当于队列的头部和尾部。

/** 使用数组保存元素 */
    final Object[] items;

    /** 下一个take,poll,peek或remove方法调用时访问此下标的元素 */
    int takeIndex;

    /** 下一个put, offer, 或add方法调用时访问此下标的元素 */
    int putIndex;

    /**队列中的元素数量 */
    int count;

    /** 保护所有操作的主锁 */
    final ReentrantLock lock;

    /** 获取元素的等待条件 */
    private final Condition notEmpty;

    /** 放置元素的等待条件 */
    private final Condition notFull;
构造函数

构造函数如下:

/**
 * 使用一个固定的数值和默认的访问规则创建,默认是使用非公平锁
 *
 * @param capacity the capacity of this queue
 * @throws IllegalArgumentException if {@code capacity < 1}
 */
public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}

/**
 * 使用一个固定的数值和指定的访问规则创建
 *
 * @param capacity the capacity of this queue
 * @param fair if {@code true} then queue accesses for threads blocked
 *        on insertion or removal, are processed in FIFO order;
 *        if {@code false} the access order is unspecified.
 * @throws IllegalArgumentException if {@code capacity < 1}
 */
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

/**
 *使用一个固定的数值和指定的访问规则创建,并将给定集合中的元素
 * 增加到队列中,增加的顺序是指定的集合迭代器的遍历顺序
 *
 * @param capacity the capacity of this queue
 * @param fair if {@code true} then queue accesses for threads blocked
 *        on insertion or removal, are processed in FIFO order;
 *        if {@code false} the access order is unspecified.
 * @param c the collection of elements to initially contain
 * @throws IllegalArgumentException if {@code capacity} is less than
 *         {@code c.size()}, or less than 1.
 * @throws NullPointerException if the specified collection or any
 *         of its elements are null
 */
public ArrayBlockingQueue(int capacity, boolean fair,
                          Collection<? extends E> c) {
    this(capacity, fair);

    final ReentrantLock lock = this.lock;
    lock.lock(); // Lock only for visibility, not mutual exclusion
    try {
        final Object[] items = this.items;
        int i = 0;
        try {
            for (E e : c)
                items[i++] = Objects.requireNonNull(e);
        } catch (ArrayIndexOutOfBoundsException ex) {
            throw new IllegalArgumentException();
        }
        count = i;
        putIndex = (i == capacity) ? 0 : i;
    } finally {
        lock.unlock();
    }
}
增加操作
public boolean add(E e) {
    return super.add(e);
}

public boolean add(E e) {
    // 内部重用offer方法
    if (offer(e))
        return true;
    // 如果增加失败,抛出异常指示队列已满
    else
        throw new IllegalStateException("Queue full");
}

-------------------------------------------------------------------------

public boolean offer(E e) {
    // 检查是否是否为null,如果是抛出NPE异常
    Objects.requireNonNull(e);
    // 加锁。  此处使用final的原因是将成员变量赋值为局部变量,
    // 然后使用此变量就不需要经过两次访问,即先访问this,再
    // 访问lock,轻微提升程序性能,后面此种方法的使用也是一样。
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 如果队列满了,返回false
        if (count == items.length)
            return false;
        // 否则,加入队列
        else {
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}

public static <T> T requireNonNull(T obj) {
    if (obj == null)
        throw new NullPointerException();
    return obj;
}

private void enqueue(E e) {
    // assert lock.isHeldByCurrentThread();
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;

    final Object[] items = this.items;
    // 插入元素
    items[putIndex] = e;
    if (++putIndex == items.length) putIndex = 0;
    count++;
    // 随机通知一个等待的线程
    notEmpty.signal();
}

-------------------------------------------------------------------------

// 阻塞方法
public void put(E e) throws InterruptedException {
    Objects.requireNonNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // 如果队列已经,在notFull上阻塞自己等待通知
        // 关于等待-通知机制已经说过很多次,此处不再多说
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

-------------------------------------------------------------------------

// 超时方法
public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {

    Objects.requireNonNull(e);
    // 计算超时时间,转换为纳秒
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // 如果队列已满,超时等待,如果时间用完,返回false
        while (count == items.length) {
            if (nanos <= 0L)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        enqueue(e);
        return true;
    } finally {
        lock.unlock();
    }
}
删除操作
// 删除指定元素
public boolean remove(Object o) {
    if (o == null) return false;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 如果队列中存在元素
        if (count > 0) {
            final Object[] items = this.items;
            // 注意此处精彩的循环使用,因为内部是一个循环数组
            for (int i = takeIndex, end = putIndex,
                     to = (i < end) ? end : items.length;
                 ; i = 0, to = end) {
                for (; i < to; i++)
                    if (o.equals(items[i])) {
                        removeAt(i);
                        return true;
                    }
                if (to == end) break;
            }
        }
        return false;
    } finally {
        lock.unlock();
    }
}

void removeAt(final int removeIndex) {
    // assert lock.isHeldByCurrentThread();
    // assert lock.getHoldCount() == 1;
    // assert items[removeIndex] != null;
    // assert removeIndex >= 0 && removeIndex < items.length;

    final Object[] items = this.items;
    // 如果删除的是头元素,只需修改头元素下标即可
    if (removeIndex == takeIndex) {
        // removing front item; just advance
        items[takeIndex] = null;
        if (++takeIndex == items.length) takeIndex = 0;
        count--;
        // 此处是为了保持迭代器与队列的一致性
        if (itrs != null)
            itrs.elementDequeued();
    } else {
        // an "interior" remove

        // slide over all others up through putIndex.
        for (int i = removeIndex, putIndex = this.putIndex;;) {
            int pred = i;
            if (++i == items.length) i = 0;
            // 如果已经移到了最后一个元素,跳出循环
            if (i == putIndex) {
                items[pred] = null;
                this.putIndex = pred;
                break;
            }
            // 将元素前移一位
            items[pred] = items[i];
        }
        count--;
        if (itrs != null)
            itrs.removedAt(removeIndex);
    }
    notFull.signal();
}

-------------------------------------------------------------------------

public E remove() {
    // 重用poll方法,如果队列为空,抛出异常
    E x = poll();
    if (x != null)
        return x;
    else
        throw new NoSuchElementException();
}

-------------------------------------------------------------------------

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}

private E dequeue() {
    // assert lock.isHeldByCurrentThread();
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;

    final Object[] items = this.items;
    // 获取头元素,因为使用Object[]保存,所以要进行类型转换
    // 因为只能增加指定类型的元素,所以可以确保类型转换一定
    // 会成功,抑制此非受检警告
    @SuppressWarnings("unchecked")
    E e = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length) takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return e;
}

-------------------------------------------------------------------------

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

-------------------------------------------------------------------------

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0) {
            if (nanos <= 0L)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        return dequeue();
    } finally {
        lock.unlock();
    }
}

阻塞方法以及超时方法和增加操作一样,此处不多做讲解。

访问操作
// element()方法在AbstractQueue<E>类中,ArrayBlockingQueue继承自此类
public E element() {
    // 重用peek方法,如果队列为空抛出异常
    E x = peek();
    if (x != null)
        return x;
    else
        throw new NoSuchElementException();
}

-------------------------------------------------------------------------

public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return itemAt(takeIndex); // null when queue is empty
    } finally {
        lock.unlock();
    }
}

final E itemAt(int i) {
    return (E) items[i];
}
辅助方法

部分方法逻辑简单,有兴趣自己查看即可。

public void clear() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int k;
        // 如果队列中存在元素,清空队列
        if ((k = count) > 0) {
            circularClear(items, takeIndex, putIndex);
            takeIndex = putIndex;
            count = 0;
            // 使迭代器保持一致
            if (itrs != null)
                itrs.queueIsEmpty();
            // 如果有线程等待插入元素,唤醒
            for (; k > 0 && lock.hasWaiters(notFull); k--)
                notFull.signal();
        }
    } finally {
        lock.unlock();
    }
}

// 将存在的元素全部置为null即可,等待 gc回收它们,此时等于清空了队列。
private static void circularClear(Object[] items, int i, int end) {
    // assert 0 <= i && i < items.length;
    // assert 0 <= end && end < items.length;

    for (int to = (i < end) ? end : items.length;
         ; i = 0, to = end) {
        for (; i < to; i++) items[i] = null;
        if (to == end) break;
    }
}

-------------------------------------------------------------------------

public int drainTo(Collection<? super E> c) {
    // 重用drainTo(Collection<? super E> c, int maxElements)方法
    return drainTo(c, Integer.MAX_VALUE);
}

public int drainTo(Collection<? super E> c, int maxElements) {
    Objects.requireNonNull(c);
    // 如果指定的集合是自己,抛出异常,符合BlockingQueue接口文档中的定义
    if (c == this)
        throw new IllegalArgumentException();
    if (maxElements <= 0)
        return 0;
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 获取需要转移的元素数量
        int n = Math.min(maxElements, count);
        int take = takeIndex;
        int i = 0;
        try {
            // 通过直接访问数组,比重复调用poll()方法再增加性能会高很多
            while (i < n) {
                @SuppressWarnings("unchecked")
                E e = (E) items[take];
                c.add(e);
                items[take] = null;
                if (++take == items.length) take = 0;
                i++;
            }
            return n;
        } finally {
            // Restore invariants even if c.add() threw
            // 做一些处理工作
            if (i > 0) {
                count -= i;
                takeIndex = take;
                if (itrs != null) {
                    if (count == 0)
                        itrs.queueIsEmpty();
                    else if (i > take)
                        itrs.takeIndexWrapped();
                }
                for (; i > 0 && lock.hasWaiters(notFull); i--)
                    notFull.signal();
            }
        }
    } finally {
        lock.unlock();
    }
}
核心要点
  1. 内部使用了一个循环数组
  2. 是一个有界数组,提供了容量后无法被更改
  3. 可以指定锁的公平性

DelayQueue

DelayQueue类实现BlockingQueue接口。阅读BlockingQueue文本以获取有关的更多信息。

DelayQueue内部阻止元素直到某个延迟到期,元素必须实现接口java.util.concurrent.Delayed。以下是java.util.concurrent.Delayed接口:

public interface Delayed extends Comparable<Delayed< {

    public long getDelay(TimeUnit timeUnit);

}

getDelay()方法返回的值应该是在释放此元素之前剩余的延迟。如果返回0或负值,则延迟将被视为已过期,并且在DelayQueue调用下一个take()等操作时释放。

传递给getDelay()方法的TimeUnit实例是一个Enum,它说明了延迟的时间单位。TimeUnit枚举有以下值:

DAYS
HOURS
MINUTES
SECONDS
MILLISECONDS
MICROSECONDS
NANOSECONDS

Delayed接口继承了java.lang.Comparable接口,这意味着Delayed对象可以被相互比较。这可能是在DelayQueue内部用于排序队列中的元素,因此它们能够按到期时间排序释放。

示例

以下是使用DelayQueue的示例:

public class DelayQueueExample {

    public static void main(String[] args) {
        DelayQueue queue = new DelayQueue();
        Delayed element1 = new DelayedElement();
        queue.put(element1);
        Delayed element2 = queue.take();
    }
}

DelayedElement是我创建的Delayed接口的实现。它不是java.util.concurrent包的一部分。你必须创建自己的Delayed接口实现才能使用DelayQueue类。

源码

整体介绍

DelayQueue类的泛型定义中可以看出,此类只能储存继承自Delayed接口的元素,内部使用一个优先级队列对元素进行排序。

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {

    private final transient ReentrantLock lock = new ReentrantLock();
    private final PriorityQueue<E> q = new PriorityQueue<E>();

    // 等待队列的头节点,可以视作一个缓存
    // 当一个线程成为leader,它只会等待指定延迟的时间,但
    // 其他线程会一直等到。所以leader线程在获取到元素后
    // 一定要释放其他线程,除非其他线程临时成为leader
    private Thread leader;

    /**
     * 当队列头部的一个新元素可获得(即超时到期)或者一个新线程成为leader,唤醒此等待条件上的线程
     */
    private final Condition available = lock.newCondition();
构造函数

只有两个构造方法,一个是默认构造方法,一个是给定一个集合,并将其中元素增加到等待队列中。

public DelayQueue() {}

/**
 * Creates a {@code DelayQueue} initially containing the elements of the
 * given collection of {@link Delayed} instances.
 *
 * @param c the collection of elements to initially contain
 * @throws NullPointerException if the specified collection or any
 *         of its elements are null
 */
public DelayQueue(Collection<? extends E> c) {
    this.addAll(c);
}
增加操作
public boolean add(E e) {
    // 重用offer方法
    return offer(e);
}

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 将元素增加到优先级队列中
        q.offer(e);
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

public void put(E e) {
    // 因为是无界队列,所以插入不会被阻塞。超时方法同理
    offer(e);
}

public boolean offer(E e, long timeout, TimeUnit unit) {
    return offer(e);
}
删除操作
public E remove() {
    E x = poll();
    if (x != null)
        return x;
    else
        throw new NoSuchElementException();
}

// 提取并删除第一个元素,如果队列为空返回null
public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 获取第一个元素
        E first = q.peek();
        return (first == null || first.getDelay(NANOSECONDS) > 0)
            ? null
            : q.poll();
    } finally {
        lock.unlock();
    }
}

/**
 * 提取并删除队列的第一个元素,如果队列为空则等待 
 * 直到有可获得的元素
 *
 * @return the head of this queue
 * @throws InterruptedException {@inheritDoc}
 */
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            // 如果队列为空,阻塞
            if (first == null)
                available.await();
            else {
                // 获取头元素的等待延迟
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0L)
                    return q.poll();
                first = null; // don't retain ref while waiting
                // 如果已经有线程在等待获取头元素,那么阻塞自己
                if (leader != null)
                    available.await();
                // 否则,自己就是leader,等待给定延迟
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 如果成功获取到元素并且队列不为空,唤醒其他线程
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

/**
 * Retrieves and removes the head of this queue, waiting if necessary
 * until an element with an expired delay is available on this queue,
 * or the specified wait time expires.
 *
 * @return the head of this queue, or {@code null} if the
 *         specified waiting time elapses before an element with
 *         an expired delay becomes available
 * @throws InterruptedException {@inheritDoc}
 */
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            // 如果队列为空,超时等待
            if (first == null) {
                if (nanos <= 0L)
                    return null;
                else
                    nanos = available.awaitNanos(nanos);
            } else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0L)
                    return q.poll();
                // 如果延迟还未到期,而指定的超时已到期,那么返回null
                if (nanos <= 0L)
                    return null;
                first = null; // don't retain ref while waiting
                if (nanos < delay || leader != null)
                    nanos = available.awaitNanos(nanos);
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        long timeLeft = available.awaitNanos(delay);
                        nanos -= delay - timeLeft;
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}
访问操作
public E element() {
    E x = peek();
    if (x != null)
        return x;
    else
        throw new NoSuchElementException();
}

public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 委托给优先级队列获取
        return q.peek();
    } finally {
        lock.unlock();
    }
}
其他操作
public int drainTo(Collection<? super E> c) {
    return drainTo(c, Integer.MAX_VALUE);
}

public int drainTo(Collection<? super E> c, int maxElements) {
    Objects.requireNonNull(c);
    if (c == this)
        throw new IllegalArgumentException();
    if (maxElements <= 0)
        return 0;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int n = 0;
        for (E first;
             n < maxElements
                 && (first = q.peek()) != null
                 && first.getDelay(NANOSECONDS) <= 0;) {
            // 增加到集合中
            c.add(first);   // In this order, in case add() throws.
            // 从队列中删除此元素
            q.poll();
            ++n;
        }
        return n;
    } finally {
        lock.unlock();
    }
}
迭代器

迭代器使用数组保存队列中的元素,当创建一个迭代器时,使用toArray()方法将当前队列转换为数组,所以此迭代器不一定会和内部的优先级队列保持一致。迭代器除了提供访问操作外,只提供了一个删除操作,这个删除操作保证不会出现不一致的情况。

public Iterator<E> iterator() {
    return new Itr(toArray());
}

/**
 * Snapshot iterator that works off  of underlying q array.
 */
private class Itr implements Iterator<E> {
    final Object[] array; // Array of all elements
    int cursor;           // index of next element to return
    int lastRet;          // index of last element, or -1 if no such

    Itr(Object[] array) {
        lastRet = -1;
        this.array = array;
    }

    public boolean hasNext() {
        return cursor < array.length;
    }

    @SuppressWarnings("unchecked")
    public E next() {
        if (cursor >= array.length)
            throw new NoSuchElementException();
        return (E)array[lastRet = cursor++];
    }

    public void remove() {
        if (lastRet < 0)
            throw new IllegalStateException();
        removeEQ(array[lastRet]);
        lastRet = -1;
    }
}

void removeEQ(Object o) {
    final ReentrantLock lock = this.lock;
    // 加锁
    lock.lock();
    try {
        // 获取优先级队列的迭代器,然后执行删除操作
        for (Iterator<E> it = q.iterator(); it.hasNext(); ) {
            if (o == it.next()) {
                it.remove();
                break;
            }
        }
    } finally {
        lock.unlock();
    }
}
核心要点
  1. 使用此队列时,元素必须要实现Delayed接口
  2. 当已经有一个线程等待获取队列头元素时,其他也想要获取元素的线程就会进行等待阻塞状态
  3. 迭代器不和内部的优先级队列保持一致性
  4. 迭代器的remove()方法与内部的优先级队列保持一致性

LinkedBlockingQueue

LinkedBlockingQueue类实现了BlockingQueue接口。阅读BlockingQueue文本以获取有关的更多信息。

LinkedBlockingQueue在内部将元素存储在链接结构(链接节点)中。如果需要,该链接结构可以具有一个上限。如果未指定上限,则使用Integer.MAX_VALUE作为上限。

LinkedBlockingQueue内部将元素以FIFO(先入先出)次序存储。队列的头部是已在队列中的时间最长的元素,队列的尾部是已在队列中的时间最短的元素。

示例

以下是如何实例化和使用LinkedBlockingQueue

BlockingQueue<String> unbounded = new LinkedBlockingQueue<String>();
BlockingQueue<String> bounded   = new LinkedBlockingQueue<String>(1024);

bounded.put("Value");

String value = bounded.take();

源码

整体介绍

LinkedBlockingQueue内部使用了一个单向链表,同时它提供了两个锁,一个用于获取并删除元素,一个用于增加元素。count字段使用原子变量,避免修改它时需要同时获取两个锁。

static class Node<E> {
    E item;

    /**
     * 下面中的一个:
     * - 真实的后继节点
     * - 这个节点本身,此时原后继节点现在是head.next,即第一个元素
     * - null, 意味没有后继节点,此节点是队列最后一个节点
     */
    Node<E> next;

    Node(E x) { item = x; }
}

private final int capacity;

/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();

/**
 * Head of linked list.
 * Invariant: head.item == null
 */
transient Node<E> head;

/**
 * Tail of linked list.
 * Invariant: last.next == null
 */
private transient Node<E> last;

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
增加操作

注意进行增加操作时,只对putLock加锁,如果还对takeLock也进行加锁,那么就会影响性能。同时,为了弥补此方法带来的后果,count使用原子变量,进行CAS更新,防止数据不一致。

为了提升性能,在增加元素成功后,如果队列还没有满,那么便唤醒其他因队列满而被阻塞的插入线程。

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        // 注意即使count没有被锁保护,它依然可以被用作等待条件
        // 判定。因为此时count只会被减少(putLock已加锁),如果容量
        // 改变,会被唤醒。count在其他地方的使用也与此相似。

        // 队列已满,阻塞自己
        while (count.get() == capacity) {
            notFull.await();
        }
        // 插入队列中
        enqueue(node);
        // CAS更新count值
        c = count.getAndIncrement();
        // 如果队列没满,唤醒其他等待插入的线程
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    // 如果队列原来是空队列,唤醒等待提取元素的线程
    if (c == 0)
        signalNotEmpty();
}

private void enqueue(Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node;
}

private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    // 先加锁,才能调用对应Condtion的signal()方法
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    // 队列已满,返回false
    if (count.get() == capacity)
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        if (count.get() < capacity) {
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}

public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {

    if (e == null) throw new NullPointerException();
    long nanos = unit.toNanos(timeout);
    int c = -1;
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        // 等待-超时机制
        while (count.get() == capacity) {
            if (nanos <= 0L)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        enqueue(new Node<E>(e));
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return true;
}
删除操作

删除操作与增加操作一样。

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        // 当队列为空,阻塞自己
        while (count.get() == 0) {
            notEmpty.await();
        }
        // 将头节点出队
        x = dequeue();
        c = count.getAndDecrement();
        // 如果队列还有元素,唤醒其他等待提取元素的线程
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    // 如果原本队列是满的,唤醒增加线程,因为现在元素已经被取出,队列不满
    if (c == capacity)
        signalNotFull();
    return x;
}

private E dequeue() {
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;

    // 头节点为空,其中不存储元素
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E x = null;
    int c = -1;
    long nanos = unit.toNanos(timeout);
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            if (nanos <= 0L)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

public E poll() {
    final AtomicInteger count = this.count;
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        if (count.get() > 0) {
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        }
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

// 删除一个指定元素
public boolean remove(Object o) {
    if (o == null) return false;
    // 将两个锁全部加锁
    fullyLock();
    try {
        for (Node<E> pred = head, p = pred.next;
             p != null;
             pred = p, p = p.next) {
            if (o.equals(p.item)) {
                // 从队列中移除此节点
                unlink(p, pred);
                return true;
            }
        }
        return false;
    } finally {
        // 释放全部两个锁
        fullyUnlock();
    }
}

void unlink(Node<E> p, Node<E> pred) {
    // assert putLock.isHeldByCurrentThread();
    // assert takeLock.isHeldByCurrentThread();
    // p.next没有被设置为null,为了保证迭代器遍历到p时继续工作,
    // 保证弱一致性
    p.item = null;
    pred.next = p.next;
    if (last == p)
        last = pred;
    if (count.getAndDecrement() == capacity)
        notFull.signal();
}
访问操作
public E peek() {
    // 队列为空,返回null
    if (count.get() == 0)
        return null;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        // 返回第一个元素
        return (count.get() > 0) ? head.next.item : null;
    } finally {
        takeLock.unlock();
    }
}
其他操作
public void clear() {
    fullyLock();
    try {
        for (Node<E> p, h = head; (p = h.next) != null; h = p) {
            // 使得next指向自己
            h.next = h;
            // 解除对元素实体的引用
            p.item = null;
        }
        head = last;
        // assert head.item == null && head.next == null;
        // 如果原来队列是满的,唤醒等待的插入线程
        if (count.getAndSet(0) == capacity)
            notFull.signal();
    } finally {
        fullyUnlock();
    }
}


public int drainTo(Collection<? super E> c) {
    return drainTo(c, Integer.MAX_VALUE);
}

public int drainTo(Collection<? super E> c, int maxElements) {
    Objects.requireNonNull(c);
    if (c == this)
        throw new IllegalArgumentException();
    if (maxElements <= 0)
        return 0;
    boolean signalNotFull = false;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        // 获取当前队列中的元素数量
        int n = Math.min(maxElements, count.get());
        // count.get provides visibility to first n Nodes
        Node<E> h = head;
        int i = 0;
        try {
            // 将n个元素加入到指定集合中
            while (i < n) {
                Node<E> p = h.next;
                c.add(p.item);
                p.item = null;
                h.next = h;
                h = p;
                ++i;
            }
            return n;
        } finally {
            // Restore invariants even if c.add() threw
            if (i > 0) {
                // assert h.item == null;
                head = h;
                signalNotFull = (count.getAndAdd(-i) == capacity);
            }
        }
    } finally {
        takeLock.unlock();
        if (signalNotFull)
            signalNotFull();
    }
}
迭代器

LinkedBlockingQueue的迭代器与DelayQueue的不同,DelayQueue的迭代器与原组件没有任何的一致性,而LinkedBlockingQueue的迭代器与内部的链表保持了弱一致性。

注意它的next()方法,它会跳过内容为null的节点,回忆前面删除操作中的remove(Object)方法,他没有修改节点的next字段,如果修改了,迭代器就会无法正常工作,而为了保证一致性,迭代器也需要跳过这个空节点。

而它的forEachRemaining(Consumer<? super E> action)方法是分批次进行处理的,每批64个元素,如果数量小于64,那就使用此数量。

private class Itr implements Iterator<E> {
    private Node<E> next;           // 持有nextItem的节点
    private E nextItem;             // 下一个进行处理的元素
    private Node<E> lastRet;        // 上一个返回的元素,即当前正在使用的
    private Node<E> ancestor;       // Helps unlink lastRet on remove()

    Itr() {
        fullyLock();
        try {
            // 保存第一个元素
            if ((next = head.next) != null)
                nextItem = next.item;
        } finally {
            fullyUnlock();
        }
    }

    public boolean hasNext() {
        return next != null;
    }

    public E next() {
        Node<E> p;
        if ((p = next) == null)
            throw new NoSuchElementException();
        lastRet = p;
        E x = nextItem;
        fullyLock();
        try {
            E e = null;
            // 注意此处,遇到空节点会跳过去访问下一个节点
            for (p = p.next; p != null && (e = p.item) == null; )
                p = succ(p);
            next = p;
            nextItem = e;
        } finally {
            fullyUnlock();
        }
        return x;
    }
    
    Node<E> succ(Node<E> p) {
        // 正常出队的元素next字段会指向自己
        if (p == (p = p.next))
            p = head.next;
        return p;
    }
    
    public void forEachRemaining(Consumer<? super E> action) {
        // A variant of forEachFrom
        Objects.requireNonNull(action);
        Node<E> p;
        if ((p = next) == null) return;
        lastRet = p;
        next = null;
        final int batchSize = 64;
        Object[] es = null;
        int n, len = 1;
        do {
            fullyLock();
            try {
                if (es == null) {
                    p = p.next;
                    // 获取真正存在的元素的数量,如果多于64,分批进行,一批为64个
                    for (Node<E> q = p; q != null; q = succ(q))
                        if (q.item != null && ++len == batchSize)
                            break;
                    es = new Object[len];
                    es[0] = nextItem;
                    nextItem = null;
                    n = 1;
                } else
                    n = 0;
                // n为1的使用只因为p=p.next,经过此步后p已经不是首元素,
                // 而是第二个元素。而后面批次的插入直接从0开始即可
                // 将元素放入数组中
                for (; p != null && n < len; p = succ(p))
                    if ((es[n] = p.item) != null) {
                        lastRet = p;
                        n++;
                    }
            } finally {
                fullyUnlock();
            }
            // 分别调用accept方法
            for (int i = 0; i < n; i++) {
                @SuppressWarnings("unchecked") E e = (E) es[i];
                action.accept(e);
            }
        } while (n > 0 && p != null);
    }

    public void remove() {
        // 获取当前元素
        Node<E> p = lastRet;
        if (p == null)
            throw new IllegalStateException();
        lastRet = null;
        fullyLock();
        try {
            if (p.item != null) {
                if (ancestor == null)
                    ancestor = head;
                // 获取p的前驱结点
                ancestor = findPred(p, ancestor);
                // 从链表中删除结点p
                unlink(p, ancestor);
            }
        } finally {
            fullyUnlock();
        }
    }
}
核心要点
  1. 内部使用一个单向链表,以FIFO顺序存储
  2. 可以在链表两头同时进行操作,所以使用两个锁分别保护
  3. 插入线程在执行完操作后如果队列未满会唤醒其他等待插入的线程,同时队列非空还会唤醒等待获取元素的线程;提取线程同理。
  4. 迭代器与单向链表保持弱一致性,调用remove(T)方法删除一个元素后,不会解除其对下一个结点的next引用,否则迭代器将无法工作。
  5. 迭代器的forEachRemaining(Consumer<? super E> action)以64个元素为一批进行操作

PriorityBlockingQueue

PriorityBlockingQueue类实现了BlockingQueue接口。阅读BlockingQueue文本以获取有关的更多信息。

PriorityBlockingQueue是一个无限的并发队列。它使用与java.util.PriorityQueue类相同的排序规则。你不能将null插入此队列。

插入java.util.PriorityQueue的所有元素必须实现java.lang.Comparable接口。因此,元素根据你在Comparable 中的实现进行优先级排序。

注意,对于具有相同优先级的元素(compare()== 0),不会强制执行任何特定行为。

另请注意,如果你从PriorityBlockingQueue得到一个IteratorIterator不保证按优先级顺序迭代元素。

示例

以下是使用PriorityBlockingQueue的示例:

BlockingQueue<String> queue = new PriorityBlockingQueue<String>();

//String implements java.lang.Comparable
queue.put("Value");

String value = queue.take();

源码

PriorityBlockingQueue内部使用了一个以数组为基础的二叉堆,所有的公共操作使用一个锁来进行保护。当对数组进行扩容时,放弃主锁,使用一个简单的自旋锁进行扩容,这样做是为了让扩容和提取元素同步进行。

成员变量
// 默认初始化大小
private static final int DEFAULT_INITIAL_CAPACITY = 11;

/**
 * 数组可分配的最大容量。
 * 一些虚拟机在数组中分配了对象头,尝试分配更大的容量
 * 可能会导致OOM,请求的数组容量超过了允许的上限。
 */
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

/**
 * 优先级队列使用Comparator进行排序,或者通过元素的自然顺序,
 * 即实现了Comparable接口。如果没有比较器:对于在堆中的每个结点n, 
 * 以及它的后代 d,n <= d。
 */
private transient Object[] queue;

// 队列元素数量
private transient int size;

// 比较器,为null代表使用自然顺序排序
private transient Comparator<? super E> comparator;

private final ReentrantLock lock;

private final Condition notEmpty;

/**
 * 分配时的自旋锁,通过CAS获得
 */
private transient volatile int allocationSpinLock;

/**
 * 只为序列化操作使用
 */
private PriorityQueue<E> q;
构造方法
public PriorityBlockingQueue() {
    this(DEFAULT_INITIAL_CAPACITY, null);
}

public PriorityBlockingQueue(int initialCapacity) {
    this(initialCapacity, null);
}

public PriorityBlockingQueue(int initialCapacity,
                             Comparator<? super E> comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    this.comparator = comparator;
    this.queue = new Object[initialCapacity];
}

/**
 * 如果给定的集合是 SortedSet或者 PriorityQueue, 这个优先级
 * 队列根据同样的顺序排序。
 */
public PriorityBlockingQueue(Collection<? extends E> c) {
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    boolean heapify = true; // true 如果不知道二叉堆的顺序
    boolean screen = true;  // true 如果必须检查null

    // 针对 SortedSet和 PriorityQueue处理
    if (c instanceof SortedSet<?>) {
        SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
        this.comparator = (Comparator<? super E>) ss.comparator();
        heapify = false;
    }
    else if (c instanceof PriorityBlockingQueue<?>) {
        PriorityBlockingQueue<? extends E> pq =
            (PriorityBlockingQueue<? extends E>) c;
        this.comparator = (Comparator<? super E>) pq.comparator();
        screen = false;
        if (pq.getClass() == PriorityBlockingQueue.class) // exact match
            heapify = false;
    }

    Object[] a = c.toArray();
    int n = a.length;
    // If c.toArray incorrectly doesn't return Object[],  it.
    if (a.getClass() != Object[].class)
        a = Arrays.Of(a, n, Object[].class);
    if (screen && (n == 1 || this.comparator != null)) {
        for (int i = 0; i < n; ++i)
            if (a[i] == null)
                throw new NullPointerException();
    }
    this.queue = a;
    this.size = n;
    if (heapify)
        heapify();
}
增加操作
// 因为PriorityBlockingQueue本身拒绝插入null,所以offer也需要抛出NPE,
// 复用offer方法即可
public boolean add(E e) {
    return offer(e);
}

// 此队列是无界的,不会返回false
public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] array;
    // 扩容
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    try {
        // 将元素插入二叉堆中
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            siftUpComparable(n, e, array);
        else
            siftUpUsingComparator(n, e, array, cmp);
        size = n + 1;
        // 唤醒等待获取元素的线程
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}

private void tryGrow(Object[] array, int oldCap) {
    lock.unlock(); // must release and then re-acquire main lock
    Object[] newArray = null;
    // 获取自旋锁
    if (allocationSpinLock == 0 &&
        ALLOCATIONSPINLOCK.compareAndSet(this, 0, 1)) {
        try {
            // 计算新容量。如果当前容量很小,那么直接扩容一倍多一点,
            // 因为此时容量可能会迅速增长,否则扩容50%即可
            int newCap = oldCap + ((oldCap < 64) ?
                                   (oldCap + 2) : // grow faster if small
                                   (oldCap >> 1));
            // 如果新容量大于最大容量,那么计算当前最小容量(+1),
            // 如果依然大于最大容量,抛出OOM
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];
        } finally {
            allocationSpinLock = 0;
        }
    }
    // CAS竞争自旋锁失败,调度此线程
    if (newArray == null) // back off if another thread is allocating
        Thread.yield();
    // 加锁,拷贝元素
    lock.lock();
    if (newArray != null && queue == array) {
        queue = newArray;
        System.array(array, 0, newArray, 0, oldCap);
    }
}



// 此队列是无界的,所以永远不会被阻塞,复用offer方法即可
public void put(E e) {
    offer(e); // never need to block
}

public boolean offer(E e, long timeout, TimeUnit unit) {
    return offer(e); // never need to block
}
删除操作
public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return dequeue();
    } finally {
        lock.unlock();
    }
}

private E dequeue() {
    int n = size - 1;
    // 队列为空返回null
    if (n < 0)
        return null;
    else {
        // 获取头元素
        Object[] array = queue;
        E result = (E) array[0];
        E x = (E) array[n];
        array[n] = null;
        // 整理二叉堆
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n;
        return result;
    }
}

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
        while ( (result = dequeue()) == null && nanos > 0)
            nanos = notEmpty.awaitNanos(nanos);
    } finally {
        lock.unlock();
    }
    return result;
}
访问操作
public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (size == 0) ? null : (E) queue[0];
    } finally {
        lock.unlock();
    }
}
迭代器

PriorityBlockingQueue类中的迭代器和DelayQueue中的迭代器一样,都不会与原组件保证一致性。

// 调用toArray()方法获取当前的二叉堆
public Iterator<E> iterator() {
    return new Itr(toArray());
}

/**
 * Snapshot iterator that works off  of underlying q array.
 */
final class Itr implements Iterator<E> {
    final Object[] array; // Array of all elements
    int cursor;           // index of next element to return
    int lastRet;          // index of last element, or -1 if no such

    Itr(Object[] array) {
        lastRet = -1;
        this.array = array;
    }

    public boolean hasNext() {
        return cursor < array.length;
    }

    public E next() {
        if (cursor >= array.length)
            throw new NoSuchElementException();
        return (E)array[lastRet = cursor++];
    }

    public void remove() {
        if (lastRet < 0)
            throw new IllegalStateException();
        removeEQ(array[lastRet]);
        lastRet = -1;
    }
}
核心要点
  1. 必须提供要Comparator接口或者队列元素实现Comparable接口。
  2. 可以同时进行扩容和提取元素的操作,不过只能有一个线程进行扩容
  3. 数组大小小于64时,进行双倍容量的扩展,否则扩容1.5倍
  4. 使用迭代器访问元素的顺序不会按指定的比较器顺序
  5. 迭代器不会与原数组保持一致性

LinkedBlockingDeque

LinkedBlockingDeque类实现了BlockingDeque接口。阅读BlockingDeque文本以获取有关的更多信息。

Deque来自“双端队列” 这个词。Deque是一个队列,你可以在插入和删除队列两端的元素。

LinkedBlockingDeque是一个Deque,如果一个线程试图从中获取一个元素,而队列空的,不管线程从哪一端试图获取元素,都会被阻塞。

示例

以下是实例化和使用LinkedBlockingDeque的例子:

BlockingDeque<String> deque = new LinkedBlockingDeque<String>();

deque.addFirst("1");
deque.addLast("2");

String two = deque.takeLast();
String one = deque.takeFirst();

源码

整体介绍

LinkedBlockingDequeLinkedBlockingQueue的实现大体上类似,区别在于LinkedBlockingDeque提供的操作更多。并且LinkedBlockingQueue内置两个锁分别用于put和take操作,而LinkedBlockingDeque只使用一个锁控制所有操作。因为队列能够同时在头尾进行put和take操作,所以使用两个锁也需要将两个锁同时加锁才能保证操作的同步性,不如只使用一个锁的性能好

同步节点相比LinkedBlockingQueue多了一个prev字段。

static final class Node<E> {
    E item;

    Node<E> prev;

    Node<E> next;

    Node(E x) {
        item = x;
    }
}
增加操作

增加操作相比LinkedBlockingQueue只能在队列尾部增加,它能在队列的头尾两端都进行增加操作。

public void addFirst(E e) {
    // 复用offer方法
    if (!offerFirst(e))
        throw new IllegalStateException("Deque full");
}

public void addLast(E e) {
    if (!offerLast(e))
        throw new IllegalStateException("Deque full");
}

public boolean offerFirst(E e) {
    if (e == null) throw new NullPointerException();
    // 构造节点
    Node<E> node = new Node<E>(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 插入到队列头部
        return linkFirst(node);
    } finally {
        lock.unlock();
    }
}

private boolean linkFirst(Node<E> node) {
    // assert lock.isHeldByCurrentThread();

    // 如果队列已满,返回false
    if (count >= capacity)
        return false;
    // 获取头节点,将自己的 next字段指向头节点,然后设置自己为头节点
    Node<E> f = first;
    node.next = f;
    first = node;
    // 如果队列为空,尾节点也指向自己
    if (last == null)
        last = node;
    else
        f.prev = node;
    ++count;
    // 唤醒等待获取元素的线程
    notEmpty.signal();
    return true;
}

public boolean offerLast(E e) {
    if (e == null) throw new NullPointerException();
    Node<E> node = new Node<E>(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 插入到队列尾部
        return linkLast(node);
    } finally {
        lock.unlock();
    }
}

private boolean linkLast(Node<E> node) {
    // assert lock.isHeldByCurrentThread();
    
    // 如果队列已满,返回false
    if (count >= capacity)
        return false;
    // 将自己设置为尾节点
    Node<E> l = last;
    node.prev = l;
    last = node;
    // 如果队列为空,头节点也指向自己
    if (first == null)
        first = node;
    else
        l.next = node;
    ++count;
    // 唤醒等待获取元素的线程
    notEmpty.signal();
    return true;
}

public void putFirst(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    Node<E> node = new Node<E>(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 如果队列已满,等待
        while (!linkFirst(node))
            notFull.await();
    } finally {
        lock.unlock();
    }
}

public void putLast(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    Node<E> node = new Node<E>(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 如果队列已满,等待
        while (!linkLast(node))
            notFull.await();
    } finally {
        lock.unlock();
    }
}

public boolean offerFirst(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
    if (e == null) throw new NullPointerException();
    Node<E> node = new Node<E>(e);
    // 计算超时时间
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // 如果队列已满,超时等待
        while (!linkFirst(node)) {
            if (nanos <= 0L)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        return true;
    } finally {
        lock.unlock();
    }
}

public boolean offerLast(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
    if (e == null) throw new NullPointerException();
    Node<E> node = new Node<E>(e);
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (!linkLast(node)) {
            if (nanos <= 0L)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        return true;
    } finally {
        lock.unlock();
    }
}
删除操作
public E removeFirst() {
    // 复用poll操作
    E x = pollFirst();
    if (x == null) throw new NoSuchElementException();
    return x;
}

public E removeLast() {
    E x = pollLast();
    if (x == null) throw new NoSuchElementException();
    return x;
}

public E pollFirst() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 获取头节点的值,并删除它
        return unlinkFirst();
    } finally {
        lock.unlock();
    }
}

private E unlinkFirst() {
    // assert lock.isHeldByCurrentThread();

    // 如果队列为空,返回null
    Node<E> f = first;
    if (f == null)
        return null;
    // 重置头节点
    Node<E> n = f.next;
    E item = f.item;
    f.item = null;
    f.next = f; // help GC
    first = n;
    if (n == null)
        last = null;
    else
        n.prev = null;
    --count;
    // 唤醒等待插入的线程
    notFull.signal();
    return item;
}

public E pollLast() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return unlinkLast();
    } finally {
        lock.unlock();
    }
}

private E unlinkLast() {
    // assert lock.isHeldByCurrentThread();
    Node<E> l = last;
    // 队列为空,返回null
    if (l == null)
        return null;
    // 更新尾节点
    Node<E> p = l.prev;
    E item = l.item;
    l.item = null;
    l.prev = l; // help GC
    last = p;
    if (p == null)
        first = null;
    else
        p.next = null;
    --count;
    notFull.signal();
    return item;
}

public E takeFirst() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E x;
        // 如果队列为空,等待
        while ( (x = unlinkFirst()) == null)
            notEmpty.await();
        return x;
    } finally {
        lock.unlock();
    }
}

public E takeLast() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E x;
        // 如果队列为空,等待
        while ( (x = unlinkLast()) == null)
            notEmpty.await();
        return x;
    } finally {
        lock.unlock();
    }
}

public E pollFirst(long timeout, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        E x;
        while ( (x = unlinkFirst()) == null) {
            if (nanos <= 0L)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        return x;
    } finally {
        lock.unlock();
    }
}

public E pollLast(long timeout, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        E x;
        while ( (x = unlinkLast()) == null) {
            if (nanos <= 0L)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        return x;
    } finally {
        lock.unlock();
    }
}
访问操作
public E getFirst() {
    // 复用peek方法
    E x = peekFirst();
    if (x == null) throw new NoSuchElementException();
    return x;
}

public E getLast() {
    E x = peekLast();
    if (x == null) throw new NoSuchElementException();
    return x;
}

public E peekFirst() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 如果队列不为空,返回头元素
        return (first == null) ? null : first.item;
    } finally {
        lock.unlock();
    }
}

public E peekLast() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 如果队列不为空,返回尾元素
        return (last == null) ? null : last.item;
    } finally {
        lock.unlock();
    }
}
BlockingQueue 方法

由于BlockingDeque继承自BlockingQueue接口,所以需要实现BlockingQueue中的方法,具体只需要复用前面提到的方法即可。

public boolean add(E e) {
    addLast(e);
    return true;
}

public boolean offer(E e) {
    return offerLast(e);
}

public void put(E e) throws InterruptedException {
    putLast(e);
}

public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
    return offerLast(e, timeout, unit);
}

public E remove() {
    return removeFirst();
}

public E poll() {
    return pollFirst();
}

public E take() throws InterruptedException {
    return takeFirst();
}

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    return pollFirst(timeout, unit);
}

public E element() {
    return getFirst();
}

public E peek() {
    return peekFirst();
}
核心要点
  1. 内部使用一个双向链表
  2. 可以在链表两头同时进行put和take操作,只能使用一个锁
  3. 插入线程在执行完操作后如果队列未满会唤醒其他等待插入的线程,同时队列非空还会唤醒等待获取元素的线程;take线程同理。
  4. 迭代器与内部的双向链表保持弱一致性,调用remove(T)方法删除一个元素后,不会解除其对下一个结点的next引用,否则迭代器将无法工作。
  5. 迭代器的forEachRemaining(Consumer<? super E> action)以64个元素为一批进行操作
  6. forEach(Consumer<? super E> action)removeIfremoveAllretainAll都是64个元素为一批进行操作

SynchronousQueue

SynchronousQueue类实现了BlockingQueue接口。阅读BlockingQueue文本以获取有关的更多信息。

SynchronousQueue是一个内部只能包含一个元素的队列。插入元素到队列的线程被阻塞,直到另一个线程从队列中获取了队列中存储的元素。同样,如果线程尝试获取元素并且当前不存在任何元素,则该线程将被阻塞,直到线程将元素插入队列。

将这个类称为队列有点夸大其词。这更像是一个点。

源码

SynchronousQueue的内部实现了两个类,一个是TransferStack类,使用LIFO顺序存储元素,这个类用于非公平模式;还有一个类是TransferQueue,使用FIFI顺序存储元素,这个类用于公平模式。这两个类继承自”Nonblocking Concurrent Objects with Condition Synchronization”算法,此算法是由W. N. Scherer III 和 M. L. Scott提出的,关于此算法的理论内容在这个网站中:http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/duals.html。两个类的性能差不多,FIFO通常用于在竞争下支持更高的吞吐量,而LIFO在一般的应用中保证更高的线程局部性。

队列(或者栈)的节点在任何时间要么是”date”模式 —— 通过put操作提供的元素的模式,要么是”request”模式 —— 通过take操作取出元素的模式,要么为空。还有一个模式是”fulfill”模式,当队列有一个data节点时,请求从队列中获取一个元素就会构造一个”fulfill”模式的节点,反之亦然。这个类最有趣的特性在于任何操作都能够计算出现在队列头节点处于什么模式,然后根据它进行操作而无需使用锁。

队列和栈都继承了抽象类Transferer,这个类只定义了一个方法transfer,此方法可以既可以执行put也可以执行take操作。这两个操作被统一到了一个方法中,因为在dual数据结构中,put和take操作是对称的,所以相近的所有结点都可以被结合。使用transfer方法是从长远来看的,它相比分为两个几乎重复的部分来说更加容易理解。

队列和栈数据结构在概念上有许多相似性,但是在真正的实现细节上却几乎没有什么相似的地方。为了简单起见,它们保持清晰,这样在以后它们能以不同的方法扩展。

SynchronousQueue中使用的队列和栈的算法和”Nonblocking Concurrent Objects with Condition Synchronization”算法相比是不同的版本,包括对取消的处理。主要的差别如下:

  1. 最初的算法使用了位标记指针,但是此类在结点中使用了模式位,这导致了很多深入的改变。

  2. SynchronousQueue必须阻塞线程,直到变为fulfilled模式。

  3. 支持取消操作,通过超时和中断方式,包括清除被取消的结点/线程,以避免无法进行垃圾回收和无用的内存消耗。

阻塞主要通过LockSupportpark/unpark方法完成,除了下一个结点将要变为fulfilled模式的情况,这时会在多处理器机器中使用自旋等待。在非常忙碌的SynchronousQueue中,自旋可以显着改变吞吐量。而在不忙碌的情况下,自旋的次数就会变的足够小,不会影响性能。

清除操作在队列和栈中以不同的方式完成。对于队列,当结点被取消时,我们总是可以在O(1)时间立刻删除它。但是如果它被固定在队尾,它就必须等待直到其他取消操作完成。对于栈来说,我们需要以O(n)时间遍历来确保能够删除这个结点,不过这个操作可以和其他访问栈的线程同时进行。

队列和栈的父类为Transferer。它只定义了一个通用方法。

abstract static class Transferer<E> {
    /**
     * 执行put或者take操作/
     * 如果参数e非空,这个元素将被交给一个消费线程;如果为null,
     * 则请求返回一个被生产者提交的元素。
     * 如果返回的结果非空,那么元素被提交了或被接受了;如果为null,
     * 这个操作可能因为超时或者中断失败了。调用者可以通过检查
     * Thread.interrupted来区分到底是因为什么元素失败。
     */
    abstract E transfer(E e, boolean timed, long nanos);
}
TransferStack

这个类继承自Scherer-Scott的 dual stack 算法,但不完全相同,它使用结点而不是位标记指针。

static final class TransferStack<E> extends Transferer<E> {

    /* Modes for SNodes, ORed together in node fields */
    /** 表示一个未满足的消费者 */
    static final int REQUEST    = 0;
    /** 表示一个未满足的生产者 */
    static final int DATA       = 1;
    /** Node is fulfilling another unfulfilled DATA or REQUEST */
    static final int FULFILLING = 2;

    static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }

    /** Node class for TransferStacks. */
    static final class SNode {
        volatile SNode next;        // 栈中的下一个结点
        volatile SNode match;       // 匹配此结点的结点
        volatile Thread waiter;     // 控制 park/unpark
        Object item;                // 数据
        int mode;
核心算法 transfer

使用put操作时参数e不为空,而使用take操作时参数e为null,而timednanos指定是否使用超时。

  1. 如果头节点为空或者已经包含了相同模式的结点,那么尝试将结点
    增加到栈中并且等待匹配。如果被取消,返回null
  2. 如果头节点是一个模式不同的结点,尝试将一个fulfilling结点加入到栈中,匹配相应的等待结点,然后一起从栈中弹出,并且返回匹配的元素。匹配和弹出操作可能无法进行,由于其他线程正在执行操作3
  3. 如果栈顶已经有了一个fulfilling结点,帮助它完成它的匹配和弹出操作,然后继续。
E transfer(E e, boolean timed, long nanos) {
    /*
     * 基础算法,循环尝试下面三种操作中的一个:
     *
     * 1. 如果头节点为空或者已经包含了相同模式的结点,尝试将结点
     *    增加到栈中并且等待匹配。如果被取消,返回null
     *
     * 2. 如果头节点是一个模式不同的结点,尝试将一个`fulfilling`结点加入
     *    到栈中,匹配相应的等待结点,然后一起从栈中弹出,
     *    并且返回匹配的元素。匹配和弹出操作可能无法进行,
     *    由于其他线程正在执行操作3
     *
     * 3. 如果栈顶已经有了一个`fulfilling`结点,帮助它完成
     *    它的匹配和弹出操作,然后继续。
     */

    SNode s = null; // constructed/reused as needed
    // 传入参数为null代表请求获取一个元素,否则表示插入元素
    int mode = (e == null) ? REQUEST : DATA;

    for (;;) {
        SNode h = head;
        // 如果头节点为空或者和当前模式相同
        if (h == null || h.mode == mode) {  // empty or same-mode
            // 设置超时时间为 0,立刻返回
            if (timed && nanos <= 0L) {     // can't wait
                if (h != null && h.isCancelled())
                    casHead(h, h.next);     // pop cancelled node
                else
                    return null;
            // 构造一个结点并且设为头节点
            } else if (casHead(h, s = snode(s, e, h, mode))) {
                // 等待满足
                SNode m = awaitFulfill(s, timed, nanos);
                if (m == s) {               // wait was cancelled
                    clean(s);
                    return null;
                }
                if ((h = head) != null && h.next == s)
                    casHead(h, s.next);     // help s's fulfiller
                return (E) ((mode == REQUEST) ? m.item : s.item);
            }
        // 检查头节点是否为FULFILLIING
        } else if (!isFulfilling(h.mode)) { // try to fulfill
            if (h.isCancelled())            // already cancelled
                casHead(h, h.next);         // pop and retry
            // 更新头节点为自己
            else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                // 循环直到匹配成功
                for (;;) { // loop until matched or waiters disappear
                    SNode m = s.next;       // m is s's match
                    if (m == null) {        // all waiters are gone
                        casHead(s, null);   // pop fulfill node
                        s = null;           // use new node next time
                        break;              // restart main loop
                    }
                    SNode mn = m.next;
                    if (m.tryMatch(s)) {
                        casHead(s, mn);     // pop both s and m
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    } else                  // lost match
                        s.casNext(m, mn);   // help unlink
                }
            }
        // 帮助满足的结点匹配
        } else {                            // help a fulfiller
            SNode m = h.next;               // m is h's match
            if (m == null)                  // waiter is gone
                casHead(h, null);           // pop fulfilling node
            else {
                SNode mn = m.next;
                if (m.tryMatch(h))          // help match
                    casHead(h, mn);         // pop both h and m
                else                        // lost match
                    h.casNext(m, mn);       // help unlink
            }
        }
    }
}

当一个结点插入到栈中,它要么能和其他结点匹配然后一起出栈,否则就需要等待一个匹配的结点到来。在等待的过程中,一般使用自旋等待代替阻塞(在多处理器环境下),因为很有可能会有相应结点到来。如果自旋结束还没有匹配,那么就设置waiter然后阻塞自己,在阻塞自己之前还会再检查至少一次是否有匹配的结点。

如果等待的过程中由于超时到期或者中断,那么需要取消此节点,方法是将match字段指向自己,然后返回。

SNode awaitFulfill(SNode s, boolean timed, long nanos) {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    int spins = shouldSpin(s)
        ? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS)
        : 0;
    for (;;) {
        if (w.isInterrupted())
            s.tryCancel();
        SNode m = s.match;
        if (m != null)
            return m;
        if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                s.tryCancel();
                continue;
            }
        }
        if (spins > 0) {
            Thread.onSpinWait();
            spins = shouldSpin(s) ? (spins - 1) : 0;
        }
        else if (s.waiter == null)
            s.waiter = w; // establish waiter so can park next iter
        else if (!timed)
            LockSupport.park(this);
        else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
            LockSupport.parkNanos(this, nanos);
    }
}

使用TransferStack即SynchronousQueue的非公平模式时,先put再take结点变化如下(注意DATA节点是插入线程构造的,而REQUEST是提取元素的线程的模式,此节点在构造时会变为FULFILLING节点,此处依然使用REQUEST以指代是take线程):

img

如果先take再put时,插入线程则会构建一个模式为[11]的结点,而11 & FULFILLING != 0, 所以isFulfilling(h.mode)方法会返回true。

img

清除

在最坏的情况我们需要遍历整个栈来删除节点s。如果有多个线程并发调用clean方法,我们不会知道其他线程可能已经删除了此节点。

void clean(SNode s) {
    s.item = null;   // forget item
    s.waiter = null; // forget thread

    /*
     * At worst we may need to traverse entire stack to unlink
     * s. If there are multiple concurrent calls to clean, we
     * might not see s if another thread has already removed
     * it. But we can stop when we see any node known to
     * follow s. We use s.next unless it too is cancelled, in
     * which case we try the node one past. We don't check any
     * further because we don't want to doubly traverse just to
     * find sentinel.
     */

    SNode past = s.next;
    if (past != null && past.isCancelled())
        past = past.next;

    // 删除头部被取消的节点
    SNode p;
    while ((p = head) != null && p != past && p.isCancelled())
        casHead(p, p.next);

    // 移除中间的节点
    while (p != null && p != past) {
        SNode n = p.next;
        if (n != null && n.isCancelled())
            p.casNext(n, n.next);
        else
            p = n;
    }
}
TransferQueue
static final class TransferQueue<E> extends Transferer<E> {
    /*
     * This extends Scherer-Scott dual queue algorithm, differing,
     * among other ways, by using modes within nodes rather than
     * marked pointers. The algorithm is a little simpler than
     * that for stacks because fulfillers do not need explicit
     * nodes, and matching is done by CAS'ing QNode.item field
     * from non-null to null (for put) or vice versa (for take).
     */

    /** Node class for TransferQueue. */
    static final class QNode {
        volatile QNode next;          // next node in queue
        volatile Object item;         // CAS'ed to or from null
        volatile Thread waiter;       // to control park/unpark
        final boolean isData;
transfer方法
  1. 如果队列为空或者头节点模式和自己的模式相同,尝试将自己增加到队列的等待者中,等待被满足或者被取消
  2. 如果队列包含了在等待的节点,并且本次调用是与之模式匹配的调用,尝试通过CAS修改等待节点item字段然后将其出队
E transfer(E e, boolean timed, long nanos) {
    /* Basic algorithm is to loop trying to take either of
     * two actions:
     *
     * 1. If queue apparently empty or holding same-mode nodes,
     *    try to add node to queue of waiters, wait to be
     *    fulfilled (or cancelled) and return matching item.
     *
     * 2. If queue apparently contains waiting items, and this
     *    call is of complementary mode, try to fulfill by CAS'ing
     *    item field of waiting node and dequeuing it, and then
     *    returning matching item.
     *
     * In each case, along the way, check for and try to help
     * advance head and tail on behalf of other stalled/slow
     * threads.
     *
     * The loop starts off with a null check guarding against
     * seeing uninitialized head or tail values. This never
     * happens in current SynchronousQueue, but could if
     * callers held non-volatile/final ref to the
     * transferer. The check is here anyway because it places
     * null checks at top of loop, which is usually faster
     * than having them implicitly interspersed.
     */

    QNode s = null; // constructed/reused as needed
    boolean isData = (e != null);

    for (;;) {
        QNode t = tail;
        QNode h = head;
        if (t == null || h == null)         // saw uninitialized value
            continue;                       // spin

        // 如果队列为空或者模式与头节点相同
        if (h == t || t.isData == isData) { // empty or same-mode
            QNode tn = t.next;
            // 如果有其他线程修改了tail,进入下一循环重读
            if (t != tail)                  // inconsistent read
                continue;
            // 如果有其他线程修改了tail,尝试cas更新尾节点,进入下一循环重读
            if (tn != null) {               // lagging tail
                advanceTail(t, tn);
                continue;
            }
            // 超时返回
            if (timed && nanos <= 0L)       // can't wait
                return null;
            // 构建一个新节点
            if (s == null)
                s = new QNode(e, isData);
            // 尝试CAS设置尾节点的next字段指向自己
            // 如果失败,重试
            if (!t.casNext(null, s))        // failed to link in
                continue;
      
            // cas设置当前节点为尾节点
            advanceTail(t, s);              // swing tail and wait
            // 等待匹配的节点
            Object x = awaitFulfill(s, e, timed, nanos);
            // 如果被取消,删除自己,返回null
            if (x == s) {                   // wait was cancelled
                clean(t, s);
                return null;
            }

            // 如果此节点没有被模式匹配的线程出队
            // 那么自己进行出队操作
            if (!s.isOffList()) {           // not already unlinked
                advanceHead(t, s);          // unlink if head
                if (x != null)              // and forget fields
                    s.item = s;
                s.waiter = null;
            }
            return (x != null) ? (E)x : e;

        } else {                            // complementary-mode
            QNode m = h.next;               // node to fulfill
            // 数据不一致,重读
            if (t != tail || m == null || h != head)
                continue;                   // inconsistent read

            Object x = m.item;
            if (isData == (x != null) ||    // m already fulfilled     m已经匹配成功了
                x == m ||                   // m cancelled             m被取消了
                !m.casItem(x, e)) {         // lost CAS                CAS竞争失败
                // 上面三个条件无论哪一个满足,都证明m已经失效无用了,
                // 需要将其出队
                advanceHead(h, m);          // dequeue and retry
                continue;
            }

            // 成功匹配,依然需要将节点出队
            advanceHead(h, m);              // successfully fulfilled
            // 唤醒匹配节点,如果它被阻塞了
            LockSupport.unpark(m.waiter);
            return (x != null) ? (E)x : e;
        }
    }
}

Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
    /* Same idea as TransferStack.awaitFulfill */
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    int spins = (head.next == s)
        ? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS)
        : 0;
    for (;;) {
        if (w.isInterrupted())
            s.tryCancel(e);
        Object x = s.item;
        // item被修改后返回
        // 如果put操作在此等待,item会被更新为null
        // 如果take操作再次等待,item会由null变为一个值
        if (x != e)
            return x;
        if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                s.tryCancel(e);
                continue;
            }
        }
        if (spins > 0) {
            --spins;
            Thread.onSpinWait();
        }
        else if (s.waiter == null)
            s.waiter = w;
        else if (!timed)
            LockSupport.park(this);
        else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
            LockSupport.parkNanos(this, nanos);
    }
}

使用TransferQueue即公平模式插入节点,队列的变化如下:

img

img

注意匹配的时候item的变化。

public operations

SynchronousQueue类的公共操作都是依赖于transfer方法完成的,注意不同的方法调用transfer方法时提供的参数。

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    if (transferer.transfer(e, false, 0) == null) {
        Thread.interrupted();
        throw new InterruptedException();
    }
}

public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
    if (e == null) throw new NullPointerException();
    if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
        return true;
    if (!Thread.interrupted())
        return false;
    throw new InterruptedException();
}

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    return transferer.transfer(e, true, 0) != null;
}

------------------------------------------------------------------

public E take() throws InterruptedException {
    E e = transferer.transfer(null, false, 0);
    if (e != null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E e = transferer.transfer(null, true, unit.toNanos(timeout));
    if (e != null || !Thread.interrupted())
        return e;
    throw new InterruptedException();
}

public E poll() {
    return transferer.transfer(null, true, 0);
}
核心要点
  1. 可以指定锁的公平性
  2. 队列内部不会存储元素,所以尽量避免使用add,offer此类立即返回的方法,除非有特殊需求