JAVA中的死锁

img

什么是死锁

在多线程环境中,多个进程可以竞争有限数量的资源。当一个进程申请资源时,如果这时没有可用资源,那么这个进程进入等待状态。有时,如果所申请的资源被其他等待进程占有,那么该等待进程有可能再也无法改变状态。这种情况称为死锁

在Java中使用多线程,就会有可能导致死锁问题。死锁会让程序一直住,不再程序往下执行。我们只能通过中止并重启的方式来让程序重新执行。

造成死锁的原因

  • 当前线程拥有其他线程需要的资源
  • 当前线程等待其他线程已拥有的资源
  • 都不放弃自己拥有的资源

死锁的必要条件

互斥

进程要求对所分配的资源(如打印机)进行排他性控制,即在一段时间内某资源仅为一个进程所占有。此时若有其他进程请求该资源,则请求进程只能等待。

不可剥夺

进程所获得的资源在未使用完毕之前,不能被其他进程强行夺走,即只能由获得该资源的进程自己来释放(只能是主动释放)。

请求与保持

进程已经保持了至少一个资源,但又提出了新的资源请求,而该资源已被其他进程占有,此时请求进程被阻塞,但对自己已获得的资源保持不放。

循环等待

是指进程发生死锁后,必然存在一个进程–资源之间的环形链,通俗讲就是你等我的资源,我等你的资源,大家一直等。

死锁的分类

静态顺序型死锁

线程之间形成相互等待资源的环时,就会形成顺序死锁lock-ordering deadlock,多个线程试图以不同的顺序来获取相同的锁时,容易形成顺序死锁,如果所有线程以固定的顺序来获取锁,就不会出现顺序死锁问题

经典案例是LeftRightDeadlock,两个方法,分别是leftRigth、rightLeft。如果一个线程调用leftRight,另一个线程调用rightLeft,且两个线程是交替执行的,就会发生死锁。

public class LeftRightDeadLock {

    //左边锁
    private static Object left = new Object();
    //右边锁
    private static Object right = new Object();

    /**
     * 现持有左边的锁,然后获取右边的锁
     */
    public static void leftRight() {
        synchronized (left) {
            System.out.println("leftRigth: left lock,threadId:" + Thread.currentThread().getId());
            //休眠增加死锁产生的概率
            sleep(100);
            synchronized (right) {
                System.out.println("leftRigth: right lock,threadId:" + Thread.currentThread().getId());
            }
        }
    }

    /**
     * 现持有右边的锁,然后获取左边的锁
     */
    public static void rightLeft() {
        synchronized (right) {
            System.out.println("rightLeft: right lock,threadId:" + Thread.currentThread().getId());
            //休眠增加死锁产生的概率
            sleep(100);
            synchronized (left) {
                System.out.println("rightLeft: left lock,threadId:" + Thread.currentThread().getId());
            }
        }
    }

    /**
     * 休眠
     *
     * @param time
     */
    private static void sleep(long time) {
        try {
            Thread.sleep(time);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        //创建一个线程池
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        executorService.execute(() -> leftRight());
        executorService.execute(() -> rightLeft());
        executorService.shutdown();
    }
}

输出

leftRigth: left lock,threadId:12
rightLeft: right lock,threadId:13

我们发现,12号线程锁住了左边要向右边获取锁,13号锁住了右边,要向左边获取锁,因为两边都不释放自己的锁,互不相让,就产生了死锁。

解决方案

固定加锁的顺序(针对锁顺序死锁)

只要交换下锁的顺序,让线程来了之后先获取同一把锁,获取不到就等待,等待上一个线程释放锁再获取锁。

public static void leftRigth() {
       synchronized (left) {
         ...
           synchronized (right) {
            ...
           }
       }
   }

