核心内容摘要
一起草影视
集成到 Spring Boot
添加依赖在项目的pom.xml文件中添加asyncTool的依赖dependency groupIdcom.jd.platform/groupId artifactIdasyncTool/artifactId version版本号/version /dependency
配置线程池虽然asyncTool内部会管理线程池但为了更好地控制线程的使用可以自定义线程池。
以下是两种配置方式1自定义线程池Configuration EnableAsync // 开启线程池 public class TaskExecutePool { Autowired private TaskThreadPoolConfig config; Bean public Executor myTaskAsyncPool() { ThreadPoolTaskExecutor executor new ThreadPoolTaskExecutor(); executor.setCorePoolSize(config.getCorePoolSize()); // 核心线程池大小 executor.setMaxPoolSize(config.getMaxPoolSize()); // 最大线程数 executor.setQueueCapacity(config.getQueueCapacity()); // 队列容量 executor.setKeepAliveSeconds(config.getKeepAliveSeconds()); // 活跃时间 executor.setThreadNamePrefix(MyExecutor-); // 线程名字前缀 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 拒绝策略 executor.initialize(); return executor; } }2修改原生 Spring 异步线程池的装配Configuration EnableAsync // 开启线程池 public class NativeAsyncTaskExecutePool implements AsyncConfigurer { Autowired private TaskThreadPoolConfig config; Bean public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor new ThreadPoolTaskExecutor(); executor.setCorePoolSize(config.getCorePoolSize()); executor.setMaxPoolSize(config.getMaxPoolSize()); executor.setQueueCapacity(config.getQueueCapacity()); executor.setKeepAliveSeconds(config.getKeepAliveSeconds()); executor.setThreadNamePrefix(MyExecutor2-); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return (ex, method, objects) - { log.error( ex.getMessage() , ex); log.error(exception method: method.getName()); }; } }
核心方法说明
IWorker 接口•action(T object, MapString, WorkerWrapper allWrappers)任务的具体执行逻辑。
object 是任务的输入参数allWrappers 是所有任务的包装类集合可用于获取其他任务的结果。
•defaultValue()任务超时或异常时的默认返回值。
ICallback 接口•begin()任务开始时的回调。
•result(boolean success, T param, WorkResultV workResult)任务执行结果的回调。
success 表示任务是否成功param 是任务的输入参数workResult 是任务的执行结果。
WorkerWrapper 类•id任务的唯一标识。
•param任务的输入参数。
•worker任务的具体实现。
•callback任务的回调实现。
•depend任务的依赖关系定义任务的执行顺序。
•next任务的后续任务用于定义任务的执行顺序。
详细使用方式及示例
串行任务任务按顺序依次执行。
以下是一个串行任务的示例// 定义任务 A WorkerWrapper wrapperA new WorkerWrapper.BuilderInteger, Integer() .id(workerA) .worker(new WorkerA()) .callback(new WorkerA()) .param(
.build(); // 定义任务 B依赖于任务 A WorkerWrapper wrapperB new WorkerWrapper.BuilderInteger, Integer() .id(workerB) .worker(new WorkerB()) .callback(new WorkerB()) .param(
.depend(wrapperA) .build(); // 定义任务 C依赖于任务 B WorkerWrapper wrapperC new WorkerWrapper.BuilderInteger, Integer() .id(workerC) .worker(new WorkerC()) .callback(new WorkerC()) .param(
.depend(wrapperB) .build(); // 提交任务 Async.beginWork(1000, wrapperA);
并行任务多个任务同时执行。
以下是一个并行任务的示例// 定义任务 A WorkerWrapper wrapperA new WorkerWrapper.BuilderInteger, Integer() .id(workerA) .worker(new WorkerA()) .callback(new WorkerA()) .param(
.build(); // 定义任务 B WorkerWrapper wrapperB new WorkerWrapper.BuilderInteger, Integer() .id(workerB) .worker(new WorkerB()) .callback(new WorkerB()) .param(
.build(); // 定义任务 C WorkerWrapper wrapperC new WorkerWrapper.BuilderInteger, Integer() .id(workerC) .worker(new WorkerC()) .callback(new WorkerC()) .param(
.build(); // 提交任务 Async.beginWork(1000, wrapperA, wrapperB, wrapperC);
阻塞等待 - 先串行后并行先执行任务 A然后任务 B 和任务 C 并行执行// 定义任务 A WorkerWrapper wrapperA new WorkerWrapper.BuilderInteger, Integer() .id(workerA) .worker(new WorkerA()) .callback(new WorkerA()) .param(
.build(); // 定义任务 B依赖于任务 A WorkerWrapper wrapperB new WorkerWrapper.BuilderInteger, Integer() .id(workerB) .worker(new WorkerB()) .callback(new WorkerB()) .param(
.depend(wrapperA) .build(); // 定义任务 C依赖于任务 A WorkerWrapper wrapperC new WorkerWrapper.BuilderInteger, Integer() .id(workerC) .worker(new WorkerC()) .callback(new WorkerC()) .param(
.depend(wrapperA) .build(); // 提交任务 Async.beginWork(1000, wrapperA);
阻塞等待 - 先并行后串行任务 B 和任务 C 并行执行完成后任务 A 执行// 定义任务 A WorkerWrapper wrapperA new WorkerWrapper.BuilderInteger, Integer() .id(workerA) .worker(new WorkerA()) .callback(new WorkerA()) .param(null) // 参数为任务 B 和任务 C 的结果 .build(); // 定义任务 B WorkerWrapper wrapperB new WorkerWrapper.BuilderInteger, Integer() .id(workerB) .worker(new WorkerB()) .callback(new WorkerB()) .param(
.next(wrapperA) .build(); // 定义任务 C WorkerWrapper wrapperC new WorkerWrapper.BuilderInteger, Integer() .id(workerC) .worker(new WorkerC()) .callback(new WorkerC()) .param(
.next(wrapperA) .build(); // 提交任务 Async.beginWork(1000, wrapperB, wrapperC);
主要作用说明
任务编排灵活的并行与串行组合asyncTool 支持任意组合多线程的并行和串行任务开发者可以根据业务需求灵活定义任务的执行顺序。
任务依赖管理它允许任务之间存在强依赖和弱依赖关系。
例如某些任务必须在其他任务完成后才能执行而另一些任务则可以在依赖任务中的任意一个或多个完成后执行。
执行监控与回调全链路回调机制每个任务在执行过程中无论成功、失败、超时还是异常都会触发回调函数。
这使得开发者可以实时监控任务的执行状态。
任务跳过回调即使某些任务被跳过未执行asyncTool 也会提供回调方便开发者进行日志记录或异常处理。
异常处理与容错异常与超时处理每个任务可以设置超时时间和默认值当任务执行失败或超时时会返回默认值确保整个任务链的稳定性。
独立任务容错单个任务的失败不会影响其他任务的回调和最终结果的获取但如果任务依赖的上游任务失败则当前任务也会失败并返回默认值。
性能优化低线程设计asyncTool 采用低线程设计减少线程的创建和销毁开销。
例如在多个任务依赖关系中后续任务可以复用前一个任务的线程。
无锁机制整个框架全程无锁避免了锁带来的性能开销提高了并发性能。
结果管理按顺序返回结果任务执行完成后asyncTool 可以按任务添加的顺序返回结果列表方便开发者进行后续处理。
支持异步回调除了同步阻塞返回结果外还支持整个任务组的异步回调避免阻塞主线程。
线程池管理线程池共享与独占支持为每个任务组独享线程池也可以让所有任务组共享一个线程池灵活配置资源。
简化开发封装复杂逻辑asyncTool 封装了复杂的并发逻辑使得开发者可以更专注于业务逻辑的实现而无需深入了解底层的并发机制。
五、
注意事项•任务的线程安全由于任务可能在多个线程中并发执行需要确保任务的线程安全性。
•任务的异常处理在任务执行过程中可能会出现异常需要合理地处理异常避免影响整个应用的运行。
•任务的超时设置合理设置任务的超时时间避免任务长时间未完成导致资源浪费。
•任务的依赖关系正确配置任务的依赖关系确保任务按预期顺序执行。
通过以上详细说明和代码示例你可以在 Spring Boot 项目中灵活使用asyncTool实现复杂的多线程任务编排。