学习Future先来认识一个接口Callable
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
可以理解这哥们就是带有返回值的Runnable
Future接口可以对具体的Callable任务的执行结果进行操作,可以查询,取消,判断是否Callable的任务是否执行完成等。
Future和Callable接口是相互配合的,Future获取Callable任务的结果。
下面我们看2小demo
public class FutureTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
Future<Integer> future = executorService.submit(new CallableTask());
System.out.println(future.get());
executorService.shutdown();
}
static class CallableTask implements Callable<Integer> {
@Override
public Integer call() throws Exception {
Thread.sleep(3000);
return 1;
}
}
}
从future中获取callable的执行结果
future的获取可以给个超时时间
如果有一批任务,我们一般怎么写呢?
public class FutureTest1 {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(10);
List<Future<?>> futureList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Future<?> future = executorService.submit(new CallableTask());
futureList.add(future);
}
futureList.forEach(future -> {
try {
System.out.println(future.get());
} catch (Exception e) {
e.printStackTrace();
}
});
executorService.shutdown();
}
static class CallableTask implements Callable<Integer> {
@Override
public Integer call() throws Exception {
Thread.sleep(3000);
return 1;
}
}
}
一般就是一起提交到线程池,记录future, foreach获取结果,生产环境也经常这样写
再来认识下FutureTask
可以看到FutureTask是Future的一个实现类,其实也是唯一的一个,FutureTask实现了RunnableFuture接口,这个接口同时继承了Runnable和Future。所以FutureTask既可以作为Runnable被当做一个线程执行,同时又可以作为Future来处理Callable的返回结果。牛逼不牛逼
public class FutureTest2 {
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
FutureTask futureTask = new FutureTask(new CallableTask());
new Thread(futureTask).start();
System.out.println(futureTask.get(3500, TimeUnit.MILLISECONDS));
}
static class CallableTask implements Callable<Integer> {
@Override
public Integer call() throws Exception {
Thread.sleep(3000);
return 1;
}
}
}
可以看到 FutureTask 既可以作为任务提交执行,又可以接受任务执行结果
Future有一定的局限性:
- 不能手动完成 当你写了一个函数,用于通过一个远程API获取一个电子商务产品最新价格。因为这个 API 太耗时,你把它允许在一个独立的线程中,并且从你的函数中返回一个 Future。现在假设这个API服务宕机了,这时你想通过该产品的最新缓存价格手工完成这个Future 。你会发现无法这样做。
- Future 的结果在非阻塞的情况下,不能执行更进一步的操作 Future 不会通知你它已经完成了,它提供了一个阻塞的 get() 方法通知你结果。你无法给 Future 植入一个回调函数,当 Future 结果可用的时候,用该回调函数自动的调用 Future 的结果。
- 多个 Future 不能串联在一起组成链式调用 有时候你需要执行一个长时间运行的计算任务,并且当计算任务完成的时候,你需要把它的计算结果发送给另外一个长时间运行的计算任务等等。你会发现你无法使用 Future 创建这样的一个工作流。
- 不能组合多个 Future 的结果 假设你有10个不同的Future,你想并行的运行,然后在它们运行未完成后运行一些函数。你会发现你也无法使用 Future 这样做。
因此我们引入了CompletableFuture
public class FutureTest3 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
System.out.println("CompletableFuture ...");
});
future.get();
}
}
runAsync 当无需获得执行结果时使用
public class FutureTest4 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "haha";
});
System.out.println(future.get());
}
}
supplyAsync 需要获得执行结果时使用
public class FutureTest5 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 传递线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "haha";
}, executorService);
System.out.println(future.get());
}
}
传递线程池
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "haha";
});
CompletableFuture<String> thenApplyFuture =
future.thenApply(str -> "hehe " + str);
System.out.println(thenApplyFuture.get());
}
对future的结果做回调执行
可以接受一系列的转换
可以组合多个future
最后再看下CompletableFuture对异常的处理
public class FutureTest7 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
int a = 100 / 0;
return "haha";
}).exceptionally(ex -> {
System.out.println(ex.getMessage());
return "exception";
});
System.out.println(future.get());
}
}
还可以使用handle
handle()
从异常恢复,无论一个异常是否发生它都会被调用
public class FutureTest8 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
int a = 100 / 0;
return "haha";
}).handle((res, ex) -> {
if (ex != null) {
System.out.println(ex.getMessage());
return "error";
}
return res;
});
System.out.println(future.get());
}
}
转载请注明:汪明鑫的个人博客 » Future学习笔记
说点什么
您将是第一位评论人!