   public static void rightLeft() {
       synchronized (left) {
         ...
           synchronized (right) {
            ...
           }
       }
   }

动态锁顺序型死锁

由于方法入参由外部传递而来,方法内部虽然对两个参数按照固定顺序进行加锁,但是由于外部传递时顺序的不可控,而产生锁顺序造成的死锁,即动态锁顺序死锁。

上例告诉我们,交替的获取锁会导致死锁,且锁是固定的。有时候并锁的执行顺序并不那么清晰,参数导致不同的执行顺序。经典案例是银行账户转账,from账户向to账户转账,在转账之前先获取两个账户的锁,然后开始转账,如果这是to账户向from账户转账,角色互换,也会导致锁顺序死锁。

/**
 * 动态顺序型死锁
 * 转账业务
 */
public class TransferMoneyDeadlock {

    public static void transfer(Account from, Account to, int amount) {
        //先锁住转账的账户
        synchronized (from) {
            System.out.println("线程【" + Thread.currentThread().getId() + "】获取【" + from.name + "】账户锁成功");
            //休眠增加死锁产生的概率
            sleep(100);
            //在锁住目标账户
            synchronized (to) {
                System.out.println("线程【" + Thread.currentThread().getId() + "】获取【" + to.name + "】账户锁成功");
                if (from.balance < amount) {
                    System.out.println("余额不足");
                    return;
                } else {
                    from.debit(amount);
                    to.credit(amount);
                    System.out.println("线程【" + Thread.currentThread().getId() + "】从【" + from.name + "】账户转账到【" + to.name + "】账户【" + amount + "】元钱成功");
                }
            }
        }
    }

    private static class Account {
        String name;
        int balance;

        public Account(String name, int balance) {
            this.name = name;
            this.balance = balance;
        }

        void debit(int amount) {
            this.balance = balance - amount;
        }

        void credit(int amount) {
            this.balance = balance + amount;
        }
    }


    /**
     * 休眠
     *
     * @param time
     */
    private static void sleep(long time) {
        try {
            Thread.sleep(time);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        //创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        //创建账户A
        Account A = new Account("A", 100);
        //创建账户B
        Account B = new Account("B", 200);
        //A -> B 的转账
        executorService.execute(() -> transfer(A, B, 5));
        //B -> A 的转账
        executorService.execute(() -> transfer(B, A, 10));
        executorService.shutdown();
    }
}

输出

线程【12】获取【A】账户锁成功
线程【13】获取【B】账户锁成功

然后就没有然后了,产生了死锁,我们发现 因为对象的调用关系,产生了互相锁住资源的问题。

解决方案

根据传入对象的hashCode硬性确定加锁顺序,消除可变性,避免死锁

package com.test.thread.deadlock;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 动态顺序型死锁解决方案
 */
public class TransferMoneyDeadlock {
    /**
     * 监视器,第三把锁,为了方式HASH冲突
     */
    private static Object lock = new Object();

