JAVA阻塞队列实现

img

什么是阻塞队列

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

队列的特点是:先进先出(FIFO)

BlockingQueue的方法

阻塞队列提供了四种处理方法:

方法\处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
检查 remove() peek() 不可用 不可用
  1. 抛出异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException(“Queue full”)异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常 。
  2. 返回特殊值:插入方法会返回是否成功,成功则返回true。移除方法,则是从队列里拿出一个元素,如果没有则返回null
  3. 一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。
  4. 超时退出:当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。

Java里的阻塞队列

  • ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列,遵循FIFO原则。
  • LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列,遵循FIFO原则,默认和最大长度为Integer.MAX_VALUE。
  • PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
  • DelayQueue:一个使用优先级队列实现的支持延时无界阻塞队列。
  • SynchronousQueue:一个不存储元素的阻塞队列。
  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
  • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

有界:有初始化最大长度,达到最大程度继续添加要莫阻塞,要莫抛出异常

无界:没有初始化最大长度,能够一直添加,不会阻塞或抛出异常,一直到OOM。

因为阻塞队列实现都差不多,我们就拿ArrayBlockingQueue来看下实现

ArrayBlockingQueue结构

img

阻塞队列的实现都差不多,我们就拿ArrayBlockingQueue 来举例

   //底层数据结构
	private final E[] items;
//用来为下一个take/poll/remove的索引(出队)
   private int takeIndex;
//用来为下一个put/offer/add的索引(入队)
   private int putIndex;
//队列中元素的个数
   private int count;

//定义的可重入锁
   final ReentrantLock lock;

   //非空的条件
   private final Condition notEmpty;

   //非满的条件
   private final Condition notFull;

构造方法


 /**
    * 创造一个队列,指定队列容量,默认模式为非公平模式
    * @param capacity <1会抛异常
    */
   public ArrayBlockingQueue(int capacity) {
       this(capacity, false);
   }


/**
    * ArrayBlockingQueue 的构造方法
    *
    * @param capacity 初始化大小 默认Integer
    * @param fair     是否使用公平锁
    */
   public ArrayBlockingQueue(int capacity, boolean fair) {
       //指定大小<=0 抛出异常
       if (capacity <= 0) {
           throw new IllegalArgumentException();
       }
       //初始化数组的大小
       this.items = new Object[capacity];
       //创建可重入锁
       lock = new ReentrantLock(fair);
       //创建非空条件
       notEmpty = lock.newCondition();
       //创建非满条件
       notFull = lock.newCondition();
   }

入队

offer不阻塞添加

在队尾插入一个元素, 如果队列没满,立即返回true; 如果队列满了,立即返回false

  /**
 * 在队尾插入一个元素,
 * 如果队列没满,立即返回true;
 * 如果队列满了,立即返回false
 * 注意:该方法通常优于add(),因为add()失败直接抛异常
 */
public boolean offer(E e) {
    //检查非空
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    //加锁
    lock.lock();
    try {
        //队列满了
        if (count == items.length) {
            return false;
            //队列没有满
        } else {
            //入队
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}

offer等待超时阻塞添加

/**
 * 在队尾插入一个元素,如果数组已满,则进入等待,直到出现以下三种情况:
 *  1、被唤醒
 *  2、等待时间超时
 *  3、当前线程被中断
 * @param e 需要添加的元素
 * @param timeout 超时时间
 * @param unit 时间单位
 * @return
 * @throws InterruptedException
 */
public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
    //检查非空
    checkNotNull(e);
    //计算等待时间
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    //可中断锁加锁
    lock.lockInterruptibly();
    try {
        //队列满
        while (count == items.length) {
            //如果超时返回fasle
            if (nanos <= 0)
                return false;
             /*
             * 进行等待:
             * 在这个过程中可能发生三件事:
             * 1、被唤醒-->继续当前这个for(;;)循环
             * 2、超时-->继续当前这个for(;;)循环
             * 3、被中断-->之后直接执行catch部分的代码
             */
            nanos = notFull.awaitNanos(nanos);
        }
        //入队
        enqueue(e);
        return true;
    } finally {
        //解锁
        lock.unlock();
    }
}

put阻塞添加

在队尾插入一个元素,如果队列满了,一直阻塞,直到数组不满了或者线程被中断

/**
 * 在队尾插入一个元素
 * 如果队列满了,一直阻塞,直到数组不满了或者线程被中断
 */
public void put(E e) throws InterruptedException {
    //检查非空
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    //可中断锁-加锁
    lock.lockInterruptibly();
    try {
        //队列满了 阻塞
        while (count == items.length) {
            notFull.await();
        }
        //入队
        enqueue(e);
    } finally {
        //解锁
        lock.unlock();
    }
}

这里使用的lock.lockInterruptibly() ,当前线程如果调用了Thread.interrupt()方法,那么lockInterruptible()判断的Thread.interrupted()聚会成立,就会抛出异常,其实就是线程中断,该方法就抛出异常。

enqueue入队操作

/**
   * 入队操作
   *
   * @param x 需要入队的袁旭
   */
  private void enqueue(E x) {
      // assert lock.getHoldCount() == 1;
      // assert items[putIndex] == null;
      final Object[] items = this.items;
      //putIndex 默认为队列数据的长度
      items[putIndex] = x;
      //队列满了重置为0 从头开始
      if (++putIndex == items.length) {
          putIndex = 0;
      }
      //统计数字+1
      count++;
      //非空的条件阻塞的线程唤醒
      notEmpty.signal();
  }

队列没满items[putIndex] = data;达到数组长度重置putIndex,达到环形队列目的

出队

poll非阻塞出队

如果没有元素,直接返回null;如果有元素,将队头元素置null,但是要注意队头是随时变化的,并非一直是items[0]。

/**
    * 出队
    * @return
    */
   public E poll() {
       final ReentrantLock lock = this.lock;
       lock.lock();
       try {
           //队列为空返回努力了,否则出队操作
           return (count == 0) ? null : dequeue();
       } finally {
           lock.unlock();
       }
   }

poll 等待超时阻塞出队

从对头删除一个元素,如果数组不空,出队;如果数组已空且已经超时,返回null;如果数组已空且时间未超时,则进入等待,直到出现以下三种情况:

  • 被唤醒
  • 等待时间超时
  • 当前线程被中断
/**
     * 等待超时出队
     * @param timeout 超时时间
     * @param unit 单位
     * @return 出队的元素
     * @throws InterruptedException
     */
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        //计算等待时间
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        //可中断锁
        lock.lockInterruptibly();
        try {
            //队列为空
            while (count == 0) {
                //等待时间到了还未没有元素返回null
                if (nanos <= 0) {
                    return null;
                }
                /*
                 * 进行等待:
                 * 在这个过程中可能发生三件事:
                 * 1、被唤醒-->继续当前这个for(;;)循环
                 * 2、超时-->继续当前这个for(;;)循环
                 * 3、被中断-->之后直接执行catch部分的代码
                 */
                nanos = notEmpty.awaitNanos(nanos);
            }
            //出队
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

take阻塞移除

/**
    * 阻塞移除操作
    * @return 返回移除的元素
    * @throws InterruptedException
    */
   public E take() throws InterruptedException {
       final ReentrantLock lock = this.lock;
       //可中断锁
       lock.lockInterruptibly();
       try {
           //如果元素为空就阻塞
           while (count == 0) {
               //非空阻塞
               notEmpty.await();
           }
           //出队操作
           return dequeue();
       } finally {
           //解锁
           lock.unlock();
       }
   }

dequeue出队操作

/**
 * 出队操作
 * @return 返回出队的元素
 */
private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    //获取第takeIndex个元素
    E x = (E) items[takeIndex];
    //删除元素,让GC进行回收
    items[takeIndex] = null;
    //takeIndex+1 如果移除到最后一个元素 重置为0 从头开始
    if (++takeIndex == items.length) {
        takeIndex = 0;
    }
    //统计长度-1
    count--;
    if (itrs != null) {
        //元素
        itrs.elementDequeued();
    }
    //队列不满了唤醒非满线程
    notFull.signal();
    return x;
}

使用场景

延时队列 DelayQueue

在我们的业务中通常会有一些需求是这样的

  • 淘宝订单业务:下单之后如果三十分钟之内没有付款就自动取消订单。
  • 饿了吗订餐通知:下单成功后60s之后给用户发送短信通知。
  • 缓存系统,如果key到期了取出来删除

那么这类业务我们可以总结出一个特点:需要延迟工作。
由此的情况,就是我们的DelayQueue应用需求的产生。

看一个简单的例子

public class DelayedTask implements Delayed {

    public DelayedTask(int delayedTime, TimeUnit unit, String message) {
        this.delayedTime = delayedTime;
        //计算到期时间
        this.expireTime = System.currentTimeMillis() + (delayedTime > 0 ? unit.toMillis(delayedTime) : 0);
        this.message = message;
    }
    //延时时长
    private int delayedTime;

    /**
     * 到期时间
     */
    private long expireTime;
    /**
     * 消息
     */
    private String message;

    /**
     * 获取队列需要演示获取水煎
     * @param unit
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return expireTime - System.currentTimeMillis();
    }

    /**
     * 对比,将延时比较小的放在前面
     * @param other
     * @return
     */
    @Override
    public int compareTo(Delayed other) {
        long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS));
        return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
    }

    @Override
    public String toString() {
        return "出队,延时:"+delayedTime+",消息:"+message;
    }

    public static void main(String[] args) throws InterruptedException {
        Random random = new Random();
        ExecutorService executorService = Executors.newCachedThreadPool();
        DelayQueue<DelayedTask> delayedTasks = new DelayQueue<DelayedTask>();
        //生产者
        executorService.submit(() -> {
            while (true) {
                //生成随机消息
                String randomStr = RandomStringUtils.randomNumeric(10);
                //生成随机数
                int randomTime = random.nextInt(10);
                DelayedTask task = new DelayedTask(randomTime, TimeUnit.SECONDS, randomStr);
                //入队
                delayedTasks.add(task);
                System.out.println("入队,消息:" + randomStr + "延时:" + randomTime + "秒");
                Thread.sleep(1000);
            }
        });
        //消费者
        executorService.submit(() -> {
            while (true) {
                DelayedTask task = delayedTasks.take();
                System.out.println(task);
            }
        });
        //显示时间进度
        executorService.submit(() -> {
            float time = 0F;
            while (true) {
                System.out.println(time+"秒");
                Thread.sleep(500);
                time += 0.5;
            }
        });

        executorService.shutdown();
    }
}

