集合面试题
ArrayList和LinkedList有什么区别?
首先,他们的底层数据结构不同,ArrayList底层是基于数组实现的,LinkedList底层是基于链表实现的
由于底层数据结构不同,他们所适用的场景也不同,ArrayList更适合随机查找,LinkedList更适合删除和添加,查询、添加、删除的时间复杂度不同
另外ArrayList和LinkedList都实现了List接口,但是LinkedList还额外实现了Deque接口,所以LinkedList还可以当做队列来使用。
ArrayList的sublist修改是否影响list本身?
方法实现
// fromIndex: 集合开始的索引,toIndex:集合结束的索引,左开右闭
public List<E> subList(int fromIndex, int toIndex) {
// 边界校验
subListRangeCheck(fromIndex, toIndex, size);
// subList 返回是一个视图
return new SubList(this, 0, fromIndex, toIndex);
}
// ArrayList 的内部类,这个类中单独定义了 set、get、size、add、remove 等方法
private class SubList extends AbstractList<E> implements RandomAccess {
private final AbstractList<E> parent; // parent的具体实现类是 ArrayList
private final int parentOffset;
private final int offset;
int size;
SubList(AbstractList<E> parent,int offset, int fromIndex, int toIndex) {
this.parent = parent;
this.parentOffset = fromIndex;
this.offset = offset + fromIndex;
this.size = toIndex - fromIndex;
this.modCount = ArrayList.this.modCount;
}
public E set(int index, E e) {
rangeCheck(index);
checkForComodification();
E oldValue = ArrayList.this.elementData(offset + index);
ArrayList.this.elementData[offset + index] = e;
return oldValue;
}
public E get(int index) {
rangeCheck(index);
checkForComodification();
return ArrayList.this.elementData(offset + index);
}
public int size() {
checkForComodification();
return this.size;
}
public void add(int index, E e) {
rangeCheckForAdd(index);
checkForComodification();
// 添加直接调用父类的添加元素的方法
parent.add(parentOffset + index, e);
// subList 添加的元素后,会同步父集合的modCount 修改到 subList的modCount,
this.modCount = parent.modCount;
this.size++;
}
public E remove(int index) {
rangeCheck(index);
checkForComodification();
E result = parent.remove(parentOffset + index);
this.modCount = parent.modCount;
this.size--;
return result;
}
private void checkForComodification() {
if (ArrayList.this.modCount != this.modCount)
throw new ConcurrentModificationException();
}
}
subList 可以做集合的任何操作
调用该方法后的生成的新的集合的操作都会对原集合有影响,在subList集合后面添加元素,添加的第一个元素的位置就是上述toIndex的值,而原始集合中toIndex的元素往后移动。其add方法调用过程:
add(element) --> AbstractList.add(e) --> SubList.add(index, e) --> parent.add(index + parentOffset, e) --> ArrayList.add(newIndex, e)
List 的 subList 方法并没有创建一个新的 List,而是使用了 原 List 的视图,这个视图使用内部类 SubList 表示;不能把 subList 方法返回的 List 强制转换成 ArrayList 等类,因为他 们之间没有继承关系;
视图和原 List 的修改还需要注意几点,尤其是他们之间的相互影响:
- 对 父 (sourceList) 子 (subList)List 做 的 非 结 构 性 修 改(non-structural changes),都会影响到彼此;
- 对
子List
做结构性修改,操作同样会反映到父List
上;子List的 add 是直接调用父集合的add方法来添加的元素的:
public void add(int index, E e) {
rangeCheckForAdd(index);
checkForComodification();
parent.add(parentOffset + index, e);
this.modCount = parent.modCount;
this.size++;
}
对父List
做结构性修改(增加、删除),均会导致子List
的遍历、增加、删除抛出异常 ConcurrentModificationException;因为其迭代的时候会对比父List的modCount
和子集合的modCount
:
private void checkForComodification() {
// ArrayList.this.modCount 表示父List的 modCount,this.modCount表示 子List的modCount
if (ArrayList.this.modCount != this.modCount)
throw new ConcurrentModificationException();
}
SynchronizedList、Vector有什么区别?
- SynchronizedList 是java.util.Collections的静态内部类;Vector是java.util包中的一个类;
- 使用add方法时,扩容机制不一样;
- SynchronizedList有很好的扩展和兼容功能,可以将所有的List的子类转成线程安全的类;
- 使用SynchronizedList的时候,进行遍历时需要手动进行同步处理;
- SynchronizedList可以指定锁的对象
Arrays.asList(T…args)获得的List特点?
- 其返回的List是Arrays的一个内部类,是原来数组的视图,不支持增删操作;
- 如果需要对其进行操作的话,可以通过ArrayList的构造器将其转为ArrayList;
Iterator和ListIterator区别?
- 都是用于遍历集合的,Iterator可以用于遍历Set、List;ListIterator只可用于List;
- ListIterator实现的Iterator接口;
- ListIterator可向前和向后遍历;Iterator只可向后遍历;
ArrayList是怎么扩容的?
ArrayList内部使用数组存储元素,当数组长度不够时进行扩容,每次加一半的空间,ArrayList不会进行缩容;
private void grow(int minCapacity) {
// overflow-conscious code
int oldCapacity = elementData.length;
// 新容量为旧容量的1.5倍
int newCapacity = oldCapacity + (oldCapacity >> 1);
// 如果新容量发现比需要的容量还小,则以需要的容量为准
if (newCapacity - minCapacity < 0)
newCapacity = minCapacity;
// 如果新容量已经超过最大容量了,则使用最大容量
if (newCapacity - MAX_ARRAY_SIZE > 0)
newCapacity = hugeCapacity(minCapacity);
// 以新容量拷贝出来一个新数组
elementData = Arrays.copyOf(elementData, newCapacity);
}
ArrayList插入、删除、查询元素的时间复杂度各是多少?
ArrayList支持随机访问,通过索引访问元素极快,时间复杂度为O(1);
public E get(int index) {
// 检查是否越界
rangeCheck(index);
// 返回数组index位置的元素
return elementData(index);
}
如果根据比较对象的话时间复杂度就是O(n)
public int indexOf(Object o) {
if (o == null) {
for (int i = 0; i < size; i++)
if (elementData[i]==null)
return i;
} else {
for (int i = 0; i < size; i++)
if (o.equals(elementData[i]))
return i;
}
return -1;
}
ArrayList添加元素到尾部极快,平均时间复杂度为O(1);
public boolean add(E e) {
// 检查是否需要扩容
ensureCapacityInternal(size + 1); // Increments modCount!!
// 把元素插入到最后一位
elementData[size++] = e;
return true;
}
ArrayList添加元素到中间比较慢,因为要搬移元素,平均时间复杂度为O(n);
public void add(int index, E element) {
// 检查是否越界
rangeCheckForAdd(index);
// 检查是否需要扩容
ensureCapacityInternal(size + 1); // Increments modCount!!
// 将index及其之后的元素往后挪一位,则index位置处就空出来了
System.arraycopy(elementData, index, elementData, index + 1,
size - index);
// 将元素插入到index的位置
elementData[index] = element;
// 大小加
size++;
}
ArrayList从尾部删除元素极快,时间复杂度为O(1);
ArrayList从中间删除元素比较慢,因为要搬移元素,平均时间复杂度为O(n);
public boolean remove(Object o) {
if (o == null) {
// 遍历整个数组,找到元素第一次出现的位置,并将其快速删除
for (int index = 0; index < size; index++)
// 如果要删除的元素为null,则以null进行比较,使用==
if (elementData[index] == null) {
fastRemove(index);
return true;
}
} else {
// 遍历整个数组,找到元素第一次出现的位置,并将其快速删除
for (int index = 0; index < size; index++)
// 如果要删除的元素不为null,则进行比较,使用equals()方法
if (o.equals(elementData[index])) {
fastRemove(index);
return true;
}
}
return false;
}
private void fastRemove(int index) {
// 少了一个越界的检查
modCount++;
// 如果index不是最后一位,则将index之后的元素往前挪一位
int numMoved = size - index - 1;
if (numMoved > 0)
System.arraycopy(elementData, index+1, elementData, index,
numMoved);
// 将最后一个元素删除,帮助GC
elementData[--size] = null; // clear to let GC do its work
}
怎么求两个集合的并集、交集、差集?
ArrayList支持求并集,调用addAll(Collection<? extends E> c)方法即可;
public boolean addAll(Collection<? extends E> c) {
// 将集合c转为数组
Object[] a = c.toArray();
int numNew = a.length;
// 检查是否需要扩容
ensureCapacityInternal(size + numNew); // Increments modCount
// 将c中元素全部拷贝到数组的最后
System.arraycopy(a, 0, elementData, size, numNew);
// 大小增加c的大小
size += numNew;
// 如果c不为空就返回true,否则返回false
return numNew != 0;
}
ArrayList支持求交集,调用retainAll(Collection<? extends E> c)方法即可;
ArrayList支持求单向差集,调用removeAll(Collection<? extends E> c)方法即可;
/**
* 批量删除元素
* complement为true表示删除c中不包含的元素
* complement为false表示删除c中包含的元素
*/
private boolean batchRemove(Collection<?> c, boolean complement) {
final Object[] elementData = this.elementData;
// 使用读写两个指针同时遍历数组
// 读指针每次自增1,写指针放入元素的时候才加1
// 这样不需要额外的空间,只需要在原有的数组上操作就可以了
int r = 0, w = 0;
boolean modified = false;
try {
// 遍历整个数组,如果c中包含该元素,则把该元素放到写指针的位置(以complement为准)
for (; r < size; r++)
if (c.contains(elementData[r]) == complement)
elementData[w++] = elementData[r];
} finally {
// 正常来说r最后是等于size的,除非c.contains()抛出了异常
if (r != size) {
System.arraycopy(elementData, r,
elementData, w,
size - r);
w += size - r;
}
/**
* 原有的:a:1 3 4
* 比较的:b:1 3 2
*/
if (w != size) {
// 将写指针之后的元素置为空,帮助GC
for (int i = w; i < size; i++)
elementData[i] = null;
modCount += size - w;
// 新大小等于写指针的位置(因为每写一次写指针就加1,所以新大小正好等于写指针的位置)
size = w;
modified = true;
}
}
return modified;
}
ArrayList是怎么实现序列化和反序列化的?
序列化
private void writeObject(java.io.ObjectOutputStream s)
throws java.io.IOException{
// 防止序列化期间被修改
int expectedModCount = modCount;
// 写出非transient非static属性(会写出size属性)
s.defaultWriteObject();
// 写出元素个数
s.writeInt(size);
// 依次写出元素
for (int i=0; i<size; i++) {
s.writeObject(elementData[i]);
}
// 如果有修改,抛出异常
if (modCount != expectedModCount) {
throw new ConcurrentModificationException();
}
}
反序列化
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
// 声明为空数组
elementData = EMPTY_ELEMENTDATA;
// 读入非transient非static属性(会读取size属性)
s.defaultReadObject();
// 读入元素个数,没什么用,只是因为写出的时候写了size属性,读的时候也要按顺序来读
s.readInt(); // ignored
if (size > 0) {
// 计算容量
ensureCapacityInternal(size);
Object[] a = elementData;
// // 依次读取元素到数组中
for (int i=0; i<size; i++) {
a[i] = s.readObject();
}
}
}
查看writeObject()方法可知,先调用s.defaultWriteObject()方法,再把size写入到流中,再把元素一个一个的写入到流中。
一般地,只要实现了Serializable接口即可自动序列化,writeObject()和readObject()是为了自己控制序列化的方式,这两个方法必须声明为private,在java.io.ObjectStreamClass#getPrivateMethod()方法中通过反射获取到writeObject()这个方法。
在ArrayList的writeObject()方法中先调用了s.defaultWriteObject()方法,这个方法是写入非static非transient的属性,在ArrayList中也就是size属性。同样地,在readObject()方法中先调用了s.defaultReadObject()方法解析出了size属性。
elementData定义为transient的优势,自己根据size序列化真实的元素,而不是根据数组的长度序列化元素,减少了空间占用。
集合的方法toArray()有什么问题?
public class ArrayTest {
public static void main(String[] args) {
Father[] fathers = new Son[]{};
// 打印结果为class [Lcom.coolcoding.code.Son;
System.out.println(fathers.getClass());
List<String> strList = new MyList();
// 打印结果为class [Ljava.lang.String;
System.out.println(strList.toArray().getClass());
}
}
class Father {}
class Son extends Father {}
class MyList extends ArrayList<String> {
/**
* 子类重写父类的方法,返回值可以不一样
* 但这里只能用数组类型,换成Object就不行
* 应该算是java本身的bug
*/
@Override
public String[] toArray() {
// 为了方便举例直接写死
return new String[]{"1", "2", "3"};
}
}
什么是fail-fast?
定义
“快速失败”也就是fail-fast,它是Java集合的一种错误检测机制。当多个线程对集合进行结构上的改变的操作时,有可能会产生fail-fast机制。记住是有可能,而不是一定。例如:假设存在两个线程(线程1、线程2),线程1通过Iterator在遍历集合A中的元素,在某个时候线程2修改了集合A的结构(是结构上面的修改,而不是简单的修改集合元素的内容),那么这个时候程序就会抛出 ConcurrentModificationException 异常,从而产生fail-fast机制。
例子
public class FailFastTest {
private static List<Integer> list = new ArrayList<>();
private static class threadOne extends Thread{
public void run() {
Iterator<Integer> iterator = list.iterator();
while(iterator.hasNext()){
int i = iterator.next();
System.out.println("ThreadOne 遍历:" + i);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
private static class threadTwo extends Thread{
public void run(){
int i = 0 ;
while(i < 6){
System.out.println("ThreadTwo run:" + i);
if(i == 3){
list.remove(i);
}
i++;
}
}
}
public static void main(String[] args) {
for(int i = 0 ; i < 10;i++){
list.add(i);
}
new threadOne().start();
new threadTwo().start();
}
}
运行结果:
通过上面的示例和讲解,我初步知道fail-fast产生的原因就在于程序在对 collection 进行迭代时,某个线程对该 collection 在结构上对其做了修改,这时迭代器就会抛出 ConcurrentModificationException 异常信息,从而产生 fail-fast。
要了解fail-fast机制,我们首先要对ConcurrentModificationException 异常有所了解。当方法检测到对象的并发修改,但不允许这种修改时就抛出该异常。同时需要注意的是,该异常不会始终指出对象已经由不同线程并发修改,如果单线程违反了规则,同样也有可能会抛出改异常。
诚然,迭代器的快速失败行为无法得到保证,它不能保证一定会出现该错误,但是快速失败操作会尽最大努力抛出ConcurrentModificationException异常,所以因此,为提高此类操作的正确性而编写一个依赖于此异常的程序是错误的做法,正确做法是:ConcurrentModificationException 应该仅用于检测 bug。下面我将以ArrayList为例进一步分析fail-fast产生的原因。
fail-fast解决办法
- 方案一: 在遍历过程中所有涉及到改变modCount值得地方全部加上synchronized或者直接使用Collections.synchronizedList,这样就可以解决。但是不推荐,因为增删造成的同步锁可能会阻塞遍历操作。
- 方案二: 使用CopyOnWriteArrayList来替换ArrayList。推荐使用该方案。
LinkedList是单链表还是双链表实现的?
LinkedList是一个以双链表实现的List;
LinkedList除了作为List还有什么用处?
LinkedList还是一个双端队列,具有队列、双端队列、栈的特性;
看一下它继承了哪些接口
LinkedList插入、删除、查询元素的时间复杂度各是多少?
LinkedList在队列首尾添加、删除元素非常高效,时间复杂度为O(1);
// 从队列首添加元素
private void linkFirst(E e) {
// 从队列首添加元素
final Node<E> f = first;
// 创建新节点,新节点的next是首节点
final Node<E> newNode = new Node<>(null, e, f);
// 让新节点作为新的首节点
first = newNode;
// 判断是不是第一个添加的元素
// 如果是就把last也置为新节点
// 否则把原首节点的prev指针置为新节点
if (f == null)
last = newNode;
else
f.prev = newNode;
size++;
modCount++;
}
// 从队尾添加元素
void linkLast(E e) {
// 队列尾节点
final Node<E> l = last;
// 创建新节点,新节点的prev是尾节点
final Node<E> newNode = new Node<>(l, e, null);
// 让新节点成为新的尾节点
last = newNode;
// 判断是不是第一个添加的元素
// 如果是就把first也置为新节点
// 否则把原尾节点的next指针置为新节点
if (l == null)
first = newNode;
else
l.next = newNode;
size++;
modCount++;
}
// 删除首节点
private E unlinkFirst(Node<E> f) {
// 首节点的元素值
final E element = f.item;
// 首节点的next指针
final Node<E> next = f.next;
// 添加首节点的内容,协助GC
f.item = null;
f.next = null; // help GC
// 把首节点的next作为新的首节点
first = next;
// 如果只有一个元素,删除了,把last也置为空
// 否则把next的前置指针置为空
if (next == null)
last = null;
else
next.prev = null;
// 元素个数减1
size--;
// 修改次数加1
modCount++;
// 返回删除的元素
return element;
}
// 删除尾节点
private E unlinkLast(Node<E> l) {
// 尾节点的元素值
final E element = l.item;
// 尾节点的前置指针
final Node<E> prev = l.prev;
// 清空尾节点的内容,协助GC
l.item = null;
l.prev = null; // help GC
// 让前置节点成为新的尾节点
last = prev;
// 如果只有一个元素,删除了把first置为空
// 否则把前置节点的next置为空
if (prev == null)
first = null;
else
prev.next = null;
// 元素个数减1
size--;
// 修改次数加1
modCount++;
// 返回删除的元素
return element;
}
LinkedList在中间添加、删除元素比较低效,时间复杂度为O(n);
中间添加
// 在节点succ之前添加元素
void linkBefore(E e, Node<E> succ) {
// succ是待添加节点的后继节点
// 找到待添加节点的前置节点
final Node<E> pred = succ.prev;
// 在其前置节点和后继节点之间创建一个新节点
final Node<E> newNode = new Node<>(pred, e, succ);
// 修改后继节点的前置指针指向新节点
succ.prev = newNode;
// 判断前置节点是否为空
// 如果为空,说明是第一个添加的元素,修改first指针
// 否则修改前置节点的next为新节点
if (pred == null)
first = newNode;
else
pred.next = newNode;
// 修改元素个数
size++;
// 修改次数加1
modCount++;
}
// 寻找index位置的节点
Node<E> node(int index) {
// 因为是双链表
// 所以根据index是在前半段还是后半段决定从前遍历还是从后遍历
// 这样index在后半段的时候可以少遍历一半的元素
if (index < (size >> 1)) {
// 如果是在前半段
// 就从前遍历
Node<E> x = first;
for (int i = 0; i < index; i++)
x = x.next;
return x;
} else {
// 如果是在后半段
// 就从后遍历
Node<E> x = last;
for (int i = size - 1; i > index; i--)
x = x.prev;
return x;
}
}
// 在指定index位置处添加元素
public void add(int index, E element) {
// 判断是否越界
checkPositionIndex(index);
// 如果index是在队列尾节点之后的一个位置
// 把新节点直接添加到尾节点之后
// 否则调用linkBefore()方法在中间添加节点
if (index == size)
linkLast(element);
else
linkBefore(element, node(index));
}
在中间删除或者删除指定节点x
// 删除指定节点x
E unlink(Node<E> x) {
// x的元素值
final E element = x.item;
// x的前置节点
final Node<E> next = x.next;
// x的后置节点
final Node<E> prev = x.prev;
// 如果前置节点为空
// 说明是首节点,让first指向x的后置节点
// 否则修改前置节点的next为x的后置节点
if (prev == null) {
first = next;
} else {
prev.next = next;
x.prev = null;
}
// 如果后置节点为空
// 说明是尾节点,让last指向x的前置节点
// 否则修改后置节点的prev为x的前置节点
if (next == null) {
last = prev;
} else {
next.prev = prev;
x.next = null;
}
// 清空x的元素值,协助GC
x.item = null;
// 元素个数减1
size--;
// 修改次数加1
modCount++;
// 返回删除的元素
return element;
}
// remove的时候如果没有元素抛出异常
public E removeFirst() {
final Node<E> f = first;
if (f == null)
throw new NoSuchElementException();
return unlinkFirst(f);
}
// remove的时候如果没有元素抛出异常
public E removeLast() {
final Node<E> l = last;
if (l == null)
throw new NoSuchElementException();
return unlinkLast(l);
}
// poll的时候如果没有元素返回null
public E pollFirst() {
final Node<E> f = first;
return (f == null) ? null : unlinkFirst(f);
}
// poll的时候如果没有元素返回null
public E pollLast() {
final Node<E> l = last;
return (l == null) ? null : unlinkLast(l);
}
// 删除中间节点
public E remove(int index) {
// 检查是否越界
checkElementIndex(index);
// 删除指定index位置的节点
return unlink(node(index));
}
什么是随机访问?
java集合类中元素的访问分为随机访问和顺序访问。随机访问一般是通过index下标访问,行为类似数组的访问。而顺序访问类似于链表的访问,通常为迭代器遍历。
以List接口及其实例为例。ArrayList是典型的随机访问型,而LinkedList则是顺序访问型。List接口既定义了下标访问方法又定义了迭代器方法。所以其实例既可使用下标随机访问也可以使用迭代器进行遍历。但这两种方式的性能差异很明显。
随机访问是说你可以随意访问该数据结构中的任意一个节点,假设该数据结构有10个节点,你可以随意访问第1个到第10个节点。
对于列表而言,如果其存在10个节点,如果你要访问第5个节点,你只能从列表的头或者尾,依次遍历相邻的每一个节点;
RandomAccess接口
JDK中的RandomAccess接口是一个标记接口,它并未定义方法。其目的是用于指示实现类具有随机访问特性,在遍历时使用下标访问较迭代器更快。如果
for(int i = 0, n = list.size(); i < n; i++)
list.get(i);
的运行比
for(Interator i = list.iterator();i.hasNext();)
i.next();
快,则应实现RandomAccess接口。
哪些集合支持随机访问?他们都有哪些共性?
ArrayList、HashMap、TreeMap和HashTable类提供对元素的随机访问。
CopyOnWriteArrayList是怎么保证并发安全的?
CopyOnWriteArrayList使用ReentrantLock重入锁加锁,保证线程安全;
/** 用于修改时加锁 */
final transient ReentrantLock lock = new ReentrantLock();
/** 真正存储元素的地方,只能通过getArray()/setArray()访问 */
private transient volatile Object[] array;
CopyOnWriteArrayList的实现采用了什么思想?
CopyOnWriteArrayList采用读写分离的思想,读操作不加锁,写操作加锁,且写操作占用较大内存空间,所以适用于读多写少的场合;它不存在扩容的概念,每次写操作都要复制一个副本,在副本的基础上修改后改变Array引用。CopyOnWriteArrayList中写操作需要大面积复制数组,所以性能肯定很差。
CopyOnWriteArrayList是不是强一致性的?
不能用于实时读的场景,像拷贝数组、新增元素都需要时间,所以调用一个set操作后,读取到数据可能还是旧的,虽然CopyOnWriteArrayList 能做到最终一致性,但是还是没法满足实时性一致性要求;
CopyOnWriteArrayListaddIfAbsent(E e)了解吗?
添加一个元素如果这个元素不存在于集合中。
public boolean addIfAbsent(E e) {
// 获取元素数组, 取名为快照
Object[] snapshot = getArray();
// 检查如果元素不存在,直接返回false
// 如果存在再调用addIfAbsent()方法添加元素
return indexOf(e, snapshot, 0, snapshot.length) >= 0 ? false :
addIfAbsent(e, snapshot);
}
private boolean addIfAbsent(E e, Object[] snapshot) {
final ReentrantLock lock = this.lock;
// 加锁
lock.lock();
try {
// 重新获取旧数组
Object[] current = getArray();
int len = current.length;
// 如果快照与刚获取的数组不一致
// 说明有修改
if (snapshot != current) {
// 重新检查元素是否在刚获取的数组里
int common = Math.min(snapshot.length, len);
for (int i = 0; i < common; i++)
// 到这个方法里面了, 说明元素不在快照里面
if (current[i] != snapshot[i] && eq(e, current[i]))
return false;
if (indexOf(e, current, common, len) >= 0)
return false;
}
// 拷贝一份n+1的数组
Object[] newElements = Arrays.copyOf(current, len + 1);
// 将元素放在最后一位
newElements[len] = e;
setArray(newElements);
return true;
} finally {
// 释放锁
lock.unlock();
}
}
(1)检查这个元素是否存在于数组快照中;
(2)如果存在直接返回false,如果不存在调用addIfAbsent(E e, Object[] snapshot)处理;
(3)加锁;
(4)如果当前数组不等于传入的快照,说明有修改,检查待添加的元素是否存在于当前数组中,如果存在直接返回false;
(5)拷贝一个新数组,长度等于原数组长度加1,并把原数组元素拷贝到新数组中;
(6)把新元素添加到数组最后一位;
(7)把新数组赋值给当前对象的array属性,覆盖原数组;
(8)解锁;
CopyOnWriteArrayList适用于什么样的场景?
CopyOnWriteArrayList采用读写分离的思想,读操作不加锁,写操作加锁,且写操作占用较大内存空间,所以适用于读多写少的场合,比如缓存;不过这类慎用 ,因为谁也没法保证CopyOnWriteArrayList 到底要放置多少数据,万一数据稍微有点多,每次add/set都要重新复制数组,这个代价实在太高昂了。在高性能的互联网应用中,这种操作分分钟引起故障。
CopyOnWriteArrayList插入、删除、查询元素的时间复杂度各是多少?
CopyOnWriteArrayList的写操作都要先拷贝一份新数组,在新数组中做修改,修改完了再用新数组替换老数组,所以空间复杂度是O(n),性能比较低下;
CopyOnWriteArrayList的读操作支持随机访问,时间复杂度为O(1);
添加一个元素到末尾。
public boolean add(E e) {
final ReentrantLock lock = this.lock;
// 加锁
lock.lock();
try {
// 获取旧数组
Object[] elements = getArray();
int len = elements.length;
// 将旧数组元素拷贝到新数组中
// 新数组大小是旧数组大小加1
Object[] newElements = Arrays.copyOf(elements, len + 1);
// 将元素放在最后一位
newElements[len] = e;
setArray(newElements);
return true;
} finally {
// 释放锁
lock.unlock();
}
}
添加一个元素在指定索引处
public void add(int index, E element) {
final ReentrantLock lock = this.lock;
// 加锁
lock.lock();
try {
// 获取旧数组
Object[] elements = getArray();
int len = elements.length;
// 检查是否越界, 可以等于len
if (index > len || index < 0)
throw new IndexOutOfBoundsException("Index: "+index+
", Size: "+len);
Object[] newElements;
int numMoved = len - index;
if (numMoved == 0)
// 如果插入的位置是最后一位
// 那么拷贝一个n+1的数组, 其前n个元素与旧数组一致
newElements = Arrays.copyOf(elements, len + 1);
else {
// 如果插入的位置不是最后一位
// 那么新建一个n+1的数组
newElements = new Object[len + 1];
// 拷贝旧数组前index的元素到新数组中
System.arraycopy(elements, 0, newElements, 0, index);
// 将index及其之后的元素往后挪一位拷贝到新数组中
// 这样正好index位置是空出来的
System.arraycopy(elements, index, newElements, index + 1,
numMoved);
}
// 将元素放置在index处
newElements[index] = element;
setArray(newElements);
} finally {
// 释放锁
lock.unlock();
}
}
删除指定索引位置的元素
public E remove(int index) {
final ReentrantLock lock = this.lock;
// 加锁
lock.lock();
try {
// 获取旧数组
Object[] elements = getArray();
int len = elements.length;
E oldValue = get(elements, index);
int numMoved = len - index - 1;
if (numMoved == 0)
// 如果移除的是最后一位
// 那么直接拷贝一份n-1的新数组, 最后一位就自动删除了
setArray(Arrays.copyOf(elements, len - 1));
else {
// 如果移除的不是最后一位
// 那么新建一个n-1的新数组
Object[] newElements = new Object[len - 1];
// 将前index的元素拷贝到新数组中
System.arraycopy(elements, 0, newElements, 0, index);
// 将index后面(不包含)的元素往前挪一位
// 这样正好把index位置覆盖掉了, 相当于删除了
System.arraycopy(elements, index + 1, newElements, index,
numMoved);
setArray(newElements);
}
return oldValue;
} finally {
// 释放锁
lock.unlock();
}
}
获取指定索引的元素
public E get(int index) {
// 获取元素不需要加锁
// 直接返回index位置的元素
// 这里是没有做越界检查的, 因为数组本身会做越界检查
return get(getArray(), index);
}
final Object[] getArray() {
return array;
}
private E get(Object[] a, int index) {
return (E) a[index];
}
CopyOnWriteArrayList为什么没有size属性?
因为每次修改都是拷贝一份正好可以存储目标个数元素的数组,所以不需要size属性了,数组的长度就是集合的大小,而不像ArrayList数组的长度实际是要大于集合的大小的。
比如,add(E e)操作,先拷贝一份n+1个元素的数组,再把新元素放到新数组的最后一位,这时新数组的长度为len+1了,也就是集合的size了。
比较古老的集合Vector和Stack有什么缺陷?
Vector:线程安全的动态数组
Stack:继承Vector,基于动态数组实现的一个线程安全的栈;
Vector与ArrayList基本是一致的,不同的是Vector是线程安全的,会在可能出现线程安全的方法前面加上synchronized关键字;
Vector:随机访问速度快,插入和移除性能较差(数组的特点);支持null元素;有顺序;元素可以重复;线程安全;
Stack:后进先出,实现了一些栈基本操作的方法(其实并不是只能后进先出,因为继承自Vector,可以有很多操作,从某种意义上来讲,不是一个栈);
所以基本上来说vector 因为线程安全的实现方法比较粗暴效率较低。
什么是散列表?
散列表(Hash table,也叫哈希表),是根据关键码值(Key value)而直接进行访问的数据结构。也就是说,它通过把关键码值映射到表中一个位置来访问记录,以加快查找的速度。这个映射函数叫做散列函数,存放记录的数组叫做散列表。
给定表M,存在函数f(key),对任意给定的关键字值key,代入函数后若能得到包含该关键字的记录在表中的地址,则称表M为哈希(Hash)表,函数f(key)为哈希(Hash) 函数。
hash就是找到一种数据内容和数据存放地址之间的映射关系。
常见的散列函数?
- 直接定址法:直接以关键字k或者k加上某个常数(k+c)作为哈希地址;
- 数字分析法:提取关键字中取值比较均匀的数字作为哈希地址;
- 除留余数法:用关键字k除以某个不大于哈希表长度m的数p,将所得余数作为哈希表地址;
- 分段叠加法:按照哈希表地址位数将关键字分成位数相等的几部分,其中最后一部分可以比较短。然后将这几部分相加,舍弃最高进位后的结果就是该关键字的哈希地址;
- 平方取中法:如果关键字各个部分分布都不均匀的话,可以先求出它的平方值,然后按照需求去中间几位作为哈希表地址;
- 伪随机数法:采用一个伪随机数作为哈希函数
碰撞解决方案?
衡量一个哈希函数的好坏的重要指标就是发生碰撞的概率以及发生碰撞的解决方案。任何哈希函数基本无法彻底避免碰撞。
常见解决碰撞的方法有以下几种:
- 开发定址法:就是一旦发生了冲突,就去寻找下一个空的散列地址,只要散列表足够大,空的散列地址总能找到,并将记录存入;
- 链地址法:将哈希表的每个单元作为链表的头结点,所有哈希地址为 i 的元素构成一个同义词链表。即发生冲突时就把该关键字链在以该单元为头结点的链表尾部;
- 再哈希法:当哈希地址发生冲突用其他的函数计算另一个哈希地址,直到冲突不在产生为止;
- 建立公共溢出区:将哈希表分为基本表和溢出表两部分,发生冲突的元素都放入溢出表中。
怎么实现一个散列表?
java中HashMap实现方式的演进?
JDK1.7中底层是数组+链表,JDK1.8中底层是数组+链表+红黑树,加红黑树的目的是提高HashMap插入和查询整体效率JDK1.7中链表插入使用的是头插法,1.8中链表插入使用的是尾插法,因为1.8中插入key和value时需要判断链表元素个数,所以需要遍历链表统计链表元素个数,所以正好就直接使用尾插法JDK1.7中哈希算法比较复杂,存在各种右移与异或运算,JDK1.8中进行了简化,因为复杂的哈希算法的目的就是提高散列性,来提供HashMap的整体效率,而1.8中新增了红黑树,所以可以适当的简化哈希算法,节省CPU资源。
HashMap的容量有什么特点?
默认情况下,当我们设置HashMap的初始化容量时,实际上HashMap会采用第一个大于该数值的2的幂作为初始化容量。HashMap默认容量为16,每次超过阀值时,按照两倍大小进行自动扩容,所以容量总是 2^N 次方。并且,底层的 table
数组是延迟初始化,在首次添加 key-value 键值对才进行初始化。在HashMap中,哈希桶数组table的长度length大小必须为2的n次方(一定是合数),这是一种非常规的设计,常规的设计是把桶的大小设计为素数。相对来说素数导致冲突的概率要小于合数,具体证明可以参考http://note.csdn.net/liuqiyao_01/article/details/14475159,Hashtable初始化桶大小为11,就是桶大小设计为素数的应用(Hashtable扩容后不能保证还是素数)。HashMap采用这种非常规设计,主要是为了在取模和扩容时做优化,同时为了减少冲突,HashMap定位哈希桶索引位置时,也加入了高位参与运算的过程。
HashMap如何确定哈希桶数组索引位置?
不管增加、删除、查找键值对,定位到哈希桶数组的位置都是很关键的第一步。前面说过HashMap的数据结构是数组和链表的结合,所以我们当然希望这个HashMap里面的元素位置尽量分布均匀些,尽量使得每个位置上的元素数量只有一个,那么当我们用hash算法求得这个位置的时候,马上就可以知道对应位置的元素就是我们要的,不用遍历链表,大大优化了查询的效率。HashMap定位数组索引位置,直接决定了hash方法的离散性能。先看看源码的实现(方法一+方法二):
方法一:
static final int hash(Object key) { //jdk1.8 & jdk1.7
int h;
// h = key.hashCode() 为第一步 取hashCode值
// h ^ (h >>> 16) 为第二步 高位参与运算
return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}
方法二:
static int indexFor(int h, int length) { //jdk1.7的源码,jdk1.8没有这个方法,但是实现原理一样的
return h & (length-1); //第三步 取模运算
}
这里的Hash算法本质上就是三步:取key的hashCode值、高位运算、取模运算。
对于任意给定的对象,只要它的hashCode()返回值相同,那么程序调用方法一所计算得到的Hash码值总是相同的。我们首先想到的就是把hash值对数组长度取模运算,这样一来,元素的分布相对来说是比较均匀的。但是,模运算的消耗还是比较大的,在HashMap中是这样做的:调用方法二来计算该对象应该保存在table数组的哪个索引处。
这个方法非常巧妙,它通过h & (table.length -1)来得到该对象的保存位,而HashMap底层数组的长度总是2的n次方,这是HashMap在速度上的优化。当length总是2的n次方时,h& (length-1)运算等价于对length取模,也就是h%length,但是&比%具有更高的效率。
在JDK1.8的实现中,优化了高位运算的算法,通过hashCode()的高16位异或低16位实现的:(h = k.hashCode()) ^ (h >>> 16),主要是从速度、功效、质量来考虑的,这么做可以在数组table的length比较小的时候,也能保证考虑到高低Bit都参与到Hash的计算中,同时不会有太大的开销。
下面举例说明下,n为table的长度。
HashMap是怎么进行扩容的?
扩容(resize)就是重新计算容量,向HashMap对象里不停的添加元素,而HashMap对象内部的数组无法装载更多的元素时,对象就需要扩大数组的长度,以便能装入更多的元素。当然Java里的数组是无法自动扩容的,方法是使用一个新的数组代替已有的容量小的数组,就像我们用一个小桶装水,如果想装更多的水,就得换大水桶。
我们分析下resize的源码,鉴于JDK1.8融入了红黑树,较复杂,为了便于理解我们仍然使用JDK1.7的代码,好理解一些,本质上区别不大,具体区别后文再说。
void resize(int newCapacity) { //传入新的容量
Entry[] oldTable = table; //引用扩容前的Entry数组
int oldCapacity = oldTable.length;
if (oldCapacity == MAXIMUM_CAPACITY) { //扩容前的数组大小如果已经达到最大(2^30)了
threshold = Integer.MAX_VALUE; //修改阈值为int的最大值(2^31-1),这样以后就不会扩容了
return;
}
Entry[] newTable = new Entry[newCapacity]; //初始化一个新的Entry数组
transfer(newTable); //!!将数据转移到新的Entry数组里
table = newTable; //HashMap的table属性引用新的Entry数组
threshold = (int)(newCapacity * loadFactor);//修改阈值
}
这里就是使用一个容量更大的数组来代替已有的容量小的数组,transfer()方法将原有Entry数组的元素拷贝到新的Entry数组里。
void transfer(Entry[] newTable) {
Entry[] src = table; //src引用了旧的Entry数组
int newCapacity = newTable.length;
for (int j = 0; j < src.length; j++) { //遍历旧的Entry数组
Entry<K,V> e = src[j]; //取得旧Entry数组的每个元素
if (e != null) {
src[j] = null;//释放旧Entry数组的对象引用(for循环后,旧的Entry数组不再引用任何对象)
do {
Entry<K,V> next = e.next;
int i = indexFor(e.hash, newCapacity); //!!重新计算每个元素在数组中的位置
e.next = newTable[i]; //标记[1]
newTable[i] = e; //将元素放在数组上
e = next; //访问下一个Entry链上的元素
} while (e != null);
}
}
}
newTable[i]的引用赋给了e.next,也就是使用了单链表的头插入方式,同一位置上新元素总会被放在链表的头部位置;这样先放在一个索引上的元素终会被放到Entry链的尾部(如果发生了hash冲突的话),这一点和Jdk1.8有区别,下文详解。在旧数组中同一条Entry链上的元素,通过重新计算索引位置后,有可能被放到了新数组的不同位置上。
下面举个例子说明下扩容过程。假设了我们的hash算法就是简单的用key mod 一下表的大小(也就是数组的长度)。其中的哈希桶数组table的size=2, 所以key = 3、7、5,put顺序依次为 5、7、3。在mod 2以后都冲突在table[1]这里了。这里假设负载因子 loadFactor=1,即当键值对的实际大小size 大于 table的实际大小时进行扩容。接下来的三个步骤是哈希桶数组 resize成4,然后所有的Node重新rehash的过程。
下面我们讲解下JDK1.8做了哪些优化。经过观测可以发现,我们使用的是2次幂的扩展(指长度扩为原来2倍),所以,元素的位置要么是在原位置,要么是在原位置再移动2次幂的位置。看下图可以明白这句话的意思,n为table的长度,图(a)表示扩容前的key1和key2两种key确定索引位置的示例,图(b)表示扩容后key1和key2两种key确定索引位置的示例,其中hash1是key1对应的哈希与高位运算结果。
元素在重新计算hash之后,因为n变为2倍,那么n-1的mask范围在高位多1bit(红色),因此新的index就会发生这样的变化:
因此,我们在扩充HashMap的时候,不需要像JDK1.7的实现那样重新计算hash,只需要看看原来的hash值新增的那个bit是1还是0就好了,是0的话索引没变,是1的话索引变成“原索引+oldCap”,可以看看下图为16扩充为32的resize示意图:
这个设计确实非常的巧妙,既省去了重新计算hash值的时间,而且同时,由于新增的1bit是0还是1可以认为是随机的,因此resize的过程,均匀的把之前的冲突的节点分散到新的bucket了。这一块就是JDK1.8新增的优化点。有一点注意区别,JDK1.7中rehash的时候,旧链表迁移新链表的时候,如果在新表的数组索引位置相同,则链表元素会倒置,但是从上图可以看出,JDK1.8不会倒置。JDK1.8扩容步骤为:
(1)如果使用是默认构造方法,则第一次插入元素时初始化为默认值,容量为16,扩容门槛为12;
(2)如果使用的是非默认构造方法,则第一次插入元素时初始化容量等于扩容门槛,扩容门槛在构造方法里等于传入容量向上最近的2的n次方;
(3)如果旧容量大于0,则新容量等于旧容量的2倍,但不超过最大容量2的30次方,新扩容门槛为旧扩容门槛的2倍;
(4)创建一个新容量的桶;
(5)搬移元素,原链表分化成两个链表,低位链表存储在原来桶的位置,高位链表搬移到原来桶的位置加旧容量的位置;
final Node<K, V>[] resize() {
// 旧数组
Node<K, V>[] oldTab = table;
// 旧容量
int oldCap = (oldTab == null) ? 0 : oldTab.length;
// 旧扩容门槛
int oldThr = threshold;
int newCap, newThr = 0;
if (oldCap > 0) {
if (oldCap >= MAXIMUM_CAPACITY) {
// 如果旧容量达到了最大容量,则不再进行扩容
threshold = Integer.MAX_VALUE;
return oldTab;
} else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
oldCap >= DEFAULT_INITIAL_CAPACITY)
// 如果旧容量的两倍小于最大容量并且旧容量大于默认初始容量(16),则容量扩大为两部,扩容门槛也扩大为两倍
newThr = oldThr << 1; // double threshold
} else if (oldThr > 0) // initial capacity was placed in threshold
// 使用非默认构造方法创建的map,第一次插入元素会走到这里
// 如果旧容量为0且旧扩容门槛大于0,则把新容量赋值为旧门槛
newCap = oldThr;
else { // zero initial threshold signifies using defaults
// 调用默认构造方法创建的map,第一次插入元素会走到这里
// 如果旧容量旧扩容门槛都是0,说明还未初始化过,则初始化容量为默认容量,扩容门槛为默认容量*默认装载因子
newCap = DEFAULT_INITIAL_CAPACITY;
newThr = (int) (DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
}
if (newThr == 0) {
// 如果新扩容门槛为0,则计算为容量*装载因子,但不能超过最大容量
float ft = (float) newCap * loadFactor;
newThr = (newCap < MAXIMUM_CAPACITY && ft < (float) MAXIMUM_CAPACITY ?
(int) ft : Integer.MAX_VALUE);
}
// 赋值扩容门槛为新门槛
threshold = newThr;
// 新建一个新容量的数组
@SuppressWarnings({"rawtypes", "unchecked"})
Node<K, V>[] newTab = (Node<K, V>[]) new Node[newCap];
// 把桶赋值为新数组
table = newTab;
// 如果旧数组不为空,则搬移元素
if (oldTab != null) {
// 遍历旧数组
for (int j = 0; j < oldCap; ++j) {
Node<K, V> e;
// 如果桶中第一个元素不为空,赋值给e
if ((e = oldTab[j]) != null) {
// 清空旧桶,便于GC回收
oldTab[j] = null;
// 如果这个桶中只有一个元素,则计算它在新桶中的位置并把它搬移到新桶中
// 因为每次都扩容两倍,所以这里的第一个元素搬移到新桶的时候新桶肯定还没有元素
if (e.next == null)
newTab[e.hash & (newCap - 1)] = e;
else if (e instanceof TreeNode)
// 如果第一个元素是树节点,则把这颗树打散成两颗树插入到新桶中去
((TreeNode<K, V>) e).split(this, newTab, j, oldCap);
else { // preserve order
// 如果这个链表不止一个元素且不是一颗树
// 则分化成两个链表插入到新的桶中去
// 比如,假如原来容量为4,3、7、11、15这四个元素都在三号桶中
// 现在扩容到8,则3和11还是在三号桶,7和15要搬移到七号桶中去
// 也就是分化成了两个链表
Node<K, V> loHead = null, loTail = null;
Node<K, V> hiHead = null, hiTail = null;
Node<K, V> next;
do {
next = e.next;
// (e.hash & oldCap) == 0的元素放在低位链表中
// 比如,3 & 4 == 0
if ((e.hash & oldCap) == 0) {
if (loTail == null)
loHead = e;
else
loTail.next = e;
loTail = e;
} else {
// (e.hash & oldCap) != 0的元素放在高位链表中
// 比如,7 & 4 != 0
if (hiTail == null)
hiHead = e;
else
hiTail.next = e;
hiTail = e;
}
} while ((e = next) != null);
// 遍历完成分化成两个链表了
// 低位链表在新桶中的位置与旧桶一样(即3和11还在三号桶中)
if (loTail != null) {
loTail.next = null;
newTab[j] = loHead;
}
// 高位链表在新桶中的位置正好是原来的位置加上旧容量(即7和15搬移到七号桶了)
if (hiTail != null) {
hiTail.next = null;
newTab[j + oldCap] = hiHead;
}
}
}
}
}
return newTab;
}
HashMap的put方法?
先说HashMap的Put方法的大体流程:
1根据Key通过哈希算法与与运算得出数组下标
2如果数组下标位置元素为空,则将key和value封装为Entry对象(JDK1.7中是Entry对象,JDK1.8中是Node对象)并放入该位置
3如果数组下标位置元素不为空,则要分情况讨论
a如果是JDK1.7,则先判断是否需要扩容,如果要扩容就进行扩容,如果不用扩容就生成Entry对象,并使用头插法添加到当前位置的链表中
b如果是JDK1.8,则会先判断当前位置上的Node的类型,看是红黑树Node,还是链表Node
ⅰ如果是红黑树Node,则将key和value封装为一个红黑树节点并添加到红黑树中去,在这个过程中会判断红黑树中是否存在当前key,如果存在则更新value
ⅱ如果此位置上的Node对象是链表节点,则将key和value封装为一个链表Node并通过尾插法插入到链表的最后位置去,因为是尾插法,所以需要遍历链表,在遍历链表的过程中会判断是否存在当前key,如果存在则更新value,当遍历完链表后,将新链表Node插入到链表中,插入到链表后,会看当前链表的节点个数,如果大于等于8,那么则会将该链表转成红黑树
ⅲ将key和value封装为Node插入到链表或红黑树中后,再判断是否需要进行扩容,如果需要就扩容,如果不需要就结束PUT方法
代码如下:
public V put(K key, V value) {
// 调用hash(key)计算出key的hash值
return putVal(hash(key), key, value, false, true);
}
static final int hash(Object key) {
int h;
// 如果key为null,则hash值为0,否则调用key的hashCode()方法
// 并让高16位与整个hash异或,这样做是为了使计算出的hash更分散
return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}
final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
boolean evict) {
Node<K, V>[] tab;
Node<K, V> p;
int n, i;
// 如果桶的数量为0,则初始化
if ((tab = table) == null || (n = tab.length) == 0)
// 调用resize()初始化
n = (tab = resize()).length;
// (n - 1) & hash 计算元素在哪个桶中
// 如果这个桶中还没有元素,则把这个元素放在桶中的第一个位置
if ((p = tab[i = (n - 1) & hash]) == null)
// 新建一个节点放在桶中
tab[i] = newNode(hash, key, value, null);
else {
// 如果桶中已经有元素存在了
Node<K, V> e;
K k;
// 如果桶中第一个元素的key与待插入元素的key相同,保存到e中用于后续修改value值
if (p.hash == hash &&
((k = p.key) == key || (key != null && key.equals(k))))
e = p;
else if (p instanceof TreeNode)
// 如果第一个元素是树节点,则调用树节点的putTreeVal插入元素
e = ((TreeNode<K, V>) p).putTreeVal(this, tab, hash, key, value);
else {
// 遍历这个桶对应的链表,binCount用于存储链表中元素的个数
for (int binCount = 0; ; ++binCount) {
// 如果链表遍历完了都没有找到相同key的元素,说明该key对应的元素不存在,则在链表最后插入一个新节点
if ((e = p.next) == null) {
p.next = newNode(hash, key, value, null);
// 如果插入新节点后链表长度大于8,则判断是否需要树化,因为第一个元素没有加到binCount中,所以这里-1
if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
treeifyBin(tab, hash);
break;
}
// 如果待插入的key在链表中找到了,则退出循环
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
break;
p = e;
}
}
// 如果找到了对应key的元素
if (e != null) { // existing mapping for key
// 记录下旧值
V oldValue = e.value;
// 判断是否需要替换旧值
if (!onlyIfAbsent || oldValue == null)
// 替换旧值为新值
e.value = value;
// 在节点被访问后做点什么事,在LinkedHashMap中用到
afterNodeAccess(e);
// 返回旧值
return oldValue;
}
}
// 到这里了说明没有找到元素
// 修改次数加1
++modCount;
// 元素数量加1,判断是否需要扩容
if (++size > threshold)
// 扩容
resize();
// 在节点插入后做点什么事,在LinkedHashMap中用到
afterNodeInsertion(evict);
// 没找到元素返回null
return null;
}
HashMap中的元素是否是有序的?
HashMap中的元素是无序的
HashMap何时进行树化?何时进行反树化?
如果冲突的节点数已经达到8个,看是否需要改变冲突节点的存储结构,treeifyBin首先判断当前hashMap的长度,如果不足64,只进行resize,扩容table,如果达到64,那么将冲突的存储结构为红黑树。
当链表的元素大于8时进行树化,小于6时进行反树化。
选择 8 作为阀值和加载因子为0.75的原因是参考 泊松概率函数(Poisson distribution)
在hashCode离散性很好的情况下,红黑树用到的概率非常小,因为数据均匀分布在每个桶中,几乎不会有桶中链表长度会达到阈值(8)。但是在随机hashCode下,离散性可能会变差,然而JDK又不能阻止用户实现这种不好的hash算法,因此就可能导致不均匀的数据分布。
事实上,随机hashCode算法下所有桶中节点的分布频率遵循如下的泊松分布。在扩容阈值为0.75的情况下,(即使因为扩容而方差很大)遵循着参数平均为0.5的泊松分布。一个桶中链表长度达到8个元素的概率为0.00000006,几乎是不可能事件。之所以选择8,是时间和空间的权衡(trade-off),是根据概率统计决定的, 是非常严谨和科学的。
通俗点将就是put进去的key进行计算hashCode时 只要选择计算hash值的算法足够好(hash碰撞率极低),从而遵循泊松分布,使得桶中挂载的bin的数量等于8的概率非常小,从而转换为红黑树的概率也小,反之则概率大。
为什么转化为红黑树的阈值8和转化为链表的阈值6不一样?
为了避免频繁来回转化。
泊松分布与指数分布
泊松分布
Poisson分布,是一种统计与概率论中常见的离散概率分布,其适合于描述单位时间内随机事件发生的次数的概率分布。
如某一服务设施在一定时间内受到的服务请求的次数,电话交换机接到呼叫的次数、汽车站台的候客人数、机器出现的故障数、自然灾害发生的次数、DNA序列的变异数、放射性原子核的衰变数、激光的光子数分布等等;
指数分布
指数分布(Exponential distribution)是一种连续概率分布。指数分配可以用来表示独立随机事件发生的时间间隔,比如旅客进入机场的时间间隔、打进客服中心电话的时间间隔、中文维基百科新条目出现的时间间隔等等;
与泊松分布相比,其最大的差异就是指数分布是针对连续随机变量定义,即时间这个变量。时间必须是连续的。而泊松分布是针对随机事件发生次数定义的,发生次数是离散的。粗略地可以认为这两个分布之间有一种“倒数”的关系
HashMap是怎么进行缩容的?
它不会动态地进行缩容,也就是说,你不应该保留一个已经删除过大量Entry的HashMap(如果不打算继续添加元素的话),此时它的buckets数组经过多次扩容已经变得非常大了,这会占用非常多的无用内存,这样做的好处是不用多次对数组进行扩容或缩容操作。不过一般也不会出现这种情况,如果遇见了,请毫不犹豫地丢掉它,或者把数据转移到一个新的HashMap。
HashMap插入、删除、查询元素的时间复杂度各是多少?
因为HashMap很少出现hash冲突了,因为哈希算法足够优秀,那么全是o(1)
但是当有链表的时候,那么就是o(n)的复杂度
如果转成红黑树 也就是二叉树的一种,那么应该是o(logN)的平均复杂度
HashMap中的红黑树实现部分可以用其它数据结构代替吗?
之所以选择红黑树是为了解决二叉查找树的缺陷,二叉查找树在特殊情况下会变成一条线性结构(这就跟原来使用链表结构一样了,造成很深的问题),遍历查找会非常慢。
而红黑树在插入新数据后可能需要通过左旋,右旋、变色这些操作来保持平衡,引入红黑树就是为了查找数据快,解决链表查询深度的问题,我们知道红黑树属于平衡二叉树,但是为了保持“平衡”是需要付出代价的,但是该代价所损耗的资源要比遍历线性链表要少,所以当长度大于8的时候,会使用红黑树,如果链表长度很短的话,根本不需要引入红黑树,引入反而会慢。
LinkedHashMap是怎么实现的?
LinkedHashMap内部维护了一个双向链表,能保证元素按插入的顺序访问,也能以访问顺序访问,可以用来实现LRU缓存策略。
LinkedHashMap可以看成是 LinkedList + HashMap。
LinkedHashMap是有序的吗?怎么个有序法?
如果accessOrder为false,则可以按插入元素的顺序遍历元素;
如果accessOrder为true,则可以按访问元素的顺序遍历元素;
/**
* 双向链表头节点
*/
transient LinkedHashMap.Entry<K,V> head;
/**
* 双向链表尾节点
*/
transient LinkedHashMap.Entry<K,V> tail;
/**
* 是否按访问顺序排序
*/
final boolean accessOrder;
LinkedHashMap 通过重写 HashMap 提供的回调方法,从而实现其对顺序的特性的处理。同时,因为 LinkedHashMap 的顺序特性,需要重写 #keysToArray(T[] a)
等遍历相关的方法。
LinkedHashMap如何实现LRU缓存淘汰策略?
首先,我们先来看看LRU是个什么鬼。LRU,Least Recently Used,最近最少使用,也就是优先淘汰最近最少使用的元素。
如果使用LinkedHashMap,我们把accessOrder设置为true是不是就差不多能实现这个策略了呢?答案是肯定的。请看下面的代码:
import java.util.LinkedHashMap;
import java.util.Map;
public class LRUTest {
public static void main(String[] args) {
// 创建一个只有5个元素的缓存
LRU<Integer, Integer> lru = new LRU<>(5, 0.75f);
lru.put(1, 1);
lru.put(2, 2);
lru.put(3, 3);
lru.put(4, 4);
lru.put(5, 5);
lru.put(6, 6);
lru.put(7, 7);
System.out.println(lru.get(4));
lru.put(6, 666);
// 输出: {3=3, 5=5, 7=7, 4=4, 6=666}
// 可以看到最旧的元素被删除了
// 且最近访问的4被移到了后面
System.out.println(lru);
}
}
class LRU<K, V> extends LinkedHashMap<K, V> {
// 保存缓存的容量
private int capacity;
public LRU(int capacity, float loadFactor) {
super(capacity, loadFactor, true);
this.capacity = capacity;
}
/**
* 重写removeEldestEntry()方法设置何时移除旧元素
* @param eldest
* @return
*/
@Override
protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
// 当元素个数大于了缓存的容量, 就移除元素
return size() > this.capacity;
}
}
WeakHashMap使用的数据结构?
WeakHashMap因为gc的时候会把没有强引用的key回收掉,所以注定了它里面的元素不会太多,因此也就不需要像HashMap那样元素多的时候转化为红黑树来处理了。
因此,WeakHashMap的存储结构只有(数组 + 链表)。
WeakHashMap具有什么特性?
WeakHashMap是一种弱引用map,内部的key会存储为弱引用,当jvm gc的时候,如果这些key没有强引用存在的话,会被gc回收掉,下一次当我们操作map的时候会把对应的Entry整个删除掉,基于这种特性,WeakHashMap特别适用于缓存处理。
WeakHashMap通常用来做什么?
任何事物都有他存在的道理,WeakHashmap业务场景就是缓存,可以有效的节省内存,缓存丢失也不会出太大问题,可以再次获取。许多开源框架,例如tomcat等都使用了weakHashmap做为缓存处理。
WeakHashMap使用String作为key是需要注意些什么?为什么?
import java.util.Map;
import java.util.WeakHashMap;
/**
* @description:
* @author: LvXueYang
* @creat:2021-08-17 09:29
**/
public class WeakHashMapTest {
public static void main(String[] args) {
Map<String, Integer> map = new WeakHashMap<>();
map.put(new String("1"), 1);
map.put(new String("2"), 2);
map.put(new String("3"), 3);
// 放入不用new String()声明的字符串
map.put("6", 6);
String key = null;
for (String s : map.keySet()) {
// 这个"3"和new String("3")不是一个引用
if (s.equals("3")) {
key = s;
}
}
System.out.println(map);
// gc一下
System.gc();
// 放一个new String()声明的字符串
map.put(new String("4"), 4);
// 输出{4=4, 6=6, 3=3},gc后放入的值和强引用的key可以打印出来
System.out.println(map);
// key与"3"的引用断裂
key = null;
// gc一下
System.gc();
// 输出{6=6},gc后强引用的key可以打印出来
System.out.println(map);
}
}
什么是强引用、软引用、弱引用、虚引用?
(1)强引用
使用最普遍的引用。如果一个对象具有强引用,它绝对不会被gc回收。如果内存空间不足了,gc宁愿抛出OutOfMemoryError,也不是会回收具有强引用的对象。
(2)软引用
如果一个对象只具有软引用,则内存空间足够时不会回收它,但内存空间不够时就会回收这部分对象。只要这个具有软引用对象没有被回收,程序就可以正常使用。
(3)弱引用
如果一个对象只具有弱引用,则不管内存空间够不够,当gc扫描到它时就会回收它。
(4)虚引用
如果一个对象只具有虚引用,那么它就和没有任何引用一样,任何时候都可能被gc回收。
软(弱、虚)引用必须和一个引用队列(ReferenceQueue)一起使用,当gc回收这个软(弱、虚)引用引用的对象时,会把这个软(弱、虚)引用放到这个引用队列中。
红黑树具有哪些特性?
红黑树具有以下5种性质:
(1)节点是红色或黑色。
(2)根节点是黑色。
(3)每个叶节点(NIL节点,空节点)是黑色的。
(4)每个红色节点的两个子节点都是黑色。(从每个叶子到根的所有路径上不能有两个连续的红色节点)
(5)从任一节点到其每个叶子的所有路径都包含相同数目的黑色节点。
红黑树的时间复杂度为O(log n),与树的高度成正比。
红黑树每次的插入、删除操作都需要做平衡,平衡时有可能会改变根节点的位置,颜色转换,左旋,右旋等
TreeMap就有序的吗?怎么个有序法?
TreeMap中的元素是有序的,按key的顺序排列;
TreeMap是否需要扩容?
TreeMap 因为采用树结构,所以无需初始考虑像 HashMap 考虑容量问题,也不存在扩容问题。
,TreeMap的存储结构只有一颗红黑树
什么是左旋?什么是右旋?
左旋,就是以某个节点为支点向左旋转。
整个左旋过程如下:
(1)将 y的左节点 设为 x的右节点,即将 β 设为 x的右节点;
(2)将 x 设为 y的左节点的父节点,即将 β的父节点 设为 x;
(3)将 x的父节点 设为 y的父节点;
(4)如果 x的父节点 为空节点,则将y设置为根节点;如果x是它父节点的左(右)节点,则将y设置为x父节点的左(右)节点;
(5)将 x 设为 y的左节点;
(6)将 x的父节点 设为 y;
让我们来看看TreeMap中的实现:
/**
* 以p为支点进行左旋
* 假设p为图中的x
*/
private void rotateLeft(Entry<K,V> p) {
if (p != null) {
// p的右节点,即y
Entry<K,V> r = p.right;
// (1)将 y的左节点 设为 x的右节点
p.right = r.left;
// (2)将 x 设为 y的左节点的父节点(如果y的左节点存在的话)
if (r.left != null)
r.left.parent = p;
// (3)将 x的父节点 设为 y的父节点
r.parent = p.parent;
// (4)...
if (p.parent == null)
// 如果 x的父节点 为空,则将y设置为根节点
root = r;
else if (p.parent.left == p)
// 如果x是它父节点的左节点,则将y设置为x父节点的左节点
p.parent.left = r;
else
// 如果x是它父节点的右节点,则将y设置为x父节点的右节点
p.parent.right = r;
// (5)将 x 设为 y的左节点
r.left = p;
// (6)将 x的父节点 设为 y
p.parent = r;
}
}
右旋,就是以某个节点为支点向右旋转。
整个右旋过程如下:
(1)将 x的右节点 设为 y的左节点,即 将 β 设为 y的左节点;
(2)将 y 设为 x的右节点的父节点,即 将 β的父节点 设为 y;
(3)将 y的父节点 设为 x的父节点;
(4)如果 y的父节点 是 空节点,则将x设为根节点;如果y是它父节点的左(右)节点,则将x设为y的父节点的左(右)节点;
(5)将 y 设为 x的右节点;
(6)将 y的父节点 设为 x;
让我们来看看TreeMap中的实现:
/**
* 以p为支点进行右旋
* 假设p为图中的y
*/
private void rotateRight(Entry<K,V> p) {
if (p != null) {
// p的左节点,即x
Entry<K,V> l = p.left;
// (1)将 x的右节点 设为 y的左节点
p.left = l.right;
// (2)将 y 设为 x的右节点的父节点(如果x有右节点的话)
if (l.right != null) l.right.parent = p;
// (3)将 y的父节点 设为 x的父节点
l.parent = p.parent;
// (4)...
if (p.parent == null)
// 如果 y的父节点 是 空节点,则将x设为根节点
root = l;
else if (p.parent.right == p)
// 如果y是它父节点的右节点,则将x设为y的父节点的右节点
p.parent.right = l;
else
// 如果y是它父节点的左节点,则将x设为y的父节点的左节点
p.parent.left = l;
// (5)将 y 设为 x的右节点
l.right = p;
// (6)将 y的父节点 设为 x
p.parent = l;
}
}
红黑树怎么插入元素?
插入元素,如果元素在树中存在,则替换value;如果元素不存在,则插入到对应的位置,再平衡树。
public V put(K key, V value) {
Entry<K,V> t = root;
if (t == null) {
// 如果没有根节点,直接插入到根节点
compare(key, key); // type (and possibly null) check
root = new Entry<>(key, value, null);
size = 1;
modCount++;
return null;
}
// key比较的结果
int cmp;
// 用来寻找待插入节点的父节点
Entry<K,V> parent;
// 根据是否有comparator使用不同的分支
Comparator<? super K> cpr = comparator;
if (cpr != null) {
// 如果使用的是comparator方式,key值可以为null,只要在comparator.compare()中允许即可
// 从根节点开始遍历寻找
do {
parent = t;
cmp = cpr.compare(key, t.key);
if (cmp < 0)
// 如果小于0从左子树寻找
t = t.left;
else if (cmp > 0)
// 如果大于0从右子树寻找
t = t.right;
else
// 如果等于0,说明插入的节点已经存在了,直接更换其value值并返回旧值
return t.setValue(value);
} while (t != null);
}
else {
// 如果使用的是Comparable方式,key不能为null
if (key == null)
throw new NullPointerException();
@SuppressWarnings("unchecked")
Comparable<? super K> k = (Comparable<? super K>) key;
// 从根节点开始遍历寻找
do {
parent = t;
cmp = k.compareTo(t.key);
if (cmp < 0)
// 如果小于0从左子树寻找
t = t.left;
else if (cmp > 0)
// 如果大于0从右子树寻找
t = t.right;
else
// 如果等于0,说明插入的节点已经存在了,直接更换其value值并返回旧值
return t.setValue(value);
} while (t != null);
}
// 如果没找到,那么新建一个节点,并插入到树中
Entry<K,V> e = new Entry<>(key, value, parent);
if (cmp < 0)
// 如果小于0插入到左子节点
parent.left = e;
else
// 如果大于0插入到右子节点
parent.right = e;
// 插入之后的平衡
fixAfterInsertion(e);
// 元素个数加1(不需要扩容)
size++;
// 修改次数加1
modCount++;
// 如果插入了新节点返回空
return null;
}
插入再平衡
插入的元素默认都是红色,因为插入红色元素只违背了第4条特性,那么我们只要根据这个特性来平衡就容易多了。
根据不同的情况有以下几种处理方式:
- 插入的元素如果是根节点,则直接涂成黑色即可,不用平衡;
- 插入的元素的父节点如果为黑色,不需要平衡;
- 插入的元素的父节点如果为红色,则违背了特性4,需要平衡,平衡时又分成下面三种情况:
(如果父节点是祖父节点的左节点)
情况 | 策略 |
---|---|
1)父节点为红色,叔叔节点也为红色 | (1)将父节点设为黑色;(2)将叔叔节点设为黑色;(3)将祖父节点设为红色;(4)将祖父节点设为新的当前节点,进入下一次循环判断; |
2)父节点为红色,叔叔节点为黑色,且当前节点是其父节点的右节点 | (1)将父节点作为新的当前节点;(2)以新当节点为支点进行左旋,进入情况3); |
3)父节点为红色,叔叔节点为黑色,且当前节点是其父节点的左节点 | (1)将父节点设为黑色;(2)将祖父节点设为红色;(3)以祖父节点为支点进行右旋,进入下一次循环判断; |
(如果父节点是祖父节点的右节点,则正好与上面反过来)
情况 | 策略 |
---|---|
1)父节点为红色,叔叔节点也为红色 | (1)将父节点设为黑色;(2)将叔叔节点设为黑色;(3)将祖父节点设为红色;(4)将祖父节点设为新的当前节点,进入下一次循环判断; |
2)父节点为红色,叔叔节点为黑色,且当前节点是其父节点的左节点 | (1)将父节点作为新的当前节点;(2)以新当节点为支点进行右旋; |
3)父节点为红色,叔叔节点为黑色,且当前节点是其父节点的右节点 | (1)将父节点设为黑色;(2)将祖父节点设为红色;(3)以祖父节点为支点进行左旋,进入下一次循环判断; |
让我们来看看TreeMap中的实现:
/**
* 插入再平衡
*(1)每个节点或者是黑色,或者是红色。
*(2)根节点是黑色。
*(3)每个叶子节点(NIL)是黑色。(注意:这里叶子节点,是指为空(NIL或NULL)的叶子节点!)
*(4)如果一个节点是红色的,则它的子节点必须是黑色的。
*(5)从一个节点到该节点的子孙节点的所有路径上包含相同数目的黑节点。
*/
private void fixAfterInsertion(Entry<K,V> x) {
// 插入的节点为红节点,x为当前节点
x.color = RED;
// 只有当插入节点不是根节点且其父节点为红色时才需要平衡(违背了特性4)
while (x != null && x != root && x.parent.color == RED) {
if (parentOf(x) == leftOf(parentOf(parentOf(x)))) {
// a)如果父节点是祖父节点的左节点
// y为叔叔节点
Entry<K,V> y = rightOf(parentOf(parentOf(x)));
if (colorOf(y) == RED) {
// 情况1)如果叔叔节点为红色
// (1)将父节点设为黑色
setColor(parentOf(x), BLACK);
// (2)将叔叔节点设为黑色
setColor(y, BLACK);
// (3)将祖父节点设为红色
setColor(parentOf(parentOf(x)), RED);
// (4)将祖父节点设为新的当前节点
x = parentOf(parentOf(x));
} else {
// 如果叔叔节点为黑色
// 情况2)如果当前节点为其父节点的右节点
if (x == rightOf(parentOf(x))) {
// (1)将父节点设为当前节点
x = parentOf(x);
// (2)以新当前节点左旋
rotateLeft(x);
}
// 情况3)如果当前节点为其父节点的左节点(如果是情况2)则左旋之后新当前节点正好为其父节点的左节点了)
// (1)将父节点设为黑色
setColor(parentOf(x), BLACK);
// (2)将祖父节点设为红色
setColor(parentOf(parentOf(x)), RED);
// (3)以祖父节点为支点进行右旋
rotateRight(parentOf(parentOf(x)));
}
} else {
// b)如果父节点是祖父节点的右节点
// y是叔叔节点
Entry<K,V> y = leftOf(parentOf(parentOf(x)));
if (colorOf(y) == RED) {
// 情况1)如果叔叔节点为红色
// (1)将父节点设为黑色
setColor(parentOf(x), BLACK);
// (2)将叔叔节点设为黑色
setColor(y, BLACK);
// (3)将祖父节点设为红色
setColor(parentOf(parentOf(x)), RED);
// (4)将祖父节点设为新的当前节点
x = parentOf(parentOf(x));
} else {
// 如果叔叔节点为黑色
// 情况2)如果当前节点为其父节点的左节点
if (x == leftOf(parentOf(x))) {
// (1)将父节点设为当前节点
x = parentOf(x);
// (2)以新当前节点右旋
rotateRight(x);
}
// 情况3)如果当前节点为其父节点的右节点(如果是情况2)则右旋之后新当前节点正好为其父节点的右节点了)
// (1)将父节点设为黑色
setColor(parentOf(x), BLACK);
// (2)将祖父节点设为红色
setColor(parentOf(parentOf(x)), RED);
// (3)以祖父节点为支点进行左旋
rotateLeft(parentOf(parentOf(x)));
}
}
}
// 平衡完成后将根节点设为黑色
root.color = BLACK;
}
红黑树怎么删除元素?
删除元素本身比较简单,就是采用二叉树的删除规则。
(1)如果删除的位置有两个叶子节点,则从其右子树中取最小的元素放到删除的位置,然后把删除位置移到替代元素的位置,进入下一步。
(2)如果删除的位置只有一个叶子节点(有可能是经过第一步转换后的删除位置),则把那个叶子节点作为替代元素,放到删除的位置,然后把这个叶子节点删除。
(3)如果删除的位置没有叶子节点,则直接把这个删除位置的元素删除即可。
(4)针对红黑树,如果删除位置是黑色节点,还需要做再平衡。
(5)如果有替代元素,则以替代元素作为当前节点进入再平衡。
(6)如果没有替代元素,则以删除的位置的元素作为当前节点进入再平衡,平衡之后再删除这个节点。
public V remove(Object key) {
// 获取节点
Entry<K,V> p = getEntry(key);
if (p == null)
return null;
V oldValue = p.value;
// 删除节点
deleteEntry(p);
// 返回删除的value
return oldValue;
}
private void deleteEntry(Entry<K,V> p) {
// 修改次数加1
modCount++;
// 元素个数减1
size--;
if (p.left != null && p.right != null) {
// 如果当前节点既有左子节点,又有右子节点
// 取其右子树中最小的节点
Entry<K,V> s = successor(p);
// 用右子树中最小节点的值替换当前节点的值
p.key = s.key;
p.value = s.value;
// 把右子树中最小节点设为当前节点
p = s;
// 这种情况实际上并没有删除p节点,而是把p节点的值改了,实际删除的是p的后继节点
}
// 如果原来的当前节点(p)有2个子节点,则当前节点已经变成原来p的右子树中的最小节点了,也就是说其没有左子节点了
// 到这一步,p肯定只有一个子节点了
// 如果当前节点有子节点,则用子节点替换当前节点
Entry<K,V> replacement = (p.left != null ? p.left : p.right);
if (replacement != null) {
// 把替换节点直接放到当前节点的位置上(相当于删除了p,并把替换节点移动过来了)
replacement.parent = p.parent;
if (p.parent == null)
root = replacement;
else if (p == p.parent.left)
p.parent.left = replacement;
else
p.parent.right = replacement;
// 将p的各项属性都设为空
p.left = p.right = p.parent = null;
// 如果p是黑节点,则需要再平衡
if (p.color == BLACK)
fixAfterDeletion(replacement);
} else if (p.parent == null) {
// 如果当前节点就是根节点,则直接将根节点设为空即可
root = null;
} else {
// 如果当前节点没有子节点且其为黑节点,则把自己当作虚拟的替换节点进行再平衡
if (p.color == BLACK)
fixAfterDeletion(p);
// 平衡完成后删除当前节点(与父节点断绝关系)
if (p.parent != null) {
if (p == p.parent.left)
p.parent.left = null;
else if (p == p.parent.right)
p.parent.right = null;
p.parent = null;
}
}
}
经过上面的处理,真正删除的肯定是黑色节点才会进入到再平衡阶段。
因为删除的是黑色节点,导致整颗树不平衡了,所以这里我们假设把删除的黑色赋予当前节点,这样当前节点除了它自已的颜色还多了一个黑色,那么:
(1)如果当前节点是根节点,则直接涂黑即可,不需要再平衡;
(2)如果当前节点是红+黑节点,则直接涂黑即可,不需要平衡;
(3)如果当前节点是黑+黑节点,则我们只要通过旋转把这个多出来的黑色不断的向上传递到一个红色节点即可,这又可能会出现以下四种情况:
(假设当前节点为父节点的左子节点)
情况 | 策略 |
---|---|
1)x是黑+黑节点,x的兄弟是红节点 | (1)将兄弟节点设为黑色;(2)将父节点设为红色;(3)以父节点为支点进行左旋;(4)重新设置x的兄弟节点,进入下一步; |
2)x是黑+黑节点,x的兄弟是黑节点,且兄弟节点的两个子节点都是黑色 | (1)将兄弟节点设置为红色;(2)将x的父节点作为新的当前节点,进入下一次循环; |
3)x是黑+黑节点,x的兄弟是黑节点,且兄弟节点的右子节点为黑色,左子节点为红色 | (1)将兄弟节点的左子节点设为黑色;(2)将兄弟节点设为红色;(3)以兄弟节点为支点进行右旋;(4)重新设置x的兄弟节点,进入下一步; |
3)x是黑+黑节点,x的兄弟是黑节点,且兄弟节点的右子节点为红色,左子节点任意颜色 | (1)将兄弟节点的颜色设为父节点的颜色;(2)将父节点设为黑色;(3)将兄弟节点的右子节点设为黑色;(4)以父节点为支点进行左旋;(5)将root作为新的当前节点(退出循环); |
(假设当前节点为父节点的右子节点,正好反过来)
情况 | 策略 |
---|---|
1)x是黑+黑节点,x的兄弟是红节点 | (1)将兄弟节点设为黑色;(2)将父节点设为红色;(3)以父节点为支点进行右旋;(4)重新设置x的兄弟节点,进入下一步; |
2)x是黑+黑节点,x的兄弟是黑节点,且兄弟节点的两个子节点都是黑色 | (1)将兄弟节点设置为红色;(2)将x的父节点作为新的当前节点,进入下一次循环; |
3)x是黑+黑节点,x的兄弟是黑节点,且兄弟节点的左子节点为黑色,右子节点为红色 | (1)将兄弟节点的右子节点设为黑色;(2)将兄弟节点设为红色;(3)以兄弟节点为支点进行左旋;(4)重新设置x的兄弟节点,进入下一步; |
3)x是黑+黑节点,x的兄弟是黑节点,且兄弟节点的左子节点为红色,右子节点任意颜色 | (1)将兄弟节点的颜色设为父节点的颜色;(2)将父节点设为黑色;(3)将兄弟节点的左子节点设为黑色;(4)以父节点为支点进行右旋;(5)将root作为新的当前节点(退出循环); |
让我们来看看TreeMap中的实现:
/**
* 删除再平衡
*(1)每个节点或者是黑色,或者是红色。
*(2)根节点是黑色。
*(3)每个叶子节点(NIL)是黑色。(注意:这里叶子节点,是指为空(NIL或NULL)的叶子节点!)
*(4)如果一个节点是红色的,则它的子节点必须是黑色的。
*(5)从一个节点到该节点的子孙节点的所有路径上包含相同数目的黑节点。
*/
private void fixAfterDeletion(Entry<K,V> x) {
// 只有当前节点不是根节点且当前节点是黑色时才进入循环
while (x != root && colorOf(x) == BLACK) {
if (x == leftOf(parentOf(x))) {
// 如果当前节点是其父节点的左子节点
// sib是当前节点的兄弟节点
Entry<K,V> sib = rightOf(parentOf(x));
// 情况1)如果兄弟节点是红色
if (colorOf(sib) == RED) {
// (1)将兄弟节点设为黑色
setColor(sib, BLACK);
// (2)将父节点设为红色
setColor(parentOf(x), RED);
// (3)以父节点为支点进行左旋
rotateLeft(parentOf(x));
// (4)重新设置x的兄弟节点,进入下一步
sib = rightOf(parentOf(x));
}
if (colorOf(leftOf(sib)) == BLACK &&
colorOf(rightOf(sib)) == BLACK) {
// 情况2)如果兄弟节点的两个子节点都是黑色
// (1)将兄弟节点设置为红色
setColor(sib, RED);
// (2)将x的父节点作为新的当前节点,进入下一次循环
x = parentOf(x);
} else {
if (colorOf(rightOf(sib)) == BLACK) {
// 情况3)如果兄弟节点的右子节点为黑色
// (1)将兄弟节点的左子节点设为黑色
setColor(leftOf(sib), BLACK);
// (2)将兄弟节点设为红色
setColor(sib, RED);
// (3)以兄弟节点为支点进行右旋
rotateRight(sib);
// (4)重新设置x的兄弟节点
sib = rightOf(parentOf(x));
}
// 情况4)
// (1)将兄弟节点的颜色设为父节点的颜色
setColor(sib, colorOf(parentOf(x)));
// (2)将父节点设为黑色
setColor(parentOf(x), BLACK);
// (3)将兄弟节点的右子节点设为黑色
setColor(rightOf(sib), BLACK);
// (4)以父节点为支点进行左旋
rotateLeft(parentOf(x));
// (5)将root作为新的当前节点(退出循环)
x = root;
}
} else { // symmetric
// 如果当前节点是其父节点的右子节点
// sib是当前节点的兄弟节点
Entry<K,V> sib = leftOf(parentOf(x));
// 情况1)如果兄弟节点是红色
if (colorOf(sib) == RED) {
// (1)将兄弟节点设为黑色
setColor(sib, BLACK);
// (2)将父节点设为红色
setColor(parentOf(x), RED);
// (3)以父节点为支点进行右旋
rotateRight(parentOf(x));
// (4)重新设置x的兄弟节点
sib = leftOf(parentOf(x));
}
if (colorOf(rightOf(sib)) == BLACK &&
colorOf(leftOf(sib)) == BLACK) {
// 情况2)如果兄弟节点的两个子节点都是黑色
// (1)将兄弟节点设置为红色
setColor(sib, RED);
// (2)将x的父节点作为新的当前节点,进入下一次循环
x = parentOf(x);
} else {
if (colorOf(leftOf(sib)) == BLACK) {
// 情况3)如果兄弟节点的左子节点为黑色
// (1)将兄弟节点的右子节点设为黑色
setColor(rightOf(sib), BLACK);
// (2)将兄弟节点设为红色
setColor(sib, RED);
// (3)以兄弟节点为支点进行左旋
rotateLeft(sib);
// (4)重新设置x的兄弟节点
sib = leftOf(parentOf(x));
}
// 情况4)
// (1)将兄弟节点的颜色设为父节点的颜色
setColor(sib, colorOf(parentOf(x)));
// (2)将父节点设为黑色
setColor(parentOf(x), BLACK);
// (3)将兄弟节点的左子节点设为黑色
setColor(leftOf(sib), BLACK);
// (4)以父节点为支点进行右旋
rotateRight(parentOf(x));
// (5)将root作为新的当前节点(退出循环)
x = root;
}
}
}
// 退出条件为多出来的黑色向上传递到了根节点或者红节点
// 则将x设为黑色即可满足红黑树规则
setColor(x, BLACK);
}
为什么要进行平衡?
因为违背了红黑树的性质,必须保证红黑树的五大性质。
(1)节点是红色或黑色。
(2)根节点是黑色。
(3)每个叶节点(NIL节点,空节点)是黑色的。
(4)每个红色节点的两个子节点都是黑色。(从每个叶子到根的所有路径上不能有两个连续的红色节点)
(5)从任一节点到其每个叶子的所有路径都包含相同数目的黑色节点。
如何实现红黑树的遍历?
(1)从root遍历整个树;
(2)如果待查找的key比当前遍历的key小,则在其左子树中查找;
(3)如果待查找的key比当前遍历的key大,则在其右子树中查找;
(4)如果待查找的key与当前遍历的key相等,则找到了该元素,直接返回;
(5)从这里可以看出是否有comparator分化成了两个方法,但是内部逻辑一模一样,因此可见笔者comparator = (k1, k2) -> ((Comparable<? super K>)k1).compareTo(k2);
这种改造的必要性。
public V get(Object key) {
// 根据key查找元素
Entry<K,V> p = getEntry(key);
// 找到了返回value值,没找到返回null
return (p==null ? null : p.value);
}
final Entry<K,V> getEntry(Object key) {
// 如果comparator不为空,使用comparator的版本获取元素
if (comparator != null)
return getEntryUsingComparator(key);
// 如果key为空返回空指针异常
if (key == null)
throw new NullPointerException();
// 将key强转为Comparable
@SuppressWarnings("unchecked")
Comparable<? super K> k = (Comparable<? super K>) key;
// 从根元素开始遍历
Entry<K,V> p = root;
while (p != null) {
int cmp = k.compareTo(p.key);
if (cmp < 0)
// 如果小于0从左子树查找
p = p.left;
else if (cmp > 0)
// 如果大于0从右子树查找
p = p.right;
else
// 如果相等说明找到了直接返回
return p;
}
// 没找到返回null
return null;
}
final Entry<K,V> getEntryUsingComparator(Object key) {
@SuppressWarnings("unchecked")
K k = (K) key;
Comparator<? super K> cpr = comparator;
if (cpr != null) {
// 从根元素开始遍历
Entry<K,V> p = root;
while (p != null) {
int cmp = cpr.compare(k, p.key);
if (cmp < 0)
// 如果小于0从左子树查找
p = p.left;
else if (cmp > 0)
// 如果大于0从右子树查找
p = p.right;
else
// 如果相等说明找到了直接返回
return p;
}
}
// 没找到返回null
return null;
}
TreeMap为什么不允许key为null?
TreeMap 的 key 不允许为空( null
),可能是因为红黑树是一颗二叉查找树,需要对 key 进行排序。
TreeMap中是怎么遍历的?
TreeMap插入、删除、查询元素的时间复杂度各是多少?
TreeMap 的查找、添加、删除 key-value 键值对的平均时间复杂度为 O(logN)
。原因是,TreeMap 采用红黑树,操作都需要经过二分查找,而二分查找的时间复杂度是 O(logN)
。
HashMap在多线程环境中什么时候会出现问题?
在多线程使用场景中,应该尽量避免使用线程不安全的HashMap,而使用线程安全的ConcurrentHashMap。那么为什么说HashMap是线程不安全的,下面举例子说明在并发的多线程使用场景中使用HashMap可能造成死循环。代码例子如下(便于理解,仍然使用JDK1.7的环境):
public class HashMapInfiniteLoop {
private static HashMap<Integer,String> map = new HashMap<Integer,String>(2,0.75f);
public static void main(String[] args) {
map.put(5, "C");
new Thread("Thread1") {
public void run() {
map.put(7, "B");
System.out.println(map);
};
}.start();
new Thread("Thread2") {
public void run() {
map.put(3, "A);
System.out.println(map);
};
}.start();
}
}
其中,map初始化为一个长度为2的数组,loadFactor=0.75,threshold=2*0.75=1,也就是说当put第二个key的时候,map就需要进行resize。
通过设置断点让线程1和线程2同时debug到transfer方法(3.3小节代码块)的首行。注意此时两个线程已经成功添加数据。放开thread1的断点至transfer方法的“Entry next = e.next;” 这一行;然后放开线程2的的断点,让线程2进行resize。结果如下图。
注意,Thread1的 e 指向了key(3),而next指向了key(7),其在线程二rehash后,指向了线程二重组后的链表。
线程一被调度回来执行,先是执行 newTalbe[i] = e, 然后是e = next,导致了e指向了key(7),而下一次循环的next = e.next导致了next指向了key(3)。
于是,当我们用线程一调用map.get(11)时,悲剧就出现了——Infinite Loop。
ConcurrentHashMap的存储结构?
ConcurrentHashMap采用(数组 + 链表 + 红黑树)的结构存储元素;
ConcurrentHashMap是怎么保证并发安全的?
ConcurrentHashMap采用的锁有 synchronized,CAS,自旋锁,分段锁,volatile等
ConcurrentHashMap是怎么扩容的?
1.每次添加元素后,元素数量加1,并判断是否达到扩容门槛,达到了则进行扩容或协助扩容。
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// 这里使用的思想跟LongAdder类是一模一样的(后面会讲)
// 把数组的大小存储根据不同的线程存储到不同的段上(也是分段锁的思想)
// 并且有一个baseCount,优先更新baseCount,如果失败了再更新不同线程对应的段
// 这样可以保证尽量小的减少冲突
// 先尝试把数量加到baseCount上,如果失败再加到分段的CounterCell上
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
// 如果as为空
// 或者长度为0
// 或者当前线程所在的段为null
// 或者在当前线程的段上加数量失败
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
// 强制增加数量(无论如何数量是一定要加上的,并不是简单地自旋)
// 不同线程对应不同的段都更新失败了
// 说明已经发生冲突了,那么就对counterCells进行扩容
// 以减少多个线程hash到同一个段的概率
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
// 计算元素个数
s = sumCount();
}
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// 如果元素个数达到了扩容门槛,则进行扩容
// 注意,正常情况下sizeCtl存储的是扩容门槛,即容量的0.75倍
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
// rs是扩容时的一个邮戳标识
int rs = resizeStamp(n);
if (sc < 0) {
// sc<0说明正在扩容中
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
// 扩容已经完成了,退出循环
// 正常应该只会触发nextTable==null这个条件,其它条件没看出来何时触发
break;
// 扩容未完成,则当前线程加入迁移元素中
// 并把扩容线程数加1
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
// 这里是触发扩容的那个线程进入的地方
// sizeCtl的高16位存储着rs这个扩容邮戳
// sizeCtl的低16位存储着扩容线程数加1,即(1+nThreads)
// 所以官方说的扩容时sizeCtl的值为 -(1+nThreads)是错误的
// 进入迁移元素
transfer(tab, null);
// 重新计算元素个数
s = sumCount();
}
}
}
(1)元素个数的存储方式类似于LongAdder类,存储在不同的段上,减少不同线程同时更新size时的冲突;
(2)计算元素个数时把这些段的值及baseCount相加算出总的元素个数;
(3)正常情况下sizeCtl存储着扩容门槛,扩容门槛为容量的0.75倍;
(4)扩容时sizeCtl高位存储扩容邮戳(resizeStamp),低位存储扩容线程数加1(1+nThreads);
(5)其它线程添加元素后如果发现存在扩容,也会加入的扩容行列中来;
2.线程添加元素时发现正在扩容且当前元素所在的桶元素已经迁移完成了,则协助迁移其它桶的元素。
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
// 如果桶数组不为空,并且当前桶第一个元素为ForwardingNode类型,并且nextTab不为空
// 说明当前桶已经迁移完毕了,才去帮忙迁移其它桶的元素
// 扩容时会把旧桶的第一个元素置为ForwardingNode,并让其nextTab指向新桶数组
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
// sizeCtl<0,说明正在扩容
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
// 扩容线程数加1
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
// 当前线程帮忙迁移元素
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
当前桶元素迁移完成了才去协助迁移其它桶元素;
3.扩容时容量变为两倍,并把部分元素迁移到其它桶中。
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
// 如果nextTab为空,说明还没开始迁移
// 就新建一个新桶数组
try {
// 新桶数组是原桶的两倍
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
// 新桶数组大小
int nextn = nextTab.length;
// 新建一个ForwardingNode类型的节点,并把新桶数组存储在里面
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 整个while循环就是在算i的值,过程太复杂,不用太关心
// i的值会从n-1依次递减,感兴趣的可以打下断点就知道了
// 其中n是旧桶数组的大小,也就是说i从15开始一直减到1这样去迁移元素
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
// 如果一次遍历完成了
// 也就是整个map所有桶中的元素都迁移完成了
int sc;
if (finishing) {
// 如果全部迁移完成了,则替换旧桶数组
// 并设置下一次扩容门槛为新桶数组容量的0.75倍
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 当前线程扩容完成,把扩容线程数-1
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
// 扩容完成两边肯定相等
return;
// 把finishing设置为true
// finishing为true才会走到上面的if条件
finishing = advance = true;
// i重新赋值为n
// 这样会再重新遍历一次桶数组,看看是不是都迁移完成了
// 也就是第二次遍历都会走到下面的(fh = f.hash) == MOVED这个条件
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
// 如果桶中无数据,直接放入ForwardingNode标记该桶已迁移
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
// 如果桶中第一个元素的hash值为MOVED
// 说明它是ForwardingNode节点
// 也就是该桶已迁移
advance = true; // already processed
else {
// 锁定该桶并迁移元素
synchronized (f) {
// 再次判断当前桶第一个元素是否有修改
// 也就是可能其它线程先一步迁移了元素
if (tabAt(tab, i) == f) {
// 把一个链表分化成两个链表
// 规则是桶中各元素的hash与桶大小n进行与操作
// 等于0的放到低位链表(low)中,不等于0的放到高位链表(high)中
// 其中低位链表迁移到新桶中的位置相对旧桶不变
// 高位链表迁移到新桶中位置正好是其在旧桶的位置加n
// 这也正是为什么扩容时容量在变成两倍的原因
Node<K,V> ln, hn;
if (fh >= 0) {
// 第一个元素的hash值大于等于0
// 说明该桶中元素是以链表形式存储的
// 这里与HashMap迁移算法基本类似
// 唯一不同的是多了一步寻找lastRun
// 这里的lastRun是提取出链表后面不用处理再特殊处理的子链表
// 比如所有元素的hash值与桶大小n与操作后的值分别为 0 0 4 4 0 0 0
// 则最后后面三个0对应的元素肯定还是在同一个桶中
// 这时lastRun对应的就是倒数第三个节点
// 至于为啥要这样处理,我也没太搞明白
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
// 看看最后这几个元素归属于低位链表还是高位链表
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
// 遍历链表,把hash&n为0的放在低位链表中
// 不为0的放在高位链表中
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 低位链表的位置不变
setTabAt(nextTab, i, ln);
// 高位链表的位置是原位置加n
setTabAt(nextTab, i + n, hn);
// 标记当前桶已迁移
setTabAt(tab, i, fwd);
// advance为true,返回上面进行--i操作
advance = true;
}
else if (f instanceof TreeBin) {
// 如果第一个元素是树节点
// 也是一样,分化成两颗树
// 也是根据hash&n为0放在低位树中
// 不为0放在高位树中
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
// 遍历整颗树,根据hash&n是否为0分化成两颗树
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 如果分化的树中元素个数小于等于6,则退化成链表
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
// 低位树的位置不变
setTabAt(nextTab, i, ln);
// 高位树的位置是原位置加n
setTabAt(nextTab, i + n, hn);
// 标记该桶已迁移
setTabAt(tab, i, fwd);
// advance为true,返回上面进行--i操作
advance = true;
}
}
}
}
}
}
(1)新桶数组大小是旧桶数组的两倍;
(2)迁移元素先从靠后的桶开始;
(3)迁移完成的桶在里面放置一ForwardingNode类型的元素,标记该桶迁移完成;
(4)迁移时根据hash&n是否等于0把桶中元素分化成两个链表或树;
(5)低位链表(树)存储在原来的位置;
(6)高们链表(树)存储在原来的位置加n的位置;
(7)迁移元素时会锁住当前桶,也是分段锁的思想;
ConcurrentHashMap的size()方法的实现知道多少?
元素个数的存储也是采用分段的思想,获取元素个数时需要把所有段加起来。
public int size() {
// 调用sumCount()计算元素个数
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
final long sumCount() {
// 计算CounterCell所有段及baseCount的数量之和
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
(1)元素的个数依据不同的线程存在在不同的段里;(见addCounter()分析)
(2)计算CounterCell所有段及baseCount的数量之和;
(3)获取元素个数没有加锁;
ConcurrentHashMap是强一致性的吗?
查询操作是不会加锁的,所以ConcurrentHashMap不是强一致性的;
ConcurrentHashMap不能解决什么问题?
ConcurrentHashMap中不能存储key或value为null的元素;
例子:
private static final Map<Integer, Integer> map = new ConcurrentHashMap<>();
public void unsafeUpdate(Integer key, Integer value) {
Integer oldValue = map.get(key);
if (oldValue == null) {
map.put(key, value);
}
}
这里如果有多个线程同时调用unsafeUpdate()这个方法,ConcurrentHashMap还能保证线程安全吗?
答案是不能。因为get()之后if之前可能有其它线程已经put()了这个元素,这时候再put()就把那个线程put()的元素覆盖了。
那怎么修改呢?
答案也很简单,使用putIfAbsent()方法,它会保证元素不存在时才插入元素,如下:
public void safeUpdate(Integer key, Integer value) {
map.putIfAbsent(key, value);
}
那么,如果上面oldValue不是跟null比较,而是跟一个特定的值比如1进行比较怎么办?也就是下面这样:
public void unsafeUpdate(Integer key, Integer value) {
Integer oldValue = map.get(key);
if (oldValue == 1) {
map.put(key, value);
}
}
这样的话就没办法使用putIfAbsent()方法了。
其实,ConcurrentHashMap还提供了另一个方法叫replace(K key, V oldValue, V newValue)可以解决这个问题。
replace(K key, V oldValue, V newValue)这个方法可不能乱用,如果传入的newValue是null,则会删除元素。
public void safeUpdate(Integer key, Integer value) {
map.replace(key, 1, value);
}
那么,如果if之后不是简单的put()操作,而是还有其它业务操作,之后才是put(),比如下面这样,这该怎么办呢?
public void unsafeUpdate(Integer key, Integer value) {
Integer oldValue = map.get(key);
if (oldValue == 1) {
System.out.println(System.currentTimeMillis());
/**
* 其它业务操作
*/
System.out.println(System.currentTimeMillis());
map.put(key, value);
}
}
这时候就没办法使用ConcurrentHashMap提供的方法了,只能业务自己来保证线程安全了,比如下面这样:
public void safeUpdate(Integer key, Integer value) {
synchronized (map) {
Integer oldValue = map.get(key);
if (oldValue == null) {
System.out.println(System.currentTimeMillis());
/**
* 其它业务操作
*/
System.out.println(System.currentTimeMillis());
map.put(key, value);
}
}
}
这样虽然不太友好,但是最起码能保证业务逻辑是正确的。
当然,这里使用ConcurrentHashMap的意义也就不大了,可以换成普通的HashMap了。
上面只是举一个简单的例子,我们不能听说ConcurrentHashMap是线程安全的,就认为它无论什么情况下都是线程安全的,还是那句话尽信书不如无书。
ConcurrentHashMap的sizeCtl知道吗?
ConcurrentHashMap中没有threshold和loadFactor这两个字段,而是采用sizeCtl来控制;
sizeCtl = -1,表示正在进行初始化;
sizeCtl = 0,默认值,表示后续在真正初始化的时候使用默认容量;
sizeCtl > 0,在初始化之前存储的是传入的容量,在初始化或扩容后存储的是下一次的扩容门槛;
sizeCtl = (resizeStamp << 16) + (1 + nThreads),表示正在进行扩容,高位存储扩容邮戳,低位存储扩容线程数加1;
public ConcurrentHashMap() {
}
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
ConcurrentHashMap中哪些地方运用到分段锁的思想?
分段锁,是一种锁的设计思路,它细化了锁的粒度,主要运用在ConcurrentHashMap中,实现高效的并发操作,当操作不需要更新整个数组时,就只锁数组中的一项就可以了。
什么是伪共享?为何会出现伪共享?怎么避免伪共享?
什么是伪共享?
计算机系统中为了解决主内存与CPU运行速度的差距,在CPU与主内存之间添加了一级或者多级高速缓冲存储器(Cache),这个Cache一般是集成到CPU内部的,所以也叫 CPU Cache,如下图是两级cache结构:
Cache内部是按行存储的,其中每一行称为一个cache行,cache行是Cache与主内存进行数据交换的单位,cache行的大小一般为2的幂次数字节。
当CPU访问某一个变量时候,首先会去看CPU Cache内是否有该变量,如果有则直接从中获取,否者就去主内存里面获取该变量,然后把该变量所在内存区域的一个Cache行大小的内存拷贝到Cache(cache行是Cache与主内存进行数据交换的单位)。由于存放到Cache行的的是内存块而不是单个变量,所以可能会把多个变量存放到了一个cache行。当多个线程同时修改一个缓存行里面的多个变量时候,由于同时只能有一个线程操作缓存行,所以相比每个变量放到一个缓存行性能会有所下降,这就是伪共享。
如上图变量x,y同时被放到了CPU的一级和二级缓存,当线程1使用CPU1对变量x进行更新时候,首先会修改cpu1的一级缓存变量x所在缓存行,这时候缓存一致性协议会导致cpu2中变量x对应的缓存行失效,那么线程2写入变量x的时候就只能去二级缓存去查找,这就破坏了一级缓存,而一级缓存比二级缓存更快。更坏的情况下如果cpu只有一级缓存,那么会导致频繁的直接访问主内存。
为何会出现伪共享
伪共享的产生是因为多个变量被放入了一个缓存行,并且多个线程同时去写入缓存行中不同变量。那么为何多个变量会被放入一个缓存行那。其实是因为Cache与内存交换数据的单位就是Cache,当CPU要访问的变量没有在Cache命中时候,根据程序运行的局部性原理会把该变量在内存中大小为Cache行的内存放如缓存行。
long a;
long b;
long c;
long d;
如上代码,声明了四个long变量,假设cache行的大小为32个字节,那么当cpu访问变量a时候发现该变量没有在cache命中,那么就会去主内存把变量a以及内存地址附近的b,c,d放入缓存行。也就是地址连续的多个变量才有可能会被放到一个缓存行中,当创建数组时候,数组里面的多个元素就会被放入到同一个缓存行。那么单线程下多个变量放入缓存行对性能有影响?其实正常情况下单线程访问时候由于数组元素被放入到了一个或者多个cache行对代码执行是有利的,因为数据都在缓存中,代码执行会更快。
如何解决伪共享
@Contended注解
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD, ElementType.TYPE})
public @interface Contended {
String value() default "";
}
Contended注解可以用于类型上和属性上,加上这个注解之后虚拟机会自动进行填充,从而避免伪共享。这个注解在Java8 ConcurrentHashMap、ForkJoinPool和Thread等类中都有应用。我们来看一下Java8中ConcurrentHashMap中如何运用Contended这个注解来解决伪共享问题。以下说的ConcurrentHashMap都是Java8版本。
注意:在Java8中提供了@sun.misc.Contended来避免伪共享时,在运行时需要设置JVM启动参数-XX:-RestrictContended否则可能不生效。
缓存行填充的威力
/**
* 缓存行测试
*/
public class CacheLineTest {
/**
* 是否启用缓存行填充
*/
private final boolean isDataPadding = false;
/**
* 正常定义的变量
*/
private volatile long x = 0;
private volatile long y = 0;
private volatile long z = 0;
/**
* 通过缓存行填充的变量
*/
private volatile VolatileData volatileDataX = new VolatileData(0);
private volatile VolatileData volatileDataY = new VolatileData(0);
private volatile VolatileData volatileDataZ = new VolatileData(0);
/**
* 循环次数
*/
private final long size = 100000000;
/**
* 进行累加操作
*/
public void accumulationX() {
//计算耗时
long currentTime = System.currentTimeMillis();
long value = 0;
//循环累加
for (int i = 0; i < size; i++) {
//使用缓存行填充的方式
if (isDataPadding) {
value = volatileDataX.accumulationAdd();
} else {
//不使用缓存行填充的方式 因为时单线程操作不需要加锁
value = (++x);
}
}
//打印
System.out.println(value);
//打印耗时
System.out.println("耗时:" + (System.currentTimeMillis() - currentTime));
}
/**
* 进行累加操作
*/
public void accumulationY() {
long currentTime = System.currentTimeMillis();
long value = 0;
for (int i = 0; i < size; i++) {
if (isDataPadding) {
value = volatileDataY.accumulationAdd();
} else {
value = ++y;
}
}
System.out.println(value);
System.out.println("耗时:" + (System.currentTimeMillis() - currentTime));
}
/**
* 进行累加操作
*/
public void accumulationZ() {
long currentTime = System.currentTimeMillis();
long value = 0;
for (int i = 0; i < size; i++) {
if (isDataPadding) {
value = volatileDataZ.accumulationAdd();
} else {
value = ++z;
}
}
System.out.println(value);
System.out.println("耗时:" + (System.currentTimeMillis() - currentTime));
}
public static void main(String[] args) {
//创建对象
CacheLineTest cacheRowTest = new CacheLineTest();
//创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
//启动三个线程个调用他们各自的方法
executorService.execute(() -> cacheRowTest.accumulationX());
executorService.execute(() -> cacheRowTest.accumulationY());
executorService.execute(() -> cacheRowTest.accumulationZ());
executorService.shutdown();
}
}
当多个线程同时对共享的缓存行进行写操作的时候,因为缓存系统自身的缓存一致性原则,会引发伪共享问题,解决的常用办法是将共享变量根据缓存行大小进行补充对齐,使其加载到缓存时能够独享缓存行,避免与其他共享变量存储在同一个缓存行。
什么是跳表?
跳表是一个随机化的数据结构,实质就是一种可以进行二分查找的有序链表。
跳表在原有的有序链表上面增加了多级索引,通过索引来实现快速查找。
跳表不仅能提高搜索性能,同时也可以提高插入和删除操作的性能。
// 数据节点,典型的单链表结构
static final class Node<K,V> {
final K key;
// 注意:这里value的类型是Object,而不是V
// 在删除元素的时候value会指向当前元素本身
volatile Object value;
volatile Node<K,V> next;
Node(K key, Object value, Node<K,V> next) {
this.key = key;
this.value = value;
this.next = next;
}
Node(Node<K,V> next) {
this.key = null;
this.value = this; // 当前元素本身(marker)
this.next = next;
}
}
// 索引节点,存储着对应的node值,及向下和向右的索引指针
static class Index<K,V> {
final Node<K,V> node;
final Index<K,V> down;
volatile Index<K,V> right;
Index(Node<K,V> node, Index<K,V> down, Index<K,V> right) {
this.node = node;
this.down = down;
this.right = right;
}
}
// 头索引节点,继承自Index,并扩展一个level字段,用于记录索引的层级
static final class HeadIndex<K,V> extends Index<K,V> {
final int level;
HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
super(node, down, right);
this.level = level;
}
}
(1)Node,数据节点,存储数据的节点,典型的单链表结构;
(2)Index,索引节点,存储着对应的node值,及向下和向右的索引指针;
(3)HeadIndex,头索引节点,继承自Index,并扩展一个level字段,用于记录索引的层级;
ConcurrentSkipList是有序的吗?
ConcurrentSkipListMap 的key是有序的。
ConcurrentSkipList是如何保证线程安全的?
ConcurrentSkipList插入、删除、查询元素的时间复杂度各是多少?
跳表查询、插入、删除的时间复杂度为O(log n),与平衡二叉树接近;
添加
public V put(K key, V value) {
// 不能存储value为null的元素
// 因为value为null标记该元素被删除(后面会看到)
if (value == null)
throw new NullPointerException();
// 调用doPut()方法添加元素
return doPut(key, value, false);
}
private V doPut(K key, V value, boolean onlyIfAbsent) {
// 添加元素后存储在z中
Node<K,V> z; // added node
// key也不能为null
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
// Part I:找到目标节点的位置并插入
// 这里的目标节点是数据节点,也就是最底层的那条链
// 自旋
outer: for (;;) {
// 寻找目标节点之前最近的一个索引对应的数据节点,存储在b中,b=before
// 并把b的下一个数据节点存储在n中,n=next
// 为了便于描述,我这里把b叫做当前节点,n叫做下一个节点
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
// 如果下一个节点不为空
// 就拿其key与目标节点的key比较,找到目标节点应该插入的位置
if (n != null) {
// v=value,存储节点value值
// c=compare,存储两个节点比较的大小
Object v; int c;
// n的下一个数据节点,也就是b的下一个节点的下一个节点(孙子节点)
Node<K,V> f = n.next;
// 如果n不为b的下一个节点
// 说明有其它线程修改了数据,则跳出内层循环
// 也就是回到了外层循环自旋的位置,从头来过
if (n != b.next) // inconsistent read
break;
// 如果n的value值为空,说明该节点已删除,协助删除节点
if ((v = n.value) == null) { // n is deleted
// todo 这里为啥会协助删除?后面讲
n.helpDelete(b, f);
break;
}
// 如果b的值为空或者v等于n,说明b已被删除
// 这时候n就是marker节点,那b就是被删除的那个
if (b.value == null || v == n) // b is deleted
break;
// 如果目标key与下一个节点的key大
// 说明目标元素所在的位置还在下一个节点的后面
if ((c = cpr(cmp, key, n.key)) > 0) {
// 就把当前节点往后移一位
// 同样的下一个节点也往后移一位
// 再重新检查新n是否为空,它与目标key的关系
b = n;
n = f;
continue;
}
// 如果比较时发现下一个节点的key与目标key相同
// 说明链表中本身就存在目标节点
if (c == 0) {
// 则用新值替换旧值,并返回旧值(onlyIfAbsent=false)
if (onlyIfAbsent || n.casValue(v, value)) {
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
// 如果替换旧值时失败,说明其它线程先一步修改了值,从头来过
break; // restart if lost race to replace value
}
// 如果c<0,就往下走,也就是找到了目标节点的位置
// else c < 0; fall through
}
// 有两种情况会到这里
// 一是到链表尾部了,也就是n为null了
// 二是找到了目标节点的位置,也就是上面的c<0
// 新建目标节点,并赋值给z
// 这里把n作为新节点的next
// 如果到链表尾部了,n为null,这毫无疑问
// 如果c<0,则n的key比目标key大,相妆于在b和n之间插入目标节点z
z = new Node<K,V>(key, value, n);
// 原子更新b的下一个节点为目标节点z
if (!b.casNext(n, z))
// 如果更新失败,说明其它线程先一步修改了值,从头来过
break; // restart if lost race to append to b
// 如果更新成功,跳出自旋状态
break outer;
}
}
// 经过Part I,目标节点已经插入到有序链表中了
// Part II:随机决定是否需要建立索引及其层次,如果需要则建立自上而下的索引
// 取个随机数
int rnd = ThreadLocalRandom.nextSecondarySeed();
// 0x80000001展开为二进制为10000000000000000000000000000001
// 只有两头是1
// 这里(rnd & 0x80000001) == 0
// 相当于排除了负数(负数最高位是1),排除了奇数(奇数最低位是1)
// 只有最高位最低位都不为1的数跟0x80000001做&操作才会为0
// 也就是正偶数
if ((rnd & 0x80000001) == 0) { // test highest and lowest bits
// 默认level为1,也就是只要到这里了就会至少建立一层索引
int level = 1, max;
// 随机数从最低位的第二位开始,有几个连续的1则level就加几
// 因为最低位肯定是0,正偶数嘛
// 比如,1100110,level就加2
while (((rnd >>>= 1) & 1) != 0)
++level;
// 用于记录目标节点建立的最高的那层索引节点
Index<K,V> idx = null;
// 取头索引节点(这是最高层的头索引节点)
HeadIndex<K,V> h = head;
// 如果生成的层数小于等于当前最高层的层级
// 也就是跳表的高度不会超过现有高度
if (level <= (max = h.level)) {
// 从第一层开始建立一条竖直的索引链表
// 这条链表使用down指针连接起来
// 每个索引节点里面都存储着目标节点这个数据节点
// 最后idx存储的是这条索引链表的最高层节点
for (int i = 1; i <= level; ++i)
idx = new Index<K,V>(z, idx, null);
}
else { // try to grow by one level
// 如果新的层数超过了现有跳表的高度
// 则最多只增加一层
// 比如现在只有一层索引,那下一次最多增加到两层索引,增加多了也没有意义
level = max + 1; // hold in array and later pick the one to use
// idxs用于存储目标节点建立的竖起索引的所有索引节点
// 其实这里直接使用idx这个最高节点也是可以完成的
// 只是用一个数组存储所有节点要方便一些
// 注意,这里数组0号位是没有使用的
@SuppressWarnings("unchecked")Index<K,V>[] idxs =
(Index<K,V>[])new Index<?,?>[level+1];
// 从第一层开始建立一条竖的索引链表(跟上面一样,只是这里顺便把索引节点放到数组里面了)
for (int i = 1; i <= level; ++i)
idxs[i] = idx = new Index<K,V>(z, idx, null);
// 自旋
for (;;) {
// 旧的最高层头索引节点
h = head;
// 旧的最高层级
int oldLevel = h.level;
// 再次检查,如果旧的最高层级已经不比新层级矮了
// 说明有其它线程先一步修改了值,从头来过
if (level <= oldLevel) // lost race to add level
break;
// 新的最高层头索引节点
HeadIndex<K,V> newh = h;
// 头节点指向的数据节点
Node<K,V> oldbase = h.node;
// 超出的部分建立新的头索引节点
for (int j = oldLevel+1; j <= level; ++j)
newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
// 原子更新头索引节点
if (casHead(h, newh)) {
// h指向新的最高层头索引节点
h = newh;
// 把level赋值为旧的最高层级的
// idx指向的不是最高的索引节点了
// 而是与旧最高层平齐的索引节点
idx = idxs[level = oldLevel];
break;
}
}
}
// 经过上面的步骤,有两种情况
// 一是没有超出高度,新建一条目标节点的索引节点链
// 二是超出了高度,新建一条目标节点的索引节点链,同时最高层头索引节点同样往上长
// Part III:将新建的索引节点(包含头索引节点)与其它索引节点通过右指针连接在一起
// 这时level是等于旧的最高层级的,自旋
splice: for (int insertionLevel = level;;) {
// h为最高头索引节点
int j = h.level;
// 从头索引节点开始遍历
// 为了方便,这里叫q为当前节点,r为右节点,d为下节点,t为目标节点相应层级的索引
for (Index<K,V> q = h, r = q.right, t = idx;;) {
// 如果遍历到了最右边,或者最下边,
// 也就是遍历到头了,则退出外层循环
if (q == null || t == null)
break splice;
// 如果右节点不为空
if (r != null) {
// n是右节点的数据节点,为了方便,这里直接叫右节点的值
Node<K,V> n = r.node;
// 比较目标key与右节点的值
int c = cpr(cmp, key, n.key);
// 如果右节点的值为空了,则表示此节点已删除
if (n.value == null) {
// 则把右节点删除
if (!q.unlink(r))
// 如果删除失败,说明有其它线程先一步修改了,从头来过
break;
// 删除成功后重新取右节点
r = q.right;
continue;
}
// 如果比较c>0,表示目标节点还要往右
if (c > 0) {
// 则把当前节点和右节点分别右移
q = r;
r = r.right;
continue;
}
}
// 到这里说明已经到当前层级的最右边了
// 这里实际是会先走第二个if
// 第一个if
// j与insertionLevel相等了
// 实际是先走的第二个if,j自减后应该与insertionLevel相等
if (j == insertionLevel) {
// 这里是真正连右指针的地方
if (!q.link(r, t))
// 连接失败,从头来过
break; // restart
// t节点的值为空,可能是其它线程删除了这个元素
if (t.node.value == null) {
// 这里会去协助删除元素
findNode(key);
break splice;
}
// 当前层级右指针连接完毕,向下移一层继续连接
// 如果移到了最下面一层,则说明都连接完成了,退出外层循环
if (--insertionLevel == 0)
break splice;
}
// 第二个if
// j先自减1,再与两个level比较
// j、insertionLevel和t(idx)三者是对应的,都是还未把右指针连好的那个层级
if (--j >= insertionLevel && j < level)
// t往下移
t = t.down;
// 当前层级到最右边了
// 那只能往下一层级去走了
// 当前节点下移
// 再取相应的右节点
q = q.down;
r = q.right;
}
}
}
return null;
}
// 寻找目标节点之前最近的一个索引对应的数据节点
private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
// key不能为空
if (key == null)
throw new NullPointerException(); // don't postpone errors
// 自旋
for (;;) {
// 从最高层头索引节点开始查找,先向右,再向下
// 直到找到目标位置之前的那个索引
for (Index<K,V> q = head, r = q.right, d;;) {
// 如果右节点不为空
if (r != null) {
// 右节点对应的数据节点,为了方便,我们叫右节点的值
Node<K,V> n = r.node;
K k = n.key;
// 如果右节点的value为空
// 说明其它线程把这个节点标记为删除了
// 则协助删除
if (n.value == null) {
if (!q.unlink(r))
// 如果删除失败
// 说明其它线程先删除了,从头来过
break; // restart
// 删除之后重新读取右节点
r = q.right; // reread r
continue;
}
// 如果目标key比右节点还大,继续向右寻找
if (cpr(cmp, key, k) > 0) {
// 往右移
q = r;
// 重新取右节点
r = r.right;
continue;
}
// 如果c<0,说明不能再往右了
}
// 到这里说明当前层级已经到最右了
// 两种情况:一是r==null,二是c<0
// 再从下一级开始找
// 如果没有下一级了,就返回这个索引对应的数据节点
if ((d = q.down) == null)
return q.node;
// 往下移
q = d;
// 重新取右节点
r = d.right;
}
}
}
// Node.class中的方法,协助删除元素
void helpDelete(Node<K,V> b, Node<K,V> f) {
/*
* Rechecking links and then doing only one of the
* help-out stages per call tends to minimize CAS
* interference among helping threads.
*/
// 这里的调用者this==n,三者关系是b->n->f
if (f == next && this == b.next) {
// 将n的值设置为null后,会先把n的下个节点设置为marker节点
// 这个marker节点的值是它自己
// 这里如果不是它自己说明marker失败了,重新marker
if (f == null || f.value != f) // not already marked
casNext(f, new Node<K,V>(f));
else
// marker过了,就把b的下个节点指向marker的下个节点
b.casNext(this, f.next);
}
}
// Index.class中的方法,删除succ节点
final boolean unlink(Index<K,V> succ) {
// 原子更新当前节点指向下一个节点的下一个节点
// 也就是删除下一个节点
return node.value != null && casRight(succ, succ.right);
}
// Index.class中的方法,在当前节点与succ之间插入newSucc节点
final boolean link(Index<K,V> succ, Index<K,V> newSucc) {
// 在当前节点与下一个节点中间插入一个节点
Node<K,V> n = node;
// 新节点指向当前节点的下一个节点
newSucc.right = succ;
// 原子更新当前节点的下一个节点指向新节点
return n.value != null && casRight(succ, newSucc);
}
我们这里把整个插入过程分成三个部分:
Part I:找到目标节点的位置并插入
(1)这里的目标节点是数据节点,也就是最底层的那条链;
(2)寻找目标节点之前最近的一个索引对应的数据节点(数据节点都是在最底层的链表上);
(3)从这个数据节点开始往后遍历,直到找到目标节点应该插入的位置;
(4)如果这个位置有元素,就更新其值(onlyIfAbsent=false);
(5)如果这个位置没有元素,就把目标节点插入;
(6)至此,目标节点已经插入到最底层的数据节点链表中了;
Part II:随机决定是否需要建立索引及其层次,如果需要则建立自上而下的索引
(1)取个随机数rnd,计算(rnd & 0x80000001);
(2)如果不等于0,结束插入过程,也就是不需要创建索引,返回;
(3)如果等于0,才进入创建索引的过程(只要正偶数才会等于0);
(4)计算while (((rnd >>>= 1) & 1) != 0)
,决定层级数,level从1开始;
(5)如果算出来的层级不高于现有最高层级,则直接建立一条竖直的索引链表(只有down有值),并结束Part II;
(6)如果算出来的层级高于现有最高层级,则新的层级只能比现有最高层级多1;
(7)同样建立一条竖直的索引链表(只有down有值);
(8)将头索引也向上增加到相应的高度,结束Part II;
(9)也就是说,如果层级不超过现有高度,只建立一条索引链,否则还要额外增加头索引链的高度(脑补一下,后面举例说明);
Part III:将新建的索引节点(包含头索引节点)与其它索引节点通过右指针连接在一起(补上right指针)
(1)从最高层级的头索引节点开始,向右遍历,找到目标索引节点的位置;
(2)如果当前层有目标索引,则把目标索引插入到这个位置,并把目标索引前一个索引向下移一个层级;
(3)如果当前层没有目标索引,则把目标索引位置前一个索引向下移一个层级;
(4)同样地,再向右遍历,寻找新的层级中目标索引的位置,回到第(2)步;
(5)依次循环找到所有层级目标索引的位置并把它们插入到横向的索引链表中;
总结起来,一共就是三大步:
(1)插入目标节点到数据节点链表中;
(2)建立竖直的down链表;
(3)建立横向的right链表;
删除
public V remove(Object key) {
return doRemove(key, null);
}
final V doRemove(Object key, Object value) {
// key不为空
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
// 自旋
outer: for (;;) {
// 寻找目标节点之前的最近的索引节点对应的数据节点
// 为了方便,这里叫b为当前节点,n为下一个节点,f为下下个节点
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
Object v; int c;
// 整个链表都遍历完了也没找到目标节点,退出外层循环
if (n == null)
break outer;
// 下下个节点
Node<K,V> f = n.next;
// 再次检查
// 如果n不是b的下一个节点了
// 说明有其它线程先一步修改了,从头来过
if (n != b.next) // inconsistent read
break;
// 如果下个节点的值奕为null了
// 说明有其它线程标记该元素为删除状态了
if ((v = n.value) == null) { // n is deleted
// 协助删除
n.helpDelete(b, f);
break;
}
// 如果b的值为空或者v等于n,说明b已被删除
// 这时候n就是marker节点,那b就是被删除的那个
if (b.value == null || v == n) // b is deleted
break;
// 如果c<0,说明没找到元素,退出外层循环
if ((c = cpr(cmp, key, n.key)) < 0)
break outer;
// 如果c>0,说明还没找到,继续向右找
if (c > 0) {
// 当前节点往后移
b = n;
// 下一个节点往后移
n = f;
continue;
}
// c=0,说明n就是要找的元素
// 如果value不为空且不等于找到元素的value,不需要删除,退出外层循环
if (value != null && !value.equals(v))
break outer;
// 如果value为空,或者相等
// 原子标记n的value值为空
if (!n.casValue(v, null))
// 如果删除失败,说明其它线程先一步修改了,从头来过
break;
// P.S.到了这里n的值肯定是设置成null了
// 关键!!!!
// 让n的下一个节点指向一个market节点
// 这个market节点的key为null,value为marker自己,next为n的下个节点f
// 或者让b的下一个节点指向下下个节点
// 注意:这里是或者||,因为两个CAS不能保证都成功,只能一个一个去尝试
// 这里有两层意思:
// 一是如果标记market成功,再尝试将b的下个节点指向下下个节点,如果第二步失败了,进入条件,如果成功了就不用进入条件了
// 二是如果标记market失败了,直接进入条件
if (!n.appendMarker(f) || !b.casNext(n, f))
// 通过findNode()重试删除(里面有个helpDelete()方法)
findNode(key); // retry via findNode
else {
// 上面两步操作都成功了,才会进入这里,不太好理解,上面两个条件都有非"!"操作
// 说明节点已经删除了,通过findPredecessor()方法删除索引节点
// findPredecessor()里面有unlink()操作
findPredecessor(key, cmp); // clean index
// 如果最高层头索引节点没有右节点,则跳表的高度降级
if (head.right == null)
tryReduceLevel();
}
// 返回删除的元素值
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
}
return null;
}
(1)寻找目标节点之前最近的一个索引对应的数据节点(数据节点都是在最底层的链表上);
(2)从这个数据节点开始往后遍历,直到找到目标节点的位置;
(3)如果这个位置没有元素,直接返回null,表示没有要删除的元素;
(4)如果这个位置有元素,先通过n.casValue(v, null)
原子更新把其value设置为null;
(5)通过n.appendMarker(f)
在当前元素后面添加一个marker元素标记当前元素是要删除的元素;
(6)通过b.casNext(n, f)
尝试删除元素;
(7)如果上面两步中的任意一步失败了都通过findNode(key)
中的n.helpDelete(b, f)
再去不断尝试删除;
(8)如果上面两步都成功了,再通过findPredecessor(key, cmp)
中的q.unlink(r)
删除索引节点;
(9)如果head的right指针指向了null,则跳表高度降级;
查询
public V get(Object key) {
return doGet(key);
}
private V doGet(Object key) {
// key不为空
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
// 自旋
outer: for (;;) {
// 寻找目标节点之前最近的索引对应的数据节点
// 为了方便,这里叫b为当前节点,n为下个节点,f为下下个节点
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
Object v; int c;
// 如果链表到头还没找到元素,则跳出外层循环
if (n == null)
break outer;
// 下下个节点
Node<K,V> f = n.next;
// 如果不一致读,从头来过
if (n != b.next) // inconsistent read
break;
// 如果n的值为空,说明节点已被其它线程标记为删除
if ((v = n.value) == null) { // n is deleted
// 协助删除,再重试
n.helpDelete(b, f);
break;
}
// 如果b的值为空或者v等于n,说明b已被删除
// 这时候n就是marker节点,那b就是被删除的那个
if (b.value == null || v == n) // b is deleted
break;
// 如果c==0,说明找到了元素,就返回元素值
if ((c = cpr(cmp, key, n.key)) == 0) {
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
// 如果c<0,说明没找到元素
if (c < 0)
break outer;
// 如果c>0,说明还没找到,继续寻找
// 当前节点往后移
b = n;
// 下一个节点往后移
n = f;
}
}
return null;
}
(1)寻找目标节点之前最近的一个索引对应的数据节点(数据节点都是在最底层的链表上);
(2)从这个数据节点开始往后遍历,直到找到目标节点的位置;
(3)如果这个位置没有元素,直接返回null,表示没有找到元素;
(4)如果这个位置有元素,返回元素的value值;
ConcurrentSkipList的索引具有什么特性?
ConcurrentSkipListMap提供了三个内部类来构建这样的链表结构:Node、Index、HeadIndex。其中Node表示最底层的单链表有序节点、Index表示为基于Node的索引层,HeadIndex用来维护索引层次。到这里我们可以这样说ConcurrentSkipListMap是通过HeadIndex维护索引层次,通过Index从最上层开始往下层查找,一步一步缩小查询范围,最后到达最底层Node时,就只需要比较很小一部分数据了。
Index提供了一个基于Node节点的索引Node,一个指向下一个Index的right,一个指向下层的down节点。
为什么Redis选择使用跳表而不是红黑树来实现有序集合?
首先,我们来分析下Redis的有序集合支持的操作:
1)插入元素
2)删除元素
3)查找元素
4)有序输出所有元素
5)查找区间内所有元素
其中,前4项红黑树都可以完成,且时间复杂度与跳表一致。
但是,最后一项,红黑树的效率就没有跳表高了。
在跳表中,要查找区间的元素,我们只要定位到两个区间端点在最低层级的位置,然后按顺序遍历元素就可以了,非常高效。
而红黑树只能定位到端点后,再从首位置开始每次都要查找后继节点,相对来说是比较耗时的。
此外,跳表实现起来很容易且易读,红黑树实现起来相对困难,所以Redis选择使用跳表来实现有序集合。
HashSet怎么保证添加元素不重复?
HashSet内部使用HashMap的key存储元素,以此来保证元素不重复;
// 内部使用HashMap
private transient HashMap<E,Object> map;
// 虚拟对象,用来作为value放到map中
private static final Object PRESENT = new Object();
HashSet是有序的吗?
HashSet是无序的,因为HashMap的key是无序的;
HashSet是否允许null元素?
HashSet中允许有一个null元素,因为HashMap允许key为null;
Set是否有get()方法?
HashSet是没有get()方法的,,因为get似乎没有意义,不像List那样可以按index获取元素。
这里只要一个检查元素是否存在的方法contains(),直接调用map的containsKey()方法。
public boolean contains(Object o) {
return map.containsKey(o);
}
HashSet源码?
package java.util;
import java.io.InvalidObjectException;
import sun.misc.SharedSecrets;
public class HashSet<E>
extends AbstractSet<E>
implements Set<E>, Cloneable, java.io.Serializable
{
static final long serialVersionUID = -5024744406713321676L;
// 内部元素存储在HashMap中
private transient HashMap<E,Object> map;
// 虚拟元素,用来存到map元素的value中的,没有实际意义
private static final Object PRESENT = new Object();
// 空构造方法
public HashSet() {
map = new HashMap<>();
}
// 把另一个集合的元素全都添加到当前Set中
// 注意,这里初始化map的时候是计算了它的初始容量的
public HashSet(Collection<? extends E> c) {
map = new HashMap<>(Math.max((int) (c.size()/.75f) + 1, 16));
addAll(c);
}
// 指定初始容量和装载因子
public HashSet(int initialCapacity, float loadFactor) {
map = new HashMap<>(initialCapacity, loadFactor);
}
// 只指定初始容量
public HashSet(int initialCapacity) {
map = new HashMap<>(initialCapacity);
}
// LinkedHashSet专用的方法
// dummy是没有实际意义的, 只是为了跟上上面那个操持方法签名不同而已
HashSet(int initialCapacity, float loadFactor, boolean dummy) {
map = new LinkedHashMap<>(initialCapacity, loadFactor);
}
// 迭代器
public Iterator<E> iterator() {
return map.keySet().iterator();
}
// 元素个数
public int size() {
return map.size();
}
// 检查是否为空
public boolean isEmpty() {
return map.isEmpty();
}
// 检查是否包含某个元素
public boolean contains(Object o) {
return map.containsKey(o);
}
// 添加元素
public boolean add(E e) {
return map.put(e, PRESENT)==null;
}
// 删除元素
public boolean remove(Object o) {
return map.remove(o)==PRESENT;
}
// 清空所有元素
public void clear() {
map.clear();
}
// 克隆方法
@SuppressWarnings("unchecked")
public Object clone() {
try {
HashSet<E> newSet = (HashSet<E>) super.clone();
newSet.map = (HashMap<E, Object>) map.clone();
return newSet;
} catch (CloneNotSupportedException e) {
throw new InternalError(e);
}
}
// 序列化写出方法
private void writeObject(java.io.ObjectOutputStream s)
throws java.io.IOException {
// 写出非static非transient属性
s.defaultWriteObject();
// 写出map的容量和装载因子
s.writeInt(map.capacity());
s.writeFloat(map.loadFactor());
// 写出元素个数
s.writeInt(map.size());
// 遍历写出所有元素
for (E e : map.keySet())
s.writeObject(e);
}
// 序列化读入方法
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
// 读入非static非transient属性
s.defaultReadObject();
// 读入容量, 并检查不能小于0
int capacity = s.readInt();
if (capacity < 0) {
throw new InvalidObjectException("Illegal capacity: " +
capacity);
}
// 读入装载因子, 并检查不能小于等于0或者是NaN(Not a Number)
// java.lang.Float.NaN = 0.0f / 0.0f;
float loadFactor = s.readFloat();
if (loadFactor <= 0 || Float.isNaN(loadFactor)) {
throw new InvalidObjectException("Illegal load factor: " +
loadFactor);
}
// 读入元素个数并检查不能小于0
int size = s.readInt();
if (size < 0) {
throw new InvalidObjectException("Illegal size: " +
size);
}
// 根据元素个数重新设置容量
// 这是为了保证map有足够的容量容纳所有元素, 防止无意义的扩容
capacity = (int) Math.min(size * Math.min(1 / loadFactor, 4.0f),
HashMap.MAXIMUM_CAPACITY);
// 再次检查某些东西, 不重要的代码忽视掉
SharedSecrets.getJavaOISAccess()
.checkArray(s, Map.Entry[].class, HashMap.tableSizeFor(capacity));
// 创建map, 检查是不是LinkedHashSet类型
map = (((HashSet<?>)this) instanceof LinkedHashSet ?
new LinkedHashMap<E,Object>(capacity, loadFactor) :
new HashMap<E,Object>(capacity, loadFactor));
// 读入所有元素, 并放入map中
for (int i=0; i<size; i++) {
@SuppressWarnings("unchecked")
E e = (E) s.readObject();
map.put(e, PRESENT);
}
}
// 可分割的迭代器, 主要用于多线程并行迭代处理时使用
public Spliterator<E> spliterator() {
return new HashMap.KeySpliterator<E,Object>(map, 0, -1, 0, 0);
}
}
LinkedHashSet的底层使用什么存储元素?
LinkedHashSet的底层使用LinkedHashMap存储元素。
package java.util;
// LinkedHashSet继承自HashSet
public class LinkedHashSet<E>
extends HashSet<E>
implements Set<E>, Cloneable, java.io.Serializable {
private static final long serialVersionUID = -2851667679971038690L;
// 传入容量和装载因子
public LinkedHashSet(int initialCapacity, float loadFactor) {
super(initialCapacity, loadFactor, true);
}
// 只传入容量, 装载因子默认为0.75
public LinkedHashSet(int initialCapacity) {
super(initialCapacity, .75f, true);
}
// 使用默认容量16, 默认装载因子0.75
public LinkedHashSet() {
super(16, .75f, true);
}
// 将集合c中的所有元素添加到LinkedHashSet中
// 好奇怪, 这里计算容量的方式又变了
// HashSet中使用的是Math.max((int) (c.size()/.75f) + 1, 16)
// 这一点有点不得其解, 是作者偷懒?
public LinkedHashSet(Collection<? extends E> c) {
super(Math.max(2*c.size(), 11), .75f, true);
addAll(c);
}
// 可分割的迭代器, 主要用于多线程并行迭代处理时使用
@Override
public Spliterator<E> spliterator() {
return Spliterators.spliterator(this, Spliterator.DISTINCT | Spliterator.ORDERED);
}
}
LinkedHashSet是有序的吗?怎么个有序法?
LinkedHashSet是有序的,它是按照插入的顺序排序的。
LinkedHashSet支持按元素访问顺序排序吗?
不支持
首先,LinkedHashSet所有的构造方法都是调用HashSet的同一个构造方法,如下:
// HashSet的构造方法
HashSet(int initialCapacity, float loadFactor, boolean dummy) {
map = new LinkedHashMap<>(initialCapacity, loadFactor);
}
然后,通过调用LinkedHashMap的构造方法初始化map,如下所示:
public LinkedHashMap(int initialCapacity, float loadFactor) {
super(initialCapacity, loadFactor);
}
可以看到,这里把accessOrder写死为false了。
所以,LinkedHashSet是不支持按访问顺序对元素排序的,只能按插入顺序排序。
TreeSet真的是使用TreeMap来存储元素的吗?
通过源码分析我们知道TreeSet里面实际上是使用的NavigableMap来存储元素,虽然大部分时候这个map确实是TreeMap,但不是所有时候都是TreeMap。
因为有一个构造方法是TreeSet(NavigableMap<E,Object> m)
,而且这是一个非public方法,通过调用关系我们可以发现这个构造方法都是在自己类中使用的,比如下面这个:
public NavigableSet<E> tailSet(E fromElement, boolean inclusive) {
return new TreeSet<>(m.tailMap(fromElement, inclusive));
}
而这个m我们姑且认为它是TreeMap,也就是调用TreeMap的tailMap()方法:
public NavigableMap<K,V> tailMap(K fromKey, boolean inclusive) {
return new AscendingSubMap<>(this,
false, fromKey, inclusive,
true, null, true);
}
可以看到,返回的是AscendingSubMap对象,这个类的继承链是怎么样的呢?
可以看到,这个类并没有继承TreeMap,不过通过源码分析也可以看出来这个类是组合了TreeMap,也算和TreeMap有点关系,只是不是继承关系。
所以,TreeSet的底层不完全是使用TreeMap来实现的,更准确地说,应该是NavigableMap。
TreeSet是有序的吗?怎么个有序法?
有序的,TreeSet实现了SortedSet接口,它的有序性主要依赖于NavigableMap的有序性,而NavigableMap又继承自SortedMap,这个接口的有序性是指按照key的自然排序保证的有序性,而key的自然排序又有两种实现方式,一种是key实现Comparable接口,一种是构造方法传入Comparator比较器。
TreeSet和LinkedHashSet有何不同?
LinkedHashSet并没有实现SortedSet接口,它的有序性主要依赖于LinkedHashMap的有序性,所以它的有序性是指按照插入顺序保证的有序性;
而TreeSet实现了SortedSet接口,它的有序性主要依赖于NavigableMap的有序性,而NavigableMap又继承自SortedMap,这个接口的有序性是指按照key的自然排序保证的有序性,而key的自然排序又有两种实现方式,一种是key实现Comparable接口,一种是构造方法传入Comparator比较器。
TreeSet和SortedSet有什么区别和联系?
TreeSet实现了NavigableSet接口,而NavigableSet继承自SortedSet接口;TreeSet实现了SortedSet接口;
CopyOnWriteArraySet是用Map实现的吗?
CopyOnWriteArraySet底层是使用CopyOnWriteArrayList存储元素的,所以它并不是使用Map来存储元素的。
CopyOnWriteArraySet是有序的吗?怎么个有序法?
有序,因为底层是CopyOnWriteArrayList存储元素的,所以是个数组。
CopyOnWriteArraySet怎么保证并发安全?
CopyOnWriteArraySet是并发安全的,而且实现了读写分离;因为底层是CopyOnWriteArrayList存储元素的,所以见CopyOnWriteArrayList是如何实现并发安全的。
CopyOnWriteArraySet以何种方式保证元素不重复?
在添加元素时调用了CopyOnWriteArrayList的addIfAbsent()方法来保证元素不重复。
CopyOnWriteArraySet通过调用CopyOnWriteArrayList的addIfAbsent()方法来保证元素不重复
具体的见前文
如何比较两个Set中的元素是否完全一致?
假设有两个Set,一个是A,一个是B。
最简单的方式就是判断是否A中的元素都在B中,B中的元素是否都在A中,也就是两次两层循环。
其实,并不需要。
因为Set中的元素并不重复,所以只要先比较两个Set的元素个数是否相等,再作一次两层循环就可以了,需要仔细体味。代码如下:
public class CopyOnWriteArraySetTest {
public static void main(String[] args) {
Set<Integer> set1 = new CopyOnWriteArraySet<>();
set1.add(1);
set1.add(5);
set1.add(2);
set1.add(7);
// set1.add(3);
set1.add(4);
Set<Integer> set2 = new HashSet<>();
set2.add(1);
set2.add(5);
set2.add(2);
set2.add(7);
set2.add(3);
System.out.println(eq(set1, set2));
System.out.println(eq(set2, set1));
}
private static <T> boolean eq(Set<T> set1, Set<T> set2) {
if (set1.size() != set2.size()) {
return false;
}
for (T t : set1) {
// contains相当于一层for循环
if (!set2.contains(t)) {
return false;
}
}
return true;
}
}
如何比较两个List中的元素是否完全相等呢?
我们知道,List中元素是可以重复的,那是不是要做两次两层循环呢?
其实,也不需要做两次两层遍历,一次也可以搞定,设定一个标记数组,标记某个位置的元素是否找到过,请仔细体味。代码如下:
public class ListEqTest {
public static void main(String[] args) {
List<Integer> list1 = new ArrayList<>();
list1.add(1);
list1.add(3);
list1.add(6);
list1.add(3);
list1.add(8);
list1.add(5);
List<Integer> list2 = new ArrayList<>();
list2.add(3);
list2.add(1);
list2.add(3);
list2.add(8);
list2.add(5);
list2.add(6);
System.out.println(eq(list1, list2));
System.out.println(eq(list2, list1));
}
private static <T> boolean eq(List<T> list1, List<T> list2) {
if (list1.size() != list2.size()) {
return false;
}
// 标记某个元素是否找到过,防止重复
boolean matched[] = new boolean[list2.size()];
outer: for (T t : list1) {
for (int i = 0; i < list2.size(); i++) {
// i这个位置没找到过才比较大小
if (!matched[i] && list2.get(i).equals(t)) {
matched[i] = true;
continue outer;
}
}
return false;
}
return true;
}
}
ConcurrentSkipListSet的底层是ConcurrentSkipListMap吗?
ConcurrentSkipListSet底层是通过ConcurrentNavigableMap来实现的,它是一个有序的线程安全的集合。
ConcurrentSkipListSet是有序的吗?怎么个有序法?
ConcurrentSkipListSet有序的,基于元素的自然排序或者通过比较器确定的顺序;
ConcurrentSkipListSet源码?
// 实现了NavigableSet接口,并没有所谓的ConcurrentNavigableSet接口
public class ConcurrentSkipListSet<E>
extends AbstractSet<E>
implements NavigableSet<E>, Cloneable, java.io.Serializable {
private static final long serialVersionUID = -2479143111061671589L;
// 存储使用的map
private final ConcurrentNavigableMap<E,Object> m;
// 初始化
public ConcurrentSkipListSet() {
m = new ConcurrentSkipListMap<E,Object>();
}
// 传入比较器
public ConcurrentSkipListSet(Comparator<? super E> comparator) {
m = new ConcurrentSkipListMap<E,Object>(comparator);
}
// 使用ConcurrentSkipListMap初始化map
// 并将集合c中所有元素放入到map中
public ConcurrentSkipListSet(Collection<? extends E> c) {
m = new ConcurrentSkipListMap<E,Object>();
addAll(c);
}
// 使用ConcurrentSkipListMap初始化map
// 并将有序Set中所有元素放入到map中
public ConcurrentSkipListSet(SortedSet<E> s) {
m = new ConcurrentSkipListMap<E,Object>(s.comparator());
addAll(s);
}
// ConcurrentSkipListSet类内部返回子set时使用的
ConcurrentSkipListSet(ConcurrentNavigableMap<E,Object> m) {
this.m = m;
}
// 克隆方法
public ConcurrentSkipListSet<E> clone() {
try {
@SuppressWarnings("unchecked")
ConcurrentSkipListSet<E> clone =
(ConcurrentSkipListSet<E>) super.clone();
clone.setMap(new ConcurrentSkipListMap<E,Object>(m));
return clone;
} catch (CloneNotSupportedException e) {
throw new InternalError();
}
}
/* ---------------- Set operations -------------- */
// 返回元素个数
public int size() {
return m.size();
}
// 检查是否为空
public boolean isEmpty() {
return m.isEmpty();
}
// 检查是否包含某个元素
public boolean contains(Object o) {
return m.containsKey(o);
}
// 添加一个元素
// 调用map的putIfAbsent()方法
public boolean add(E e) {
return m.putIfAbsent(e, Boolean.TRUE) == null;
}
// 移除一个元素
public boolean remove(Object o) {
return m.remove(o, Boolean.TRUE);
}
// 清空所有元素
public void clear() {
m.clear();
}
// 迭代器
public Iterator<E> iterator() {
return m.navigableKeySet().iterator();
}
// 降序迭代器
public Iterator<E> descendingIterator() {
return m.descendingKeySet().iterator();
}
/* ---------------- AbstractSet Overrides -------------- */
// 比较相等方法
public boolean equals(Object o) {
// Override AbstractSet version to avoid calling size()
if (o == this)
return true;
if (!(o instanceof Set))
return false;
Collection<?> c = (Collection<?>) o;
try {
// 这里是通过两次两层for循环来比较
// 这里是有很大优化空间的,参考上篇文章CopyOnWriteArraySet中的彩蛋
return containsAll(c) && c.containsAll(this);
} catch (ClassCastException unused) {
return false;
} catch (NullPointerException unused) {
return false;
}
}
// 移除集合c中所有元素
public boolean removeAll(Collection<?> c) {
// Override AbstractSet version to avoid unnecessary call to size()
boolean modified = false;
for (Object e : c)
if (remove(e))
modified = true;
return modified;
}
/* ---------------- Relational operations -------------- */
// 小于e的最大元素
public E lower(E e) {
return m.lowerKey(e);
}
// 小于等于e的最大元素
public E floor(E e) {
return m.floorKey(e);
}
// 大于等于e的最小元素
public E ceiling(E e) {
return m.ceilingKey(e);
}
// 大于e的最小元素
public E higher(E e) {
return m.higherKey(e);
}
// 弹出最小的元素
public E pollFirst() {
Map.Entry<E,Object> e = m.pollFirstEntry();
return (e == null) ? null : e.getKey();
}
// 弹出最大的元素
public E pollLast() {
Map.Entry<E,Object> e = m.pollLastEntry();
return (e == null) ? null : e.getKey();
}
/* ---------------- SortedSet operations -------------- */
// 取比较器
public Comparator<? super E> comparator() {
return m.comparator();
}
// 最小的元素
public E first() {
return m.firstKey();
}
// 最大的元素
public E last() {
return m.lastKey();
}
// 取两个元素之间的子set
public NavigableSet<E> subSet(E fromElement,
boolean fromInclusive,
E toElement,
boolean toInclusive) {
return new ConcurrentSkipListSet<E>
(m.subMap(fromElement, fromInclusive,
toElement, toInclusive));
}
// 取头子set
public NavigableSet<E> headSet(E toElement, boolean inclusive) {
return new ConcurrentSkipListSet<E>(m.headMap(toElement, inclusive));
}
// 取尾子set
public NavigableSet<E> tailSet(E fromElement, boolean inclusive) {
return new ConcurrentSkipListSet<E>(m.tailMap(fromElement, inclusive));
}
// 取子set,包含from,不包含to
public NavigableSet<E> subSet(E fromElement, E toElement) {
return subSet(fromElement, true, toElement, false);
}
// 取头子set,不包含to
public NavigableSet<E> headSet(E toElement) {
return headSet(toElement, false);
}
// 取尾子set,包含from
public NavigableSet<E> tailSet(E fromElement) {
return tailSet(fromElement, true);
}
// 降序set
public NavigableSet<E> descendingSet() {
return new ConcurrentSkipListSet<E>(m.descendingMap());
}
// 可分割的迭代器
@SuppressWarnings("unchecked")
public Spliterator<E> spliterator() {
if (m instanceof ConcurrentSkipListMap)
return ((ConcurrentSkipListMap<E,?>)m).keySpliterator();
else
return (Spliterator<E>)((ConcurrentSkipListMap.SubMap<E,?>)m).keyIterator();
}
// 原子更新map,给clone方法使用
private void setMap(ConcurrentNavigableMap<E,Object> map) {
UNSAFE.putObjectVolatile(this, mapOffset, map);
}
// 原子操作相关内容
private static final sun.misc.Unsafe UNSAFE;
private static final long mapOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = ConcurrentSkipListSet.class;
mapOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("m"));
} catch (Exception e) {
throw new Error(e);
}
}
}
什么是堆?什么是堆化?
堆是一种特殊的树,只要满足下面两个条件,它就是一个堆:
(1)堆是一颗完全二叉树;
(2)堆中某个节点的值总是不大于(或不小于)其父节点的值。
其中,我们把根节点最大的堆叫做大顶堆,根节点最小的堆叫做小顶堆。
堆化(向下调整)、向上调整的前提都是:在二叉树中,只有一个位置不满足堆的性质,其它位置都满足堆的性质。
向下调整 是让调整的结点与其孩子节点进行比较
向上调整 是让调整的结点与其父亲结点进行比较
什么是优先级队列?
PriorityQueue是怎么实现的?
// 默认容量
private static final int DEFAULT_INITIAL_CAPACITY = 11;
// 存储元素的地方
transient Object[] queue; // non-private to simplify nested class access
// 元素个数
private int size = 0;
// 比较器
private final Comparator<? super E> comparator;
// 修改次数
transient int modCount = 0; // non-private to simplify nested class access
PriorityQueue是一个小顶堆;
PriorityQueue是有序的吗?
PriorityQueue不是有序的,只有堆顶存储着最小的元素;
PriorityQueue入队、出队的时间复杂度各是多少?
建堆的时间复杂度是O(n);
堆的插入、删除元素的时间复杂度都是O(log n);
入队有两个方法,add(E e)和offer(E e),两者是一致的,add(E e)也是调用的offer(E e)。
public boolean add(E e) {
return offer(e);
}
public boolean offer(E e) {
// 不支持null元素
if (e == null)
throw new NullPointerException();
modCount++;
// 取size
int i = size;
// 元素个数达到最大容量了,扩容
if (i >= queue.length)
grow(i + 1);
// 元素个数加1
size = i + 1;
// 如果还没有元素
// 直接插入到数组第一个位置
// 这里跟我们之前讲堆不一样了
// java里面是从0开始的
// 我们说的堆是从1开始的
if (i == 0)
queue[0] = e;
else
// 否则,插入元素到数组size的位置,也就是最后一个元素的下一位
// 注意这里的size不是数组大小,而是元素个数
// 然后,再做自下而上的堆化
siftUp(i, e);
return true;
}
private void siftUp(int k, E x) {
// 根据是否有比较器,使用不同的方法
if (comparator != null)
siftUpUsingComparator(k, x);
else
siftUpComparable(k, x);
}
@SuppressWarnings("unchecked")
private void siftUpComparable(int k, E x) {
Comparable<? super E> key = (Comparable<? super E>) x;
while (k > 0) {
// 找到父节点的位置
// 因为元素是从0开始的,所以减1之后再除以2
int parent = (k - 1) >>> 1;
// 父节点的值
Object e = queue[parent];
// 比较插入的元素与父节点的值
// 如果比父节点大,则跳出循环
// 否则交换位置
if (key.compareTo((E) e) >= 0)
break;
// 与父节点交换位置
queue[k] = e;
// 现在插入的元素位置移到了父节点的位置
// 继续与父节点再比较
k = parent;
}
// 最后找到应该插入的位置,放入元素
queue[k] = key;
}
(1)入队不允许null元素;
(2)如果数组不够用了,先扩容;
(3)如果还没有元素,就插入下标0的位置;
(4)如果有元素了,就插入到最后一个元素往后的一个位置(实际并没有插入哈);
(5)自下而上堆化,一直往上跟父节点比较;
(6)如果比父节点小,就与父节点交换位置,直到出现比父节点大为止;
(7)由此可见,PriorityQueue是一个小顶堆。
出队有两个方法,remove()和poll(),remove()也是调用的poll(),只是没有元素的时候抛出异常。
public E remove() {
// 调用poll弹出队首元素
E x = poll();
if (x != null)
// 有元素就返回弹出的元素
return x;
else
// 没有元素就抛出异常
throw new NoSuchElementException();
}
@SuppressWarnings("unchecked")
public E poll() {
// 如果size为0,说明没有元素
if (size == 0)
return null;
// 弹出元素,元素个数减1
int s = --size;
modCount++;
// 队列首元素
E result = (E) queue[0];
// 队列末元素
E x = (E) queue[s];
// 将队列末元素删除
queue[s] = null;
// 如果弹出元素后还有元素
if (s != 0)
// 将队列末元素移到队列首
// 再做自上而下的堆化
siftDown(0, x);
// 返回弹出的元素
return result;
}
private void siftDown(int k, E x) {
// 根据是否有比较器,选择不同的方法
if (comparator != null)
siftDownUsingComparator(k, x);
else
siftDownComparable(k, x);
}
@SuppressWarnings("unchecked")
private void siftDownComparable(int k, E x) {
Comparable<? super E> key = (Comparable<? super E>)x;
// 只需要比较一半就行了,因为叶子节点占了一半的元素
int half = size >>> 1; // loop while a non-leaf
while (k < half) {
// 寻找子节点的位置,这里加1是因为元素从0号位置开始
int child = (k << 1) + 1; // assume left child is least
// 左子节点的值
Object c = queue[child];
// 右子节点的位置
int right = child + 1;
if (right < size &&
((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)
// 左右节点取其小者
c = queue[child = right];
// 如果比子节点都小,则结束
if (key.compareTo((E) c) <= 0)
break;
// 如果比最小的子节点大,则交换位置
queue[k] = c;
// 指针移到最小子节点的位置继续往下比较
k = child;
}
// 找到正确的位置,放入元素
queue[k] = key;
}
(1)将队列首元素弹出;
(2)将队列末元素移到队列首;
(3)自上而下堆化,一直往下与最小的子节点比较;
(4)如果比最小的子节点大,就交换位置,再继续与最小的子节点比较;
(5)如果比最小的子节点小,就不用交换位置了,堆化结束;
(6)这就是堆中的删除堆顶元素;
PriorityQueue是否需要扩容?扩容规则呢?
会因为PriorityQueue是无限增长的队列,元素不够用了会扩容,所以添加元素不会失败。
private void grow(int minCapacity) {
// 旧容量
int oldCapacity = queue.length;
// Double size if small; else grow by 50%
// 旧容量小于64时,容量翻倍
// 旧容量大于等于64,容量只增加旧容量的一半
int newCapacity = oldCapacity + ((oldCapacity < 64) ?
(oldCapacity + 2) :
(oldCapacity >> 1));
// overflow-conscious code
// 检查是否溢出
if (newCapacity - MAX_ARRAY_SIZE > 0)
newCapacity = hugeCapacity(minCapacity);
// 创建出一个新容量大小的新数组并把旧数组元素拷贝过去
queue = Arrays.copyOf(queue, newCapacity);
}
private static int hugeCapacity(int minCapacity) {
if (minCapacity < 0) // overflow
throw new OutOfMemoryError();
return (minCapacity > MAX_ARRAY_SIZE) ?
Integer.MAX_VALUE :
MAX_ARRAY_SIZE;
}
(1)当数组比较小(小于64)的时候每次扩容容量翻倍;
(2)当数组比较大的时候每次扩容只增加一半的容量;
ArrayBlockingQueue的实现方式?
ArrayBlockingQueue是java并发包下一个以数组实现的阻塞队列。
// 使用数组存储元素
final Object[] items;
// 取元素的指针
int takeIndex;
// 放元素的指针
int putIndex;
// 元素数量
int count;
// 保证并发访问的锁
final ReentrantLock lock;
// 非空条件
private final Condition notEmpty;
// 非满条件
private final Condition notFull;
ArrayBlockingQueue是否需要扩容?
ArrayBlockingQueue不需要扩容,因为是初始化时指定容量,并循环利用数组;
ArrayBlockingQueue利用takeIndex和putIndex循环利用数组
ArrayBlockingQueue怎么保证线程安全?
利用重入锁和两个条件保证并发安全
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
// 初始化数组
this.items = new Object[capacity];
// 创建重入锁及两个条件
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
入队有四个方法,它们分别是add(E e)、offer(E e)、put(E e)、offer(E e, long timeout, TimeUnit unit),它们有什么区别呢?
public boolean add(E e) {
// 调用父类的add(e)方法
return super.add(e);
}
// super.add(e)
public boolean add(E e) {
// 调用offer(e)如果成功返回true,如果失败抛出异常
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
public boolean offer(E e) {
// 元素不可为空
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 加锁
lock.lock();
try {
if (count == items.length)
// 如果数组满了就返回false
return false;
else {
// 如果数组没满就调用入队方法并返回true
enqueue(e);
return true;
}
} finally {
// 解锁
lock.unlock();
}
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 加锁,如果线程中断了抛出异常
lock.lockInterruptibly();
try {
// 如果数组满了,使用notFull等待
// notFull等待的意思是说现在队列满了
// 只有取走一个元素后,队列才不满
// 然后唤醒notFull,然后继续现在的逻辑
// 这里之所以使用while而不是if
// 是因为有可能多个线程阻塞在lock上
// 即使唤醒了可能其它线程先一步修改了队列又变成满的了
// 这时候需要再次等待
while (count == items.length)
notFull.await();
// 入队
enqueue(e);
} finally {
// 解锁
lock.unlock();
}
}
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
// 加锁
lock.lockInterruptibly();
try {
// 如果数组满了,就阻塞nanos纳秒
// 如果唤醒这个线程时依然没有空间且时间到了就返回false
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
// 入队
enqueue(e);
return true;
} finally {
// 解锁
lock.unlock();
}
}
private void enqueue(E x) {
final Object[] items = this.items;
// 把元素直接放在放指针的位置上
items[putIndex] = x;
// 如果放指针到数组尽头了,就返回头部
if (++putIndex == items.length)
putIndex = 0;
// 数量加1
count++;
// 唤醒notEmpty,因为入队了一个元素,所以肯定不为空了
notEmpty.signal();
}
(1)add(e)时如果队列满了则抛出异常;
(2)offer(e)时如果队列满了则返回false;
(3)put(e)时如果队列满了则使用notFull等待;
(4)offer(e, timeout, unit)时如果队列满了则等待一段时间后如果队列依然满就返回false;
(5)利用放指针循环使用数组来存储元素;
出队有四个方法,它们分别是remove()、poll()、take()、poll(long timeout, TimeUnit unit),它们有什么区别呢?
public E remove() {
// 调用poll()方法出队
E x = poll();
if (x != null)
// 如果有元素出队就返回这个元素
return x;
else
// 如果没有元素出队就抛出异常
throw new NoSuchElementException();
}
public E poll() {
final ReentrantLock lock = this.lock;
// 加锁
lock.lock();
try {
// 如果队列没有元素则返回null,否则出队
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 加锁
lock.lockInterruptibly();
try {
// 如果队列无元素,则阻塞等待在条件notEmpty上
while (count == 0)
notEmpty.await();
// 有元素了再出队
return dequeue();
} finally {
// 解锁
lock.unlock();
}
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
// 加锁
lock.lockInterruptibly();
try {
// 如果队列无元素,则阻塞等待nanos纳秒
// 如果下一次这个线程获得了锁但队列依然无元素且已超时就返回null
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
// 取取指针位置的元素
E x = (E) items[takeIndex];
// 把取指针位置设为null
items[takeIndex] = null;
// 取指针前移,如果数组到头了就返回数组前端循环利用
if (++takeIndex == items.length)
takeIndex = 0;
// 元素数量减1
count--;
if (itrs != null)
itrs.elementDequeued();
// 唤醒notFull条件
notFull.signal();
return x;
}
(1)remove()时如果队列为空则抛出异常;
(2)poll()时如果队列为空则返回null;
(3)take()时如果队列为空则阻塞等待在条件notEmpty上;
(4)poll(timeout, unit)时如果队列为空则阻塞等待一段时间后如果还为空就返回null;
(5)利用取指针循环从数组中取元素;
ArrayBlockingQueue有什么缺点?
a)队列长度固定且必须在初始化时指定,所以使用之前一定要慎重考虑好容量;
b)如果消费速度跟不上入队速度,则会导致提供者线程一直阻塞,且越阻塞越多,非常危险;
c)只使用了一个锁来控制入队出队,效率较低。
LinkedBlockingQueue的实现方式?
LinkedBlockingQueue采用单链表的形式实现;
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
LinkedBlockingQueue是有界的还是无界的队列?
LinkedBlockingQueue是有界队列,不传入容量时默认为最大int值;
public LinkedBlockingQueue() {
// 如果没传容量,就使用最大int值初始化其容量
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
// 初始化head和last指针为空值节点
last = head = new Node<E>(null);
}
LinkedBlockingQueue怎么保证线程安全?
LinkedBlockingQueue采用两把锁的锁分离技术实现入队出队互不阻塞;
// 容量
private final int capacity;
// 元素数量
private final AtomicInteger count = new AtomicInteger();
// 链表头
transient Node<E> head;
// 链表尾
private transient Node<E> last;
// take锁
private final ReentrantLock takeLock = new ReentrantLock();
// notEmpty条件
// 当队列无元素时,take锁会阻塞在notEmpty条件上,等待其它线程唤醒
private final Condition notEmpty = takeLock.newCondition();
// 放锁
private final ReentrantLock putLock = new ReentrantLock();
// notFull条件
// 当队列满了时,put锁会会阻塞在notFull上,等待其它线程唤醒
private final Condition notFull = putLock.newCondition();
LinkedBlockingQueue与ArrayBlockingQueue对比?
a)后者入队出队采用一把锁,导致入队出队相互阻塞,效率低下;
b)前才入队出队采用两把锁,入队出队互不干扰,效率较高;
c)二者都是有界队列,如果长度相等且出队速度跟不上入队速度,都会导致大量线程阻塞;
d)前者如果初始化不传入初始容量,则使用最大int值,如果出队速度跟不上入队速度,会导致队列特别长,占用大量内存;
SynchronousQueue的实现方式?
// Transferer抽象类,主要定义了一个transfer方法用来传输元素
abstract static class Transferer<E> {
abstract E transfer(E e, boolean timed, long nanos);
}
// 以栈方式实现的Transferer
static final class TransferStack<E> extends Transferer<E> {
// 栈中节点的几种类型:
// 1. 消费者(请求数据的)
static final int REQUEST = 0;
// 2. 生产者(提供数据的)
static final int DATA = 1;
// 3. 二者正在撮合中
static final int FULFILLING = 2;
// 栈中的节点
static final class SNode {
// 下一个节点
volatile SNode next; // next node in stack
// 匹配者
volatile SNode match; // the node matched to this
// 等待着的线程
volatile Thread waiter; // to control park/unpark
// 元素
Object item; // data; or null for REQUESTs
// 模式,也就是节点的类型,是消费者,是生产者,还是正在撮合中
int mode;
}
// 栈的头节点
volatile SNode head;
}
// 以队列方式实现的Transferer
static final class TransferQueue<E> extends Transferer<E> {
// 队列中的节点
static final class QNode {
// 下一个节点
volatile QNode next; // next node in queue
// 存储的元素
volatile Object item; // CAS'ed to or from null
// 等待着的线程
volatile Thread waiter; // to control park/unpark
// 是否是数据节点
final boolean isData;
}
// 队列的头节点
transient volatile QNode head;
// 队列的尾节点
transient volatile QNode tail;
}
(1)定义了一个抽象类Transferer,里面定义了一个传输元素的方法;
(2)有两种传输元素的方法,一种是栈,一种是队列;
(3)栈的特点是后进先出,队列的特点是先进行出;
(4)栈只需要保存一个头节点就可以了,因为存取元素都是操作头节点;
(5)队列需要保存一个头节点一个尾节点,因为存元素操作尾节点,取元素操作头节点;
(6)每个节点中保存着存储的元素、等待着的线程,以及下一个节点;
SynchronousQueue真的是无缓冲的吗?
通过源码分析,我们可以发现其实SynchronousQueue内部或者使用栈或者使用队列来存储包含线程和元素值的节点,如果同一个模式的节点过多的话,它们都会存储进来,且都会阻塞着,所以,严格上来说,SynchronousQueue并不能算是一个无缓冲队列。
SynchronousQueue怎么保证线程安全?
我们这里主要介绍以栈方式实现的传输模式,以put(E e)方法为例。
public void put(E e) throws InterruptedException {
// 元素不可为空
if (e == null) throw new NullPointerException();
// 直接调用传输器的transfer()方法
// 三个参数分别是:传输的元素,是否需要超时,超时的时间
if (transferer.transfer(e, false, 0) == null) {
// 如果传输失败,直接让线程中断并抛出中断异常
Thread.interrupted();
throw new InterruptedException();
}
}
调用transferer的transfer()方法,传入元素e,说明是生产者
我们这里主要介绍以栈方式实现的传输模式,以take()方法为例。
public E take() throws InterruptedException {
// 直接调用传输器的transfer()方法
// 三个参数分别是:null,是否需要超时,超时的时间
// 第一个参数为null表示是消费者,要取元素
E e = transferer.transfer(null, false, 0);
// 如果取到了元素就返回
if (e != null)
return e;
// 否则让线程中断并抛出中断异常
Thread.interrupted();
throw new InterruptedException();
}
调用transferer的transfer()方法,传入null,说明是消费者。
transfer()方法
transfer()方法同时实现了取元素和放元素的功能,下面我再来看看这个transfer()方法里究竟干了什么。
// TransferStack.transfer()方法
E transfer(E e, boolean timed, long nanos) {
SNode s = null; // constructed/reused as needed
// 根据e是否为null决定是生产者还是消费者
int mode = (e == null) ? REQUEST : DATA;
// 自旋+CAS,熟悉的套路,熟悉的味道
for (;;) {
// 栈顶元素
SNode h = head;
// 栈顶没有元素,或者栈顶元素跟当前元素是一个模式的
// 也就是都是生产者节点或者都是消费者节点
if (h == null || h.mode == mode) { // empty or same-mode
// 如果有超时而且已到期
if (timed && nanos <= 0) { // can't wait
// 如果头节点不为空且是取消状态
if (h != null && h.isCancelled())
// 就把头节点弹出,并进入下一次循环
casHead(h, h.next); // pop cancelled node
else
// 否则,直接返回null(超时返回null)
return null;
} else if (casHead(h, s = snode(s, e, h, mode))) {
// 入栈成功(因为是模式相同的,所以只能入栈)
// 调用awaitFulfill()方法自旋+阻塞当前入栈的线程并等待被匹配到
SNode m = awaitFulfill(s, timed, nanos);
// 如果m等于s,说明取消了,那么就把它清除掉,并返回null
if (m == s) { // wait was cancelled
clean(s);
// 被取消了返回null
return null;
}
// 到这里说明匹配到元素了
// 因为从awaitFulfill()里面出来要不被取消了要不就匹配到了
// 如果头节点不为空,并且头节点的下一个节点是s
// 就把头节点换成s的下一个节点
// 也就是把h和s都弹出了
// 也就是把栈顶两个元素都弹出了
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
// 根据当前节点的模式判断返回m还是s中的值
return (E) ((mode == REQUEST) ? m.item : s.item);
}
} else if (!isFulfilling(h.mode)) { // try to fulfill
// 到这里说明头节点和当前节点模式不一样
// 如果头节点不是正在撮合中
// 如果头节点已经取消了,就把它弹出栈
if (h.isCancelled()) // already cancelled
casHead(h, h.next); // pop and retry
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
// 头节点没有在撮合中,就让当前节点先入队,再让他们尝试匹配
// 且s成为了新的头节点,它的状态是正在撮合中
for (;;) { // loop until matched or waiters disappear
SNode m = s.next; // m is s's match
// 如果m为null,说明除了s节点外的节点都被其它线程先一步撮合掉了
// 就清空栈并跳出内部循环,到外部循环再重新入栈判断
if (m == null) { // all waiters are gone
casHead(s, null); // pop fulfill node
s = null; // use new node next time
break; // restart main loop
}
SNode mn = m.next;
// 如果m和s尝试撮合成功,就弹出栈顶的两个元素m和s
if (m.tryMatch(s)) {
casHead(s, mn); // pop both s and m
// 返回撮合结果
return (E) ((mode == REQUEST) ? m.item : s.item);
} else // lost match
// 尝试撮合失败,说明m已经先一步被其它线程撮合了
// 就协助清除它
s.casNext(m, mn); // help unlink
}
}
} else { // help a fulfiller
// 到这里说明当前节点和头节点模式不一样
// 且头节点是正在撮合中
SNode m = h.next; // m is h's match
if (m == null) // waiter is gone
// 如果m为null,说明m已经被其它线程先一步撮合了
casHead(h, null); // pop fulfilling node
else {
SNode mn = m.next;
// 协助匹配,如果m和s尝试撮合成功,就弹出栈顶的两个元素m和s
if (m.tryMatch(h)) // help match
// 将栈顶的两个元素弹出后,再让s重新入栈
casHead(h, mn); // pop both h and m
else // lost match
// 尝试撮合失败,说明m已经先一步被其它线程撮合了
// 就协助清除它
h.casNext(m, mn); // help unlink
}
}
}
}
// 三个参数:需要等待的节点,是否需要超时,超时时间
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
// 到期时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 当前线程
Thread w = Thread.currentThread();
// 自旋次数
int spins = (shouldSpin(s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
// 当前线程中断了,尝试清除s
if (w.isInterrupted())
s.tryCancel();
// 检查s是否匹配到了元素m(有可能是其它线程的m匹配到当前线程的s)
SNode m = s.match;
// 如果匹配到了,直接返回m
if (m != null)
return m;
// 如果需要超时
if (timed) {
// 检查超时时间如果小于0了,尝试清除s
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel();
continue;
}
}
if (spins > 0)
// 如果还有自旋次数,自旋次数减一,并进入下一次自旋
spins = shouldSpin(s) ? (spins-1) : 0;
// 后面的elseif都是自旋次数没有了
else if (s.waiter == null)
// 如果s的waiter为null,把当前线程注入进去,并进入下一次自旋
s.waiter = w; // establish waiter so can park next iter
else if (!timed)
// 如果不允许超时,直接阻塞,并等待被其它线程唤醒,唤醒后继续自旋并查看是否匹配到了元素
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
// 如果允许超时且还有剩余时间,就阻塞相应时间
LockSupport.parkNanos(this, nanos);
}
}
// SNode里面的方向,调用者m是s的下一个节点
// 这时候m节点的线程应该是阻塞状态的
boolean tryMatch(SNode s) {
// 如果m还没有匹配者,就把s作为它的匹配者
if (match == null &&
UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
Thread w = waiter;
if (w != null) { // waiters need at most one unpark
waiter = null;
// 唤醒m中的线程,两者匹配完毕
LockSupport.unpark(w);
}
// 匹配到了返回true
return true;
}
// 可能其它线程先一步匹配了m,返回其是否是s
return match == s;
}
整个逻辑比较复杂,这里为了简单起见,屏蔽掉多线程处理的细节,只描述正常业务场景下的逻辑:
(1)如果栈中没有元素,或者栈顶元素跟将要入栈的元素模式一样,就入栈;
(2)入栈后自旋等待一会看有没有其它线程匹配到它,自旋完了还没匹配到元素就阻塞等待;
(3)阻塞等待被唤醒了说明其它线程匹配到了当前的元素,就返回匹配到的元素;
(4)如果两者模式不一样,且头节点没有在匹配中,就拿当前节点跟它匹配,匹配成功了就返回匹配到的元素;
(5)如果两者模式不一样,且头节点正在匹配中,当前线程就协助去匹配,匹配完成了再让当前节点重新入栈重新匹配;
SynchronousQueue的公平模式和非公平模式有什么区别?
SynchronousQueue有两种实现方式,一种是公平(队列)方式,一种是非公平(栈)方式;
public SynchronousQueue() {
// 默认非公平模式
this(false);
}
public SynchronousQueue(boolean fair) {
// 如果是公平模式就使用队列,如果是非公平模式就使用栈
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
SynchronousQueue在高并发情景下会有什么问题?
试想一下,如果有多个生产者,但只有一个消费者,如果消费者处理不过来,是不是生产者都会阻塞起来?反之亦然。
这是一件很危险的事,所以,SynchronousQueue一般用于生产、消费的速度大致相当的情况,这样才不会导致系统中过多的线程处于阻塞状态。
PriorityBlockingQueue的实现方式?
// 默认容量为11
private static final int DEFAULT_INITIAL_CAPACITY = 11;
// 最大数组大小
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
// 存储元素的地方
private transient Object[] queue;
// 元素个数
private transient int size;
// 比较器
private transient Comparator<? super E> comparator;
// 重入锁
private final ReentrantLock lock;
// 非空条件
private final Condition notEmpty;
// 扩容的时候使用的控制变量,CAS更新这个值,谁更新成功了谁扩容,其它线程让出CPU
private transient volatile int allocationSpinLock;
// 不阻塞的优先级队列,非存储元素的地方,仅用于序列化/反序列化时
private PriorityQueue<E> q;
(1)依然是使用一个数组来使用元素;
(2)使用一个锁加一个notEmpty条件来保证并发安全;
(3)使用一个变量的CAS操作来控制扩容;
PriorityBlockingQueue是否需要扩容?
PriorityBlockingQueue扩容时使用一个单独变量的CAS操作来控制只有一个线程进行扩容;
private void tryGrow(Object[] array, int oldCap) {
// 先释放锁,因为是从offer()方法的锁内部过来的
// 这里先释放锁,使用allocationSpinLock变量控制扩容的过程
// 防止阻塞的线程过多
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
// CAS更新allocationSpinLock变量为1的线程获得扩容资格
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
// 旧容量小于64则翻倍,旧容量大于64则增加一半
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
// 判断新容量是否溢出
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
// 创建新数组
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
// 相当于解锁
allocationSpinLock = 0;
}
}
// 只有进入了上面条件的才会满足这个条件
// 意思是让其它线程让出CPU
if (newArray == null) // back off if another thread is allocating
Thread.yield();
// 再次加锁
lock.lock();
// 判断新数组创建成功并且旧数组没有被替换过
if (newArray != null && queue == array) {
// 队列赋值为新数组
queue = newArray;
// 并拷贝旧数组元素到新数组中
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
(1)解锁,解除offer()方法中加的锁;
(2)使用allocationSpinLock变量的CAS操作来控制扩容的过程;
(3)旧容量小于64则翻倍,旧容量大于64则增加一半;
(4)创建新数组;
(5)修改allocationSpinLock为0,相当于解锁;
(6)其它线程在扩容的过程中要让出CPU;
(7)再次加锁;
(8)新数组创建成功,把旧数组元素拷贝过来,并返回到offer()方法中继续添加元素操作;
PriorityBlockingQueue怎么保证线程安全?
PriorityBlockingQueue使用一个锁+一个notEmpty条件控制并发安全;
每个阻塞队列都有四个方法,我们这里只分析一个offer(E e)方法:
public boolean offer(E e) {
// 元素不能为空
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
// 加锁
lock.lock();
int n, cap;
Object[] array;
// 判断是否需要扩容,即元素个数达到了数组容量
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
// 根据是否有比较器选择不同的方法
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
// 插入元素完毕,元素个数加1
size = n + 1;
// 唤醒notEmpty条件
notEmpty.signal();
} finally {
// 解锁
lock.unlock();
}
return true;
}
private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {
// 取父节点
int parent = (k - 1) >>> 1;
// 父节点的元素值
Object e = array[parent];
// 如果key大于父节点,堆化结束
if (key.compareTo((T) e) >= 0)
break;
// 否则,交换二者的位置,继续下一轮比较
array[k] = e;
k = parent;
}
// 找到了应该放的位置,放入元素
array[k] = key;
}
入队的整个操作跟PriorityQueue几乎一致:
(1)加锁;
(2)判断是否需要扩容;
(3)添加元素并做自下而上的堆化;
(4)元素个数加1并唤醒notEmpty条件,唤醒取元素的线程;
(5)解锁;
阻塞队列的出队方法也有四个,我们这里只分析一个take()方法:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 加锁
lock.lockInterruptibly();
E result;
try {
// 队列没有元素,就阻塞在notEmpty条件上
// 出队成功,就跳出这个循环
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
// 解锁
lock.unlock();
}
// 返回出队的元素
return result;
}
private E dequeue() {
// 元素个数减1
int n = size - 1;
if (n < 0)
// 数组元素不足,返回null
return null;
else {
Object[] array = queue;
// 弹出堆顶元素
E result = (E) array[0];
// 把堆尾元素拿到堆顶
E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
// 并做自上而下的堆化
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
// 修改size
size = n;
// 返回出队的元素
return result;
}
}
private static <T> void siftDownComparable(int k, T x, Object[] array,
int n) {
if (n > 0) {
Comparable<? super T> key = (Comparable<? super T>)x;
int half = n >>> 1; // loop while a non-leaf
// 只需要遍历到叶子节点就够了
while (k < half) {
// 左子节点
int child = (k << 1) + 1; // assume left child is least
// 左子节点的值
Object c = array[child];
// 右子节点
int right = child + 1;
// 取左右子节点中最小的值
if (right < n &&
((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
c = array[child = right];
// key如果比左右子节点都小,则堆化结束
if (key.compareTo((T) c) <= 0)
break;
// 否则,交换key与左右子节点中最小的节点的位置
array[k] = c;
k = child;
}
// 找到了放元素的位置,放置元素
array[k] = key;
}
}
出队的过程与PriorityQueue基本类似:
(1)加锁;
(2)判断是否出队成功,未成功就阻塞在notEmpty条件上;
(3)出队时弹出堆顶元素,并把堆尾元素拿到堆顶;
(4)再做自上而下的堆化;
(5)解锁;
PriorityBlockingQueue为什么不需要notFull条件?
因为PriorityBlockingQueue在入队的时候如果没有空间了是会自动扩容的,也就不存在队列满了的状态,也就是不需要等待通知队列不满了可以放元素了,所以也就不需要notFull条件了
什么是双重队列?
放取元素使用同一个队列,队列中的节点具有两种模式,一种是数据节点,一种是非数据节点。
放元素时先跟队列头节点对比,如果头节点是非数据节点,就让他们匹配,如果头节点是数据节点,就生成一个数据节点放在队列尾端(入队)。
取元素时也是先跟队列头节点对比,如果头节点是数据节点,就让他们匹配,如果头节点是非数据节点,就生成一个非数据节点放在队列尾端(入队)。
用图形来表示就是下面这样:
LinkedTransferQueue是怎么实现阻塞队列的?
LinkedTransferQueue可以看作LinkedBlockingQueue、SynchronousQueue(公平模式)、ConcurrentLinkedQueue三者的集合体;
LinkedTransferQueue的实现方式是使用一种叫做双重队列
的数据结构;
static final class Node {
// 是否是数据节点(也就标识了是生产者还是消费者)
final boolean isData; // false if this is a request node
// 元素的值
volatile Object item; // initially non-null if isData; CASed to match
// 下一个节点
volatile Node next;
// 持有元素的线程
volatile Thread waiter; // null until waiting
}
LinkedTransferQueue是怎么控制并发安全的?
LinkedTransferQueue全程都没有使用synchronized、重入锁等比较重的锁,基本是通过 自旋+CAS 实现;
不管是取元素还是放元素都会入队;
先尝试跟头节点比较,如果二者模式不一样,就匹配它们,组成CP,然后返回对方的值;
如果二者模式一样,就入队,并自旋或阻塞等待被唤醒;
至于是否入队及阻塞有四种模式,NOW、ASYNC、SYNC、TIMED;
xfer()
private E xfer(E e, boolean haveData, int how, long nanos) {
// 不允许放入空元素
if (haveData && (e == null))
throw new NullPointerException();
Node s = null; // the node to append, if needed
// 外层循环,自旋,失败就重试
retry:
for (;;) { // restart on append race
// 下面这个for循环用于控制匹配的过程
// 同一时刻队列中只会存储一种类型的节点
// 从头节点开始尝试匹配,如果头节点被其它线程先一步匹配了
// 就再尝试其下一个,直到匹配到为止,或者到队列中没有元素为止
for (Node h = head, p = h; p != null;) { // find & match first node
// p节点的模式
boolean isData = p.isData;
// p节点的值
Object item = p.item;
// p没有被匹配到
if (item != p && (item != null) == isData) { // unmatched
// 如果两者模式一样,则不能匹配,跳出循环后尝试入队
if (isData == haveData) // can't match
break;
// 如果两者模式不一样,则尝试匹配
// 把p的值设置为e(如果是取元素则e是null,如果是放元素则e是元素值)
if (p.casItem(item, e)) { // match
// 匹配成功
// for里面的逻辑比较复杂,用于控制多线程同时放取元素时出现竞争的情况的
// 看不懂可以直接跳过
for (Node q = p; q != h;) {
// 进入到这里可能是头节点已经被匹配,然后p会变成h的下一个节点
Node n = q.next; // update by 2 unless singleton
// 如果head还没变,就把它更新成新的节点
// 并把它删除(forgetNext()会把它的next设为自己,也就是从单链表中删除了)
// 这时为什么要把head设为n呢?因为到这里了,肯定head本身已经被匹配掉了
// 而上面的p.casItem()又成功了,说明p也被当前这个元素给匹配掉了
// 所以需要把它们俩都出队列,让其它线程可以从真正的头开始,不用重复检查了
if (head == h && casHead(h, n == null ? q : n)) {
h.forgetNext();
break;
} // advance and retry
// 如果新的头节点为空,或者其next为空,或者其next未匹配,就重试
if ((h = head) == null ||
(q = h.next) == null || !q.isMatched())
break; // unless slack < 2
}
// 唤醒p中等待的线程
LockSupport.unpark(p.waiter);
// 并返回匹配到的元素
return LinkedTransferQueue.<E>cast(item);
}
}
// p已经被匹配了或者尝试匹配的时候失败了
// 也就是其它线程先一步匹配了p
// 这时候又分两种情况,p的next还没来得及修改,p的next指向了自己
// 如果p的next已经指向了自己,就重新取head重试,否则就取其next重试
Node n = p.next;
p = (p != n) ? n : (h = head); // Use head if p offlist
}
// 到这里肯定是队列中存储的节点类型和自己一样
// 或者队列中没有元素了
// 就入队(不管放元素还是取元素都得入队)
// 入队又分成四种情况:
// NOW,立即返回,没有匹配到立即返回,不做入队操作
// ASYNC,异步,元素入队但当前线程不会阻塞(相当于无界LinkedBlockingQueue的元素入队)
// SYNC,同步,元素入队后当前线程阻塞,等待被匹配到
// TIMED,有超时,元素入队后等待一段时间被匹配,时间到了还没匹配到就返回元素本身
// 如果不是立即返回
if (how != NOW) { // No matches available
// 新建s节点
if (s == null)
s = new Node(e, haveData);
// 尝试入队
Node pred = tryAppend(s, haveData);
// 入队失败,重试
if (pred == null)
continue retry; // lost race vs opposite mode
// 如果不是异步(同步或者有超时)
// 就等待被匹配
if (how != ASYNC)
return awaitMatch(s, pred, e, (how == TIMED), nanos);
}
return e; // not waiting
}
}
private Node tryAppend(Node s, boolean haveData) {
// 从tail开始遍历,把s放到链表尾端
for (Node t = tail, p = t;;) { // move p to last node and append
Node n, u; // temps for reads of next & tail
// 如果首尾都是null,说明链表中还没有元素
if (p == null && (p = head) == null) {
// 就让首节点指向s
// 注意,这里插入第一个元素的时候tail指针并没有指向s
if (casHead(null, s))
return s; // initialize
}
else if (p.cannotPrecede(haveData))
// 如果p无法处理,则返回null
// 这里无法处理的意思是,p和s节点的类型不一样,不允许s入队
// 比如,其它线程先入队了一个数据节点,这时候要入队一个非数据节点,就不允许,
// 队列中所有的元素都要保证是同一种类型的节点
// 返回null后外面的方法会重新尝试匹配重新入队等
return null; // lost race vs opposite mode
else if ((n = p.next) != null) // not last; keep traversing
// 如果p的next不为空,说明不是最后一个节点
// 则让p重新指向最后一个节点
p = p != t && t != (u = tail) ? (t = u) : // stale tail
(p != n) ? n : null; // restart if off list
else if (!p.casNext(null, s))
// 如果CAS更新s为p的next失败
// 则说明有其它线程先一步更新到p的next了
// 就让p指向p的next,重新尝试让s入队
p = p.next; // re-read on CAS failure
else {
// 到这里说明s成功入队了
// 如果p不等于t,就更新tail指针
// 还记得上面插入第一个元素时tail指针并没有指向新元素吗?
// 这里就是用来更新tail指针的
if (p != t) { // update if slack now >= 2
while ((tail != t || !casTail(t, s)) &&
(t = tail) != null &&
(s = t.next) != null && // advance and retry
(s = s.next) != null && s != t);
}
// 返回p,即s的前一个元素
return p;
}
}
}
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
// 如果是有超时的,计算其超时时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 当前线程
Thread w = Thread.currentThread();
// 自旋次数
int spins = -1; // initialized after first item and cancel checks
// 随机数,随机让一些自旋的线程让出CPU
ThreadLocalRandom randomYields = null; // bound if needed
for (;;) {
Object item = s.item;
// 如果s元素的值不等于e,说明它被匹配到了
if (item != e) { // matched
// assert item != s;
// 把s的item更新为s本身
// 并把s中的waiter置为空
s.forgetContents(); // avoid garbage
// 返回匹配到的元素
return LinkedTransferQueue.<E>cast(item);
}
// 如果当前线程中断了,或者有超时的到期了
// 就更新s的元素值指向s本身
if ((w.isInterrupted() || (timed && nanos <= 0)) &&
s.casItem(e, s)) { // cancel
// 尝试解除s与其前一个节点的关系
// 也就是删除s节点
unsplice(pred, s);
// 返回元素的值本身,说明没匹配到
return e;
}
// 如果自旋次数小于0,就计算自旋次数
if (spins < 0) { // establish spins at/near front
// spinsFor()计算自旋次数
// 如果前面有节点未被匹配就返回0
// 如果前面有节点且正在匹配中就返回一定的次数,等待
if ((spins = spinsFor(pred, s.isData)) > 0)
// 初始化随机数
randomYields = ThreadLocalRandom.current();
}
else if (spins > 0) { // spin
// 还有自旋次数就减1
--spins;
// 并随机让出CPU
if (randomYields.nextInt(CHAINED_SPINS) == 0)
Thread.yield(); // occasionally yield
}
else if (s.waiter == null) {
// 更新s的waiter为当前线程
s.waiter = w; // request unpark then recheck
}
else if (timed) {
// 如果有超时,计算超时时间,并阻塞一定时间
nanos = deadline - System.nanoTime();
if (nanos > 0L)
LockSupport.parkNanos(this, nanos);
}
else {
// 不是超时的,直接阻塞,等待被唤醒
// 唤醒后进入下一次循环,走第一个if的逻辑就返回匹配的元素了
LockSupport.park(this);
}
}
}
这三个方法里的内容特别复杂,很大一部分代码都是在控制线程安全,各种CAS,我们这里简单描述一下大致的逻辑:
(1)来了一个元素,我们先查看队列头的节点,是否与这个元素的模式一样;
(2)如果模式不一样,就尝试让他们匹配,如果头节点被别的线程先匹配走了,就尝试与头节点的下一个节点匹配,如此一直往后,直到匹配到或到链表尾为止;
(3)如果模式一样,或者到链表尾了,就尝试入队;
(4)入队的时候有可能链表尾修改了,那就尾指针后移,再重新尝试入队,依此往复;
(5)入队成功了,就自旋或阻塞,阻塞了就等待被其它线程匹配到并唤醒;
(6)唤醒之后进入下一次循环就匹配到元素了,返回匹配到的元素;
(7)是否需要入队及阻塞有四种情况:
a)NOW,立即返回,没有匹配到立即返回,不做入队操作
对应的方法有:poll()、tryTransfer(e)
b)ASYNC,异步,元素入队但当前线程不会阻塞(相当于无界LinkedBlockingQueue的元素入队)
对应的方法有:add(e)、offer(e)、put(e)、offer(e, timeout, unit)
c)SYNC,同步,元素入队后当前线程阻塞,等待被匹配到
对应的方法有:take()、transfer(e)
d)TIMED,有超时,元素入队后等待一段时间被匹配,时间到了还没匹配到就返回元素本身
对应的方法有:poll(timeout, unit)、tryTransfer(e, timeout, unit)
LinkedTransferQueue与SynchronousQueue有什么异同?
(1)在java8中两者的实现方式基本一致,都是使用的双重队列;
(2)前者完全实现了后者,但比后者更灵活;
(3)后者不管放元素还是取元素,如果没有可匹配的元素,所在的线程都会阻塞;
(4)前者可以自己控制放元素是否需要阻塞线程,比如使用四个添加元素的方法就不会阻塞线程,只入队元素,使用transfer()会阻塞线程;
(5)取元素两者基本一样,都会阻塞等待有新的元素进入被匹配到;
ConcurrentLinkedQueue是阻塞队列吗?
ConcurrentLinkedQueue不是阻塞队列;ConcurrentLinkedQueue只实现了Queue接口,并没有实现BlockingQueue接口,所以它不是阻塞队列,也不能用于线程池中,但是它是线程安全的,可用于多线程环境中。
ConcurrentLinkedQueue如何保证并发安全?
能
因为它不是阻塞队列,所以只有两个入队的方法,add(e)和offer(e)。
因为是无界队列,所以add(e)方法也不用抛出异常了。
public boolean add(E e) {
return offer(e);
}
public boolean offer(E e) {
// 不能添加空元素
checkNotNull(e);
// 新节点
final Node<E> newNode = new Node<E>(e);
// 入队到链表尾
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
// 如果没有next,说明到链表尾部了,就入队
if (q == null) {
// CAS更新p的next为新节点
// 如果成功了,就返回true
// 如果不成功就重新取next重新尝试
if (p.casNext(null, newNode)) {
// 如果p不等于t,说明有其它线程先一步更新tail
// 也就不会走到q==null这个分支了
// p取到的可能是t后面的值
// 把tail原子更新为新节点
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
// 返回入队成功
return true;
}
}
else if (p == q)
// 如果p的next等于p,说明p已经被删除了(已经出队了)
// 重新设置p的值
p = (t != (t = tail)) ? t : head;
else
// t后面还有值,重新设置p的值
p = (p != t && t != (t = tail)) ? t : q;
}
}
入队整个流程还是比较清晰的,这里有个前提是出队时会把出队的那个节点的next设置为节点本身。
(1)定位到链表尾部,尝试把新节点到后面;
(2)如果尾部变化了,则重新获取尾部,再重试;
因为它不是阻塞队列,所以只有两个出队的方法,remove()和poll()。
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
public E poll() {
restartFromHead:
for (;;) {
// 尝试弹出链表的头节点
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
// 如果节点的值不为空,并且将其更新为null成功了
if (item != null && p.casItem(item, null)) {
// 如果头节点变了,则不会走到这个分支
// 会先走下面的分支拿到新的头节点
// 这时候p就不等于h了,就更新头节点
// 在updateHead()中会把head更新为新节点
// 并让head的next指向其自己
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
// 上面的casItem()成功,就可以返回出队的元素了
return item;
}
// 下面三个分支说明头节点变了
// 且p的item肯定为null
else if ((q = p.next) == null) {
// 如果p的next为空,说明队列中没有元素了
// 更新h为p,也就是空元素的节点
updateHead(h, p);
// 返回null
return null;
}
else if (p == q)
// 如果p等于p的next,说明p已经出队了,重试
continue restartFromHead;
else
// 将p设置为p的next
p = q;
}
}
}
// 更新头节点的方法
final void updateHead(Node<E> h, Node<E> p) {
// 原子更新h为p成功后,延迟更新h的next为它自己
// 这里用延迟更新是安全的,因为head节点已经变了
// 只要入队出队的时候检查head有没有变化就行了,跟它的next关系不大
if (h != p && casHead(h, p))
h.lazySetNext(h);
}
出队的整个逻辑也是比较清晰的:
(1)定位到头节点,尝试更新其值为null;
(2)如果成功了,就成功出队;
(3)如果失败或者头节点变化了,就重新寻找头节点,并重试;
(4)整个出队过程没有一点阻塞相关的代码,所以出队的时候不会阻塞线程,没找到元素就返回null;
ConcurrentLinkedQueue能用于线程池吗?
ConcurrentLinkedQueue不能用在线程池中;因为不是阻塞队列。
ConcurrentLinkedQueue与LinkedBlockingQueue对比?
(1)两者都是线程安全的队列;
(2)两者都可以实现取元素时队列为空直接返回null,后者的poll()方法可以实现此功能;
(3)前者全程无锁,后者全部都是使用重入锁控制的;
(4)前者效率较高,后者效率较低;
(5)前者无法实现如果队列为空等待元素到来的操作;
(6)前者是非阻塞队列,后者是阻塞队列;
(7)前者无法用在线程池中,后者可以;
DelayQueue是阻塞-队列吗?
从继承体系可以看到,DelayQueue实现了BlockingQueue,所以它是一个阻塞队列。
DelayQueue的实现方式?
DelayQueue内部存储结构使用优先级队列;
// 用于控制并发的锁
private final transient ReentrantLock lock = new ReentrantLock();
// 优先级队列
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 用于标记当前是否有线程在排队(仅用于取元素时)
private Thread leader = null;
// 条件,用于表示现在是否有可取的元素
private final Condition available = lock.newCondition();
DelayQueue如何保证并发安全?
DelayQueue使用重入锁和条件来控制并发安全;
因为DelayQueue是阻塞队列,且优先级队列是无界的,所以入队不会阻塞不会超时,因此它的四个入队方法是一样的。
public boolean add(E e) {
return offer(e);
}
public void put(E e) {
offer(e);
}
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e);
}
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
入队方法比较简单:
(1)加锁;
(2)添加元素到优先级队列中;
(3)如果添加的元素是堆顶元素,就把leader置为空,并唤醒等待在条件available上的线程;
(4)解锁;
因为DelayQueue是阻塞队列,所以它的出队有四个不同的方法,有抛出异常的,有阻塞的,有不阻塞的,有超时的。
我们这里主要分析两个,poll()和take()方法。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return q.poll();
} finally {
lock.unlock();
}
}
poll()方法比较简单:
(1)加锁;
(2)检查第一个元素,如果为空或者还没到期,就返回null;
(3)如果第一个元素到期了就调用poll()弹出第一个元素;
(4)解锁。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 堆顶元素
E first = q.peek();
// 如果堆顶元素为空,说明队列中还没有元素,直接阻塞等待
if (first == null)
available.await();
else {
// 堆顶元素的到期时间
long delay = first.getDelay(NANOSECONDS);
// 如果小于0说明已到期,直接调用poll()方法弹出堆顶元素
if (delay <= 0)
return q.poll();
// 如果delay大于0 ,则下面要阻塞了
// 将first置为空方便gc,因为有可能其它元素弹出了这个元素
// 这里还持有着引用不会被清理
first = null; // don't retain ref while waiting
// 如果前面有其它线程在等待,直接进入等待
if (leader != null)
available.await();
else {
// 如果leader为null,把当前线程赋值给它
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 等待delay时间后自动醒过来
// 醒过来后把leader置空并重新进入循环判断堆顶元素是否到期
// 这里即使醒过来后也不一定能获取到元素
// 因为有可能其它线程先一步获取了锁并弹出了堆顶元素
// 条件锁的唤醒分成两步,先从Condition的队列里出队
// 再入队到AQS的队列中,当其它线程调用LockSupport.unpark(t)的时候才会真正唤醒
// 关于AQS我们后面会讲的^^
available.awaitNanos(delay);
} finally {
// 如果leader还是当前线程就把它置为空,让其它线程有机会获取元素
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 成功出队后,如果leader为空且堆顶还有元素,就唤醒下一个等待的线程
if (leader == null && q.peek() != null)
// signal()只是把等待的线程放到AQS的队列里面,并不是真正的唤醒
available.signal();
// 解锁,这才是真正的唤醒
lock.unlock();
}
}
take()方法稍微要复杂一些:
(1)加锁;
(2)判断堆顶元素是否为空,为空的话直接阻塞等待;
(3)判断堆顶元素是否到期,到期了直接poll()出元素;
(4)没到期,再判断前面是否有其它线程在等待,有则直接等待;
(5)前面没有其它线程在等待,则把自己当作第一个线程等待delay时间后唤醒,再尝试获取元素;
(6)获取到元素之后再唤醒下一个等待的线程;
(7)解锁;
DelayQueue主要用于什么场景?
DelayQueue常用于定时任务
java中的线程池实现定时任务是直接用的DelayQueue吗?
当然不是,ScheduledThreadPoolExecutor中使用的是它自己定义的内部类DelayedWorkQueue,其实里面的实现逻辑基本都是一样的,只不过DelayedWorkQueue里面没有使用现在的PriorityQueue,而是使用数组又实现了一遍优先级队列,本质上没有什么区别。
什么是双端队列?
双端队列是一种特殊的队列,它的两端都可以进出元素,故而得名双端队列。
ArrayDeque是怎么实现双端队列的?
ArrayDeque是一种以数组方式实现的双端队列,它是非线程安全的。
// 存储元素的数组
transient Object[] elements; // non-private to simplify nested class access
// 队列头位置
transient int head;
// 队列尾位置
transient int tail;
// 最小初始容量
private static final int MIN_INITIAL_CAPACITY = 8;
LinkedList与ArrayDeque的对比?
ArrayList、ArrayDeque内部以数组的形式保存集合中的元素,因此随机访问元素时有较好的性能;而LinkedList内部以链表的形式来保存集合中的元素,因此随机访问集合中的元素时虽然性能较差,但在插入、删除元素时性能非常出色(只需要改变指针所指的地址即可)。需要指出的是,虽然Vector也是以数组的形式来存储
双端队列是否可以作为栈使用?
可以
ArrayDeque可以直接作为栈使用;
LinkedList是List和Deque的集合体?
是
LinkedList在功能上等于ArrayList + ArrayDeque;
集合指定初始容量的好处?
集合是我们在Java编程中使用非常广泛的,它就像大海,海纳百川,像万能容器,盛装万物,而且这个大海,万能容器还可以无限变大(如果条件允许)。当这个海、容器的量变得非常大的时候,它的初始容量就会显得很重要了,因为挖海、扩容是需要消耗大量的人力物力财力的。同样的道理,Collection的初始容量也显得异常重要。所以:对于已知的情景,请为集合指定初始容量。
如ArrayList每次新增一个元素,就会检测ArrayList的当前容量是否已经到达临界点,如果到达临界点则会扩容1.5倍。然而ArrayList的扩容以及数组的拷贝生成新的数组是相当耗资源的。所以若我们事先已知集合的使用场景,知道集合的大概范围,我们最好是指定初始化容量,这样对资源的利用会更加好,尤其是大数据量的前提下,效率的提升和资源的利用会显得更加具有优势。