    /**
     * 我们经过上一次得失败,明白了不能依赖参数名称简单的确定锁的顺序,因为参数是
     * 具有动态性的,所以,我们改变一下思路,直接根据传入对象的hashCode()大小来
     * 对锁定顺序进行排序(这里要明白的是如何排序不是关键,有序才是关键)。
     *
     * @param from
     * @param to
     * @param amount
     */
    public static void transfer(Account from, Account to, int amount) {
        /**
         * 这里需要说明一下为什么不使用HashCode()因为HashCode方法可以被重写,
         * 所以,我们无法简单的使用父类或者当前类提供的简单的hashCode()方法,
         * 所以,我们就使用系统提供的identityHashCode()方法,该方法保证无论
         * 你是否重写了hashCode方法,都会在虚拟机层面上调用一个名为JVM_IHashCode
         * 的方法来根据对象的存储地址来获取该对象的hashCode(),HashCode如果不重写
         * 的话,其实也是通过这个虚拟机层面上的方法,JVM_IHashCode()方法实现的
         * 这个方法是用C++实现的。
         */
        int fromHash = System.identityHashCode(from);
        int toHash = System.identityHashCode(to);
        if (fromHash > toHash) {
            //先锁住转账的账户
            synchronized (from) {
                System.out.println("线程【" + Thread.currentThread().getId() + "】获取【" + from.name + "】账户锁成功");
                //休眠增加死锁产生的概率
                sleep(100);
                //在锁住目标账户
                synchronized (to) {
                    System.out.println("线程【" + Thread.currentThread().getId() + "】获取【" + to.name + "】账户锁成功");
                    if (from.balance < amount) {
                        System.out.println("余额不足");
                        return;
                    } else {
                        from.debit(amount);
                        to.credit(amount);
                        System.out.println("线程【" + Thread.currentThread().getId() + "】从【" + from.name + "】账户转账到【" + to.name + "】账户【" + amount + "】元钱成功");
                    }
                }
            }
        } else if (fromHash < toHash) {
            //先锁住转账的账户
            synchronized (to) {
                System.out.println("线程【" + Thread.currentThread().getId() + "】获取【" + from.name + "】账户锁成功");
                //休眠增加死锁产生的概率
                sleep(100);
                //在锁住目标账户
                synchronized (from) {
                    System.out.println("线程【" + Thread.currentThread().getId() + "】获取【" + to.name + "】账户锁成功");
                    if (from.balance < amount) {
                        System.out.println("余额不足");
                        return;
                    } else {
                        from.debit(amount);
                        to.credit(amount);
                        System.out.println("线程【" + Thread.currentThread().getId() + "】从【" + from.name + "】账户转账到【" + to.name + "】账户【" + amount + "】元钱成功");
                    }
                }
            }
        } else {
            //如果传入对象的Hash值相同,那就加让加第三层锁
            synchronized (lock) {
                //先锁住转账的账户
                synchronized (from) {
                    System.out.println("线程【" + Thread.currentThread().getId() + "】获取【" + from.name + "】账户锁成功");
                    //休眠增加死锁产生的概率
                    sleep(100);
                    //在锁住目标账户
                    synchronized (to) {
                        System.out.println("线程【" + Thread.currentThread().getId() + "】获取【" + to.name + "】账户锁成功");
                        if (from.balance < amount) {
                            System.out.println("余额不足");
                            return;
                        } else {
                            from.debit(amount);
                            to.credit(amount);
                            System.out.println("线程【" + Thread.currentThread().getId() + "】从【" + from.name + "】账户转账到【" + to.name + "】账户【" + amount + "】元钱成功");
                        }
                    }
                }
            }
        }

    }

    private static class Account {
        String name;
        int balance;

        public Account(String name, int balance) {
            this.name = name;
            this.balance = balance;
        }

        void debit(int amount) {
            this.balance = balance - amount;
        }

        void credit(int amount) {
            this.balance = balance + amount;
        }
    }


    /**
     * 休眠
     *
     * @param time
     */
    private static void sleep(long time) {
        try {
            Thread.sleep(time);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        //创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        //创建账户A
        Account A = new Account("A", 100);
        //创建账户B
        Account B = new Account("B", 200);
        //A -> B 的转账
        executorService.execute(() -> transfer(A, B, 5));
        //B -> A 的转账
        executorService.execute(() -> transfer(B, A, 10));
        executorService.shutdown();
    }
}

输出

复制线程【12】获取【A】账户锁成功
线程【12】获取【B】账户锁成功
线程【12】从【A】账户转账到【B】账户【5】元钱成功
线程【13】获取【B】账户锁成功
线程【13】获取【A】账户锁成功
线程【13】从【B】账户转账到【A】账户【10】元钱成功

协作对象间的死锁

在协作对象之间可能存在多个锁获取的情况,但是这些获取多个锁的操作并不像在LeftRightDeadLock或transferMoney中那么明显,这两个锁并不一定必须在同一个方法中被获取。如果在持有锁时调用某个外部方法,那么这就需要警惕死锁问题,因为在这个外部方法中可能会获取其他锁,或者阻塞时间过长,导致其他线程无法及时获取当前被持有的锁。

上述两例中,在同一个方法中获取两个锁。实际上,锁并不一定在同一方法中被获取。经典案例,如出租车调度系统。

/**
 * 协作对象间的死锁
 */
public class CoordinateDeadlock {
    /**
     * Taxi 类
     */
    static class Taxi {
        private String location;
        private String destination;
        private Dispatcher dispatcher;

