Java CompletableFuture 异步超时实现探索()

  本篇文章为你整理了Java CompletableFuture 异步超时实现探索()的详细内容,包含有 Java CompletableFuture 异步超时实现探索,希望能帮助你了解 Java CompletableFuture 异步超时实现探索。

   JDK 8 是一次重大的版本升级,新增了非常多的特性,其中之一便是 CompletableFuture。自此从 JDK 层面真正意义上的支持了基于事件的异步编程范式,弥补了 Future 的缺陷。

  在我们的日常优化中,最常用手段便是多线程并行执行。这时候就会涉及到 CompletableFuture 的使用。

  
JDK 8 是一次重大的版本升级,新增了非常多的特性,其中之一便是CompletableFuture。自此从 JDK 层面真正意义上的支持了基于事件的异步编程范式,弥补了Future的缺陷。

  在我们的日常优化中,最常用手段便是多线程并行执行。这时候就会涉及到CompletableFuture的使用。

  常见使用方式

  下面举例一个常见场景。

  假如我们有两个 RPC 远程调用服务,我们需要获取两个 RPC 的结果后,再进行后续逻辑处理。

  

public static void main(String[] args) {

 

   // 任务 A,耗时 2 秒

   int resultA = compute(1);

   // 任务 B,耗时 2 秒

   int resultB = compute(2);

   // 后续业务逻辑处理

   System.out.println(resultA + resultB);

  

 

  可以预估到,串行执行最少耗时 4 秒,并且 B 任务并不依赖 A 任务结果。

  对于这种场景,我们通常会选择并行的方式优化,Demo 代码如下:

  

public static void main(String[] args) {

 

   // 仅简单举例,在生产代码中可别这么写!

   // 统计耗时的函数

   time(() - {

   CompletableFuture Integer result = Stream.of(1, 2)

   // 创建异步任务

   .map(x - CompletableFuture.supplyAsync(() - compute(x), executor))

   // 聚合

   .reduce(CompletableFuture.completedFuture(0), (x, y) - x.thenCombineAsync(y, Integer::sum, executor));

   // 等待结果

   try {

   System.out.println("结果:" + result.get());

   } catch (ExecutionException InterruptedException e) {

   System.err.println("任务执行异常");

  [async-1]: 任务执行开始:1

  [async-2]: 任务执行开始:2

  [async-1]: 任务执行完成:1

  [async-2]: 任务执行完成:2

  耗时:2 秒

  

 

  可以看到耗时变成了 2 秒。

  存在的问题

  看上去CompletableFuture现有功能可以满足我们诉求。但当我们引入一些现实常见情况时,一些潜在的不足便暴露出来了。

  compute(x)如果是一个根据入参查询用户某类型优惠券列表的任务,我们需要查询两种优惠券并组合在一起返回给上游。假如上游要求我们 2 秒内处理完毕并返回结果,但compute(x)耗时却在 0.5 秒 ~ 无穷大波动。这时候我们就需要把耗时过长的compute(x)任务结果放弃,仅处理在指定时间内完成的任务,尽可能保证服务可用。

  那么以上代码的耗时由耗时最长的服务决定,无法满足现有诉求。通常我们会使用get(long timeout, TimeUnit unit)来指定获取结果的超时时间,并且我们会给compute(x)设置一个超时时间,达到后自动抛异常来中断任务。

  

public static void main(String[] args) {

 

   // 仅简单举例,在生产代码中可别这么写!

   // 统计耗时的函数

   time(() - {

   List CompletableFuture Integer result = Stream.of(1, 2)

   // 创建异步任务,compute(x) 超时抛出异常

   .map(x - CompletableFuture.supplyAsync(() - compute(x), executor))

   .toList();

   // 等待结果

   int res = 0;

   for (CompletableFuture Integer future : result) {

   try {

   res += future.get(2, SECONDS);

   } catch (ExecutionException InterruptedException TimeoutException e) {

   System.err.println("任务执行异常或超时");

   System.out.println("结果:" + res);

  [async-2]: 任务执行开始:2

  [async-1]: 任务执行开始:1

  [async-1]: 任务执行完成:1

  任务执行异常或超时

  耗时:2 秒

  

 

  可以看到,只要我们能够给compute(x)设置一个超时时间将任务中断,结合get、getNow等获取结果的方式,就可以很好地管理整体耗时。

  那么问题也就转变成了,如何给任务设置异步超时时间呢?

  当异步任务是一个 RPC 请求时,我们可以设置一个 JSF 超时,以达到异步超时效果。

  当请求是一个 R2M 请求时,我们也可以控制 R2M 连接的最大超时时间来达到效果。

  这么看好像我们都是在依赖三方中间件的能力来管理任务超时时间?那么就存在一个问题,中间件超时控制能力有限,如果异步任务是中间件 IO 操作 + 本地计算操作怎么办?

  用 JSF 超时举一个具体的例子,反编译 JSF 的获取结果代码如下:

  

public V get(long timeout, TimeUnit unit) throws InterruptedException {

 

   // 配置的超时时间

   timeout = unit.toMillis(timeout);

   // 剩余等待时间

   long remaintime = timeout - (this.sentTime - this.genTime);

   if (remaintime = 0L) {

   if (this.isDone()) {

   // 反序列化获取结果

   return this.getNow();

   } else if (this.await(remaintime, TimeUnit.MILLISECONDS)) {

   // 等待时间内任务完成,反序列化获取结果

   return this.getNow();

   this.setDoneTime();

   // 超时抛出异常

   throw this.clientTimeoutException(false);

  

 

  当这个任务刚好卡在超时边缘完成时,这个任务的耗时时间就变成了超时时间 + 获取结果时间。而获取结果(反序列化)作为纯本地计算操作,耗时长短受 CPU 影响较大。

  某些 CPU 使用率高的情况下,就会出现异步任务没能触发抛出异常中断,导致我们无法准确控制超时时间。对上游来说,本次请求全部失败。

  JDK 9

  这类问题非常常见,如大促场景,服务器 CPU 瞬间升高就会出现以上问题。

  那么如何解决呢?其实 JDK 的开发大佬们早有研究。在 JDK 9,CompletableFuture正式提供了orTimeout、completeTimeout方法,来准确实现异步超时控制。

  

public CompletableFuture T orTimeout(long timeout, TimeUnit unit) {

 

   if (unit == null)

   throw new NullPointerException();

   if (result == null)

   whenComplete(new Canceller(Delayer.delay(new Timeout(this), timeout, unit)));

   return this;

  

 

  JDK 9orTimeout其实现原理是通过一个定时任务,在给定时间之后抛出异常。如果任务在指定时间内完成,则取消抛异常的操作。

  以上代码我们按执行顺序来看下:

  首先执行new Timeout(this)。

  

static final class Timeout implements Runnable {

 

   final CompletableFuture ?

   Timeout(CompletableFuture ? f) { this.f = f; }

   public void run() {

   if (f != null !f.isDone())

   // 抛出超时异常

   f.completeExceptionally(new TimeoutException());

  

 

  通过源码可以看到,Timeout是一个实现 Runnable 的类,run()方法负责给传入的异步任务通过completeExceptionallyCAS 赋值异常,将任务标记为异常完成。

  那么谁来触发这个run()方法呢?我们看下Delayer的实现。

  

static final class Delayer {

 

   static ScheduledFuture ? delay(Runnable command, long delay,

   TimeUnit unit) {

   // 到时间触发 command 任务

   return delayer.schedule(command, delay, unit);

   static final class DaemonThreadFactory implements ThreadFactory {

   public Thread newThread(Runnable r) {

   Thread t = new Thread(r);

   t.setDaemon(true);

   t.setName("CompletableFutureDelayScheduler");

   return t;

   static final ScheduledThreadPoolExecutor delayer;

   static {

   (delayer = new ScheduledThreadPoolExecutor(

   1, new DaemonThreadFactory())).

   setRemoveOnCancelPolicy(true);

  

 

  Delayer其实就是一个单例定时调度器,Delayer.delay(new Timeout(this), timeout, unit)通过ScheduledThreadPoolExecutor实现指定时间后触发Timeout的run()方法。

  到这里就已经实现了超时抛出异常的操作。但当任务完成时,就没必要触发Timeout了。因此我们还需要实现一个取消逻辑。

  

static final class Canceller implements BiConsumer Object, Throwable {

 

   final Future ?

   Canceller(Future ? f) { this.f = f; }

   public void accept(Object ignore, Throwable ex) {

   if (ex == null f != null !f.isDone())

   // 3 未触发抛异常任务则取消

   f.cancel(false);

  

 

  当任务执行完成,或者任务执行异常时,我们也就没必要抛出超时异常了。因此我们可以把delayer.schedule(command, delay, unit)返回的定时超时任务取消,不再触发Timeout。 当我们的异步任务完成,并且定时超时任务未完成的时候,就是我们取消的时机。因此我们可以通过whenComplete(BiConsumer ? super T, ? super Throwable action)来完成。

  Canceller就是一个BiConsumer的实现。其持有了delayer.schedule(command, delay, unit)返回的定时超时任务,accept(Object ignore, Throwable ex)实现了定时超时任务未完成后,执行cancel(boolean mayInterruptIfRunning)取消任务的操作。

  JDK 8

  如果我们使用的是 JDK 9 或以上,我们可以直接用 JDK 的实现来完成异步超时操作。那么 JDK 8 怎么办呢?

  其实我们也可以根据上述逻辑简单实现一个工具类来辅助。

  以下是我们营销自己的工具类以及用法,贴出来给大家作为参考,大家也可以自己写的更优雅一些~

  调用方式:

  

CompletableFutureExpandUtils.orTimeout(异步任务, 超时时间, 时间单位);

 

  

 

  工具类源码:

  

package com.jd.jr.market.reduction.util;

 

  import com.jdpay.market.common.exception.UncheckedException;

  import java.util.concurrent.*;

  import java.util.function.BiConsumer;

   * CompletableFuture 扩展工具

   * @author zhangtianci7

  public class CompletableFutureExpandUtils {

   * 如果在给定超时之前未完成,则异常完成此 CompletableFuture 并抛出 {@link TimeoutException} 。

   * @param timeout 在出现 TimeoutException 异常完成之前等待多长时间,以 {@code unit} 为单位

   * @param unit 一个 {@link TimeUnit},结合 {@code timeout} 参数,表示给定粒度单位的持续时间

   * @return 入参的 CompletableFuture

   public static T CompletableFuture T orTimeout(CompletableFuture T future, long timeout, TimeUnit unit) {

   if (null == unit) {

   throw new UncheckedException("时间的给定粒度不能为空");

   if (null == future) {

   throw new UncheckedException("异步任务不能为空");

   if (future.isDone()) {

   return future;

   return future.whenComplete(new Canceller(Delayer.delay(new Timeout(future), timeout, unit)));

   * 超时时异常完成的操作

   static final class Timeout implements Runnable {

   final CompletableFuture ? future;

   Timeout(CompletableFuture ? future) {

   this.future = future;

   public void run() {

   if (null != future !future.isDone()) {

   future.completeExceptionally(new TimeoutException());

   * 取消不需要的超时的操作

   static final class Canceller implements BiConsumer Object, Throwable {

   final Future ? future;

   Canceller(Future ? future) {

   this.future = future;

   public void accept(Object ignore, Throwable ex) {

   if (null == ex null != future !future.isDone()) {

   future.cancel(false);

   * 单例延迟调度器,仅用于启动和取消任务,一个线程就足够

   static final class Delayer {

   static ScheduledFuture ? delay(Runnable command, long delay, TimeUnit unit) {

   return delayer.schedule(command, delay, unit);

   static final class DaemonThreadFactory implements ThreadFactory {

   public Thread newThread(Runnable r) {

   Thread t = new Thread(r);

   t.setDaemon(true);

   t.setName("CompletableFutureExpandUtilsDelayScheduler");

   return t;

   static final ScheduledThreadPoolExecutor delayer;

   static {

   delayer = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory());

   delayer.setRemoveOnCancelPolicy(true);

  

 

  JEP 266: JDK 9 并发包更新提案

  以上就是Java CompletableFuture 异步超时实现探索()的详细内容,想要了解更多 Java CompletableFuture 异步超时实现探索的内容,请持续关注盛行IT软件开发工作室。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: