基于Java的工程规范智慧管理系统的设计与实现全方位解析:附毕设论文+源代码
一ã€�æ ¸å¿ƒæ¦‚å¿µç�†è§£äº‹åŠ¡æ¶ˆæ�¯è§£å†³ä»€ä¹ˆé—®é¢˜javaå¤�制下载// 分布å¼�事务典å�‹é—®é¢˜æœ¬åœ°äº‹åŠ¡ä¸�消æ�¯å�‘é€�的一致性 // ä¼ ç»Ÿæ–¹å¼�å˜åœ¨çš„问题
先�消��执行本地事务 → 消����功但本地事务失败
先执行本地事务å��å�‘消æ�¯ → 本地事务æˆ�功但消æ�¯å�‘é€�失败RocketMQ事务消æ�¯çš„æ ¸å¿ƒæœºåˆ¶textå¤�制下载Producerå�‘é€�Half消æ�¯ → Brokerå˜å‚¨Half消æ�¯ → 执行本地事务 ↓ Brokerç‰å¾…事务状æ€�å›�查 â†� Producerè¿”å›�本地事务结æ�œ ↓ æ ¹æ�®ç»“æ�œæ��交或å›�滚消æ�¯äºŒã€�两阶段æ��交详细æµ�程第一阶段å�‘é€�Half消æ�¯javaå¤�制下载public class TransactionProducer { public static void main(String[] args) throws Exception { //
创建事务消�生产者 TransactionMQProducer producer new TransactionMQProducer(TransactionProducerGroup); producer.setNamesrvAddr(
127.
0.
1:
; //
设置事务监å�¬å™¨æ ¸å¿ƒ producer.setTransactionListener(new TransactionListener() { /** * 执行本地事务 * param msg Half消æ�¯ * param arg 业务å�‚æ•° * return 本地事务状æ€� */ Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { try { // 执行本地数æ�®åº“事务 boolean success doLocalBusinessTransaction(msg, arg); if (success) { System.out.println(本地事务执行æˆ�功æ��交消æ�¯); return LocalTransactionState.COMMIT_MESSAGE; } else { System.out.println(本地事务执行失败å›�滚消æ�¯); return LocalTransactionState.ROLLBACK_MESSAGE; } } catch (Exception e) { System.out.println(本地事务执行异常å›�查); return LocalTransactionState.UNKNOW; } } /** * 事务å›�查Broker主动查询事务状æ€� * param msg Half消æ�¯ * return 事务状æ€� */ Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { // æ ¹æ�®ä¸šåŠ¡ID查询本地事务状æ€� String transactionId msg.getTransactionId(); boolean status queryLocalTransactionStatus(transactionId); if (status) { System.out.println(事务å›�查本地事务已æ��交); return LocalTransactionState.COMMIT_MESSAGE; } else { System.out.println(事务å›�查本地事务已å›�滚); return LocalTransactionState.ROLLBACK_MESSAGE; } } }); //
�动生产者 producer.start(); //
��事务消� Message msg new Message(TransactionTopic, TagA, Order-
getBytes(StandardCharsets.UTF_
); // 设置事务ID关键 msg.setKeys(TXN- System.currentTimeMillis()); // å�‘é€�Half消æ�¯ç¬¬ä¸€é˜¶æ®µ SendResult sendResult producer.sendMessageInTransaction(msg, // 业务å�‚数会在executeLocalTransactionä¸ä¼ 递 new BusinessParam(orderId, 123456,
100.
); System.out.println(Half消æ�¯å�‘é€�结æ�œ: sendResult.getSendStatus()); } }篇幅é™�制下é�¢å°±å�ªèƒ½ç»™å¤§å®¶å±•示å°�册部分内容了。整ç�†äº†ä¸€ä»½æ ¸å¿ƒé�¢è¯•笔记包括了Javaé�¢è¯•ã€�Springã€�JVMã€�MyBatisã€�Redisã€�MySQLã€�å¹¶å�‘编程ã€�å¾®æœ�务ã€�Linuxã€�Springbootã€�SpringCloudã€�MQã€�Kafc需è¦�全套é�¢è¯•笔记å�Šç”案ã€�点击æ¤å¤„å�³å�¯/å…�è´¹è�·å�–】三ã€�完整执行时åº�图textå¤�制下载┌─────────â”� ┌────────â”� ┌────────â”� │ Producer│ │ Broker │ │ 本地DB │ └────┬────┘ └───┬────┘ └────┬───┘ │
��Half消� │ │ │───────────────│ │ │ │ │ │ │
å˜å‚¨Half消æ�¯ │ │ │───────────────│ │ │ │ │
返�Half�功 │ │ │───────────────│ │ │ │ │ │
执行本地事务 │ │ │────────────────────────────────│ │ │ │ │
返�事务状� │ │ │───────────────│ │ │ │ │ │ │
�交/�滚消� │ │ │───────────────│ │ │ │ │(�能)
事务�查 │ │ │───────────────│ │ │ │ │ │
返��查结� │ │ │───────────────│ │ │ │ │ │ │
最终æ��交/å›�滚 │ │ │───────────────│四ã€�关键é…�ç½®å�‚æ•°yamlå¤�制下载# Broker端é…�ç½® broker.conf: transactionCheckMax: 15 # 最大å›�查次数 transactionCheckInterval: 60000 # å›�查间隔(ms) transactionTimeOut: 6000 # è¶…æ—¶æ—¶é—´(ms) # Producer端é…�ç½® producer: checkThreadPoolMinSize: 1 # å›�æŸ¥çº¿ç¨‹æ± æœ€å°� checkThreadPoolMaxSize: 1 # å›�æŸ¥çº¿ç¨‹æ± æœ€å¤§ checkRequestHoldMax: 2000 # å›�查请求队列大å°�五ã€�代ç �å®�ç�°æœ€ä½³å®�è·µ
完整的订å�•事务示例javaå¤�制下载Service public class OrderTransactionService { Resource private OrderMapper orderMapper; Resource private TransactionMQProducer transactionMQProducer; /** * 创建订å�•事务消æ�¯ */ public void createOrderWithTransaction(OrderDTO orderDTO) { // æ�„建消æ�¯ Message msg new Message(ORDER_TOPIC, CREATE, JSON.toJSONBytes(orderDTO)); // è®¾ç½®ä¸šåŠ¡æ ‡è¯† msg.setKeys(ORDER_ orderDTO.getOrderNo()); msg.putUserProperty(businessType, ORDER_CREATE); // å�‘é€�事务消æ�¯ SendResult sendResult transactionMQProducer.sendMessageInTransaction( msg, new OrderTransactionArg(orderDTO) ); if (!sendResult.getSendStatus().equals(SendStatus.SEND_OK)) { throw new RuntimeException(Half消æ�¯å�‘é€�失败); } } /** * 事务监å�¬å™¨å®�ç�° */ Component public class OrderTransactionListener implements TransactionListener { Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { OrderTransactionArg transactionArg (OrderTransactionArg) arg; OrderDTO orderDTO transactionArg.getOrderDTO(); try { //
ä¿�å˜è®¢å�•到数æ�®åº“ Order order convertToOrder(orderDTO); orderMapper.insert(order); //
扣å‡�库å˜è°ƒç”¨åº“å˜æœ�务 boolean deductResult inventoryService.deductStock( orderDTO.getProductId(), orderDTO.getQuantity() ); if (!deductResult) { // 库å˜ä¸�è¶³å›�滚本地事务 orderMapper.deleteById(order.getId()); return LocalTransactionState.ROLLBACK_MESSAGE; } //
记录事务日志用äº�å›�查 transactionLogService.saveTransactionLog( msg.getTransactionId(), ORDER_CREATE, order.getId(), LocalTransactionState.COMMIT_MESSAGE.name() ); return LocalTransactionState.COMMIT_MESSAGE; } catch (Exception e) { log.error(订å�•本地事务执行异常, e); // 记录异常状æ€� transactionLogService.saveTransactionLog( msg.getTransactionId(), ORDER_CREATE, null, LocalTransactionState.UNKNOW.name() ); return LocalTransactionState.UNKNOW; } } Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { // æ ¹æ�®äº‹åŠ¡ID查询事务日志 String transactionId msg.getTransactionId(); TransactionLog log transactionLogService.getByTransactionId(transactionId); if (log null) { // 没有事务记录需è¦�å›�滚 return LocalTransactionState.ROLLBACK_MESSAGE; } if (COMMIT_MESSAGE.equals(log.getStatus())) { // 事务已æ��交 return LocalTransactionState.COMMIT_MESSAGE; } else { // 事务需è¦�å›�滚 return LocalTransactionState.ROLLBACK_MESSAGE; } } } /** * 事务å�‚æ•°å°�装 */ Data AllArgsConstructor public static class OrderTransactionArg { private OrderDTO orderDTO; } }
消费端幂ç‰å¤„ç�†javaå¤�制下载Component RocketMQMessageListener( topic ORDER_TOPIC, consumerGroup ORDER_CONSUMER_GROUP ) public class OrderConsumer implements RocketMQListenerMessageExt { Override public void onMessage(MessageExt message) { //
检查消æ�¯å¹‚ç‰æ€§ String messageId message.getMsgId(); if (redisTemplate.hasKey(MSG_ messageId)) { log.info(消æ�¯å·²å¤„ç�†è·³è¿‡: {}, messageId); return; } //
解�消� OrderDTO orderDTO JSON.parseObject(message.getBody(), OrderDTO.class); //
业务处� try { // 更新订�状�为已确认 orderService.confirmOrder(orderDTO.getOrderNo()); //
记录已处ç�†æ¶ˆæ�¯ redisTemplate.opsForValue().set( MSG_ messageId, PROCESSED, 1, TimeUnit.HOURS ); } catch (Exception e) { log.error(订å�•处ç�†å¤±è´¥å°†é‡�试, e); throw new RuntimeException(e); } } }å…ã€�é�¢è¯•问题å›�ç”è¦�点问题RocketMQ事务消æ�¯å¦‚何å®�ç�°äºŒé˜¶æ®µæ��交å›�ç”结æ�„概念解释RocketMQ事务消æ�¯é€šè¿‡äºŒé˜¶æ®µæ��交ä¿�è¯�分布å¼�äº‹åŠ¡çš„æœ€ç»ˆä¸€è‡´æ€§æ ¸å¿ƒæ€�想将本地事务和消æ�¯å�‘é€�绑定通过Half消æ�¯å’Œçжæ€�å›�查机制å®�ç�°ç¬¬ä¸€é˜¶æ®µHalf消æ�¯é˜¶æ®µProducerå�‘é€�Half消æ�¯åˆ°BrokerBrokerå˜å‚¨ä½†ä¸�对Consumerå�¯è§�Half消æ�¯å�‘é€�æˆ�功å��执行本地事务本地事务执行结æ�œè¿”å›�ç»™BrokerCOMMITã€�ROLLBACK或UNKNOWN篇幅é™�制下é�¢å°±å�ªèƒ½ç»™å¤§å®¶å±•示å°�册部分内容了。整ç�†äº†ä¸€ä»½æ ¸å¿ƒé�¢è¯•笔记包括了Javaé�¢è¯•ã€�Springã€�JVMã€�MyBatisã€�Redisã€�MySQLã€�å¹¶å�‘编程ã€�å¾®æœ�务ã€�Linuxã€�Springbootã€�SpringCloudã€�MQã€�Kafc需è¦�全套é�¢è¯•笔记å�Šç”案ã€�点击æ¤å¤„å�³å�¯/å…�è´¹è�·å�–】​​​第二阶段状æ€�确认阶段如æ�œæœ¬åœ°äº‹åŠ¡è¿”å›�COMMIT/ROLLBACKBrokerç«‹å�³æ��交/å›�滚消æ�¯å¦‚æ�œè¿”å›�UNKNOWNBroker会å�‘起事务状æ€�å›�查Producerå®�ç�°TransactionListener.checkLocalTransaction()进行状æ€�查询关键机制事务状æ€�å›�查解决网络超时或生产者宕机问题消æ�¯å¹‚ç‰æ€§æ¶ˆè´¹ç«¯éœ€è¦�处ç�†é‡�å¤�消æ�¯è¶…时机制超过é…�置时间未确认的消æ�¯ä¼šè‡ªåЍå›�滚代ç �示例javaå¤�制 下载// 简è¦�å±•ç¤ºæ ¸å¿ƒä»£ç �结æ�„ producer.setTransactionListener(new TransactionListener() { public LocalTransactionState executeLocalTransaction(...) { // 执行本地业务 } public LocalTransactionState checkLocalTransaction(...) { // 状æ€�å›�查 } });适用场景订å�•åˆ›å»ºé€šçŸ¥åº“å˜æ”¯ä»˜æˆ�功å�‘é€�通知任何需è¦�ä¿�è¯�本地事务和消æ�¯å�‘é€�一致性的场景注æ„�事项事务消æ�¯ä¸�支æŒ�定时和批é‡�消æ�¯ç¡®ä¿�checkLocalTransactionæ–¹æ³•çš„å¹‚ç‰æ€§å�ˆç�†é…�ç½®å›�查次数和间隔é�¢è¯•åŠ åˆ†é¡¹æ��到最大努力通知å�‹äº‹åŠ¡å¯¹æ¯”TCCã€�Sagaç‰åˆ†å¸ƒå¼�事务方案强调消æ�¯å¹‚ç‰å¤„ç�†çš„é‡�è¦�性æ��å�ŠRocketMQ
3的事务消æ�¯ä¼˜åŒ–è¿™æ ·çš„å›�ç”æ—¢å±•示了ç�†è®ºçŸ¥è¯†å�ˆä½“ç�°äº†å®�é™…ç¼–ç �能力适å�ˆä¸é«˜çº§Javaå²—ä½�é�¢è¯•。
9.1樱花漫画免费动漫高清版-9.1樱花漫画免费动漫高清版应用