        public Taxi(Dispatcher dispatcher, String destination) {
            this.dispatcher = dispatcher;
            this.destination = destination;
        }

        public synchronized String getLocation() {
            return this.location;
        }

        /**
         * 该方法先获取Taxi的this对象锁后,然后调用Dispatcher类的方法时,又需要获取
         * Dispatcher类的this方法。
         *
         * @param location
         */
        public synchronized void setLocation(String location) {
            this.location = location;
            System.out.println(Thread.currentThread().getName() + " taxi set location:" + location);
            if (this.location.equals(destination)) {
                dispatcher.notifyAvailable(this);
            }
        }
    }

    /**
     * 调度类
     */
    static class Dispatcher {
        private Set<Taxi> taxis;
        private Set<Taxi> availableTaxis;

        public Dispatcher() {
            taxis = new HashSet<Taxi>();
            availableTaxis = new HashSet<Taxi>();
        }

        public synchronized void notifyAvailable(Taxi taxi) {
            System.out.println(Thread.currentThread().getName() + " notifyAvailable.");
            availableTaxis.add(taxi);
        }

        /**
         * 打印当前位置:有死锁风险
         * 持有当前锁的时候,同时调用Taxi的getLocation这个外部方法;而这个外部方法也是需要加锁的
         * reportLocation的锁的顺序与Taxi的setLocation锁的顺序完全相反
         */
        public synchronized void reportLocation() {
            System.out.println(Thread.currentThread().getName() + " report location.");
            for (Taxi t : taxis) {
                t.getLocation();
            }
        }

        public void addTaxi(Taxi taxi) {
            taxis.add(taxi);
        }
    }

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        final Dispatcher dispatcher = new Dispatcher();
        final Taxi taxi = new Taxi(dispatcher, "软件园");
        dispatcher.addTaxi(taxi);
        //先获取dispatcher锁,然后是taxi的锁
        executorService.execute(() -> dispatcher.reportLocation());
        //先获取taxi锁,然后是dispatcher的锁
        executorService.execute(() -> taxi.setLocation("软件园"));
        executorService.shutdown();
    }
}
解决方案

使用开放调用,开放调用指调用该方法不需要持有锁。

开放调用,是指在调用某个方法时不需要持有锁。开放调用可以避免死锁,这种代码更容易编写。上述调度算法完全可以修改为开发调用,修改同步代码块的范围,使其仅用于保护那些涉及共享状态的操作,避免在同步代码块中执行方法调用。修改Dispatcher的reportLocation方法:

setLocation方法
/**
    * 开放调用,不持有锁期间进行外部方法调用
    *
    * @param location
    */
   public void setLocation(String location) {
       synchronized (this) {
           this.location = location;
       }
       System.out.println(Thread.currentThread().getName() + " taxi set location:" + location);
       if (this.location.equals(destination)) {
           dispatcher.notifyAvailable(this);
       }
   }
reportLocation 方法
/**
       * 同步块只包含对共享状态的操作代码
       */
      public synchronized void reportLocation() {
          System.out.println(Thread.currentThread().getName() + " report location.");
          Set<Taxi> taxisCopy;
          synchronized (this) {
              taxisCopy = new HashSet<Taxi>(taxis);
          }
          for (Taxi t : taxisCopy) {
              t.getLocation();
          }
      }

数据库死锁

可能会发生死锁的更复杂的情况是数据库事务。一个数据库事务可能包含许多SQL更新请求,在事务期间更新记录时,将锁定该记录防止其他事务更新,直到这个事务完成。因此,同一事务中的每个更新请求可能会锁定数据库中的某些记录。

如果多个事务同时执行,并且需要更新相同记录,则存在最终陷入死锁的风险。例如:

Transaction 1, request record 1, locks record 1 for update
Transaction 2, request record 2, locks record 2 for update
Transaction 1, request record 2, tries to lock record 2 for update.
Transaction 2, request record 1, tries to lock record 1 for update.

由于加锁是在不同的请求中进行的,并且无法提前知道给定事务所需的全部锁,因此很难检测或防止数据库事务中的死锁。

预防死锁

在某些情况下,可以防止死锁。我将在本文中描述三种技术:

