爱豆5527:不仅仅是数字,是心动信号,是梦想起点!

核心内容摘要

17.c起草时:那些不为人知的风云变幻与智慧火花
HLW:不止于健康,更在于智慧生活

揭秘韩静格与王多鱼:免费资料大放送,开启你的财富智慧之旅!

01为什么需要批处理

应用场景解析场景1银行每日利息计算• 痛点 凌晨时段需扫描百万级账户数据手工计算容易遗漏• Spring Batch方案 分片读取账户数据批量计算利息失败自动重试• 实际案例 某银行系统改造后利息计算时间从4小时缩短至23分钟场景2电商订单归档-- 传统SQL示例存在性能问题 DELETEFROM active_orders WHERE create_time

LIMIT 5000; --需循环执行直到无数据• 问题 直接删除百万级数据会导致数据库锁表• 正确做法 使用Spring Batch分页读取→写入历史表→批量删除场景3日志分析• 典型需求 分析Nginx日志中的API响应时间分布• 特殊挑战 处理GB级文本文件时的内存控制场景4医疗数据迁移• 特殊要求 迁移过程中老系统仍需正常使用• 解决方案 使用Spring Batch的增量迁移模式

传统方式痛点详细解释每个痛点• 资源管理复杂// 典型的多线程错误示例 ExecutorService executor Executors.newFixedThreadPool(

