Advertisement

异步超时中断,知其然,也要知其所以然~

阅读量:

异步编排

在业务开发过程中,我们通常会使用线程池来处理多线程数据获取逻辑,并确保数据获取过程中的同步与阻塞问题得到处理,并完成相关业务逻辑的设计。

常见的使用方法如下:

Future

复制代码
 @Slf4j

    
 @SpringBootTest
    
 public class OtherTest {
    
 ​
    
    public static final ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS,
    
          new LinkedBlockingQueue<>(100));
    
 ​
    
    public static void main(String[] args) {
    
 ​
    
 ​
    
       Future<Integer> submit1 = executor.submit(() -> {
    
          // 业务耗时逻辑1
    
          return 1;
    
       });
    
 ​
    
       Future<Integer> submit2 = executor.submit(() -> {
    
          // 业务耗时逻辑2
    
          return 2;
    
       });
    
 ​
    
       Future<Integer> submit3 = executor.submit(() -> {
    
          // 业务耗时逻辑3
    
          return 3;
    
       });
    
 ​
    
       try {
    
          Integer integer1 = submit1.get();
    
          Integer integer2 = submit2.get();
    
          Integer integer3 = submit3.get();
    
 ​
    
          System.out.println(integer1);
    
          System.out.println(integer2);
    
          System.out.println(integer3);
    
       } catch (Exception e) {
    
          e.printStackTrace();
    
       }
    
 ​
    
    }
    
 ​
    
 }
    
 复制代码

假设一个接口涉及到3个业务逻辑,如下:

  • 业务逻辑1耗时: 50ms
  • 业务逻辑2耗时: 30ms
  • 业务逻辑3耗时: 70ms

那么如果是传统的串行调用 ,接口总耗时:150ms

当采用多线程机制来实现调用时,则该接口的时间开销将由执行时间最长的业务流程所决定;具体而言,在这种情况下,该接口的时间开销为:70ms

可以看到,接口耗时是有明显降低的~


CompletableFuture

不过,在对接口进行异步编排后,虽然 interfaces 的运行时间有所下降;但如果我们拥有多个 time-consuming 的业务逻辑,并且这些业务逻辑之间相互依赖的话?那该怎么办呢?

不言而喻的是,在之前的实现中使用Future已经无法满足当前的需求。实际上,在Java 8及更高版本的JDK中,默认引入了新的工具类——...。这一改进极大地方便了我们进行异步处理。

复制代码
 @Slf4j

    
 @SpringBootTest
    
 public class OtherTest {
    
 ​
    
    public static final ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS,
    
          new LinkedBlockingQueue<>(100));
    
 ​
    
    public static void main(String[] args) {
    
 ​
    
 ​
    
       CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
    
          // 业务耗时逻辑1
    
          return 1;
    
       }, executor);
    
 ​
    
       CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
    
          // 业务耗时逻辑2
    
          return 2;
    
       }, executor);
    
 ​
    
       CompletableFuture<Integer> completableFuture3 = CompletableFuture.supplyAsync(() -> {
    
          // 业务耗时逻辑3
    
          return 3;
    
       }, executor);
    
 ​
    
       try {
    
          // 等待任务全部执行完毕
    
          CompletableFuture.allOf(completableFuture1, completableFuture2, completableFuture3).get();
    
 ​
    
          System.out.println(completableFuture1.get());
    
          System.out.println(completableFuture2.get());
    
          System.out.println(completableFuture3.get());
    
 ​
    
       } catch (Exception e) {
    
          e.printStackTrace();
    
       }
    
 ​
    
    }
    
 ​
    
 }
    
 复制代码

因为案例较为简单,
难以体现CompletableFuture编排能力相比于Future的独特优势之处,
后续文章中将对此进行详细讲解,
不在本文讨论范围内。


超时中断

在上述案例中

此行为本质上是一种高度危险的做法。(可能导致)当下游rpc接口出现波动时(会导致)响应时间显著增加(而导致)系统资源紧张(进而导致)线程池中的资源难以及时释放(结果是)系统可能会因超负荷运行而拒绝接收新的请求(最终影响到的是)用户体验以及相关的工作状况

因此,在获取任务结果的过程中应预留必要的等待时间以避免可能出现的问题。

FutureCompletableFutureget方法都支持传入等待时间~


Future超时中断机制

Future包含了一个名为get的方法,在请求完成后会阻塞地执行任务获取,并且能够接受超时时间参数。接下来让我们深入了解一下源码。

