img

Fork-Join

java下多线程的开发可以我们自己启用多线程,线程池,还可以使用forkjoin,forkjoin可以让我们不去了解诸如Thread,Runnable等相关的知识,只要遵循forkjoin的开发模式,就可以写出很好的多线程并发程序。

Fork-Join 是什么

Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

分而治之

“分而治之” 一直是一个有效的处理大量数据的方法。著名的 MapReduce 也是采取了分而治之的思想。简单来说,就是如果你要处理1000个数据,但是你并不具备处理1000个数据的能力,那么你可以只处理其中的10个,然后,分阶段处理100次,将100次的结果进行合成,那就是最终想要的对原始的1000个数据的处理结果。

同时forkjoin在处理某一类问题时非常的有用,哪一类问题?分而治之的问题。十大计算机经典算法:快速排序、堆排序、归并排序、二分查找、线性查找、

深度优先、广度优先、Dijkstra、动态规划、朴素贝叶斯分类,有几个属于分而治之?3个,快速排序、归并排序、二分查找,还有大数据中M/R都是。

分治法的设计思想是:将一个难以直接解决的大问题,分割成一些规模较小的相同问题,以便各个击破,分而治之。

分治策略是:对于一个规模为n的问题,若该问题可以容易地解决(比如说规模n较小)则直接解决,否则将其分解为k个规模较小的子问题,这些子问题互相独立且与原问题形式相同(子问题相互之间有联系就会变为动态规范算法),递归地解这些子问题,然后将各子问题的解合并得到原问题的解。这种算法设计策略叫做分治法。

归并排序

img

归并排序是建立在归并操作上的一种有效的排序算法。该算法是采用分治法的一个非常典型的应用。将已有序的子序列合并,得到完全有序的序列;即先使每个子序列有序,再使子序列段间有序。

若将两个有序表合并成一个有序表,称为2-路归并,与之对应的还有多路归并。

对于给定的一组数据,利用递归与分治技术将数据序列划分成为越来越小的半子表,在对半子表排序后,再用递归方法将排好序的半子表合并成为越来越大的有序序列。

为了提升性能,有时我们在半子表的个数小于某个数(比如15)的情况下,对半子表的排序采用其他排序算法,比如插入排序。

Fork-Join原理

Fork/Join框架要完成两件事情:

任务分割

Fork/Join框架的基本思想就是将一个大任务分解(Fork)成一系列子任务,子任务可以继续往下分解,当多个不同的子任务都执行完成后,可以将它们各自的结果合并(Join)成一个大结果,最终合并成大任务的结果:

img

ForkJoinTask

基本任务,使用forkjoin框架必须创建的对象,提供fork,join操作,常用的两个子类

  • RecursiveAction : 无结果返回的任务
  • RecursiveTask : 有返回结果的任务
说明:
  • fork : 让task异步执行
  • join : 让task同步执行,可以获取返回值
  • ForkJoinTask 在不显示使用ForkJoinPool.execute/invoke/submit()方法进行执行的情况下,也可以使用自己的fork/invoke方法进行执行

结果合并

ForkJoinPool 执行 ForkJoinTask
  • 任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。
  • 当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务