; try { while(hasNextPage()) { ListData page fetchNextPage(); executor.submit(() - processPage(page)); // 可能引发内存泄漏 } } finally { executor.shutdown(); // 忘记调用会导致线程堆积 }

常见问题线程池配置不当导致OOM、数据库连接泄露• 容错性黑洞// 伪代码脆弱的错误处理 for (int i0; i3; i) { try { processBatch(); break; } catch (Exception e) { if (i

sendAlert(); // 简单重试无法处理部分成功场景 } }真实案例某支付系统因未处理部分失败导致重复出款• 维护噩梦# 典型硬编码配置 batch.size1000 input.path/data/in output.path/data/out问题根源参数修改需要重新部署、不同环境配置混杂• 监控盲区# 开发人员常用的临时方案 nohupjava-jarbatch.jarlog.txt21 tail-flog.txt# 无法获知实时进度关键缺陷无法回答处理到哪了、还剩多少等业务问题Spring Batch对比优势表02Spring Batch核心架构

四大金刚组件深度解析组件1Job作业工厂• 核心作用 定义完整的批处理流水线如月度报表生成流程• 真实案例 某银行的日终对账Job包含三个StepBean public Job reconciliationJob() { return jobBuilderFactory.get(dailyReconciliation) .start(downloadBankFileStep()) .next(validateDataStep()) .next(generateReportStep()) .build(); }组件2Step装配流水线Bean public Step importStep() { return stepBuilderFactory.get(csvImport) .User, Userchunk(

// 每500条提交一次 .reader(csvReader()) .processor(validationProcessor()) .writer(dbWriter()) .faultTolerant() .skipLimit(

.skip(DataIntegrityViolationException.class) .build(); }组件3ItemReader数据搬运工典型实现// 读取CSV文件示例 Bean public FlatFileItemReaderUser csvReader() { return new FlatFileItemReaderBuilderUser() .name(userReader) .resource(new FileSystemResource(data/users.csv)) .delimited().delimiter(,) .names(id, name, email) .fieldSetMapper(new BeanWrapperFieldSetMapperUser() ) .linesToSkip(

// 跳过标题行 .build(); }组件4ItemWriter数据收纳师复合写入示例Bean public CompositeItemWriterUser compositeWriter() { return new CompositeItemWriterBuilderUser() .delegates(dbWriter(), logWriter(), mqWriter()) .build(); } // 数据库写入组件 private JdbcBatchItemWriterUser dbWriter() { return new JdbcBatchItemWriterBuilderUser() .dataSource(dataSource) .sql(INSERT INTO users (name,email) VALUES (:name,:email)) .beanMapped() .build(); }

架构示意图

隐藏BOSSItemProcessor数据变形金刚public class DataMaskProcessor implements ItemProcessorUser, User { Override public User process(User user) { // 手机号脱敏 String phone user.getPhone(); user.setPhone(phone.replaceAll((\\d{3})\\d{4}(\\d{4}), $1****$

); // 邮箱转小写 user.setEmail(user.getEmail().toLowerCase()); return user; } }

组件生命周期探秘03手把手开发指南

环境搭建!-- 完整POM配置 -- parent groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-parent/artifactId version

3.

5/version /parent dependencies !-- Batch核心依赖 -- dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-batch/artifactId /dependency !-- 内存数据库生产环境可更换为MySQL等 -- dependency groupIdcom.h2database/groupId artifactIdh2/artifactId scoperuntime/scope /dependency !-- Lombok简化代码 -- dependency groupIdorg.projectlombok/groupId artifactIdlombok/artifactId optionaltrue/optional /dependency /dependencies# application.properties spring.batch.jdbc.initialize-schemaalways# 自动创建Batch元数据表 spring.datasource.urljdbc:h2:mem:testdb spring.datasource.driverClassNameorg.h

Driver

第一个批处理任务领域模型类Data// Lombok注解 NoArgsConstructor AllArgsConstructor public class User { private String name; private int age; private String email; }完整Job配置Configuration EnableBatchProcessing public class BatchConfig { Autowired private JobBuilderFactory jobBuilderFactory; Autowired private StepBuilderFactory stepBuilderFactory; // 定义Job Bean public Job importUserJob() { return jobBuilderFactory.get(importUserJob) .start(csvProcessingStep()) .build(); } // 定义Step Bean public Step csvProcessingStep() { return stepBuilderFactory.get(csvProcessing) .User, Userchunk(

// 每处理100条提交一次 .reader(userReader()) .processor(userProcessor()) .writer(userWriter()) .build(); } // CSV文件读取器 Bean public FlatFileItemReaderUser userReader() { return new FlatFileItemReaderBuilderUser() .name(userReader) .resource(new ClassPathResource(users.csv)) // 文件路径 .delimited() .delimiter(,) .names(name, age, email) // 字段映射 .targetType(User.class) .linesToSkip(

// 跳过标题行 .build(); } // 数据处理示例年龄校验 Bean public ItemProcessorUser, User userProcessor() { return user - { if (user.getAge()

{ thrownew IllegalArgumentException(年龄不能为负数: user); } return user.toBuilder() // 使用Builder模式创建新对象 .email(user.getEmail().toLowerCase()) .build(); }; } // 数据库写入器 Bean public JdbcBatchItemWriterUser userWriter(DataSource dataSource) { return new JdbcBatchItemWriterBuilderUser() .dataSource(dataSource) .sql(INSERT INTO users (name, age, email) VALUES (:name, :age, :email)) .beanMapped() .build(); } }CSV文件示例src/main/resources/users.csvname,age,email 张三,25,zhangsanexample.com 李四,30,lisiexample.com 王五,-5,wangwuexample.com启动类SpringBootApplication public class BatchApplication implements CommandLineRunner { Autowired private JobLauncher jobLauncher; Autowired private Job importUserJob; public static void main(String[] args) { SpringApplication.run(BatchApplication.class, args); } Override public void run(String... args)throws Exception { JobParameters paramsnew JobParametersBuilder() .addLong(startAt, System.currentTimeMillis()) .toJobParameters(); jobLauncher.run(importUserJob, params); } }

执行流程可视化

运行效果验证控制台输出

10:00:00 INFO o.s.b.c.l.support.SimpleJobLauncher - Job: [SimpleJob: [nameimportUserJob]] launched

10:00:05 INFO o.s.batch.core.job.SimpleStepHandler - Executing step: [csvProcessing]

10:00:15 ERROR o.s.batch.core.step.AbstractStep - Encountered an error executing step csvProcessing org.springframework.batch.item.validator.ValidationException: 年龄不能为负数: User(name王五, age-5, emailwangwuexample.com)数据库结果SELECT*FROM users;nameageemail张三25zhangsanexample.com李四30lisiexample.com

调试技巧查看元数据SELECT*FROM BATCH_JOB_INSTANCE; SELECT*FROM BATCH_STEP_EXECUTION;重试失败任务// 在Job配置中添加容错机制 Bean public Step csvProcessingStep() { return stepBuilderFactory.get(csvProcessing) .User, Userchunk(

.reader(userReader()) .processor(userProcessor()) .writer(userWriter()) .faultTolerant() .skipLimit(

// 最多跳过3条错误 .skip(IllegalArgumentException.class) .build(); }日志监控配置logging.level.org.springframework.batchDEBUG logging.level.org.hibernate.SQLWARN04实战案例银行交易对账

场景需求增强说明核心流程技术挑战• 双数据源读取文件数据库• 千万级数据高效比对• 差异记录快速入库• 分布式环境运行

完整架构设计

领域模型定义Data AllArgsConstructor NoArgsConstructor public class Transaction { // 公共字段 private String transactionId; private LocalDateTime tradeTime; private BigDecimal amount; // 银行端数据 private String bankSerialNo; private BigDecimal bankAmount; // 内部系统数据 private String internalOrderNo; private BigDecimal systemAmount; // 对账结果 private ReconStatus status; private String discrepancyType; } public enum ReconStatus { MATCHED, // 数据一致 AMOUNT_DIFF, // 金额不一致 STATUS_DIFF, // 状态不一致 ONLY_IN_BANK, // 银行单边账 ONLY_IN_SYSTEM // 系统单边账 }

完整Job配置Configuration EnableBatchProcessing public class BankReconJobConfig { // 主Job定义 Bean public Job bankReconciliationJob(Step downloadStep, Step reconStep, Step reportStep) { return jobBuilderFactory.get(bankReconciliationJob) .start(downloadStep) .next(reconStep) .next(reportStep) .build(); } // 文件下载Step Bean public Step downloadStep() { return stepBuilderFactory.get(downloadStep) .tasklet((contribution, chunkContext) - { // 实现SFTP下载逻辑 sftpService.download(/bank/recon/

csv); return RepeatStatus.FINISHED; }) .build(); } // 核心对账Step Bean public Step reconStep() { return stepBuilderFactory.get(reconStep) .Transaction, Transactionchunk(

.reader(compositeReader()) .processor(compositeProcessor()) .writer(compositeWriter()) .faultTolerant() .skipLimit(

.skip(DataIntegrityViolationException.class) .retryLimit(

.retry(DeadlockLoserDataAccessException.class) .build(); } // 组合数据读取器 Bean public CompositeItemReaderTransaction compositeReader() { return new CompositeItemReaderBuilderTransaction() .delegates(bankFileReader(), internalDbReader()) .build(); } // 银行文件读取器 Bean public FlatFileItemReaderTransaction bankFileReader() { return new FlatFileItemReaderBuilderTransaction() .name(bankFileReader) .resource(newFileSystemResource(recon/

csv)) .delimited() .names(transactionId,tradeTime,amount,bankSerialNo) .fieldSetMapper(fieldSet - { TransactiontnewTransaction(); t.setTransactionId(fieldSet.readString(transactionId)); t.setBankSerialNo(fieldSet.readString(bankSerialNo)); t.setBankAmount(fieldSet.readBigDecimal(amount)); return t; }) .build(); } // 内部数据库读取器 Bean public JdbcCursorItemReaderTransaction internalDbReader() { return new JdbcCursorItemReaderBuilderTransaction() .name(internalDbReader) .dataSource(internalDataSource) .sql(SELECT order_no, amount, status FROM transactions WHERE trade_date ?) .rowMapper((rs, rowNum) - { TransactiontnewTransaction(); t.setInternalOrderNo(rs.getString(order_no)); t.setSystemAmount(rs.getBigDecimal(amount)); return t; }) .preparedStatementSetter(ps - ps.setString(1,

-

) .build(); } // 组合处理器 Bean public CompositeItemProcessorTransaction compositeProcessor() { ListItemProcessor?, ? delegates new ArrayList(); delegates.add(new DataMatchingProcessor()); delegates.add(new DiscrepancyClassifier()); return new CompositeItemProcessorBuilder() .delegates(delegates) .build(); } // 组合写入器 Bean public CompositeItemWriterTransaction compositeWriter() { return new CompositeItemWriterBuilderTransaction() .delegates( discrepancyDbWriter(), alertMessageWriter() ) .build(); } }

