核心内容摘要
如何让淘宝任务自动化?淘金币助手的5大创新应用
在Java应用开发中为了提升系统性能和响应速度我们经常需要将一些耗时操作如调用外部API、查询数据库、复杂计算等进行异步并行处理。
当主流程需要等待所有这些并行任务执行完毕后再继续时我们通常会用到ExecutorService、CountDownLatch等并发工具。
然而直接使用这些原生工具往往意味着需要编写一些重复的、模式化的“胶水代码”这不仅增加了代码量也让核心业务逻辑显得不够清晰。
为了解决这个问题我封装了一个名为LatchUtils的轻量级工具类。
它能够以一种极其简洁的方式来组织和管理这一类异步任务。
详细代码其代码如下后面会有使用说明和示例以及和传统实现代码的对比import java.util.LinkedList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; public class LatchUtils { private static final ThreadLocalListTaskInfo THREADLOCAL ThreadLocal.withInitial(LinkedList::new); public static void submitTask(Executor executor, Runnable runnable) { THREADLOCAL.get().add(new TaskInfo(executor, runnable)); } private static ListTaskInfo popTask() { ListTaskInfo taskInfos THREADLOCAL.get(); THREADLOCAL.remove(); return taskInfos; } public static boolean waitFor(long timeout, TimeUnit timeUnit) { ListTaskInfo taskInfos popTask(); if (taskInfos.isEmpty()) { return true; } CountDownLatch latch new CountDownLatch(taskInfos.size()); for (TaskInfo taskInfo : taskInfos) { Executor executor taskInfo.executor; Runnable runnable taskInfo.runnable; executor.execute(() - { try { runnable.run(); } finally { latch.countDown(); } }); } boolean await false; try { await latch.await(timeout, timeUnit); } catch (Exception ignored) { } return await; } private static final class TaskInfo { private final Executor executor; private final Runnable runnable; public TaskInfo(Executor executor, Runnable runnable) { this.executor executor; this.runnable runnable; } } }核心思想LatchUtils的设计哲学是多次提交一次等待。
•任务注册:在主流程代码中可以先通过LatchUtils.submitTask()提交Runnable任务和其对应的Executor该线程池用来执行这个Runnable。
•执行并等待:当并行任务都提交完毕后你只需调用一次LatchUtils.waitFor()。
关注工众号码猿技术专栏回复关键词1111 获取阿里内部Java性能调优手册该方法会立即触发所有已注册任务的执行并阻塞等待所有任务执行完成或超时。
API 概览这个工具类对外暴露的接口极其简单只有两个核心静态方法submitTask()public static void submitTask(Executor executor, Runnable runnable)功能:提交一个异步任务。
参数:•executor:java.util.concurrent.Executor- 指定执行此任务的线程池。
•runnable:java.lang.Runnable- 需要异步执行的具体业务逻辑。
waitFor()public static boolean waitFor(long timeout, TimeUnit timeUnit)功能:触发所有已提交任务的执行并同步等待它们全部完成。
参数:•timeout:long- 最长等待时间。
•timeUnit:java.util.concurrent.TimeUnit- 等待时间单位。
返回值:•true:如果所有任务在指定时间内成功完成。
•false:如果等待超时。
注意:该方法在执行后会自动清理当前线程提交的任务列表因此可以重复使用。
实战示例让我们来看一个典型的应用场景一个聚合服务需要同时调用用户服务、订单服务和商品服务拿到所有结果后再进行下一步处理。
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class Main { public static void main(String[] args) { //
准备一个线程池 ExecutorService executorService Executors.newFixedThreadPool(
; System.out.println(主流程开始准备分发异步任务...); //
提交多个异步任务 // 任务一获取用户信息 LatchUtils.submitTask(executorService, () - { try { System.out.println(开始获取用户信息...); Thread.sleep(
; // 模拟耗时 System.out.println(获取用户信息成功); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); // 任务二获取订单信息 LatchUtils.submitTask(executorService, () - { try { System.out.println(开始获取订单信息...); Thread.sleep(
; // 模拟耗时 System.out.println(获取订单信息成功); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); // 任务三获取商品信息 LatchUtils.submitTask(executorService, () - { try { System.out.println(开始获取商品信息...); Thread.sleep(
; // 模拟耗时 System.out.println(获取商品信息成功); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); System.out.println(所有异步任务已提交主线程开始等待...); //
等待所有任务完成最长等待5秒 boolean allTasksCompleted LatchUtils.waitFor(5, TimeUnit.SECONDS); //
根据等待结果继续主流程 if (allTasksCompleted) { System.out.println(所有异步任务执行成功主流程继续...); } else { System.err.println(有任务执行超时主流程中断); } //
关闭线程池 executorService.shutdown(); } }输出结果:主流程开始准备分发异步任务... 所有异步任务已提交主线程开始等待... 开始获取商品信息... 开始获取用户信息... 开始获取订单信息... 获取商品信息成功 获取用户信息成功 获取订单信息成功 所有异步任务执行成功主流程继续...从这个例子中可以看到业务代码变得非常清晰。
我们只需要关注“提交任务”和“等待结果”这两个动作而无需关心CountDownLatch的初始化、countDown()的调用以及异常处理等细节。
对比如果不使用 LatchUtils为了更好地理解LatchUtils带来的价值让我们看看要实现与上面完全相同的功能用传统的Java并发API需要如何编写代码。
通常有两种主流方式使用CountDownLatch或使用CompletableFuture。
方式一直接使用 CountDownLatch这是最经典的方式开发者需要手动管理CountDownLatch的生命周期。
import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class ManualCountDownLatchExample { public static void main(String[] args) { //
准备一个线程池 ExecutorService executorService Executors.newFixedThreadPool(
; //
手动初始化 CountDownLatch数量为任务数 CountDownLatch latch new CountDownLatch(
; System.out.println(主流程开始准备分发异步任务...); //
提交任务并在每个任务的 finally 块中手动调用 latch.countDown() // 任务一获取用户信息 executorService.execute(() - { try { System.out.println(开始获取用户信息...); Thread.sleep(
; System.out.println(获取用户信息成功); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { latch.countDown(); // 手动减一 } }); // 任务二获取订单信息 executorService.execute(() - { try { System.out.println(开始获取订单信息...); Thread.sleep(
; System.out.println(获取订单信息成功); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { latch.countDown(); // 手动减一 } }); // 任务三获取商品信息 executorService.execute(() - { try { System.out.println(开始获取商品信息...); Thread.sleep(
; System.out.println(获取商品信息成功); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { latch.countDown(); // 手动减一 } }); System.out.println(所有异步任务已提交主线程开始等待...); //
手动调用 latch.await() 进行等待 boolean allTasksCompleted false; try { allTasksCompleted latch.await(5, TimeUnit.SECONDS); } catch (InterruptedException e) { // 需要处理中断异常 Thread.currentThread().interrupt(); System.err.println(主线程在等待时被中断); } //
根据等待结果继续主流程 if (allTasksCompleted) { System.out.println(所有异步任务执行成功主流程继续...); } else { System.err.println(有任务执行超时主流程中断); } //
关闭线程池 executorService.shutdown(); } }方式二使用 CompletableFuture使用CompletableFuture实现其代码如下import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class CompletableFutureExample { public static void main(String[] args) { //
准备一个线程池 ExecutorService executorService Executors.newFixedThreadPool(
; System.out.println(主流程开始准备分发异步任务...); //
创建 CompletableFuture 任务 CompletableFutureVoid userFuture CompletableFuture.runAsync(() - { try { System.out.println(开始获取用户信息...); Thread.sleep(
; System.out.println(获取用户信息成功); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }, executorService); CompletableFutureVoid orderFuture CompletableFuture.runAsync(() - { try { System.out.println(开始获取订单信息...); Thread.sleep(
; System.out.println(获取订单信息成功); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }, executorService); CompletableFutureVoid productFuture CompletableFuture.runAsync(() - { try { System.out.println(开始获取商品信息...); Thread.sleep(
; System.out.println(获取商品信息成功); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }, executorService); System.out.println(所有异步任务已提交主线程开始等待...); //
使用 CompletableFuture.allOf 将所有任务组合起来 CompletableFutureVoid allFutures CompletableFuture.allOf(userFuture, orderFuture, productFuture); //
等待组合后的 Future 完成 try { allFutures.get(5, TimeUnit.SECONDS); System.out.println(所有异步任务执行成功主流程继续...); } catch (Exception e) { // 需要处理多种异常如 InterruptedException, ExecutionException, TimeoutException System.err.println(任务执行超时或出错主流程中断 e.getMessage()); } //
关闭线程池 executorService.shutdown(); } }对比分析特性LatchUtils手动CountDownLatchCompletableFuture.allOf代码简洁性极高。
业务逻辑和并发控制分离核心代码清晰。
中等。
需要在每个任务中嵌入latch.countDown()分散了关注点。
较高。
链式调用风格但需要创建多个Future对象。
状态管理自动。
工具类内部自动管理CountDownLatch。
手动。
需要自己创建、维护和传递CountDownLatch实例。
自动。
由CompletableFuture框架管理任务状态。
错误处理简化。
waitFor内部处理InterruptedException仅返回布尔值。
复杂。
需要显式地在finally中countDown()并为主线程的await()处理InterruptedException。
复杂。
get()方法会抛出多种受检异常需要统一处理。
关注点分离优秀。
开发者只需关注“提交”和“等待”两个动作。
一般。
并发控制逻辑countDown()侵入到了业务Runnable中。
良好。
任务的定义和组合是分开的但仍需处理组合后的Future。
易用性非常简单。
几乎没有学习成本。
需要理解CountDownLatch。
容易忘记countDown()或错误处理。
需要理解CompletableFuture。
API较为丰富有一定学习曲线。
结论很明显对于“分发一组并行任务然后等待它们全部完成”这一特定但常见的模式LatchUtils 通过适度的封装极大地简化了开发者的工作。
它隐藏了并发控制的复杂性让业务代码回归其本质从而提高了代码的可读性和可维护性。