java并发工具类-Callable、Future 和FutureTask

img

前言

创建线程的2种方式,一种是直接继承Thread,另外一种就是实现Runnable接口。

这2种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果。
如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达到效果,这样使用起来就比较麻烦。

而自从Java 1.5开始,就提供了CallableFuture,通过它们可以在任务执行完毕之后得到任务执行结果

Callable接口

Callable位于JUC包下,它也是一个接口,在它里面也只声明了一个方法叫做call():

@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

Callable接口代表一段可以调用并返回结果的代码。

Future接口表示异步任务,是还没有完成的任务给出的未来结果。所以说Callable用于产生结果,Future用于获取结果。

Callable接口使用泛型去定义它的返回类型。

Executors类提供了一些有用的方法在线程池中执行Callable内的任务。由于Callable任务是并行的(并行就是整体看上去是并行的,其实在某个时间点只有一个线程在执行),我们必须等待它返回的结果。
java.util.concurrent.Future对象为我们解决了这个问题。在线程池提交Callable任务后返回了一个Future对象,使用它可以知道Callable任务的状态和得到Callable返回的执行结果。Future提供了get()方法让我们可以等待Callable结束并获取它的执行结果。

那么怎么使用Callable呢?一般情况下是配合ExecutorService来使用的,在ExecutorService接口中声明了若干个submit方法的重载版本:

<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

第一个方法:submit提交一个实现Callable接口的任务,并且返回封装了异步计算结果的Future。

第二个方法:submit提交一个实现Runnable接口的任务,并且指定了在调用Future的get方法时返回的result对象。

第三个方法:submit提交一个实现Runnable接口的任务,并且返回封装了异步计算结果的Future。

因此我们只要创建好我们的线程对象(实现Callable接口或者Runnable接口),然后通过上面3个方法提交给线程池去执行即可。

Future接口

Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果

Future接口是用来获取异步计算结果的,说白了就是对具体的Runnable或者Callable对象任务执行的结果进行获取(get()),取消(cancel()),判断是否完成等操作。我们看看Future接口的源码:

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

在Future接口中声明了5个方法,下面依次解释每个方法的作用

方法 作用
cance(boolean mayInterruptIfRunning) 试图取消执行的任务,参数为true时直接中断正在执行的任务,否则直到当前任务执行完成,成功取消后返回true,否则返回false
isCancelled() 方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。
isDone() 方法表示任务是否已经完成,若任务完成,则返回true;
get() 方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;
get(long timeout, TimeUnit unit) 设定计算结果的返回时间,如果在规定时间内没有返回计算结果则抛出TimeOutException

Future提供了三种功能

  • 判断任务是否完成;
  • 能够中断任务;
  • 能够获取任务执行结果。

因为Future只是一个接口,所以是无法直接用来创建对象使用的,因此就有了下面的FutureTask。

RunnableFuture接口

RunnableFuture实现了Runnable和Future。因此FutureTask可以传递到线程对象Thread或Excutor(线程池)来执行。

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

FutureTask

我们先来看一下FutureTask的实现:

public class FutureTask<V> implements RunnableFuture<V> 

FutureTask类实现了RunnableFuture接口,我们看一下RunnableFuture接口的实现

public interface RunnableFuture<V> extends Runnable, Future<V>

可以看出RunnableFuture继承了Runnable接口和Future接口,而FutureTask实现了RunnableFuture接口。所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。

分析

FutureTask除了实现了Future接口外还实现了Runnable接口,因此FutureTask也可以直接提交给Executor执行。 当然也可以调用线程直接执行(FutureTask.run())。接下来我们根据FutureTask.run()的执行时机来分析其所处的3种状态:

  1. 未启动,FutureTask.run()方法还没有被执行之前,FutureTask处于未启动状态,当创建一个FutureTask,而且没有执行FutureTask.run()方法前,这个FutureTask也处于未启动状态。
  2. 已启动,FutureTask.run()被执行的过程中,FutureTask处于已启动状态。
  3. 已完成,FutureTask.run()方法执行完正常结束,或者被取消或者抛出异常而结束,FutureTask都处于完成状态。

img

下面我们再来看看FutureTask的方法执行示意图(方法和Future接口基本是一样的,这里就不过多描述了)

img

  • 当FutureTask处于未启动或已启动状态时,如果此时我们执行FutureTask.get()方法将导致调用线程阻塞;当FutureTask处于已完成状态时,执行FutureTask.get()方法将导致调用线程立即返回结果或者抛出异常。
  • 当FutureTask处于未启动状态时,执行FutureTask.cancel()方法将导致此任务永远不会执行。当FutureTask处于已启动状态时,执行cancel(true)方法将以中断执行此任务线程的方式来试图停止任务,如果任务取消成功,cancel(…)返回true;但如果执行cancel(false)方法将不会对正在执行的任务线程产生影响(让线程正常执行到完成),此时cancel(…)返回false。当任务已经完成,执行cancel(…)方法将返回false。

最后我们给出FutureTask的两种构造函数:

public FutureTask(Callable<V> callable) {
}
public FutureTask(Runnable runnable, V result) {
}

事实上,FutureTask是Future接口的一个唯一实现类。

使用创景

通过上面的介绍,我们对Callable,Future,RunnableFuture,FutureTask都有了比较清晰的了解了,那么它们到底有什么用呢?我们前面说过通过这样的方式去创建线程的话,最大的好处就是能够返回结果,加入有这样的场景,我们现在需要计算一个数据,而这个数据的计算比较耗时,而我们后面的程序也要用到这个数据结果,那么这个时Callable岂不是最好的选择?我们可以开设一个线程去执行计算,而主线程继续做其他事,而后面需要使用到这个数据时,我们再使用Future获取不就可以了吗?下面我们就来编写一个这样的实例。

多任务计算

利用FutureTask和ExecutorService,可以用多线程的方式提交计算任务,主线程继续执行其他任务,当主线程需要子线程的计算结果时,在异步获取子线程的执行结果。

例如主线程执行到需要发邮件,发短信的环节,发送完成后才能继续下一次任务,对于发短信发邮件可以使用FutureTask进行多线程进行并行执行,通过get来阻塞返回结果,如果发邮件需要5s,发短信需要8s,传统的方式需要13s,使用FutureTask异步执行只需要8s。

package chapter02.future;

import util.ThreadUtils;

import java.util.concurrent.*;

/**
 * 发送消息的FutuerTask
 * 将原来发送邮件发送短信的串行任务改为FutuerTask的并行任务提高CPU利用率以及速度
 */
public class FutuerSendMessage {
    private static ExecutorService executorService = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutuerSendMessage futuerSendMessage = new FutuerSendMessage();
        futuerSendMessage.sendmessage();
    }

    public void sendmessage() throws ExecutionException, InterruptedException {
        long currentTime = System.currentTimeMillis();
        //发送邮件
        Future<Boolean> sendMailFuture = executorService.submit(() -> {
            return sendMail();
        });
        //发送短信
        Future<Boolean> sendMsgFuture = executorService.submit(() -> {
            return sendMsg();
        });
        System.out.println("提交任务到线程池,耗时:" + (System.currentTimeMillis() - currentTime));
        System.out.println("主线程继续任务...");
        //如果发送都成功了
        if (sendMailFuture.get() && sendMsgFuture.get()) {
            System.out.println("消息发送成功,耗时:" + (System.currentTimeMillis() - currentTime));
        }
        executorService.shutdown();
    }

    /**
     * 发送邮件
     *
     * @return
     */
    private boolean sendMail() {
        //睡眠5s
        ThreadUtils.sleep(5, TimeUnit.SECONDS);
        System.out.println("发送邮件");
        return true;
    }

    /**
     * 发送短信
     *
     * @return
     */
    private boolean sendMsg() {
        //睡眠8s
        ThreadUtils.sleep(8, TimeUnit.SECONDS);
        System.out.println("发送短信");
        return true;
    }

    //-----------------------------原始的代码---------------

    public void orginSendmessage() {
        long currentTime = System.currentTimeMillis();
        boolean sendMail = sendMail();
        boolean sendMsg = sendMsg();
        if (sendMail && sendMsg) {
            System.out.println("消息发送成功,耗时:" + (System.currentTimeMillis() - currentTime));
        }
    }
}

高并发环境下确保任务只执行一次

在很多高并发的环境下,往往我们只需要某些任务只执行一次。这种使用情景FutureTask的特性恰能胜任。举一个例子,假设有一个map的缓存,当key存在时,即直接返回key对应的对象;当key不存在时,则创创建一个对象。对于这样的应用场景,通常采用的方法为使用一个Map对象来存储key和换粗你对象的对应关系,典型的代码如下面所示:

原始的方式如下