核心处理器实现public class DataMatchingProcessor implements ItemProcessorTransaction, Transaction { Override public Transaction process(Transaction item) { // 双数据源匹配逻辑 if (item.getBankSerialNo() null) { item.setStatus(ReconStatus.ONLY_IN_SYSTEM); } elseif (item.getInternalOrderNo() null) { item.setStatus(ReconStatus.ONLY_IN_BANK); } else { compareAmounts(item); compareStatuses(item); } return item; } private void compareAmounts(Transaction t) { if (t.getBankAmount().compareTo(t.getSystemAmount()) !

{ t.setDiscrepancyType(AMOUNT_MISMATCH); t.setStatus(ReconStatus.AMOUNT_DIFF); BigDecimaldiff t.getBankAmount().subtract(t.getSystemAmount()); t.setAmount(diff.abs()); } } private void compareStatuses(Transaction t) { // 假设从数据库获取内部状态 StringinternalStatus transactionService.getStatus(t.getInternalOrderNo()); if(!SETTLED.equals(internalStatus)){ t.setDiscrepancyType(STATUS_MISMATCH); t.setStatus(ReconStatus.STATUS_DIFF); } } } public class DiscrepancyClassifier implements ItemProcessorTransaction, Transaction { Override public Transaction process(Transaction item) { if (item.getStatus() ! ReconStatus.MATCHED) { // 添加告警标记 item.setAlertLevel(calculateAlertLevel(item)); } return item; } private AlertLevel calculateAlertLevel(Transaction t) { if (t.getAmount().compareTo(new BigDecimal(

)

{ return AlertLevel.CRITICAL; } return AlertLevel.WARNING; } }

