java并发工具类-Semaphore

img

Semaphore简介

Semaphore也叫信号量,在JDK1.5被引入,可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源。

Semaphore内部维护了一组虚拟的许可,许可的数量可以通过构造函数的参数指定。

  • 访问特定资源前,必须使用acquire方法获得许可,如果许可数量为0,该线程则一直阻塞,直到有可用许可。
  • 访问资源后,使用release释放许可。

Semaphore和ReentrantLock类似,获取许可有公平策略和非公平许可策略,默认情况下使用非公平策略。

通俗的讲

Semaphore是一种在多线程环境下使用的设施,该设施负责协调各个线程,以保证它们能够正确、合理的使用公共资源的设施,也是操作系统中用于控制进程同步互斥的量。Semaphore是一种计数信号量,用于管理一组资源,内部是基于AQS的共享模式。它相当于给线程规定一个量从而控制允许活动的线程数。

是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源

应用场景

Semaphore 是 synchronized 的加强版,作用是控制线程的并发数量。就这一点而言,单纯的synchronized 关键字是实现不了的。

Semaphore可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接。假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程并发地读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这时我们必须控制只有10个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,就可以使用Semaphore来做流量控制。

工作原理

以一个停车场是运作为例。为了简单起见,假设停车场只有三个车位,一开始三个车位都是空的。这时如果同时来了五辆车,看门人允许其中三辆不受阻碍的进入,然后放下车拦,剩下的车则必须在入口等待,此后来的车也都不得不在入口处等待。这时,有一辆车离开停车场,看门人得知后,打开车拦,放入一辆,如果又离开两辆,则又可以放入两辆,如此往复。这个停车系统中,每辆车就好比一个线程,看门人就好比一个信号量,看门人限制了可以活动的线程。假如里面依然是三个车位,但是看门人改变了规则,要求每次只能停两辆车,那么一开始进入两辆车,后面得等到有车离开才能有车进入,但是得保证最多停两辆车。对于Semaphore类而言,就如同一个看门人,限制了可活动的线程数。

Semaphore主要方法

构造方法

创建具有给定许可数的计数信号量并设置为非公平信号量

public Semaphore(int permits)

当fair等于true时,创建具有给定许可数的计数信号量并设置为公平信号量。

public Semaphore(int permits, boolean fair)
其他方法

从此信号量获取一个许可前线程将一直阻塞。相当于一辆车占了一个车位

public void acquire() throws InterruptedException

从此信号量获取给定数目许可,在提供这些许可前一直将线程阻塞。比如n=2,就相当于一辆车占了两个车位。

public void acquire(int permits) throws InterruptedException

释放一个许可,将其返回给信号量。就如同车开走返回一个车位。

public void release()

释放n个许可

public void release(int permits)

获取当前可用许可数

public int availablePermits()

代码实现

public class SemaphoreTest {
    private static ExecutorService executorService = Executors.newCachedThreadPool();

    private static Random random = new Random();

    //阻塞队列
    private static BlockingQueue<String> parks = new LinkedBlockingQueue<>(5);


    public static void execute(Semaphore semaphore) {
        //获取一个随机数
        long sleepTime = random.nextInt(10);
        long threadId = Thread.currentThread().getId();
        String park = null;
        try {
            /**
             * 获取许可,首先判断semaphore内部的数字是否大于0,如果大于0,
             * 才能获得许可,然后将初始值5减去1,线程才会接着去执行;如果没有
             * 获得许可(原因是因为已经有5个线程获得到许可,semaphore内部的数字为0),
             * 线程会阻塞直到已经获得到许可的线程,调用release()方法,释放掉许可,
             * 也就是将semaphore内部的数字加1,该线程才有可能获得许可。
             */
            semaphore.acquire();
            /**
             *  对应的线程会到阻塞对,对应车辆去获取到车位,如果没有拿到一致阻塞,
             *  直到其他车辆归还车位。
             */
            park = parks.take();
            System.out.println("线程ID" + threadId + ",开始占用车位:" + park + ",当前剩余车位" + semaphore.availablePermits());

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        try {
            //睡眠随机秒
            Thread.sleep(sleepTime * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //归还车位
        parks.offer(park);
        System.out.println("线程ID" + threadId + ",开始归还车位:" + park + ",共占用" + sleepTime + "秒");
        //线程释放掉许可,通俗来将就是将semaphore内部的数字加1
        semaphore.release();
    }

    public static void main(String[] args) {
        //初始化线程数量
        int threadNum = 100;
        parks.offer("车位一");
        parks.offer("车位二");
        parks.offer("车位三");
        parks.offer("车位四");
        parks.offer("车位五");


        // 初始化5个许可证
        Semaphore semaphore = new Semaphore(5);
        for (int i = 0; i < threadNum; i++) {
            executorService.submit(() -> {
                execute(semaphore);
            });
        }
    }
}

注意事项

我们知道可以通过信号量控制共享资源的访问,底层还是AQS这一套,这没什么难的。但是有一点可能被大家忽略:声明信号量的时候,比如只有3个许可证,但是运行过程中,某个时刻的许可证数量是没有限制的。

public static void main(String[] args) {
       //初始化线程数量
       int threadNum = 100;
       parks.offer("车位一");
       parks.offer("车位二");
       parks.offer("车位三");
       parks.offer("车位四");
       parks.offer("车位五");


       // 初始化 0 个许可证 应该是不可以 放行的
       Semaphore semaphore = new Semaphore(0);
       /**
        * 释放了5个许可证,Semaphore(5) 效果是一样的
        * 实际使用中注意不能过多的释放release
        */
       semaphore.release(5);
       for (int i = 0; i < threadNum; i++) {
           executorService.submit(() -> {
               execute(semaphore);
           });
       }
   }

即使创建信号量的时候,指定了信号量的大小。但是在通过 release()操作释放信号量任然能超过配置的大小。也就有可能同时执行的线程数量比最开始设置的要大。
没有任何线程获取信号量的时候,依然能够释放并且释放的有效。

推荐的做法是一个线程先 acquire 然后 release。如果释放线程和获取线程不是同一个,那么最好保证这种对应关系。不要释放过多的许可证。