  1. Lock Ordering
  2. Lock Timeout
  3. Deadlock Detection

顺序锁

当多个线程需要相同的一些锁但以不同的顺序获取它们时,会发生死锁。

如果确保所有锁始终由线程以相同的顺序获取,则不会发生死锁。看看这个例子:

Thread 1:
  lock A 
  lock B

Thread 2:
   wait for A
   lock C (when A locked)

Thread 3:
   wait for A
   wait for B
   wait for C

如果线程(如线程3)需要多个锁,则必须按照确定的顺序获取它们。在获得前面的锁之前,它不能获得在锁序列中后面的锁。

例如,线程2或线程3都不能锁定C,直到它们先锁定A. 由于线程1锁定A,因此线程2和3必须先等待锁定A解锁。然后他们必须成功锁定A,然后才能尝试锁定B或C.

顺序锁是一种简单而有效的死锁预防机制。但是,只有在获取任何锁之前先了解所需要的所有锁时才能使用它,而情况并非总是如此。

超时锁

另一种死锁预防机制是对尝试获取锁设置超时,这意味着尝试获取锁的线程只会在放弃之前尝试给定的时间。如果一个线程在给定的超时内没有成功获取所有必要的锁,它将回滚,释放所有锁,等待一段随机的时间,然后重试。等待随机数量的时间使得其他线程拥有获取所需要的锁的机会,从而让应用程序继续运行而不被锁定。

下面是两个线程尝试以不同顺序获取相同的两个锁的示例,其中线程回滚并重试:

Thread 1 locks A
Thread 2 locks B

Thread 1 attempts to lock B but is blocked
Thread 2 attempts to lock A but is blocked

Thread 1's lock attempt on B times out
Thread 1 backs up and releases A as well
Thread 1 waits randomly (e.g. 257 millis) before retrying.

Thread 2's lock attempt on A times out
Thread 2 backs up and releases B as well
Thread 2 waits randomly (e.g. 43 millis) before retrying.

在上面的示例中,线程2将在线程1重新获取锁之前尝试大约200毫秒去获取锁,因此可能成功获取两个锁。然后线程1将在重新尝试获取锁A时继续等待。当线程2完成工作解锁后,线程1也将能够同时获取两个锁(除非线程2或另一个线程获取其中的锁)。

要记住的一个问题是,仅仅因为超时获取锁失败,并不一定意味着线程已经发生死锁。它也可能只意味着持有锁的线程(导致另一个线程超时)需要很长时间才能完成其任务。

此外,如果有足够多的线程竞争相同的资源,即使发生了超时和回滚,他们仍然有可能一次又一次地尝试同时获取资源。在重试之前,等待0到500毫秒的2个线程可能不会发生这种情况,但是对于10或20个线程来说,情况就不同了,这比发生两个线程在重试之前等待相同时间(或者足够接近导致发生此问题)的可能性要高得多。

锁超时机制的一个问题是无法为进入Java中的同步块设置超时,您必须创建自定义锁类或使用Java 5 java.util.concurrency包中的并发结构之一。编写自定义锁并不困难,但它超出了本文的范围。

下面使用j.u.c包中的Lock构造一个获取多个锁的实例:

import util.SleepUtils;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;

/**
 * @author shallowinggg
 */
public class MultiLock {
    /**
     * 请求获取的锁列表
     */
    private List<Lock> locks = new ArrayList<>();

    /**
     * 已经获取到的锁
     */
    private List<Lock> acquiredLocks = new ArrayList<>();

    /**
     * 锁获取超时时间,单位为mills
     */
    private long time = DEFAULT_TIME;

    /**
     * 获取锁失败最长等待重试时间,默认为500ms
     */
    private int maxWaitTime = DEFAULT_MAX_WAIT_TIME;