差异报告生成StepBean public Step reportStep() { return stepBuilderFactory.get(reportStep) .Transaction, Transactionchunk(

.reader(discrepancyReader()) .writer(excelWriter()) .build(); } Bean public JdbcPagingItemReaderTransaction discrepancyReader() { returnnewJdbcPagingItemReaderBuilderTransaction() .name(discrepancyReader) .dataSource(reconDataSource) .selectClause(SELECT *) .fromClause(FROM discrepancy_records) .whereClause(WHERE recon_date

-

.sortKeys(Collections.singletonMap(transaction_id, Order.ASCENDING)) .rowMapper(newBeanPropertyRowMapper(Transaction.class)) .build(); } Bean public ExcelFileItemWriterTransaction excelWriter() { returnnewExcelFileItemWriterBuilderTransaction() .name(excelWriter) .resource(newFileSystemResource(reports/

-

xlsx)) .sheetName(差异报告) .headers(newString[]{交易ID, 差异类型, 金额差异, 告警级别}) .fieldExtractor(item - newObject[]{ item.getTransactionId(), item.getDiscrepancyType(), item.getAmount(), item.getAlertLevel() }) .build(); }

性能优化配置# 应用配置 spring.batch.job.enabledfalse# 禁止自动启动 spring.batch.initialize-schemanever# 生产环境禁止自动建表 # 性能调优参数 spring.batch.chunk.size2000# 根据内存调整 spring.datasource.hikari.maximum-pool-size20 spring.jpa.properties.hibernate.jdbc.batch_size

执行监控看板05生产级特性

容错机制完整容错配置示例Bean public Step secureStep() { return stepBuilderFactory.get(secureStep) .Input, Outputchunk(

.reader(jdbcReader()) .processor(secureProcessor()) .writer(restApiWriter()) .faultTolerant() .retryLimit(

.retry(ConnectException.class) // 网络问题重试 .retry(DeadlockLoserDataAccessException.class) // 数据库死锁重试 .skipLimit(

.skip(DataIntegrityViolationException.class) // 数据问题跳过 .skip(InvalidDataAccessApiUsageException.class) .noRollback(ValidationException.class) // 验证异常不回滚 .listener(newErrorLogListener()) // 自定义监听器 .build(); } // 错误日志监听器示例 publicclassErrorLogListenerimplementsItemProcessListenerInput, Output { Override publicvoidonProcessError(Input item, Exception e) { ErrorLoglognewErrorLog(); log.setItemData(item.toString()); log.setErrorMsg(e.getMessage()); errorLogRepository.save(log); } }

性能优化策略千万级数据处理策略1并行Step执行配置代码Bean public Job parallelJob() { return jobBuilderFactory.get(parallelJob) .start(step1()) .split(newSimpleAsyncTaskExecutor()) // 启用异步执行器 .add(step2(), step3()) .build(); }策略2分区处理Partitioning分区处理器实现Bean public Step masterStep() { return stepBuilderFactory.get(masterStep) .partitioner(slaveStep, partitioner()) .gridSize(

// 分区数量CPU核心数*2 .taskExecutor(newThreadPoolTaskExecutor()) .build(); } Bean public Partitioner partitioner() { returnnewPartitioner() { Override public MapString, ExecutionContext partition(int gridSize) { MapString, ExecutionContext result newHashMap(); longtotal getTotalRecordCount(); longrange total / gridSize; for (inti0; i gridSize; i) { ExecutionContextcontextnewExecutionContext(); context.putLong(min, i * range); context.putLong(max, (i

* range); result.put(partitioni, context); } return result; } }; } // Slave Step配置 Bean public Step slaveStep() { return stepBuilderFactory.get(slaveStep) .Record, Resultchunk(

.reader(rangeReader(null, null)) .processor(processor()) .writer(writer()) .build(); } StepScope Bean public ItemReaderRecord rangeReader( Value(#{stepExecutionContext[min]}) Long min, Value(#{stepExecutionContext[max]}) Long max) { returnnewJdbcCursorItemReaderBuilderRecord() .sql(SELECT * FROM records WHERE id BETWEEN ? AND ?) .preparedStatementSetter(ps - { ps.setLong(1, min); ps.setLong(2, max); }) // 其他配置... .build(); }策略3异步ItemProcessor异步处理配置Bean public Step asyncStep() { return stepBuilderFactory.get(asyncStep) .Input, Outputchunk(

.reader(reader()) .processor(asyncItemProcessor()) .writer(writer()) .build(); } Bean public AsyncItemProcessorInput, Output asyncItemProcessor() { AsyncItemProcessorInput, Output asyncProcessor newAsyncItemProcessor(); asyncProcessor.setDelegate(syncProcessor()); // 同步处理器 asyncProcessor.setTaskExecutor(newThreadPoolTaskExecutor()); return asyncProcessor; } Bean public AsyncItemWriterOutput asyncItemWriter() { AsyncItemWriterOutput asyncWriter newAsyncItemWriter(); asyncWriter.setDelegate(syncWriter()); // 同步写入器 return asyncWriter; }

性能对比测试数据优化技巧数据库连接池调优spring.datasource.hikari.maximum-pool-size20 spring.datasource.hikari.minimum-idle5JVM参数优化java -jar -Xmx4g -XX:Us

GC -XX:MaxGCPauseMillis200 ...批处理参数调整.chunk(

// 根据内存容量调整 .setQueryTimeout(

// 数据库查询超时06监控与管理生产级方案

监控方案升级Spring Batch Admin替代方案现代监控栈配置// 添加监控依赖 dependency groupIdio.micrometer/groupId artifactIdmicrometer-registry-prometheus/artifactId /dependency // 暴露监控端点 Bean public MeterRegistryCustomizerMeterRegistry metricsCommonTags() { return registry - registry.config().commonTags(application, batch-service); } // 自定义Batch指标 publicclassBatchMetricsListenerextendsJobExecutionListenerSupport { privatefinalCounterprocessedRecords Counter.builder(batch.records.processed) .description(Total processed records) .register(Metrics.globalRegistry); Override publicvoidafterStep(StepExecution stepExecution) { processedRecords.increment(stepExecution.getWriteCount()); } }

元数据表结构详解关键表用途BATCH_JOB_INSTANCE作业指纹库相同参数只能存在一个实例BATCH_JOB_EXECUTION_PARAMS存储每次运行的参数BATCH_STEP_EXECUTION_CONTEXT保存步骤上下文数据重启恢复的关键

自定义监控看板-- 常用监控SQL示例 -- 最近5次作业执行情况 SELECT j.JOB_NAME, e.START_TIME, e.END_TIME, TIMEDIFF(e.END_TIME, e.START_TIME) AS DURATION, s.READ_COUNT, s.WRITE_COUNT FROM BATCH_JOB_EXECUTION e JOIN BATCH_JOB_INSTANCE j ON e.JOB_INSTANCE_ID j.JOB_INSTANCE_ID JOIN BATCH_STEP_EXECUTION s ON e.JOB_EXECUTION_ID s.JOB_EXECUTION_ID ORDERBY e.START_TIME DESC LIMIT 5;07

常见问题QA终极指南

内存溢出问题深度解决方案场景处理10GB CSV文件时OOM优化代码示例Bean StepScope public FlatFileItemReaderLargeRecord largeFileReader( Value(#{jobParameters[filePath]}) String filePath) { return new FlatFileItemReaderBuilderLargeRecord() .resource(new FileSystemResource(filePath)) .lineMapper(new DefaultLineMapper() ); }}) .linesToSkip(

.strict(false) // 允许文件结尾空行 .saveState(false) // 禁用状态保存 .build(); } // JVM参数建议 // -XX:Us

GC -Xmx2g -XX:MaxGCPauseMillis

定时任务高级配置多任务调度方案Configuration EnableScheduling public class ScheduleConfig { Autowired private JobLauncher jobLauncher; Autowired private Job reportJob; // 工作日凌晨执行 Scheduled(cron 0 0 2 * * MON-FRI) public void dailyJob()throws Exception { JobParameters paramsnew JobParametersBuilder() .addString(date, LocalDate.now().toString()) .toJobParameters(); jobLauncher.run(reportJob, params); } // 每小时轮询 Scheduled(fixedRate

public void pollJob() { if(checkNewDataExists()) { jobLauncher.run(dataProcessJob, new JobParameters()); } } // 优雅停止示例 public void stopJob(Long executionId) { JobExecution execution jobExplorer.getJobExecution(executionId); if(execution.isRunning()) { execution.setStatus(BatchStatus.STOPPING); jobRepository.update(execution); } } }

高频问题集锦Q如何重新运行失败的任务-- 步骤1查询失败的任务ID SELECT*FROM BATCH_JOB_EXECUTION WHERE STATUS FAILED; -- 步骤2使用相同参数重新启动 JobParameters params new JobParametersBuilder() .addLong(restartId, originalExecutionId) .toJobParameters(); jobLauncher.run(job, params);Q处理过程中断电怎么办Q如何实现动态参数传递// 命令行启动方式 java -jar batch.jar --spring.batch.job.namedataImportJob date

// 编程式参数构建 public void runJobWithParams(MapString, Object params) { JobParameters jobParamsnew JobParametersBuilder() .addString(mode, forceUpdate) .addLong(timestamp, System.currentTimeMillis()) .toJobParameters(); jobLauncher.run(importJob, jobParams); }

性能调优检查清单数据库优化• 添加批量处理索引• 配置连接池参数• 启用JDBC批处理模式JVM优化-XX:UseStringDeduplication -XX:UseCompressedOops -XX:MaxMetaspaceSize512mBatch配置spring.batch.jdbc.initialize-schemanever spring.batch.job.enabledfalse spring.jpa.open-in-viewfalse

大家伸进-大家伸进应用

百度百家号客服电话人工服务

123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123