复制代码
 public V get(long timeout, TimeUnit unit)

    
   throws InterruptedException, ExecutionException, TimeoutException {
    
   // 参数校验
    
   if (unit == null)
    
     throw new NullPointerException();
    
   
    
   int s = state;
    
   
    
   // 阻塞等待,如果超过超时时间任务还未完成,那么抛出超时异常
    
   if (s <= COMPLETING &&
    
       (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
    
     throw new TimeoutException();
    
   return report(s);
    
 }
    
 复制代码

阻塞等待,timedtrue代表存在超时时间

复制代码
 private int awaitDone(boolean timed, long nanos)

    
     throws InterruptedException {
    
   
    
     long startTime = 0L;
    
     WaitNode q = null;
    
     boolean queued = false;
    
     for (;;) {
    
         int s = state;
    
         // 任务状态 > COMPLETING说明已经执行完毕
    
         if (s > COMPLETING) {
    
             // 当前线程不用等待了,将等待节点里的Thread设置为null
    
             if (q != null)
    
                 q.thread = null;
    
             return s;
    
         }
    
         else if (s == COMPLETING)
    
             // COMPLETING是任务执行完毕到真正将任务设置为完成态的一个中间状态
    
             // 当任务的处于COMPLETING时,说明任务已经执行完了,但此时cpu时间不够没有继续执行
    
             // 此时需要yield一下,让其他线程执行,从而将任务正确设置为完成状态
    
             Thread.yield();
    
         else if (Thread.interrupted()) {
    
             // 如果当前线程被打断了,则把当前线程从等待该任务完成的阻塞线程链表中删除
    
             removeWaiter(q);
    
             // 抛出打断异常
    
             throw new InterruptedException();
    
         }
    
         else if (q == null) {
    
             // 如果是超时等待,且等待时间<=0,则直接返回当前任务状态
    
             if (timed && nanos <= 0L)
    
                 return s;
    
             // 初始化一个等待当前任务执行完的节点,内部包含
    
             q = new WaitNode();
    
         }
    
         else if (!queued)
    
             // 将WaitNode排队到线程等待链表中
    
             queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
    
         else if (timed) {
    
             // 阻塞等待,存在超时时间
    
             final long parkNanos;
    
             if (startTime == 0L) { // first time
    
                 startTime = System.nanoTime();
    
                 if (startTime == 0L)
    
                     startTime = 1L;
    
                 parkNanos = nanos;
    
             } else {
    
                 long elapsed = System.nanoTime() - startTime;
    
                 if (elapsed >= nanos) {
    
                     removeWaiter(q);
    
                     return state;
    
                 }
    
                 parkNanos = nanos - elapsed;
    
             }
    
             if (state < COMPLETING)
    
                 LockSupport.parkNanos(this, parkNanos);
    
         }
    
         else
    
             // 阻塞等待,没有超时时间
    
             LockSupport.park(this);
    
     }
    
 }
    
 复制代码

上面源码注释已经比较完善了,但我们还是要总结一下

  • 任务COMPLETING状态被定义为从执行完毕到正式将其设为完成态的一个过渡阶段(参考FutureTask.run()的具体实现)
    • get()方法无论在超时时间设置与否的情况下,默认都会通过LockSupport的park()与unpark()机制来实现阻塞效果
    • 每个任务都会自动维持一条专门用于跟踪当前任务进展的一条线程链表waiters

CompletableFuture超时中断机制

JDK 9版本起,支持了新的异步超时功能模块,并用于实现异步操作的超时处理。

复制代码
 CompletableFuture.allOf(completableFuture1, completableFuture2, completableFuture3).orTimeout(1, TimeUnit.SECONDS).get();

    
 复制代码

根据上面代码的编写情况可知,在这些completableFuture1, comple CompletableFuture2, completableFuture3这三个任务之间分别用逗号分隔的情况下,并列结构中的每个任务都会耗时1秒等待完毕

如果超过1秒 ,则会抛出java.util.concurrent.TimeoutException

源码如下:

复制代码
 public CompletableFuture<T> orTimeout(long timeout, TimeUnit unit) {

    
     if (unit == null)
    
         throw new NullPointerException();
    
     if (result == null)
    
         whenComplete(new Canceller(Delayer.delay(new Timeout(this),
    
                                                  timeout, unit)));
    
     return this;
    
 }
    
 ​
    
 public CompletableFuture<T> completeOnTimeout(T value, long timeout,
    
                                               TimeUnit unit) {
    
   if (unit == null)
    
     throw new NullPointerException();
    
   if (result == null)
    
     whenComplete(new Canceller(Delayer.delay(
    
       new DelayedCompleter<T>(this, value),
    
       timeout, unit)));
    
   return this;
    
 }
    
 复制代码
复制代码
 static final class Delayer {

    
   static ScheduledFuture<?> delay(Runnable command, long delay,
    
                                   TimeUnit unit) {
    
     // 延时任务
    
     return delayer.schedule(command, delay, unit);
    
   }
    
   
    
   static final ScheduledThreadPoolExecutor delayer;
    
   static {
    
     // 单线程
    
     (delayer = new ScheduledThreadPoolExecutor(
    
       1, new DaemonThreadFactory())).
    
       setRemoveOnCancelPolicy(true);
    
   }
    
 }
    
 ​
    
 ​
    
 static final class Timeout implements Runnable {
    
   final CompletableFuture<?> f;
    
   Timeout(CompletableFuture<?> f) { this.f = f; }
    
   public void run() {
    
     // 如果CompletableFuture不为null,且定时任务没有被取消
    
     if (f != null && !f.isDone())
    
       // 设置超时异常
    
       f.completeExceptionally(new TimeoutException());
    
   }
    
 }
    
 ​
    
 static final class DelayedCompleter<U> implements Runnable {
    
   final CompletableFuture<U> f;
    
   final U u;
    
   DelayedCompleter(CompletableFuture<U> f, U u) { this.f = f; this.u = u; }
    
   public void run() {
    
     if (f != null)
    
       // 将任务结果设置为我们给定的value
    
       f.complete(u);
    
   }
    
 }
    
 复制代码
复制代码
 static final class Canceller implements BiConsumer<Object, Throwable> {

    
     final Future<?> f;
    
     Canceller(Future<?> f) { this.f = f; }
    
     public void accept(Object ignore, Throwable ex) {
    
         // 如果没有异常,且超时任务存在且没有被取消,那么则取消超时任务
    
         // 因为此时说明,CompletableFuture的任务在超时时间内完成了,则不需要在监控超时
    
         if (ex == null && f != null && !f.isDone())
    
             f.cancel(false);
    
     }
    
 }
    
 复制代码

通过对上面源码的了解,我们可以知道

CompletableFuture的方法如orTimeoutcompleteOnTimeout其内核本质上依赖于ScheduledThreadPoolExecutor来完成这些操作

当我们在一个CompletableFuture上设置了超时时间后,默认情况下底层实际上会启动一个基于ScheduledThreadPoolExecutor的延时任务。其中所设定的时间即为我们所说的超时时间。这种情况下通常分为两种情况:一种是执行成功并返回结果的时间小于或等于设置的超时时间;另一种是由于长时间未完成而导致的任务自动取消并重新发起执行的情况。

如果在超时时间内完成任务,则将在任务完成后调用cancel(false)来取消延时任务。
当某个任务的执行时间超过预设的超时时间,则对该异常进行捕获并处理。

Future cancel原理

另外,我们还能看到,CompletableFuture 的延时任务并没有进行try-catch,此处可以了解下->ScheduledThreadPoolExecutor有坑嗷~

orTimeoutcompleteOnTimeout的区别就在于

  • 如果是orTimeout的情况,则会在发生超时的情况下触发异常。
    • 如果是completeOnTimeout的情形,则会不触发异常并直接将任务的结果赋值为传入的参数value

扩展知识点

当我们深入探究;CompletableFuture;;orTimeout;;completeOnTimeout;方法时,在深入了解其内部实现原理后发现其核心组件通常基于;ScheduledThreadPoolExecutor;这一单线程执行器设计而成;然而通过细致分析源码后发现该技术的核心组件通常基于;ScheduledThreadPoolExecutor;这一单线程执行器设计而成;

复制代码
 static final ScheduledThreadPoolExecutor delayer;

    
 static {
    
     (delayer = new ScheduledThreadPoolExecutor(
    
         1, new DaemonThreadFactory())).
    
         setRemoveOnCancelPolicy(true);
    
 }
    
 复制代码

当存在大量设置有不同超时时间的CompletableFuture实例发生时,在其为单线程机制的情况下可能导致的任务预期超时值通常设定为1000ms;然而,在实际运行中可能会遇到队列等待现象导致真正完成任务的时间会超过预期。

具体来说,在配置 orTimeoutcompleteOnTimeout 时所设定的超时时间并不会达到很高的精准度

全部评论 (0)

还没有任何评论哟~