输出

0.0秒
入队,消息:8675326967延时:5秒
0.5秒
入队,消息:8861554454延时:0秒
出队,延时:0,消息:8861554454
1.0秒
1.5秒
入队,消息:9123579697延时:1秒
2.0秒
2.5秒
出队,延时:1,消息:9123579697
入队,消息:5909478713延时:6秒
3.0秒
3.5秒
入队,消息:6287328130延时:0秒
出队,延时:0,消息:6287328130
4.0秒
4.5秒
出队,延时:5,消息:8675326967
入队,消息:4056656965延时:7秒
5.0秒
5.5秒
入队,消息:8250385270延时:9秒
6.0秒
6.5秒
入队,消息:1949026689延时:1秒
7.0秒
7.5秒
出队,延时:1,消息:1949026689
入队,消息:2952840210延时:9秒
8.0秒
8.5秒

总结

  • ArrayBlockingQueue是有界的阻塞队列,不接受null
  • 底层数据接口是数组,下标putIndex/takeIndex,构成一个环形FIFO队列
  • 所有的增删改查数组公用了一把锁ReentrantLock,入队和出队数组下标和count变更都是靠这把锁来维护安全的。
  • 阻塞的场景:1获取lock锁,2进入和取出还要满足condition 满了或者空了都等待出队和加入唤醒,ArrayBlockingQueue我们主要是put和take真正用到的阻塞方法(条件不满足)。
  • 成员cout /putIndex、takeIndex是共享的,所以一些查询方法size、peek、toString、方法也是加上锁保证线程安全,但没有了并发损失了性能。
  • remove(Object obj) 返回了第一个equals的Object

三种入队对比

  • offer(E e):如果队列没满,立即返回true; 如果队列满了,立即返回false–>不阻塞
  • put(E e):如果队列满了,一直阻塞,直到数组不满了或者线程被中断–>阻塞
  • offer(E e, long timeout, TimeUnit unit):在队尾插入一个元素,,如果数组已满,则进入等待,直到出现以下三种情况:–>阻塞
    • 被唤醒
    • 等待时间超时
    • 当前线程被中断

三种出对对比

  • poll():如果没有元素,直接返回null;如果有元素,出队
  • take():如果队列空了,一直阻塞,直到数组不为空或者线程被中断–>阻塞
  • poll(long timeout, TimeUnit unit):如果数组不空,出队;如果数组已空且已经超时,返回null;如果数组已空且时间未超时,则进入等待,直到出现以下三种情况:
    • 被唤醒
    • 等待时间超时
    • 当前线程被中断

等待通知模式

这里面要理解等待/通知模式

阻塞队列使用了等待/通知的设计模式

标准范式

等待方

 //等待方
public void wait() {
    lock.lock();
    try {
        while (条件) {
            condition.await();
        }
        //todo 业务代码
    } finally {
        lock.unlock();
    }
}

通知方

public void notify() {
    //todo 改变数据
    condition.signal();
}

等待超时模式

标准范式
public Integer wait(long time, TimeUnit timeUnit) {
    //获取到期时间
    long duration = timeUnit.toMillis(time);
    lock.lock();
    try {
        while (duration > 0) {
            duration += System.currentTimeMillis();
            condition.await(time, timeUnit);
            duration -= System.currentTimeMillis();
        }
        //todo 业务代码
        return null;
    } finally {
        lock.unlock();
    }
    }