91糖心:那一抹心动的甜蜜,融化你我之间的距离

核心内容摘要

经典永流传:BrazzersJuliaAnn电影生涯深度回顾
“心糖”Logo与御梦子饼干

OnlyOne,一个就够了:致敬韩寒,重温那个敢说真话的时代

什么是死信队列DLQ死信队列Dead Letter Queue简称DLQ不是一种特殊的队列类型而是被赋予「兜底存储失败消息」用途的普通队列核心是接收那些「无法正常被投递或消费」的无效消息避免消息丢失或无限循环占用系统资源。

补充说明配套组件RabbitMQ中需搭配「死信交换机DLX」使用负责路由死信到DLQKafka中无原生DLQ概念通过「死信主题DLT」实现等效功能。

核心定位「消息兜底仓库」「问题排查样本库」不参与正常业务流转仅用于留存失败消息。

核心价值避免消息丢失、隔离无效消息、支撑事后排查、保障核心业务不受影响。

消息成为死信的3大核心场景通用无论RabbitMQ还是Kafka消息成为死信的核心场景一致仅存在少量中间件特有细节

消息处理超时未被正常确认通俗解释消息被投递后在指定「存活/处理超时时间」内未得到消费者的确认ACK也未被拒绝NACK被中间件判定为「处理失败」转为死信。

细节补充RabbitMQ支持「消息TTL存活时间」和「队列TTL」超时未消费则转为死信消费者获取消息后长时间未返回basic.ack也会触发。

Kafka消费者长时间未提交offset且超过消息留存时间或消费端挂死导致消息无法处理最终转为死信。

场景示例消费者服务宕机、消费逻辑卡死、数据库连接超时导致无法完成业务处理无法返回ACK。

消息被消费者主动拒绝且不允许重试通俗解释消费者处理消息时发现「无法修复」的问题如格式错误、数据非法主动向中间件发送「拒绝消费」指令且明确「不允许重新入队重试」消息直接转为死信。

细节补充RabbitMQ调用basic.reject()或basic.nack()且参数requeuefalse核心不重新入队。

Kafka消费端抛出「不可重试异常」并配置不重新提交offset手动/自动转发到死信主题。

场景示例消息缺少必填字段、用户ID不存在、订单号非法这类问题重试也无法解决直接拒绝入死信。

业务队列/主题达到最大容量消息溢出通俗解释业务队列/主题配置了「最大消息存储上限」当消息数量达到该上限新消息无法入队中间件将按配置把最早的消息或新消息转为死信避免直接丢失。

细节补充RabbitMQ队列通过x-max-length配置最大消息数溢出消息自动转为死信。

Kafka通过主题的「最大分区消息数」「最大存储容量」配置溢出/过期消息定向到死信主题。

场景示例秒杀活动引发消息量暴增业务队列达到存储上限多余消息转为死信留存。

实际项目配置RabbitMQ 死信队列RabbitMQ实现DLQ的核心逻辑是「业务队列绑定死信交换机→死信交换机绑定死信队列」下面以Spring Boot Spring AMQP为例提供可直接落地的配置。

前置依赖pom.xml!-- Spring AMQP 操作 RabbitMQ --dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependency

核心配置类声明DLX、DLQ、业务队列importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.DirectExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.amqp.core.QueueBuilder;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;ConfigurationpublicclassRabbitMQDLQConfig{//

定义常量队列/交换机/路由键名称规范命名便于维护// 业务相关publicstaticfinalStringBUSINESS_QUEUEbiz_order_queue;publicstaticfinalStringBUSINESS_EXCHANGEbiz_order_exchange;publicstaticfinalStringBUSINESS_ROUTING_KEYbiz.order.key;// 死信相关publicstaticfinalStringDLQ_QUEUEdlq_order_queue;publicstaticfinalStringDLX_EXCHANGEdlq_order_exchange;publicstaticfinalStringDLX_ROUTING_KEYdlq.order.key;//

声明死信交换机DLX普通Direct交换机持久化避免重启丢失BeanpublicDirectExchangedlxExchange(){returnDirectExchange.builder(DLX_EXCHANGE).durable(true).build();}//

声明死信队列DLQ普通持久化队列仅用于存储死信BeanpublicQueuedlqQueue(){returnQueueBuilder.durable(DLQ_QUEUE).build();}//

绑定死信交换机 → 死信队列指定死信路由键BeanpublicBindingdlqBinding(){returnBindingBuilder.bind(dlqQueue()).to(dlxExchange()).with(DLX_ROUTING_KEY);}//

声明业务交换机普通Direct交换机持久化BeanpublicDirectExchangebusinessExchange(){returnDirectExchange.builder(BUSINESS_EXCHANGE).durable(true).build();}//

声明业务队列核心配置死信相关参数指定DLX和死信路由键BeanpublicQueuebusinessQueue(){returnQueueBuilder.durable(BUSINESS_QUEUE)// 核心配置1指定当前队列的死信要发送到的DLX.withArgument(x-dead-letter-exchange,DLX_EXCHANGE)// 核心配置2指定死信发送到DLX时的路由键.withArgument(x-dead-letter-routing-key,DLX_ROUTING_KEY)// 可选配置3消息TTL存活时间10秒超时未消费转为死信单位毫秒.withArgument(x-message-ttl,

// 可选配置4队列最大容量最多存储1000条消息溢出转为死信.withArgument(x-max-length,

.build();}//

绑定业务交换机 → 业务队列BeanpublicBindingbusinessBinding(){returnBindingBuilder.bind(businessQueue()).to(businessExchange()).with(BUSINESS_ROUTING_KEY);}}

