核心内容摘要
荧幕之外的魅力:当“三上悠亚”遇上“狂揉”的艺术
目标WMS 发货后推送快递单号到外挂系统由外挂负责持续抓取并存储快递轨迹轨迹保留3 年并向业务方WMS/客服/用户提供稳定、可审计的查询与告警能力。
1 概览•痛点WMS 每次跳转第三方查询耦合强、无法长期保存轨迹需要统一管理并保存 3 年以供审计/客服查询。
•目标实现「一次推送、外挂持续跟踪直至签收、三年归档」的可扩展、可观测、可运维系统。
•关键特性异步解耦、按状态动态轮询Adaptive Polling、限流保护、幂等入库、分层存储与归档。
2 总体架构3 核心设计要点
1 接入模式•首选WMS 采用 HTTP POST 接入幂等。
•鉴权HMAC 签名或 OAuth2接口需限流与白名单。
2 调度模型关键•任务驱动每个单号在shipment_task中维护next_check_time、check_interval_seconds、status、no_update_count。
•批量扫描调度器按next_check_time now()批量读取分页/limit并按MOD(id, N)做分片推入 MQ。
•Adaptive Polling根据状态与最近是否有更新动态调整check_interval指数退避上限 6 小时。
•停止条件检测到DELIVERED签收停止轮询并进入归档计划。
3 执行策略•固定大小 Worker 池Worker 为常驻进程容器消费者组并发处理 MQ 任务避免为每单创建线程。
•限流按快递公司使用 Redisson RateLimiter 或令牌桶保护对方接口。
•幂等写入通过shipment_id event_time desc_hash做唯一约束或写入前存在性判断。
•异常处理失败重试指数退避失败超过阈值进入 DLQ 并告警。
4 存储与归档•热存最近 90/180 天的轨迹保存在 MySQL或 NoSQL便于快速查询。
•冷存超过 N 天如 90 天将原始 payload 与老事件归档到对象存储主库写入归档路径索引。
•分区/分表courier_event按月分区以便批量删除/迁移。
5 可观测性与告警• 指标队列深度、处理吞吐、QPS、每家快递的失败率、平均延迟发货→首次轨迹、未签收超过阈值数量。
• 日志存储原始请求/响应至少保留 30—90 天并脱敏。
• 告警连续 N 次抓取失败、队列长度异常增长、接口被限流。
4 数据模型示例 DDL-- 任务表负责调度 CREATE TABLE shipment_task ( id BIGINTPRIMARY KEY AUTO_INCREMENT, order_id VARCHAR(
, tracking_no VARCHAR(
NOT NULL, courier_code VARCHAR(
, status VARCHAR(
DEFAULTNEW, last_event_time DATETIME, last_check_time DATETIME, next_check_time DATETIME, check_interval_seconds INTDEFAULT1800, no_update_count INTDEFAULT0, retry_count INTDEFAULT0, max_retry INTDEFAULT10, delivered BOOLEANDEFAULTFALSE, raw_meta JSON, created_at DATETIME DEFAULTCURRENT_TIMESTAMP, updated_at DATETIME DEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMP, UNIQUE KEY uq_tracking (tracking_no, courier_code), INDEX idx_next_check (next_check_time), INDEX idx_status (status) ); -- 轨迹事件按月分区 CREATE TABLE courier_event ( id BIGINTPRIMARY KEY AUTO_INCREMENT, shipment_id BIGINTNOT NULL, event_time DATETIME NOT NULL, location VARCHAR(
, description TEXT, status_code VARCHAR(
, raw_payload JSON, created_at DATETIME DEFAULTCURRENT_TIMESTAMP, UNIQUE KEY uq_event (shipment_id, event_time, MD5(description(
)) ); -- 归档路径记录 CREATE TABLE courier_archive ( id BIGINTPRIMARY KEY AUTO_INCREMENT, shipment_id BIGINTNOT NULL, s3_path VARCHAR(
, archived_at DATETIME, note VARCHAR(
);5 核心代码
1 接入 Controller幂等 入队RestController RequestMapping(/api/v1/shipments) publicclassShipmentController { privatefinal ShipmentTaskService taskService; PostMapping public ResponseEntity? accept(RequestBody ShipmentDto dto, RequestHeader(X-Signature) String sig) { //
验证签名HMAC if (!AuthUtil.verify(sig, dto)) { return ResponseEntity.status(HttpStatus.UNAUTHORIZED).build(); } //
幂等写入或更新任务表 ShipmentTasktask taskService.upsertFromDto(dto); //
推送到 MQ由调度器负责更完整的调度逻辑此处可简化为立即推一次 taskService.enqueueImmediateCheck(task.getId()); return ResponseEntity.accepted().body(Map.of(shipmentId, task.getId())); } }
2 调度器批量扫描 分片Component publicclassScheduler { Autowired private ShipmentTaskRepo repo; Autowired private MQProducer mqProducer; Scheduled(cron 0 * * * * *)// 每分钟触发一次 publicvoidscanAndEnqueue() { LocalDateTimenow LocalDateTime.now(); intshardCount Integer.parseInt(env.getProperty(app.shard.count,
); intmyShard Integer.parseInt(env.getProperty(app.shard.index,
); ListShipmentTask due repo.findDueTasks(now, myShard, shardCount,
; for (ShipmentTask t : due) { mqProducer.send(tracking_query_topic, t.getId()); // update next_check_time optimistically to avoid重复选中 t.setNextCheckTime(now.plusSeconds(t.getCheckIntervalSeconds())); repo.save(t); } } }findDueTasksSQL 示例SELECT * FROM shipment_task WHERE next_check_time :now AND status IN (NEW,IN_TRANSIT,PENDING_UPDATE) AND MOD(id, :shardCount) :shardIndex ORDER BY next_check_time LIMIT :limit
3 WorkerMQ 消费者RocketMQMessageListener(topic tracking_query_topic, consumerGroup tracking_fetchers) publicclassTrackingFetcherimplementsRocketMQListenerLong { Autowired private ShipmentTaskRepo repo; Autowired private CourierAdapterFactory adapterFactory; Autowired private RedissonClient redisson; Override publicvoidonMessage(Long taskId) { ShipmentTasktask repo.findById(taskId).orElse(null); if (task null) return; // rate limit key per courier RRateLimiterlimiter redisson.getRateLimiter(courier: task.getCourierCode()); if (!limiter.tryAcquire()) { // 未获取到令牌延迟重试 task.setNextCheckTime(LocalDateTime.now().plusSeconds(
); repo.save(task); return; } CourierClientclient adapterFactory.getClient(task.getCourierCode()); try { TrackingResponseresp client.query(task.getTrackingNo()); ListEvent events client.parse(resp); booleanchanged persistEvents(task, events, resp); if (resp.isDelivered()) { task.setStatus(DELIVERED); task.setDelivered(true); task.setNextCheckTime(null); } else { adjustIntervalAfterCheck(task, changed); } } catch (Exception ex) { task.setRetryCount(task.getRetryCount()
; if (task.getRetryCount() task.getMaxRetry()) task.setStatus(EXCEPTION); task.setNextCheckTime(LocalDateTime.now().plusSeconds(
); } finally { repo.save(task); } } }
4 适配器接口扩展快递方public interfaceCourierClient { TrackingResponse query(String trackingNo)throws IOException; ListEvent parse(TrackingResponse resp); } publicclassSFExpressClientimplementsCourierClient { public TrackingResponse query(String trackingNo) { /* HTTP 调用 SF API */ } public ListEvent parse(TrackingResponse resp) { /* 解析 SF 返回 */ } }6 性能估算与容量规划输入参数• 年度出库单量1,000,000 单/年• 单均轨迹事件数10 条• 平均事件 JSON 大小600 字节• 保留期限3 年存储估算• 每单事件数据10 * 600 6,000 bytes ≈ 6KB• 年度数据1,000,000 * 6KB 6,000,000 KB ≈
72 GB• 3 年数据 ≈
1
2 GB不含索引和原始 payload• 建议预留 3×—5× 因为索引/原始 payload/日志建议 100GB 存储预算吞吐估算高峰• 假设同时在途 100k 单号平均每小时查一次 → 每小时 100k 次 → 每秒 ~28 次。
• Worker 节点每节点 200 并发可以轻松处理故