JAVA阻塞队列实现
JAVA阻塞队列实现
什么是阻塞队列
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
队列的特点是:先进先出(FIFO)
BlockingQueue的方法
阻塞队列提供了四种处理方法:
方法\处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
移除 | remove() | poll() | take() | poll(time, unit) |
检查 | remove() | peek() | 不可用 | 不可用 |
- 抛出异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException(“Queue full”)异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常 。
- 返回特殊值:插入方法会返回是否成功,成功则返回true。移除方法,则是从队列里拿出一个元素,如果没有则返回null
- 一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。
- 超时退出:当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。
Java里的阻塞队列
- ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列,遵循FIFO原则。
- LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列,遵循FIFO原则,默认和最大长度为Integer.MAX_VALUE。
- PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
- DelayQueue:一个使用优先级队列实现的支持延时无界阻塞队列。
- SynchronousQueue:一个不存储元素的阻塞队列。
- LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
- LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
有界:有初始化最大长度,达到最大程度继续添加要莫阻塞,要莫抛出异常
无界:没有初始化最大长度,能够一直添加,不会阻塞或抛出异常,一直到OOM。
因为阻塞队列实现都差不多,我们就拿ArrayBlockingQueue来看下实现
ArrayBlockingQueue结构
阻塞队列的实现都差不多,我们就拿ArrayBlockingQueue 来举例
//底层数据结构
private final E[] items;
//用来为下一个take/poll/remove的索引(出队)
private int takeIndex;
//用来为下一个put/offer/add的索引(入队)
private int putIndex;
//队列中元素的个数
private int count;
//定义的可重入锁
final ReentrantLock lock;
//非空的条件
private final Condition notEmpty;
//非满的条件
private final Condition notFull;
构造方法
/**
* 创造一个队列,指定队列容量,默认模式为非公平模式
* @param capacity <1会抛异常
*/
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
/**
* ArrayBlockingQueue 的构造方法
*
* @param capacity 初始化大小 默认Integer
* @param fair 是否使用公平锁
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
//指定大小<=0 抛出异常
if (capacity <= 0) {
throw new IllegalArgumentException();
}
//初始化数组的大小
this.items = new Object[capacity];
//创建可重入锁
lock = new ReentrantLock(fair);
//创建非空条件
notEmpty = lock.newCondition();
//创建非满条件
notFull = lock.newCondition();
}
入队
offer不阻塞添加
在队尾插入一个元素, 如果队列没满,立即返回true; 如果队列满了,立即返回false
/**
* 在队尾插入一个元素,
* 如果队列没满,立即返回true;
* 如果队列满了,立即返回false
* 注意:该方法通常优于add(),因为add()失败直接抛异常
*/
public boolean offer(E e) {
//检查非空
checkNotNull(e);
final ReentrantLock lock = this.lock;
//加锁
lock.lock();
try {
//队列满了
if (count == items.length) {
return false;
//队列没有满
} else {
//入队
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
offer等待超时阻塞添加
/**
* 在队尾插入一个元素,如果数组已满,则进入等待,直到出现以下三种情况:
* 1、被唤醒
* 2、等待时间超时
* 3、当前线程被中断
* @param e 需要添加的元素
* @param timeout 超时时间
* @param unit 时间单位
* @return
* @throws InterruptedException
*/
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
//检查非空
checkNotNull(e);
//计算等待时间
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
//可中断锁加锁
lock.lockInterruptibly();
try {
//队列满
while (count == items.length) {
//如果超时返回fasle
if (nanos <= 0)
return false;
/*
* 进行等待:
* 在这个过程中可能发生三件事:
* 1、被唤醒-->继续当前这个for(;;)循环
* 2、超时-->继续当前这个for(;;)循环
* 3、被中断-->之后直接执行catch部分的代码
*/
nanos = notFull.awaitNanos(nanos);
}
//入队
enqueue(e);
return true;
} finally {
//解锁
lock.unlock();
}
}
put阻塞添加
在队尾插入一个元素,如果队列满了,一直阻塞,直到数组不满了或者线程被中断
/**
* 在队尾插入一个元素
* 如果队列满了,一直阻塞,直到数组不满了或者线程被中断
*/
public void put(E e) throws InterruptedException {
//检查非空
checkNotNull(e);
final ReentrantLock lock = this.lock;
//可中断锁-加锁
lock.lockInterruptibly();
try {
//队列满了 阻塞
while (count == items.length) {
notFull.await();
}
//入队
enqueue(e);
} finally {
//解锁
lock.unlock();
}
}
这里使用的lock.lockInterruptibly() ,当前线程如果调用了Thread.interrupt()方法,那么lockInterruptible()判断的Thread.interrupted()聚会成立,就会抛出异常,其实就是线程中断,该方法就抛出异常。
enqueue入队操作
/**
* 入队操作
*
* @param x 需要入队的袁旭
*/
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
//putIndex 默认为队列数据的长度
items[putIndex] = x;
//队列满了重置为0 从头开始
if (++putIndex == items.length) {
putIndex = 0;
}
//统计数字+1
count++;
//非空的条件阻塞的线程唤醒
notEmpty.signal();
}
队列没满items[putIndex] = data;达到数组长度重置putIndex,达到环形队列目的
出队
poll非阻塞出队
如果没有元素,直接返回null;如果有元素,将队头元素置null,但是要注意队头是随时变化的,并非一直是items[0]。
/**
* 出队
* @return
*/
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//队列为空返回努力了,否则出队操作
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
poll 等待超时阻塞出队
从对头删除一个元素,如果数组不空,出队;如果数组已空且已经超时,返回null;如果数组已空且时间未超时,则进入等待,直到出现以下三种情况:
- 被唤醒
- 等待时间超时
- 当前线程被中断
/**
* 等待超时出队
* @param timeout 超时时间
* @param unit 单位
* @return 出队的元素
* @throws InterruptedException
*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
//计算等待时间
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
//可中断锁
lock.lockInterruptibly();
try {
//队列为空
while (count == 0) {
//等待时间到了还未没有元素返回null
if (nanos <= 0) {
return null;
}
/*
* 进行等待:
* 在这个过程中可能发生三件事:
* 1、被唤醒-->继续当前这个for(;;)循环
* 2、超时-->继续当前这个for(;;)循环
* 3、被中断-->之后直接执行catch部分的代码
*/
nanos = notEmpty.awaitNanos(nanos);
}
//出队
return dequeue();
} finally {
lock.unlock();
}
}
take阻塞移除
/**
* 阻塞移除操作
* @return 返回移除的元素
* @throws InterruptedException
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//可中断锁
lock.lockInterruptibly();
try {
//如果元素为空就阻塞
while (count == 0) {
//非空阻塞
notEmpty.await();
}
//出队操作
return dequeue();
} finally {
//解锁
lock.unlock();
}
}
dequeue出队操作
/**
* 出队操作
* @return 返回出队的元素
*/
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
//获取第takeIndex个元素
E x = (E) items[takeIndex];
//删除元素,让GC进行回收
items[takeIndex] = null;
//takeIndex+1 如果移除到最后一个元素 重置为0 从头开始
if (++takeIndex == items.length) {
takeIndex = 0;
}
//统计长度-1
count--;
if (itrs != null) {
//元素
itrs.elementDequeued();
}
//队列不满了唤醒非满线程
notFull.signal();
return x;
}
使用场景
延时队列 DelayQueue
在我们的业务中通常会有一些需求是这样的
- 淘宝订单业务:下单之后如果三十分钟之内没有付款就自动取消订单。
- 饿了吗订餐通知:下单成功后60s之后给用户发送短信通知。
- 缓存系统,如果key到期了取出来删除
那么这类业务我们可以总结出一个特点:需要延迟工作。
由此的情况,就是我们的DelayQueue应用需求的产生。
看一个简单的例子
public class DelayedTask implements Delayed {
public DelayedTask(int delayedTime, TimeUnit unit, String message) {
this.delayedTime = delayedTime;
//计算到期时间
this.expireTime = System.currentTimeMillis() + (delayedTime > 0 ? unit.toMillis(delayedTime) : 0);
this.message = message;
}
//延时时长
private int delayedTime;
/**
* 到期时间
*/
private long expireTime;
/**
* 消息
*/
private String message;
/**
* 获取队列需要演示获取水煎
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
return expireTime - System.currentTimeMillis();
}
/**
* 对比,将延时比较小的放在前面
* @param other
* @return
*/
@Override
public int compareTo(Delayed other) {
long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS));
return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}
@Override
public String toString() {
return "出队,延时:"+delayedTime+",消息:"+message;
}
public static void main(String[] args) throws InterruptedException {
Random random = new Random();
ExecutorService executorService = Executors.newCachedThreadPool();
DelayQueue<DelayedTask> delayedTasks = new DelayQueue<DelayedTask>();
//生产者
executorService.submit(() -> {
while (true) {
//生成随机消息
String randomStr = RandomStringUtils.randomNumeric(10);
//生成随机数
int randomTime = random.nextInt(10);
DelayedTask task = new DelayedTask(randomTime, TimeUnit.SECONDS, randomStr);
//入队
delayedTasks.add(task);
System.out.println("入队,消息:" + randomStr + "延时:" + randomTime + "秒");
Thread.sleep(1000);
}
});
//消费者
executorService.submit(() -> {
while (true) {
DelayedTask task = delayedTasks.take();
System.out.println(task);
}
});
//显示时间进度
executorService.submit(() -> {
float time = 0F;
while (true) {
System.out.println(time+"秒");
Thread.sleep(500);
time += 0.5;
}
});
executorService.shutdown();
}
}
输出
0.0秒
入队,消息:8675326967延时:5秒
0.5秒
入队,消息:8861554454延时:0秒
出队,延时:0,消息:8861554454
1.0秒
1.5秒
入队,消息:9123579697延时:1秒
2.0秒
2.5秒
出队,延时:1,消息:9123579697
入队,消息:5909478713延时:6秒
3.0秒
3.5秒
入队,消息:6287328130延时:0秒
出队,延时:0,消息:6287328130
4.0秒
4.5秒
出队,延时:5,消息:8675326967
入队,消息:4056656965延时:7秒
5.0秒
5.5秒
入队,消息:8250385270延时:9秒
6.0秒
6.5秒
入队,消息:1949026689延时:1秒
7.0秒
7.5秒
出队,延时:1,消息:1949026689
入队,消息:2952840210延时:9秒
8.0秒
8.5秒
总结
- ArrayBlockingQueue是有界的阻塞队列,不接受null
- 底层数据接口是数组,下标putIndex/takeIndex,构成一个环形FIFO队列
- 所有的增删改查数组公用了一把锁ReentrantLock,入队和出队数组下标和count变更都是靠这把锁来维护安全的。
- 阻塞的场景:1获取lock锁,2进入和取出还要满足condition 满了或者空了都等待出队和加入唤醒,ArrayBlockingQueue我们主要是put和take真正用到的阻塞方法(条件不满足)。
- 成员cout /putIndex、takeIndex是共享的,所以一些查询方法size、peek、toString、方法也是加上锁保证线程安全,但没有了并发损失了性能。
- remove(Object obj) 返回了第一个equals的Object
三种入队对比
- offer(E e):如果队列没满,立即返回true; 如果队列满了,立即返回false–>不阻塞
- put(E e):如果队列满了,一直阻塞,直到数组不满了或者线程被中断–>阻塞
- offer(E e, long timeout, TimeUnit unit):在队尾插入一个元素,,如果数组已满,则进入等待,直到出现以下三种情况:–>阻塞
- 被唤醒
- 等待时间超时
- 当前线程被中断
三种出对对比
- poll():如果没有元素,直接返回null;如果有元素,出队
- take():如果队列空了,一直阻塞,直到数组不为空或者线程被中断–>阻塞
- poll(long timeout, TimeUnit unit):如果数组不空,出队;如果数组已空且已经超时,返回null;如果数组已空且时间未超时,则进入等待,直到出现以下三种情况:
- 被唤醒
- 等待时间超时
- 当前线程被中断
等待通知模式
这里面要理解等待/通知模式
阻塞队列使用了等待/通知的设计模式
标准范式
等待方
//等待方
public void wait() {
lock.lock();
try {
while (条件) {
condition.await();
}
//todo 业务代码
} finally {
lock.unlock();
}
}
通知方
public void notify() {
//todo 改变数据
condition.signal();
}
等待超时模式
标准范式
public Integer wait(long time, TimeUnit timeUnit) {
//获取到期时间
long duration = timeUnit.toMillis(time);
lock.lock();
try {
while (duration > 0) {
duration += System.currentTimeMillis();
condition.await(time, timeUnit);
duration -= System.currentTimeMillis();
}
//todo 业务代码
return null;
} finally {
lock.unlock();
}
}