三中提交方式:
  • execute: 异步,无返回结果
  • submit :异步,有返回结果 (返回Future<T>
  • invoke :同步,有返回结果 (会阻塞)

工作密取

即当前线程的Task已经全被执行完毕,则自动取到其他线程的Task池中取出Task继续执行。

ForkJoinPool中维护着多个线程(一般为CPU核数)在不断地执行Task,每个线程除了执行自己职务内的Task之外,还会根据自己工作线程的闲置情况去获取其他繁忙的工作线程的Task,如此一来就能能够减少线程阻塞或是闲置的时间,提高CPU利用率。

Fork/Join使用

我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork和join的操作机制,通常我们不直接继承ForkjoinTask类,只需要直接继承其子类。

  1. RecursiveAction,用于没有返回结果的任务

  2. RecursiveTask,用于有返回值的任务

    task要通过ForkJoinPool来执行,使用submit 或 invoke 提交,两者的区别是:invoke是同步执行,调用之后需要等待任务完成,才能执行后面的代码;submit是异步执行。

join()和get方法当任务完成的时候返回计算结果。

img

在我们自己实现的compute方法里,首先需要判断任务是否足够小,如果足够小就直接执行任务。如果不足够小,就必须分割成两个子任务,每个子任务在调用invokeAll方法时,又会进入compute方法,看看当前子任务是否需要继续分割成孙任务,如果不需要继续分割,则执行当前子任务并返回结果。使用join方法会等待子任务执行完并得到其结果。

Fork/Join的使用

public class SubmitTask extends RecursiveTask<Long> {
    /**
     * 起始值
     */
    private long start;
    /**
     * 结束值
     */
    private long end;
    /**
     * 阈值
     */
    private long threshold = 10L;

    public SubmitTask(long start, long end) {
        this.start = start;
        this.end = end;
    }

    /**
     * 计算逻辑
     *
     * @return
     */
    @Override
    protected Long compute() {
        //校验是否达到了阈值
        if (isLessThanThreshold()) {
            //处理并返回结果
            return handle();
        } else {
            //没有达到阈值 计算一个中间值
            long mid = (start + end) / 2;
            //拆分 左边的
            SubmitTask left = new SubmitTask(start, mid);
            //拆分右边的
            SubmitTask right = new SubmitTask(mid + 1, end);
            //添加到任务列表
            invokeAll(left, right);
            //合并结果并返回
            return left.join() + right.join();
        }
    }

    /**
     * 处理的任务
     *
     * @return
     */
    public Long handle() {
        long sum = 0;
        for (long i = start; i <= end; i++) {
            sum += i;
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return sum;
    }

    /*是否达到了阈值*/
    private boolean isLessThanThreshold() {
        return end - start <= threshold;
    }

    /**
     * forkJoin 方式调用
     *
     * @param start
     * @param end
     */
    public static void forkJoinInvok(long start, long end) {
        long sum = 0;
        long currentTime = System.currentTimeMillis();
        //创建ForkJoinPool 连接池
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        //创建初始化任务
        SubmitTask submitTask = new SubmitTask(start, end);
        // 将初始任务扔进连接池中执行
        forkJoinPool.invoke(submitTask);
        //等待返回结果
        sum = submitTask.join();
        System.out.println("forkJoin调用:result:" + sum);
        System.out.println("forkJoin调用耗时:" + (System.currentTimeMillis() - currentTime));
    }

    /**
     * 普通方式调用
     *
     * @param start
     * @param end
     */
    public static void normalInvok(long start, long end) {
        long sum = 0;
        long currentTime = System.currentTimeMillis();
        for (long i = start; i <= end; i++) {
            sum += i;
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("普通调用:result:" + sum);
        System.out.println("普通调用耗时:" + (System.currentTimeMillis() - currentTime));
    }

    public static void main(String[] args) {
        //起始值的大小
        long start = 0;
        //结束值的大小
        long end = 10000;
        //forkJoin 调用
        forkJoinInvok(start, end);
        System.out.println("========================");
        //普通调用
        normalInvok(start, end);
    }
}

运行结果

forkJoin调用:result:50005000

forkJoin调用耗时:2286


普通调用:result:50005000
普通调用耗时:17038

Fork/Join 同步用法

同步用法就是将初始化的任务扔进连接池,如果没有执行完成会阻塞

forkJoinPool.invoke(submitTask);

 /**
 * forkJoin 方式调用
 *
 * @param start
 * @param end
 */
public static void forkJoinInvok(long start, long end) {
    long sum = 0;
    long currentTime = System.currentTimeMillis();
    //创建ForkJoinPool 连接池
    ForkJoinPool forkJoinPool = new ForkJoinPool();
    //创建初始化任务
    SubmitTask submitTask = new SubmitTask(start, end);
    //讲初始任务扔进连接池中执行 同步用法
    forkJoinPool.invoke(submitTask);
    System.out.println("同步方式,任务结束才会调用该方法,当前耗时"+(System.currentTimeMillis() - currentTime));
    //等待返回结果
    sum = submitTask.join();
    System.out.println("任务执行完成,当前耗时:"+(System.currentTimeMillis() - currentTime));
    System.out.println("forkJoin调用:result:" + sum);
    System.out.println("forkJoin调用耗时:" + (System.currentTimeMillis() - currentTime));
}

打印结果

同步方式,任务结束才会调用该方法,当前耗时2367
任务执行完成,当前耗时:2368
forkJoin调用:result:50005000
forkJoin调用耗时:2368

Fork/Join 异步用法

异步用法就是将初始化的任务扔进连接池,然后继续其他任务

forkJoinPool.submit(submitTask);

/**
 * forkJoin 方式调用
 *
 * @param start
 * @param end
 */
public static void forkJoinInvok(long start, long end) {
    long sum = 0;
    long currentTime = System.currentTimeMillis();
    //创建ForkJoinPool 连接池
    ForkJoinPool forkJoinPool = new ForkJoinPool();
    //创建初始化任务
    SubmitTask submitTask = new SubmitTask(start, end);
    //j初始任务扔进连接池中执行 异步方式
    forkJoinPool.submit(submitTask);
    System.out.println("异步方式,任务结束才会调用该方法,当前耗时"+(System.currentTimeMillis() - currentTime));
    //等待返回结果
    sum = submitTask.join();
    System.out.println("任务执行完成,当前耗时:"+(System.currentTimeMillis() - currentTime));
    System.out.println("forkJoin调用:result:" + sum);
    System.out.println("forkJoin调用耗时:" + (System.currentTimeMillis() - currentTime));
}

打印结果

异步方式,任务结束才会调用该方法,当前耗时3
任务执行完成,当前耗时:2315
forkJoin调用:result:50005000
forkJoin调用耗时:2315

总结

关于ForkJoinPool

  • 可以使用ForkJoinPool.execute(异步,不返回结果)/invoke(同步,返回结果)/submit(异步,返回结果)方法,来执行ForkJoinTask。
  • ForkJoinPool有一个方法commonPool(),这个方法返回一个ForkJoinPool内部声明的静态ForkJoinPool实例。
    • 文档上说,这个方法适用于大多数的应用。这个静态实例的初始线程数,为“CPU核数-1 ”,(Runtime.getRuntime().availableProcessors() - 1)
    • ForkJoinTask自己启动时,使用的就是这个静态实例。

关于ForkJoinTask

  • 可以使用invokeAll(task)方法,主动执行其它的ForkJoinTask,并等待Task完成。(是同步的)
  • 还可以使用fork方法,让一个task执行(这个方法是异步的)
  • 还可以使用join方法,让一个task执行(这个方法是同步的,它和fork不同点是同步或者异步的区别)
  • 可以使用join来取得ForkJoinTask的返回值。由于RecursiveTask类实现了Future接口,所以也可以使用get()取得返回值。

    • get()和join()有两个主要的区别:
      - join()方法不能被中断。
      - 如果你中断调用join()方法的线程,这个方法将抛出InterruptedException异常。
      - 如果任务抛出任何未受检异常,get()方法将返回一个ExecutionException异常,而join()方法将返回一个RuntimeException异常。
      
  • ForkJoinTask在不显示使用ForkJoinPool.execute/invoke/submit()方法进行执行的情况下,也可以使用自己的fork/invoke方法进行执行。
  • 使用fork/invoke方法执行时,其实原理也是在ForkJoinPool里执行,只不过使用的是一个“在ForkJoinPool内部生成的静态的”ForkJoinPool。
  • ForkJoinTask有两个子类,RecursiveAction和RecursiveTask。

    • 他们之间的区别是,RecursiveAction没有返回值,RecursiveTask有返回值。
  • 看看ForkjoinTask的Complete方法的使用场景
    这个方法好要是用来使一个任务结束。这个方法被用在结束异步任务上,或者为那些能不正常结束的任务,提供一个选择。

  • Task的completeExceptionally方法是怎么回事。

    • 这个方法被用来,在异步的Task中产生一个exception,或者强制结束那些“不会结束”的任务
      这个方法是在Task想要“自己结束自己”时,可以被使用。而cancel方法,被设计成被其它TASK调用。
    • 当你在一个任务中抛出一个未检查异常时,它也影响到它的父任务(把它提交到ForkJoinPool类的任务)和父任务的父任务,以此类推。