消费端示例主动拒绝消息入死信importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.amqp.rabbit.core.ChannelAwareMessageListener;importorg.springframework.stereotype.Component;importcom.rabbitmq.client.Channel;ComponentpublicclassOrderConsumer{// 监听业务队列开启手动ACK需在yml中配置RabbitListener(queuesRabbitMQDLQConfig.BUSINESS_QUEUE)publicvoidconsumeOrderMessage(Stringmessage,Channelchannel,MessagerawMessage)throwsException{longdeliveryTagrawMessage.getMessageProperties().getDeliveryTag();try{System.out.println(处理订单消息message);// 模拟消息格式非法包含invalid关键字if(message.contains(invalid)){thrownewIllegalArgumentException(订单消息格式非法缺少订单号);}// 处理成功手动发送ACK确认channel.basicAck(deliveryTag,false);}catch(Exceptione){System.out.println(消息处理失败送入死信队列e.getMessage());// 核心主动拒绝消息requeuefalse不重新入队直接转为死信channel.basicReject(deliveryTag,false);}}// 可选监听死信队列用于后续排查和归档实际项目可暂不监听先留存RabbitListener(queuesRabbitMQDLQConfig.DLQ_QUEUE)publicvoidconsumeDlqMessage(Stringmessage){System.out.println(接收到死信消息留存排查message);// 落地操作存入数据库、记录详细日志、推送告警通知}}

补充yml配置开启手动ACKspring:rabbitmq:host:localhostport:5672username:guestpassword:guestvirtual-host:/listener:simple:# 开启手动ACK才能主动拒绝消息并送入死信ack-mode:manual# 消费者线程数根据业务调整concurrency:1max-concurrency:5

实际项目配置Kafka 死信主题DLTKafka无原生「死信交换机/队列」机制通过「自定义死信主题」实现兜底功能推荐使用Spring Kafka的DeadLetterPublishingRecoverer实现「消费失败自动转发死信」下面提供落地配置。

前置依赖pom.xml!-- Spring Kafka 操作 Kafka --dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId/dependency

核心配置类声明主题、死信转发器importorg.apache.kafka.clients.admin.NewTopic;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.kafka.listener.DeadLetterPublishingRecoverer;importorg.springframework.kafka.listener.SeekToCurrentErrorHandler;importorg.springframework.util.backoff.FixedBackOff;ConfigurationpublicclassKafkaDLTConfig{//

定义主题名称规范死信主题业务主题_dlqpublicstaticfinalStringBUSINESS_TOPICbiz_order_topic;publicstaticfinalStringDLQ_TOPICbiz_order_topic_dlq;//

声明业务主题分区数1副本数1测试/生产可调整BeanpublicNewTopicbusinessTopic(){returnnewNewTopic(BUSINESS_TOPIC,1,(short)

;}//

声明死信主题等效于RabbitMQ的DLQ普通主题BeanpublicNewTopicdlqTopic(){returnnewNewTopic(DLQ_TOPIC,1,(short)

;}//

配置死信转发器消费失败后自动转发消息到死信主题BeanpublicDeadLetterPublishingRecovererdeadLetterPublishingRecoverer(KafkaTemplateString,ObjectkafkaTemplate){returnnewDeadLetterPublishingRecoverer(kafkaTemplate,(record,exception)-{System.out.println(消息消费失败转发到死信主题exception.getMessage());// 指定死信主题和分区默认分区0生产可根据业务分片returnneworg.apache.kafka.common.TopicPartition(DLQ_TOPIC,

;});}//

配置错误处理器重试3次后仍失败转发到死信主题BeanpublicSeekToCurrentErrorHandlererrorHandler(DeadLetterPublishingRecovererrecoverer){// FixedBackOff固定间隔重试1秒/次最多3次生产可改用指数退避重试FixedBackOffbackOffnewFixedBackOff(1000L,

;returnnewSeekToCurrentErrorHandler(recoverer,backOff);}}

消费端示例失败后自动转发死信importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.stereotype.Component;ComponentpublicclassKafkaOrderConsumer{// 监听业务主题自动应用上面配置的错误处理器KafkaListener(topicsKafkaDLTConfig.BUSINESS_TOPIC,groupIdbiz_order_consumer_group)publicvoidconsumeOrderMessage(Stringmessage){System.out.println(处理Kafka订单消息message);// 模拟消息格式非法触发异常if(message.contains(invalid)){thrownewIllegalArgumentException(Kafka订单消息格式非法缺少订单号);}// 处理成功自动提交offsetyml中默认开启}// 可选监听死信主题用于排查和归档KafkaListener(topicsKafkaDLTConfig.DLQ_TOPIC,groupIddlq_order_consumer_group)publicvoidconsumeDlqMessage(Stringmessage){System.out.println(接收到Kafka死信消息留存排查message);// 落地操作存入数据库、记录详细日志、推送告警通知}}

补充yml配置spring:kafka:bootstrap-servers:localhost:9092producer:key-serializer:org.apache.kafka.common.serialization.StringSerializervalue-serializer:org.apache.kafka.common.serialization.StringSerializerconsumer:group-id:biz_order_consumer_groupkey-deserializer:org.apache.kafka.common.serialization.StringDeserializervalue-deserializer:org.apache.kafka.common.serialization.StringDeserializer# 自动提交offset生产可改为手动提交提高可靠性enable-auto-commit:trueauto-commit-interval:1000

死信队列与重试机制的核心区别两者都是处理「消息消费失败」的机制但定位、时机、逻辑完全不同实际项目中通常「组合使用」先重试重试失败再入死信。

对比维度死信队列DLQ重试机制核心定位事后兜底、留存失败消息、避免丢失事前补救、尝试重新处理、争取正常消费执行时机重试机制执行完毕若配置仍失败或无需重试如非法消息消息消费失败后立即/间隔执行优先于DLQ执行逻辑消息转入专用队列留存不再参与正常业务流转消息重新入队/投递再次被消费者获取并处理资源消耗低仅存储不重复执行业务逻辑中高重复执行消费逻辑无限重试会耗尽资源适用场景

重试无法解决的问题格式错误、数据非法

超时未处理的消息

需事后排查的失败消息

临时性问题网络抖动、服务短暂不可用

重试后可能成功的场景数据库连接超时最终结果消息留存等待人工/批量处理大概率成功消费少数仍失败转入DLQ最佳实践采用「指数退避重试 有限次数 死信队列」的组合重试策略选择「指数退避」重试间隔逐渐增加如1s→2s→4s→8s避免短时间内大量重试占用资源。

重试次数限制最大重试次数

次为宜避免无限循环。

兜底策略重试失败后自动转入死信队列留存消息以便后续排查。

如何处理死信堆积问题实际项目痛点死信堆积的核心原因「死信产生速度处理速度」「未及时处理死信」「业务逻辑有缺陷导致大量无效消息」处理方案分「紧急处理」和「长期优化」。

紧急处理解决当前堆积临时扩容死信消费端增加死信消费者实例数、提高消费者线程数加快死信的读取和落地如存入数据库释放中间件资源。

批量导出与离线处理通过中间件工具RabbitMQ的rabbitmqadmin、Kafka的kafka-console-consumer.sh将死信批量导出到文件/数据库离线分析处理不占用线上资源。

# Kafka示例将死信主题消息导出到文件kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic biz_order_topic_dlq --from-beginning --timeout-ms5000dlq_order_messages.txt筛选无效死信批量清理排查死信内容删除「完全无效、无修复价值」的消息如重复垃圾消息减少堆积。

长期优化避免后续堆积优化前置业务逻辑减少死信产生消息发送前校验格式、必填字段、数据合法性避免发送无效消息配置合理路由规则避免路由失败导致死信。

配置合理的重试策略减少「可修复」死信的产生避免这类消息流入DLQ。

建立死信监控与告警机制通过PrometheusGrafana监控死信数量、新增速度当超过阈值如1000条时触发邮件/短信告警及时介入。

建立死信自动处理与归档机制对有规律的死信如某类数据缺失消息编写专用程序自动修复并重新投递对无修复价值的死信配置每月归档到冷存储释放中间件资源。

死信队列在保障系统高可用和消息可靠性中的核心作用保障消息可靠性实现「最后一道兜底」避免失败消息丢失消息中间件的核心诉求之一是「可靠投递」死信队列将无法正常处理的消息留存避免了「消息消失无影无踪」的问题对于金融、订单等核心业务可后续通过死信排查补单避免数据不一致和资金损失。

保障系统高可用隔离无效消息避免拖垮核心业务若无死信队列失败消息可能无限循环重试或堆积在业务队列大量占用消费者线程、数据库连接等资源最终导致核心业务队列无法正常投递/消费引发系统雪崩。

死信队列将无效消息隔离让核心业务专注于正常消息处理保障系统平稳运行。

提升问题排查效率提供失败消息样本快速定位根因死信队列中留存了失败消息的完整内容、异常信息开发人员可通过分析这些样本快速定位问题根因如生产端漏传字段、依赖服务不可用大幅缩短排障时间。

支持系统容错与降级为系统提供「缓冲容错能力」当依赖服务如数据库、第三方支付接口出现故障时消息消费失败重试后入死信队列系统不会因依赖服务故障而阻塞实现了降级容错当依赖服务恢复后可将死信消息重新投递实现系统恢复后的补处理保证业务连续性。

总结死信队列是「兜底队列」本质是普通队列/主题用于留存无法正常处理的消息核心搭配重试机制使用。

消息成为死信的三大场景超时未确认、主动拒绝且不重试、队列/主题溢出。

RabbitMQ通过「DLXDLQ」配置Kafka通过「死信主题自动转发」实现实际项目需配置监控和归档机制避免死信堆积。

死信队列的

核心价值保障消息不丢失、隔离无效消息、支撑系统高可用和容错降级。

黄片APP下载-黄片APP下载应用

百度百家号客服电话人工服务

123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123