sách gpt4 ăn đã đi

Khám phá triển khai thời gian chờ không đồng bộ JavaCompletableFuture

In lại Tác giả: Tôi là chú chim nhỏ Thời gian cập nhật: 2023-02-08 22:31:31 26 4
mua khóa gpt4 giày nike

Tác giả: Zhang Tianci của JD Technology.

Lời nói đầu

JDK 8 là bản nâng cấp phiên bản chính bổ sung nhiều tính năng mới, một trong số đó là CompleteableFuture. Kể từ đó, mô hình lập trình không đồng bộ dựa trên sự kiện đã thực sự được hỗ trợ ở cấp JDK, bù đắp cho những thiếu sót của Future.

Trong tối ưu hóa hàng ngày của chúng tôi, phương pháp được sử dụng phổ biến nhất là thực thi song song đa luồng. Lần này sẽ liên quan đến việc sử dụng CompleteableFuture.

Cách sử dụng phổ biến

Đây là một ví dụ về một kịch bản phổ biến.

Nếu chúng ta có hai dịch vụ cuộc gọi từ xa RPC, chúng ta cần lấy kết quả của hai RPC trước khi thực hiện xử lý logic tiếp theo.

                        
                          public static void main(String[] args) { // Nhiệm vụ A, mất 2 giây int resultA = tính toán (1); // Nhiệm vụ B, mất 2 giây int resultB = tính toán (2); Hệ thống xử lý logic nghiệp vụ tiếp theo .out.println(kết quảA + kết quảB }

                        
                      

Có thể ước tính rằng quá trình thực hiện nối tiếp sẽ mất ít nhất 4 giây và tác vụ B không phụ thuộc vào kết quả của tác vụ A.

Đối với kịch bản này, chúng tôi thường chọn tối ưu hóa song song. Mã Demo như sau:

                        
                          public static void main(String[] args) { // Chỉ là một ví dụ đơn giản, đừng viết như thế này trong mã sản xuất! // Hàm tốn thời gian time(() -> { CompleteableFuture result = Stream.of(1, 2) // Tạo một task.map(x -> CompletableFuture.supplyAsync(() -> Calculate(x) không đồng bộ ), executor)) // Aggregation.reduce(CompletableFuture.completedFuture(0), (x, y) -> x.thenCombineAsync(y, Integer::sum, executor)); // Đợi kết quả thử { System.out.println("Result: " + result.get()); } Catch (ExecutionException | InterruptedException e) { System.err.println( "Ngoại lệ thực thi tác vụ"); } }); } Đầu ra: [async-1]: Đã bắt đầu thực hiện tác vụ: 1 [async-2]: Đã bắt đầu thực hiện tác vụ: 2 [async-1]: Đã hoàn thành thực hiện tác vụ: 1 [async-2]: Đã hoàn thành thực hiện tác vụ: 2 Kết quả: 3 Thời gian đã trôi qua: 2 giây

                        
                      

Bạn có thể thấy rằng thời gian thực hiện đã trở thành 2 giây.

Vấn đề

phân tích

Có vẻ như các chức năng hiện có của CompleteableFuture có thể đáp ứng được nhu cầu của chúng tôi. Nhưng khi chúng tôi giới thiệu một số tình huống thực tế phổ biến, một số khuyết điểm tiềm ẩn sẽ bộc lộ.

compute(x)  如果是一个根据入参查询用户某类型优惠券列表的任务,我们需要查询两种优惠券并组合在一起返回给上游。假如上游要求我们 2 秒内处理完毕并返回结果,但  compute(x)  耗时却在 0.5 秒 ~ 无穷大波动。这时候我们就需要把耗时过长的  compute(x)  任务结果放弃,仅处理在指定时间内完成的任务,尽可能保证服务可用.

那么以上代码的耗时由耗时最长的服务决定,无法满足现有诉求。通常我们会使用  get(long timeout, TimeUnit unit)  来指定获取结果的超时时间,并且我们会给  compute(x)  设置一个超时时间,达到后自动抛异常来中断任务.

                        
                          public static void main(String[] args) { // 仅简单举例,在生产代码中可别这么写! // 统计耗时的函数 time(() -> { List<>> result = Stream.of(1, 2) // 创建异步任务,compute(x) 超时抛出异常 .map(x -> CompletableFuture.supplyAsync(() -> compute(x), executor)) .toList(); // 等待结果 int res = 0; for (CompletableFuture future : result) { try { res += future.get(2, SECONDS); } catch (ExecutionException | InterruptedException | TimeoutException e) { System.err.println("任务执行异常或超时"); } } System.out.println("结果:" + res); }); } 输出: [async-2]: 任务执行开始:2 [async-1]: 任务执行开始:1 [async-1]: 任务执行完成:1 任务执行异常或超时 结果:1 耗时:2 秒

                        
                      

可以看到,只要我们能够给  compute(x)  设置一个超时时间将任务中断,结合  get 、 getNow  等获取结果的方式,就可以很好地管理整体耗时.

那么问题也就转变成了, 如何给任务设置异步超时时间呢 ?

现有做法

当异步任务是一个 RPC 请求时,我们可以设置一个 JSF 超时,以达到异步超时效果.

当请求是一个 R2M 请求时,我们也可以控制 R2M 连接的最大超时时间来达到效果.

这么看好像我们都是在依赖三方中间件的能力来管理任务超时时间?那么就存在一个问题,中间件超时控制能力有限,如果异步任务是中间件 IO 操作 + 本地计算操作怎么办?

用 JSF 超时举一个具体的例子,反编译 JSF 的获取结果代码如下:

                        
                          public V get(long timeout, TimeUnit unit) throws InterruptedException { // 配置的超时时间 timeout = unit.toMillis(timeout); // 剩余等待时间 long remaintime = timeout - (this.sentTime - this.genTime); if (remaintime <= 0L) { if (this.isDone()) { // 反序列化获取结果 return this.getNow(); } } else if (this.await(remaintime, TimeUnit.MILLISECONDS)) { // 等待时间内任务完成,反序列化获取结果 return this.getNow(); } this.setDoneTime(); // 超时抛出异常 throw this.clientTimeoutException(false); }

                        
                      

当这个任务刚好卡在超时边缘完成时,这个任务的耗时时间就变成了超时时间 + 获取结果时间。而获取结果(反序列化)作为纯本地计算操作,耗时长短受 CPU 影响较大.

某些 CPU 使用率高的情况下,就会出现异步任务没能触发抛出异常中断,导致我们无法准确控制超时时间。对上游来说,本次请求全部失败.

解决方式

JDK 9

这类问题非常常见,如大促场景,服务器 CPU 瞬间升高就会出现以上问题.

那么如何解决呢?其实 JDK 的开发大佬们早有研究。在 JDK 9, CompletableFuture  正式提供了  orTimeout 、 completeTimeout  方法,来准确实现异步超时控制.

                        
                          public CompletableFuture orTimeout(long timeout, TimeUnit unit) { if (unit == null) throw new NullPointerException(); if (result == null) whenComplete(new Canceller(Delayer.delay(new Timeout(this), timeout, unit))); return this; }

                        
                      

JDK 9  orTimeout  其实现原理是通过一个定时任务,在给定时间之后抛出异常。如果任务在指定时间内完成,则取消抛异常的操作.

以上代码我们按执行顺序来看下:

首先执行  new Timeout(this) .

                        
                          static final class Timeout implements Runnable { final CompletableFuture f; Timeout(CompletableFuture f) { this.f = f; } public void run() { if (f != null && !f.isDone()) // 抛出超时异常 f.completeExceptionally(new TimeoutException()); } }

                        
                      

通过源码可以看到, Timeout  是一个实现 Runnable 的类, run()  方法负责给传入的异步任务通过  completeExceptionally  CAS 赋值异常,将任务标记为异常完成.

那么谁来触发这个  run()  方法呢?我们看下  Delayer  的实现.

                        
                          static final class Delayer { static ScheduledFuture delay(Runnable command, long delay, TimeUnit unit) { // 到时间触发 command 任务 return delayer.schedule(command, delay, unit); } static final class DaemonThreadFactory implements ThreadFactory { public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); t.setName("CompletableFutureDelayScheduler"); return t; } } static final ScheduledThreadPoolExecutor delayer; static { (delayer = new ScheduledThreadPoolExecutor( 1, new DaemonThreadFactory())). setRemoveOnCancelPolicy(true); } }

                        
                      

Delayer  其实就是一个单例定时调度器, Delayer.delay(new Timeout(this), timeout, unit)  通过  ScheduledThreadPoolExecutor  实现指定时间后触发  Timeout  的  run()  方法.

到这里就已经实现了超时抛出异常的操作。但当任务完成时,就没必要触发  Timeout  了。因此我们还需要实现一个取消逻辑.

                        
                          static final class Canceller implements BiConsumer { final Future f; Canceller(Future f) { this.f = f; } public void accept(Object ignore, Throwable ex) { if (ex == null && f != null && !f.isDone()) // 3 未触发抛异常任务则取消 f.cancel(false); } }

                        
                      

当任务执行完成,或者任务执行异常时,我们也就没必要抛出超时异常了。因此我们可以把  delayer.schedule(command, delay, unit)  返回的定时超时任务取消,不再触发  Timeout 。 当我们的异步任务完成,并且定时超时任务未完成的时候,就是我们取消的时机。因此我们可以通过  whenComplete(BiConsumer action)  来完成.

Canceller  就是一个  BiConsumer  的实现。其持有了  delayer.schedule(command, delay, unit)  返回的定时超时任务, accept(Object ignore, Throwable ex)  实现了定时超时任务未完成后,执行  cancel(boolean mayInterruptIfRunning)  取消任务的操作.

JDK 8

如果我们使用的是 JDK 9 或以上,我们可以直接用 JDK 的实现来完成异步超时操作。那么 JDK 8 怎么办呢?

其实我们也可以根据上述逻辑简单实现一个工具类来辅助.

以下是我们营销自己的工具类以及用法,贴出来给大家作为参考,大家也可以自己写的更优雅一些~ 。

调用方式:

                        
                          CompletableFutureExpandUtils.orTimeout(异步任务, 超时时间, 时间单位);

                        
                      

工具类源码:

                        
                          package com.jd.jr.market.reduction.util; import com.jdpay.market.common.exception.UncheckedException; import java.util.concurrent.*; import java.util.function.BiConsumer; /** * CompletableFuture 扩展工具 * * @author zhangtianci7 */ public class CompletableFutureExpandUtils { /** * 如果在给定超时之前未完成,则异常完成此 CompletableFuture 并抛出 {@link TimeoutException} 。 * * @param timeout 在出现 TimeoutException 异常完成之前等待多长时间,以 {@code unit} 为单位 * @param unit 一个 {@link TimeUnit},结合 {@code timeout} 参数,表示给定粒度单位的持续时间 * @return 入参的 CompletableFuture */ public static  CompletableFuture orTimeout(CompletableFuture future, long timeout, TimeUnit unit) { if (null == unit) { throw new UncheckedException("时间的给定粒度不能为空"); } if (null == future) { throw new UncheckedException("异步任务不能为空"); } if (future.isDone()) { return future; } return future.whenComplete(new Canceller(Delayer.delay(new Timeout(future), timeout, unit))); } /** * 超时时异常完成的操作 */ static final class Timeout implements Runnable { final CompletableFuture future; Timeout(CompletableFuture future) { this.future = future; } public void run() { if (null != future && !future.isDone()) { future.completeExceptionally(new TimeoutException()); } } } /** * 取消不需要的超时的操作 */ static final class Canceller implements BiConsumer { final Future future; Canceller(Future future) { this.future = future; } public void accept(Object ignore, Throwable ex) { if (null == ex && null != future && !future.isDone()) { future.cancel(false); } } } /** * 单例延迟调度器,仅用于启动和取消任务,一个线程就足够 */ static final class Delayer { static ScheduledFuture delay(Runnable command, long delay, TimeUnit unit) { return delayer.schedule(command, delay, unit); } static final class DaemonThreadFactory implements ThreadFactory { public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); t.setName("CompletableFutureExpandUtilsDelayScheduler"); return t; } } static final ScheduledThreadPoolExecutor delayer; static { delayer = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory()); delayer.setRemoveOnCancelPolicy(true); } } }

                        
                      

Tài liệu tham khảo

  1. JEP 266: JDK 9 并发包更新提案

最后此篇关于JavaCompletableFuture异步超时实现探索的文章就讲到这里了,如果你想了解更多关于JavaCompletableFuture异步超时实现探索的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。

26 4 0
tôi là một con chim nhỏ
Hồ sơ

Tôi là một lập trình viên xuất sắc, rất giỏi!

Nhận phiếu giảm giá taxi Didi miễn phí
Phiếu giảm giá taxi Didi
Chứng chỉ ICP Bắc Kinh số 000000
Hợp tác quảng cáo: 1813099741@qq.com 6ren.com
Xem sitemap của VNExpress