在Java开发的web项目中,我们经常会遇到接口响应耗时过长,或者定时任务处理过慢,那在Java中最常见的解决方法就是并行了,想必大家也都不陌生了。
今天的分享主要带大家从一个实际的串行场景出发,如何一步步优化,同时也会分享在Java中实现并行处理的多种方式,以及它们之间的区别和优缺点,通过对比总结更加深入的了解并且使用Java中并发编程的相关技术。
现在我们有一个查询carrier下所有Load的接口,它需要查询Loads信息、Instruction信息、Stops信息、Actions信息后然后组装数据。
private List getHydratedLoads(Optional pageable, String predicate, List 这段代码会有什么问题?其实算是一段比较正常的代码,但是在某一个carrier下数据量比较大时,sql查询是相对较慢的,那有没有办法优化一下呢?
当前这个请求耗时总计就是12s。上面实现中查询Load、Instruction、Stop、Action 等信息是串行的,那串行的系统要做性能优化很常见的就是利用多线程并行了。
这种相互之间没有影响的任务,利用并行处理后耗时就可以优化为4s。
因为项目中多线程都用线程池,所以Thread.join()这种方式就不演示了。
Future接口在Java 5中被引入,设计初衷是对将来某个时刻会发生的结果进行建模。它建模了一种异步计算,返回一个执行运算结果的引用,当运算结束后,这个引用被返回给调用方。在Future中触发那些潜在耗时的操作把调用线程解放出来,让调用线程能继续执行其他有价值的工作,不再需要呆呆等待耗时的操作完成。
因为我们都是需要获取任务的返回值的,所以大家肯定想到是用 Future+Callable来做。
ThreadPoolExecutor提供了3个submit方法支持我们需要获取任务执行结果的需求。
Future submit(Callable task);
Future submit(Runnable task, T result);
Future<?> submit(Runnable task);
简单介绍下这三个submit方法:
这三个方法的返回值都是Future接口,Future 提供了5个方法:
image.png
分别是取消任务的方法 cancel()、判断任务是否已取消的方法 isCancelled()、判断任务是否已结束的方法 isDone()以及2 个获得任务执行结果的 get() 和 get(timeout, unit),其中最后一个 get(timeout, unit) 支持超时机制。
需要注意的是:这两个 get() 方法都是阻塞式的,如果被调用的时候,任务还没有执行完,那么调用 get() 方法的线程会阻塞,直到任务执行完才会被唤醒。
我们再介绍下FutureTask工具类,这是一个实实在在的工具类,有两个构造函数,和上面类似,一看就明白了。
FutureTask(Callable callable);
FutureTask(Runnable runnable, V result);
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
public static Callable callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter(task, result);
}
private static final class RunnableAdapter implements Callable {
private final Runnable task;
private final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
public String toString() {
return super.toString() + "[Wrapped task = " + task + "]";
}
}
这个类实现了 Runnable 和 Future 接口,可以理解就是将任务和结果结合起来了,变成一个可以有响应结果的任务进行提交,本质上FutureTask里面封装的还是一个Callable接口,它实现可以有返回值就是因为它的run方法里面调用了Callable的call()方法,将结果赋值给result,然后返回。
下面我们看下如何优化我们上面的查询接口,实现并行查询:
private List getHydratedLoadsUsingFutureTask() throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
FutureTask> queryLoadFutureTask = new FutureTask<>(() -> executeQuery("sql1"));
executorService.submit(queryLoadFutureTask);
FutureTask> queryInstructionFutureTask = new FutureTask<>(() -> executeQuery("sql2"));
executorService.submit(queryInstructionFutureTask);
FutureTask> queryStopFutureTask = new FutureTask<>(() -> executeQuery("sql3"));
executorService.submit(queryStopFutureTask);
FutureTask> queryActionFutureTask = new FutureTask<>(() -> executeQuery("sql4"));
executorService.submit(queryActionFutureTask);
// 获取结果
List loads = queryLoadFutureTask.get();
List instructions = queryInstructionFutureTask.get();
List stops = queryStopFutureTask.get();
List actions = queryActionFutureTask.get();
// We got all the entities we need, so now let's fill in all of their references to each other.
handleData(loads, instructions, stops, actions);
return loads;
}
那你可能会想到,如果任务之间有依赖关系,比如当前任务依赖前一个任务的执行结果,该怎么处理呢?
这种问题基本上也都可以用 Future 来解决,但是需要将对应的 FutureTask传入到当前任务中,然后调用get()方法即可。
比如,我们创建了两个 FutureTask——ft1 和 ft2,ft1 需要等待 ft2 执行完毕后才能做最后的数据处理,所以 ft1 内部需要引用 ft2,并在执行数据处理前,调用 ft2的 get() 方法实现等待。
// 创建任务T2的FutureTask
FutureTask ft2
= new FutureTask<>(new T2Task());
// 创建任务T1的FutureTask
FutureTask ft1
= new FutureTask<>(new T1Task(ft2));
// 线程T1执行任务ft1
Thread T1 = new Thread(ft1);
T1.start();
// 线程T2执行任务ft2
Thread T2 = new Thread(ft2);
T2.start();
// 等待线程T1执行结果
System.out.println(ft1.get());
// T1Task需要执行的任务:
class T1Task implements Callable{
FutureTask ft2;
// T1任务需要T2任务的FutureTask
T1Task(FutureTask ft2){
this.ft2 = ft2;
}
@Override
String call() throws Exception {
// 获取T2线程结果
String tf = ft2.get();
return "处理完的数据结果";
}
}
// T2Task需要执行的任务:
class T2Task implements Callable {
@Override
String call() throws Exception {
return "检验&查询数据";
}
}
通过这上面的的例子,我们明显的发现 Future 实现异步编程时的一些不足之处:
我们很难表述Future结果之间的依赖性,从文字描述上这很简单。比如,下面文字描述的关系,如果用Future去实现时还是很复杂的。
比如:“当长时间计算任务完成时,请将该计算的结果通知到另一个长时间运行的计算任务,这两个计算任务都完成后,将计算的结果与另一个查询操作结果合并”
在JDK8中引入了CompletableFuture,对Future进行了改进,可以在定义CompletableFuture时传入回调对象,任务在完成或者异常时,自动回调,再也不需要每次主动通过 Future 去询问结果了,我们接着往下看。
Java 在 1.8 版本提供了 CompletableFuture 来支持异步编程,CompletableFuture 类实现了CompletionStage 和 Future 接口,提供了非常强大的 Future 的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过 完成时回调 的方式处理计算结果,并且提供了 转换和组合 CompletableFuture 的方法。
为了体会到 CompletableFuture 异步编程的优势,我们还是先用 CompletableFuture重新实现前面的程序。
public static List getHydratedLoadsUsingCompletableFuture()
throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
try {
// 任务1:查询loads列表
CompletableFuture> queryLoads = CompletableFuture.supplyAsync(() -> executeQuery("sql1"), executorService);
// 任务2:查询instructions列表
CompletableFuture> queryInstructions = CompletableFuture.supplyAsync(() -> executeQuery("sql2"),
executorService);
// 任务3:查询stops列表
CompletableFuture> queryStops = CompletableFuture.supplyAsync(() -> executeQuery("sql3"), executorService);
// 任务4:查询actions列表
CompletableFuture> queryActions = CompletableFuture.supplyAsync(() -> executeQuery("sql4"),
executorService);
// 任务1,2,3,4执行完成后执行数据组装
CompletableFuture combineFuture = CompletableFuture.allOf(queryLoads,
queryInstructions,
queryStops,
queryActions)
.thenRun(() -> handleData(queryLoads.join(), queryInstructions.join(), queryStops.join(), queryActions.join()));
System.out.println(Thread.currentThread().getName() + ": 主线程执行到这里了");
combineFuture.get();
System.out.println(String.format("""
queryLoads: %s ,queryInstructions: %s ,queryStops: %s ,queryActions: %s
""", queryLoads.isDone(), queryInstructions.isDone(), queryStops.isDone(), queryActions.isDone()));
return queryLoads.get();
} finally {
executorService.shutdown();
}
}
通过上面的代码我们可以发现 CompletableFuture 有以下优势:
CompletableFuture 提供了四个静态方法来创建一个异步操作:
public static CompletableFuture runAsync(Runnable runnable)
public static CompletableFuture runAsync(Runnable runnable, Executor executor)
public static CompletableFuture supplyAsync(Supplier supplier)
public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)
这四个方法区别在于:
ForkJoinPool是JDK7提供的,叫做分支/合并框架。可以通过将一个任务递归分成很多分子任务,形成不同的流,进行并行执行,同时还伴随着强大的工作窃取算法,极大的提高效率,这个不属于今天我们讨论的点,感兴趣的话可以后面再聊。
注意⚠️:如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰。
问题:为什么supplyAsync方法接收一个 Supplier 函数式接口类型参数而不是一个Callable 类型的参数呢?
@FunctionalInterface
public interface Callable {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
@FunctionalInterface
public interface Supplier {
/**
* Gets a result.
*
* @return a result
*/
T get();
}
看了接口定义,我们发现它们其实都是一个不接受任何参数类型的函数式接口,在实践中它们做的是相同的事情(定义一个业务逻辑去处理然后有返回值),但在原则上它们的目的是做不同的事情:
image.png
通过接口的继承关系,我们可以发现这里的异步操作到底什么时候结束、结果如何获取,都可以通过 Future接口来解决。
另外 CompletableFuture 类还实现了 CompletionStage 接口,这个接口就比较关键了,之所以能实现响应式编程,都是通过这个接口提供的方法。
下面介绍下 CompletionStage 接口,看字面意思可以理解为“完成动作的一个阶段”,官方注释文档:CompletionStage 是一个可能执行异步计算的“阶段”,这个阶段会在另一个 CompletionStage 完成时调用去执行动作或者计算,一个 CompletionStage 会以正常完成或者中断的形式“完成”,并且它的“完成”会触发其他依赖的 CompletionStage 。CompletionStage 接口的方法一般都返回新的CompletionStage,因此构成了链式的调用。
这个看完还是有点懵逼的,不清楚什么是 CompletionStage?
在Java中什么是 CompletionStage ?
一个Function、Comsumer、Supplier 或者 Runnable 都会被描述为一个CompletionStage。
stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println())
但是 CompletionStage 这里面一共有40多个方法,我们该如何理解呢?
CompletionStage 接口可以清晰的描述任务之间的关系,可以分为 顺序串行、并行、汇聚关系以及异常处理。
CompletionStage 接口里面描述串行关系,主要是 thenApply、thenAccept、thenRun 和 thenCompose 这四个系列的接口。
public CompletionStage thenApply(Function<? super T,? extends U> fn);
public CompletionStage thenApplyAsync(Function<? super T,? extends U> fn);
public CompletionStage thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);
public CompletionStage thenAccept(Consumer<? super T> action);
public CompletionStage thenAcceptAsync(Consumer<? super T> action);
public CompletionStage thenAcceptAsync(Consumer<? super T> action,Executor executor);
public CompletionStage thenRun(Runnable action);
public CompletionStage thenRunAsync(Runnable action);
public CompletionStage thenRunAsync(Runnable action,Executor executor);
public CompletionStage thenCompose(Function<? super T, ? extends CompletionStage> fn);
public CompletionStage thenComposeAsync(Function<? super T, ? extends CompletionStage> fn) ;
public CompletionStage thenComposeAsync(Function<? super T, ? extends CompletionStage> fn, Executor executor) ;
thenApply() 和 thenCompose() 的区别?thenApply 转换的是泛型中的类型,是同一个CompletableFuture,thenCompose 用来连接两个CompletableFuture,是生成一个新的 CompletableFuture。他们都是让 CompletableFuture 可以对返回的结果进行后续操作,就像 Stream 一样进行 map 和 flatMap 的转换。
public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture result1 = future.thenApply(param -> param + " World");
CompletableFuture result2 = future.thenCompose(param -> CompletableFuture.supplyAsync(() -> param + " World"));
System.out.println(result1.get());
System.out.println(result2.get());
}
这些方法里面 Async 代表的是异步执行 fn、consumer 或者 action。
CompletableFuture f0 =
CompletableFuture.supplyAsync(
() -> "Hello World") //①
.thenApply(s -> s + " QQ") //②
.thenApply(String::toUpperCase);//③
System.out.println(f0.join());
//输出结果
HELLO WORLD QQ
可以看一下 thenApply() 方法是如何使用的。首先通过 supplyAsync() 启动一个异步流程,之后是两个串行操作,整体看起来还是挺简单的。不过,虽然这是一个异步流程,但任务①②③却是串行执行的,②依赖①的执行结果,③依赖②的执行结果。
CompletableFuture 中 thenApply 如何实现?
//静态方法,如果没有传入线程池,使用ForkJoinPool的common线程池
public static CompletableFuture supplyAsync(Supplier supplier) {
return asyncSupplyStage(ASYNC_POOL, supplier);
}
static CompletableFuture asyncSupplyStage(Executor e,
Supplier f) {
if (f == null) throw new NullPointerException();
//新建CompletableFuture对象
CompletableFuture d = new CompletableFuture();
//构造AsyncSupply对象,线程池提交AsyncSupply任务
e.execute(new AsyncSupply(d, f));
//将CompletableFuture对象返回
return d;
}
static final class AsyncSupply extends ForkJoinTask
//可以看到AsyncSupply是一个Runnable对象
implements Runnable, AsynchronousCompletionTask {
CompletableFuture dep; Supplier<? extends T> fn;
AsyncSupply(CompletableFuture dep, Supplier<? extends T> fn) {
this.dep = dep; this.fn = fn;
}
public final Void getRawResult() { return null; }
public final void setRawResult(Void v) {}
public final boolean exec() { run(); return false; }
public void run() {
CompletableFuture d; Supplier<? extends T> f;
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
//CompletableFuture对象的result为空时
if (d.result == null) {
try {
//调用传入的supplier的get方法,并将结果放入result字段
//注意:这是在线程池中提交的,所以是异步处理的
d.completeValue(f.get());
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
//处理完当前方法后,处理依赖它的栈顶方法,后面的回调方法入栈和这块呼应
d.postComplete();
}
}
}
final void postComplete() {
// 变量f存储的是当前已经完成的CompletableFuture
CompletableFuture<?> f = this; Completion h;
while ((h = f.stack) != null ||
(f != this && (h = (f = this).stack) != null)) {
CompletableFuture<?> d; Completion t;
// CAS操作,将依赖此阶段的栈顶元素取出,并且设置为下一个
if (STACK.compareAndSet(f, h, t = h.next)) {
if (t != null) {
if (f != this) {
//如果f不是this,将刚出栈的h入this的栈顶
pushStack(h);
continue;
}
// 将h剥离出来,h.next=null,帮助gc
NEXT.compareAndSet(h, t, null); // try to detach
}
//调用tryFire
f = (d = h.tryFire(NESTED)) == null ? this : d;
}
}
}
public CompletableFuture thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
private CompletableFuture uniApplyStage(
Executor e, Function<? super T,? extends V> f) {
if (f == null) throw new NullPointerException();
Object r;
// 如果当前阶段结果已经返回,则直接运行回调方法
if ((r = result) != null)
return uniApplyNow(r, e, f);
CompletableFuture d = newIncompleteFuture();
// 构造Completion放入等待栈的顶
unipush(new UniApply(e, d, this, f));
return d;
}
private CompletableFuture uniApplyNow(
Object r, Executor e, Function<? super T,? extends V> f) {
Throwable x;
CompletableFuture d = newIncompleteFuture();
// 如果依赖的方法异常中断,则直接处理并返回异常
if (r instanceof AltResult) {
if ((x = ((AltResult)r).ex) != null) {
d.result = encodeThrowable(x, r);
return d;
}
r = null;
}
try {
// 执行到这里说明依赖的任务已经有结果了,用它的结果当作参数调用回调方法
// 注意这里都是线程池中的线程在执行,所以是异步执行
if (e != null) {
e.execute(new UniApply(null, d, this, f));
} else {
@SuppressWarnings("unchecked") T t = (T) r;
d.result = d.encodeValue(f.apply(t));
}
} catch (Throwable ex) {
d.result = encodeThrowable(ex);
}
return d;
}
final void unipush(Completion c) {
if (c != null) {
// CAS自旋将回调方法压入栈顶
while (!tryPushStack(c)) {
if (result != null) {
NEXT.set(c, null);
break;
}
}
// 可能在重试中完成,判断result不为空就执行
if (result != null)
c.tryFire(SYNC);
}
}
//再次尝试判断依赖方法是否处理完成,处理完成则调用目标回调方法
final CompletableFuture tryFire(int mode) {
CompletableFuture d; CompletableFuture a;
Object r; Throwable x; Function<? super T,? extends V> f;
if ((a = src) == null || (r = a.result) == null
|| (d = dep) == null || (f = fn) == null)
return null;
tryComplete: if (d.result == null) {
if (r instanceof AltResult) {
if ((x = ((AltResult)r).ex) != null) {
d.completeThrowable(x, r);
break tryComplete;
}
r = null;
}
try {
if (mode <= 0 && !claim())
return null;
else {
@SuppressWarnings("unchecked") T t = (T) r;
d.completeValue(f.apply(t));
}
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
src = null; dep = null; fn = null;
//成功处理完依赖方法和回调方法后进行处理,可能唤醒其他的回调方法或者清理栈
return d.postFire(a, mode);
}
CompletionStage 接口里面描述 AND 汇聚关系,主要是 thenCombine、thenAcceptBoth 和 runAfterBoth 系列的接口,这些接口的区别是源自 fn、consumer、action 这三个核心参数不同。
public CompletionStage thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public CompletionStage thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public CompletionStage thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
public CompletionStage thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public CompletionStage thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public CompletionStage thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor);
public CompletionStage runAfterBoth(CompletionStage<?> other,Runnable action);
public CompletionStage runAfterBothAsync(CompletionStage<?> other,Runnable action);
public CompletionStage runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);
Async后缀的方法表示,前面的 CompletionStage 执行完成,在执行后续操作时会提交到线程池处理,否则就还是使用同一个处理线程完成CompletableFuture的所有任务。
这三种方法意思都是等两个 CompletionStage 都完成了计算才会执行下一步的操作,区别在于参数接口类型不一样。
CompletableFuture 中 thenAcceptBoth 如何实现?talk is cheap!!
public CompletableFuture thenAcceptBoth(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action) {
return biAcceptStage(null, other, action);
}
private CompletableFuture biAcceptStage(
Executor e, CompletionStage o,
BiConsumer<? super T,? super U> f) {
CompletableFuture b; Object r, s;
if (f == null || (b = o.toCompletableFuture()) == null)
throw new NullPointerException();
CompletableFuture d = newIncompleteFuture();
// 如果两个阶段有任何一个没有执行完成,则将回调方法分别放到两个互相依赖阶段的栈顶
if ((r = result) == null || (s = b.result) == null)
bipush(b, new BiAccept(e, d, this, b, f));
else if (e == null)
// 如果两个依赖的阶段都执行完成则调用回调方法
d.biAccept(r, s, f, null);
else
try {
e.execute(new BiAccept(null, d, this, b, f));
} catch (Throwable ex) {
d.result = encodeThrowable(ex);
}
return d;
}
OR的关系,表示谁运行快就用谁的结果执行下一步操作。
public CompletionStage applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public CompletionStage applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
public CompletionStage applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
public CompletionStage acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);
public CompletionStage runAfterEither(CompletionStage<?> other,Runnable action);
public CompletionStage runAfterEitherAsync(CompletionStage<?> other,Runnable action);
public CompletionStage runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);
同样也是有Async后缀的表示,当前面的 CompletionStage 执行完成,在执行后续操作时会提交到线程池处理。applyToEither、acceptEither、runAfterEither 三个方法的区别还是来自于不同的接口参数类型:Function、Consumer、Runnable。
CompletableFuture 中 applyToEither 如何实现?
public CompletableFuture applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn) {
return orApplyStage(defaultExecutor(), other, fn);
}
private CompletableFuture orApplyStage(
Executor e, CompletionStage o, Function<? super T, ? extends V> f) {
CompletableFuture b;
if (f == null || (b = o.toCompletableFuture()) == null)
throw new NullPointerException();
Object r; CompletableFuture<? extends T> z;
// 这块是重点,有任何一个阶段的结果不为空就直接执行function
if ((r = (z = this).result) != null ||
(r = (z = b).result) != null)
return z.uniApplyNow(r, e, f);
CompletableFuture d = newIncompleteFuture();
// 如果都为空则将回调方法分别push到被依赖的两个阶段的栈顶
orpush(b, new OrApply(e, d, this, b, f));
return d;
}
在Java编程中,异常处理当然是必不可少的一环,那你可能会想到如果在使用 CompletableFuture 进行异步链式编程时,如果出现异常该怎么处理呢?
首先上面我们提到的 fn、consumer、action 它们的核心方法是不允许抛出可检查异常的,但是却无法限制它们抛出运行时异常。在同步方法中,我们可以使用 try-catch{} 来捕获并处理异常,但在异步编程里面异常该如何处理 ?CompletionStage 接口给我们提供的方案非常简单,比 try-catch{} 还要简单。
下面是相关的方法,使用这些方法进行异常处理和串行操作是一样的,都支持链式编程方式。
public CompletableFuture whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture exceptionally(Function fn)
CompletableFuture future = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
if (new Random().nextInt() % 2 == 0) {
int i = 12 / 0;
}
System.out.println("执行结束!");
});
future.whenComplete(new BiConsumer() {
@Override
public void accept(Void t, Throwable action) {
System.out.println("执行完成!");
}
});
future.exceptionally(new Function() {
@Override
public Void apply(Throwable t) {
System.out.println("执行失败:" + t.getMessage());
return null;
}
}).join();
handle 也是执行任务完成时对结果的处理,whenComplete() 和 handle() 的区别在于 whenComplete() 不支持返回结果,而 handle() 是支持返回结果的。
当上一个的 CompletableFuture 的值计算完成或者抛出异常的时候,会触发 handle 方法中定义的函数,结果由 BiFunction 参数计算而得,因此这组方法兼有 whenComplete 和转换的两个功能。
public CompletionStage handle(BiFunction<? super T, Throwable, ? extends U> fn);
public CompletionStage handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public CompletionStage handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
public static List exampleCompletableFutureAndStream() {
ExecutorService executorService = Executors.newCachedThreadPool();
List loads = null;
try {
// 所有需要查询远程服务的load列表
List requestList = Lists.newArrayList("load1", "load2", "load3", "load4");
List> completableFutures = requestList.stream()
// 使用CompletableFuture以异步方式查询数据
.map(req -> CompletableFuture.supplyAsync(() -> invokeReq(req), executorService))
.map(future -> future.thenApply(Load::getStatus))
.map(future -> future.thenCompose(status -> CompletableFuture.supplyAsync(() -> status.name().toUpperCase())))
.toList();
loads = completableFutures.stream().map(CompletableFuture::join).toList();
System.out.println(Thread.currentThread().getName() + ": CompletableFuture异步方式查询请求已完成:" + loads.size());
} finally {
executorService.shutdown();
}
return loads;
}
注意到了吗?这里使用了两个不同的Stream流水线,是否可以在同一个处理流的流水线上一个接一个地放置多个map操作。
public static List exampleCompletableFutureAndStream() {
ExecutorService executorService = Executors.newCachedThreadPool();
List loads = null;
try {
// 所有需要查询远程服务的load列表
List requestList = Lists.newArrayList("load1", "load2", "load3", "load4");
loads = requestList.stream()
// 使用CompletableFuture以异步方式查询数据
.map(req -> CompletableFuture.supplyAsync(() -> invokeReq(req), executorService))
.map(future -> future.thenApply(Load::getStatus))
.map(future -> future.thenCompose(status -> CompletableFuture.supplyAsync(() -> status.name().toUpperCase())))
.map(CompletableFuture::join)
.toList();
System.out.println(Thread.currentThread().getName() + ": CompletableFuture异步方式查询请求已完成:" + loads.size());
} finally {
executorService.shutdown();
}
return loads;
}
这其实是有原因的。考虑流操作之间的延迟特性,如果你在单一流水线中处理流,不同的请求只能以同步、顺序执行的方式才会成功。因此,每个创建CompletableFuture对象只能在前一个操作结束之后执行查询指定服务请求的动作、通知join方法返回结果。
再来看一个例子:
我们的系统提供的运费价格是以美元计价的,但是你希望以人民币(RMB)的方式提供给你的客户。你可以用异步的方式向计费中心查询指定Load的价格,同时从远程的汇率服务那里查到人民币和美元之间的汇率。当二者都结束时,再将这两个结果结合起来,用返回的商品价格乘以当时的汇率,得到以人民币计价的商品价格。
public class MultiThreadTest {
@Test
public void test18() {
long start = System.nanoTime();
List> futures = loads.stream()
.map(laod ->
CompletableFuture
// 查商品价格操作和查兑换汇率操作同时进行,当两者都完成时将结果进行整合
.supplyAsync(() -> load.getPrice("load1"))
.thenCombine(CompletableFuture.supplyAsync(() -> RateService.getRate("RMB", "USD")), (price, rate) -> price * rate)
)
.collect(toList());
List usdPrices = futures.stream()
.map(CompletableFuture::join)
.collect(toList());
}
}
通过上述例子,可以看到相对于采用Java 8之前提供的Future实现,CompletableFuture版本实现所具备的巨大优势。CompletableFuture利用Lambda表达式以声明式的API提供了一种机制,能够用最有效的方式,非常容易地将多个以同步或异步方式执行复杂操作的任务结合到一起。
为了更直观地感受一下使用CompletableFuture在代码可读性上带来的巨大提升,下面尝试仅使用Java 7中提供的特性,重新实现上述例子的功能。
public class MultiThreadTest {
@Test
public void test19() throws ExecutionException, InterruptedException {
long start = System.nanoTime();
List> usdFuturePrices = new ArrayList<>(shops.size());
for (Shop shop : shops) {
// 创建一个查询人民币到美元转换汇率的Future
final Future usdFutureRate = executor.submit(new Callable() {
public Double call() {
return RateService.getRate("RMB", "USD");
}
});
// 在第二个Future中查询指定商店中特定商品的价格
Future usdFuturePrice = executor.submit(new Callable() {
public Double call() throws ExecutionException, InterruptedException {
double rmbPrice = shop.getPrice("肥皂");
// 在查找价格操作的同一个Future中, 将价格和汇率做乘法计算出汇后价格
return rmbPrice * usdFutureRate.get();
}
});
usdFuturePrices.add(usdFuturePrice);
}
List usdPrices = new ArrayList<>(usdFuturePrices.size());
for (Future usdFuturePrice : usdFuturePrices) {
usdPrices.add(usdFuturePrice.get());
}
}
}
这里我们思考这样一个问题:并行使用流还是CompletableFuture?
对集合进行并行计算有两种方式:要么将其转化为并行流,利用map这样的操作开展工作,要么枚举出集合中的每一个元素,创建新的线程,在 CompletableFuture 内对其进行操作。后者提供了更多的灵活性,你可以调整线程池的大小,而这能帮助你确保整体的计算不会因为线程都在等待I/O而发生阻塞。同时也可以提供更多描述任务之间关系的接口,我们不需要为之编写更多的代码。
这里对使用这些API的建议如下:
今天大家学到了哪些知识呢?
不足之处:今天只是对于并发编程中的工具类使用和相关原理做了分享,在实际开发过程中可能需要考虑到更多的通用性,封装通过调用模版方法,不要每一个地方都写一堆类似的代码。
| 留言与评论(共有 0 条评论) “” |