    /**
     * 生成随机数
     */
    private Random random = new Random();

    private static final int DEFAULT_MAX_WAIT_TIME = 500;

    private static final long DEFAULT_TIME = 100;

    public MultiLock(List<Lock> locks) {
        this.locks = locks;
    }

    public MultiLock(Lock... locks) {
        Collections.addAll(this.locks, locks);
    }

    /**
     * 获取所需要的所有锁
     * 如果无法一次成功获取所有锁,则释放所有已经获取的锁,并且等待一段时间然后重试
     */
    public void lock() {
        final List<Lock> locks = this.locks;
        boolean acquireLock = false;

        retry:
        while (!acquireLock) {
            for (Lock lock : locks) {
                try {
                    if (lock.tryLock(time, TimeUnit.MILLISECONDS)) {
                        acquiredLocks.add(lock);
                    } else {
                        rollback();
                        long waitTime = random.nextInt(maxWaitTime);
                        SleepUtils.mills(waitTime);

                        //此处为了在测试时观察重试,实际应用可以改为记录日志
                        System.out.println(Thread.currentThread().getName() + " retry");
                        continue retry;
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    //如果获取锁的过程中线程被打断,则回滚释放之前获取到的锁
                    if(Thread.currentThread().isInterrupted()) {
                        rollback();
                    }
                }
            }

            //成功获取所有锁,退出循环
            if (locks.size() == acquiredLocks.size()) {
                acquireLock = true;
            }
        }
    }

    /**
     * 释放获取的所有锁
     * 如果尚未成功获取锁,调用unlock()将抛出IllegalMonitorStateException
     */
    public void unlock() {
        if (locks.size() != acquiredLocks.size()) {
            throw new IllegalMonitorStateException("Not acquire all need lock!");
        }
        acquiredLocks.forEach(Lock::unlock);
        acquiredLocks.clear();
    }

    /**
     * 尝试一次获取所有锁
     * @return true 如果获取成功,否则返回false
     */
    public boolean tryLock() {
        final List<Lock> locks = this.locks;

        for (Lock lock : locks) {
            try {
                if (lock.tryLock(time, TimeUnit.NANOSECONDS)) {
                    acquiredLocks.add(lock);
                } else {
                    rollback();
                    return false;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                if(Thread.currentThread().isInterrupted()) {
                    rollback();
                }
            }
        }
        return true;
    }

    /**
     * 失败回滚,释放之前获取的所有锁
     */
    private void rollback() {
        acquiredLocks.forEach(Lock::unlock);
        acquiredLocks.clear();
    }

    public void setTime(long time) {
        this.time = time;
    }

    public long getTime() {
        return time;
    }

    public int getMaxWaitTime() {
        return maxWaitTime;
    }

    public void setMaxWaitTime(int maxWaitTime) {
        this.maxWaitTime = maxWaitTime;
    }

}
import org.junit.Test;
import p5.MultiLock;
import util.SleepUtils;

import java.util.Date;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class MultiLockTest {
    @Test
    public void test() throws Exception {
        Lock lock1 = new ReentrantLock();
        Lock lock2 = new ReentrantLock();
        Lock lock3 = new ReentrantLock();
        Lock lock4 = new ReentrantLock();
        Lock lock5 = new ReentrantLock();
        Lock lock6 = new ReentrantLock();
        Lock lock7 = new ReentrantLock();
        Lock lock8 = new ReentrantLock();

        MultiLock locks1 = new MultiLock(lock1, lock2, lock3, lock4, lock5, lock6, lock7);
        MultiLock locks2 = new MultiLock(lock2, lock3, lock4, lock5, lock6, lock7, lock8);

        Thread thread1 = new Thread(() -> {
            System.out.println("begin acquire locks1 @ " + new Date());
            locks1.lock();
            try {
                System.out.println("acquire locks1 successfully @ " + new Date());
                SleepUtils.sleep(2);
            } finally {
                locks1.unlock();
            }
        }, "Thread-1");
        Thread thread2 = new Thread(() -> {
            System.out.println("begin acquire locks2 @ " + new Date());
            locks2.lock();
            try {
                System.out.println("acquire locks2 successfully @ " + new Date());
                SleepUtils.sleep(2);
            } finally {
                locks2.unlock();
            }
        }, "Thread-2");

        thread1.start();
        thread2.start();

        thread1.join();
        thread2.join();
    }
}

输出如下:

begin acquire locks1 @ Tue Feb 26 22:50:40 CST 2019
begin acquire locks2 @ Tue Feb 26 22:50:40 CST 2019
acquire locks1 successfully @ Tue Feb 26 22:50:40 CST 2019
Thread-2 retry
Thread-2 retry
Thread-2 retry
Thread-2 retry
Thread-2 retry
acquire locks2 successfully @ Tue Feb 26 22:50:42 CST 2019

死锁检测

锁检测是一种较重(开销较大)的死锁防止机制,是针对无法进行顺序锁定,并且锁超时不可行的情况开发出的一种方法。

每次线程获取锁时,都会在线程和锁的数据结构(map,graph等)中标明。另外,每当线程请求锁时,也在该数据结构中被记录。

当线程请求锁但请求被拒绝时,线程可以遍历锁图以检查是否发生了死锁。例如,如果线程A请求锁7,但线程B持有锁7,则线程A可以检查线程B此时是否在请求线程A已持有的任何锁(如果有)。如果线程B请求了,则发生了死锁(线程A已持有锁1,请求锁7,线程B已持有锁7,请求锁1)

当然,死锁场景可能比两个持有彼此锁的线程复杂得多。线程A可能等待线程B,线程B等待线程C,线程C等待线程D,线程D等待线程A。为了使线程A检测到死锁,它必须通过线程B传递检查所有请求的锁。线程A将遍历线程B请求的锁,到线程C,然后到线程D,从中找到是否有线程A本身持有的锁之一,然后它才知道是否发生了死锁。

下面是4个线程(A,B,C和D)获取和请求的锁图。像这样的数据结构,可用于检测死锁。

img

那么如果检测到死锁,线程会做什么?

一种可能的操作是释放所有锁,回滚,等待一段随机时间,然后重试。这类似于更简单的锁超时机制,除了线程仅在实际发生死锁时进行回滚,而不是因为他们的锁请求超时。但是,如果许多线程竞争相同的锁,即使它们回滚并重试,它们也可能反复陷入死锁。

更好的选择是确定或分配线程的优先级,以便只有一个(或几个)线程回滚。其余的线程继续获取他们需要的锁,就像没有发生死锁一样。如果分配给线程的优先级是固定的,则相同的线程将始终具有更高的优先级。为避免这种情况,您可以在检测到死锁时随机分配优先级。

死锁问题排查

拿动态顺序型死锁举例,其他的都一样

public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        Account A = new Account("A", 100);
        Account B = new Account("B", 200);
        executorService.execute(() -> transfer(A, B, 5));
        executorService.execute(() -> transfer(B, A, 10));
        executorService.shutdown();
    }

死锁的现象

系统越来越卡,没有任何报错信息,随机性比较高

排查死锁

使用 jps + jstack
  1. 在 window或linux中使用jps + jstack命令

img

  1. 找到可能发生死锁的类对应的PID

    我们对应的类是TransferMoneyDeadlock PID是 13964

使用jstack -l PID

执行 jstack -l 13964 命令

img

我们观察BLOCKED 就表示阻塞状态

  • pool-1-thread-2 等待锁 <0x00000000d673baa8>并且已经获取了锁 <0x00000000d673baf0>
  • pool-1-thread-1 等待锁 <0x00000000d673baf0> 并且已经获取了锁<0x00000000d673baa8>

我们发现他们互相持有各自的锁,并且想获取对方的锁,这就是明显的死锁。

使用jconsole

使用命令打开jconsole

img

打开jconsole界面工具选择我们需要检测的类

img

选择检查死锁

img

点击检查死锁

img