private Map<String, String> orginCacheMap = new HashMap<String, String>();
public synchronized String getOrginValue(String key) {
    if (cacheMap.containsKey(key)) {
        return orginCacheMap.get(key);
    } else {
        String cacheValue = createCache();
        orginCacheMap.putIfAbsent(key, cacheValue);
        return cacheValue;
    }
}

在上面的例子中,我们通过加锁确保高并发环境下的线程安全,也确保了缓存对象只创建一次,然而确牺牲了性能。改用ConcurrentHash的情况下,几乎可以避免加锁的操作,性能大大提高,但是在高并发的情况下有可能出现Connection被创建多次的现象。这时最需要解决的问题就是当key不存在时,创建缓存对象的动作能放在connectionPool之后执行,这正是FutureTask发挥作用的时机,基于ConcurrentHashMap和FutureTask的改造代码如下:

package chapter02.future;

import util.ThreadUtils;

import java.sql.Connection;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;

/**
 * FutuerTask 多线程下只进行一次读取
 */
public class FutuerOnceExec {
    private Map<String, String> orginCacheMap = new HashMap<String, String>();
    private Map<String, Future<String>> cacheMap = new ConcurrentHashMap<String, Future<String>>();
    private ExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

    public static void main(String[] args) {
        FutuerOnceExec futuerOnceExec = new FutuerOnceExec();
        long currentTime = System.currentTimeMillis();
        String value = futuerOnceExec.getValue("xx");
        System.out.println("value:" + value + ",第一次获取耗时:" + (System.currentTimeMillis() - currentTime));
        value = futuerOnceExec.getValue("xx");
        System.out.println("value:" + value + ",第二次获取耗时:" + (System.currentTimeMillis() - currentTime));
        value = futuerOnceExec.getValue("xx");
        System.out.println("value:" + value + ",第三次获取耗时:" + (System.currentTimeMillis() - currentTime));
    }


    /**
     * 获取缓存数据
     *
     * @param key
     * @return
     */
    public String getValue(String key) {
        Future<String> futureValue = cacheMap.get(key);
        //如果已经存在Future了直接get 完成的Future是不会阻塞的
        if (null != futureValue) {
            return getFutureValue(futureValue);
        } else {
            //使用异步线程常见Future任务
            futureValue = executorService.submit(() -> {
                ThreadUtils.sleep(1, TimeUnit.SECONDS);
                return createCache();
            });
            executorService.shutdown();
            cacheMap.putIfAbsent(key, futureValue);
        }
        //获取future中的值
        return getFutureValue(futureValue);
    }

    /**
     * 获取 Future中的值
     * get的时候可能会阻塞
     *
     * @param futureValue
     * @return
     */
    private String getFutureValue(Future<String> futureValue) {
        try {
            return futureValue.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        return null;
    }

    private String createCache() {
        //todo 业务代码
        return "缓存对象";
    }


    //--------------------原始的方式

    public synchronized String getOrginValue(String key) {
        if (cacheMap.containsKey(key)) {
            return orginCacheMap.get(key);
        } else {
            String cacheValue = createCache();
            orginCacheMap.putIfAbsent(key, cacheValue);
            return cacheValue;
        }
    }
}

总结

实现Runnable接口和实现Callable接口的区别:

  1. Runnable是自从java1.1就有了,而Callable是1.5之后才加上去的。
  2. Callable规定的方法是call(),Runnable规定的方法是run()。
  3. Callable的任务执行后可返回值,而Runnable的任务是不能返回值(是void)。
  4. call方法可以抛出异常,run方法不可以。
  5. 运行Callable任务可以拿到一个Future对象,表示异步计算的结果。它提供了检查计算是否完成的方法,以等待计算的完成,并检索计算的结果。通过Future对象可以了解任务执行情况,可取消任务的执行,还可获取执行结果。
  6. 加入线程池运行,Runnable使用ExecutorService的execute方法,Callable使用submit方法。

Callable、Runnable、Future和FutureTask 的区别

  1. Callable、Runnable、Future和FutureTask 做为java 线程池运行的重要载体,有必要深入理解。
  2. Callable 和 Runnable 都是执行的任务的接口,区别在于Callable有返回值,而Runnable无返回值。
  3. Future 表示异步任务返回结果的接口
  4. RunnableFuture 继承了Runnable, Future,表示可以带有返回值的run接口
  5. FutureTask是一个实现类,实现了RunnableFuture接口,既能接受Runnable类型的任务,也可以接受Callable类型的任务,这个类的作用主要是 有一个protected void done()方法用来扩展使用,作为一个回调方法