想试试能不能把mq搞死

举报
赵KK日常技术记录 发表于 2023/06/30 23:45:21 2023/06/30
【摘要】 下午想试试能不能把mq搞死,就试了下模拟并发10000个请求,最高时等待队列数量为53,也就是说高并发时必然会出现实际业务与数据库的数据不一致的情况,搭配redis使用时,尽可能的应当(必须)从redis里面取数据。图片此时发现一条死信队列的消息,具体产生原因不详,分析和解决下此问题。死信队列的产生activemq默认使用异步发送模式,如果设置了持久化但没开启事务的话,会发生消息丢失的情况,...

下午想试试能不能把mq搞死,就试了下模拟并发10000个请求,最高时等待队列数量为53,也就是说高并发时必然会出现实际业务与数据库的数据不一致的情况,搭配redis使用时,尽可能的应当(必须)从redis里面取数据。

图片

此时发现一条死信队列的消息,具体产生原因不详,分析和解决下此问题。

死信队列的产生

activemq默认使用异步发送模式,如果设置了持久化但没开启事务的话,会发生消息丢失的情况,mq宕机也会丢失。

2.具体哪些情况会引发消息重发?

①Client用了transactions且在session中调用了rollback

②Client用了transactions且在调用commit之前关闭或者没有commit

③Client在CLIENT_ACKNOWLEDGE的传递模式下,session中调用了recover(允许消息重发模式下)

消息的重发时间间隔为1秒,重发次数默认最高6次。

一个消息被redelivedred超过6次,消费端会给mq一个"poison ack",表示这个消息有毒,此时这个消息会被放入DLQ死信队列。

图片

图片

图片

由于我是本地mq,我可能搞下测试环境的mq,本地是重试4次后就进入死信队列了。

在哪里改?

redeliveryPolicy.setMaximumRedeliveries(3);

public ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerurl);
//信任所有包下的序列化对象,解决无法发送对象消息
factory.setTrustAllPackages(true);
factory.setUserName(userName);
factory.setPassword(password);
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setMaximumRedeliveries(3);// 修改重发次数为3次
factory.setRedeliveryPolicy(redeliveryPolicy);
return factory;
}
也可在配置文件中修改

如何处理死信队列中的消息?

1:人工处理(太累)

2:定时任务(太耗性能)

3:监听死信队列

4:死信队列写库

如何监听死信队列?

思路:死信队列也是一个队列,创建一个监听器,监听死信队列ActiveMQ.DLQ队列是否有消息,有消息就进行消费。

/**

  • @author zhaokkstart

  • @create 2020-09-07 16:27
    */
    import javax.jms.BytesMessage;
    import javax.jms.JMSException;
    import javax.jms.MapMessage;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.ObjectMessage;
    import javax.jms.StreamMessage;
    import javax.jms.TextMessage;
    import org.apache.activemq.command.ActiveMQDestination;;
    public class QueueMessageListener implements MessageListener{
    @Override
    public void onMessage(Message message) {
    try {
    ActiveMQDestination queues=(ActiveMQDestination)message.getJMSDestination();
    if(queues.getPhysicalName().equalsIgnoreCase(“ActiveMQ.DLQ”)){
    System.out.println(“监听队列:”+queues.getPhysicalName()+“消费了消息:”);

             if (message instanceof TextMessage) {
                 TextMessage tm = (TextMessage) message;
                 try {
                     System.out.println("from get textMessage:\t" + tm.getText());
                 } catch (JMSException e) {
                     // TODO Auto-generated catch block
                     e.printStackTrace();
                 }
             }
    
           
             if (message instanceof MapMessage) {
                 MapMessage mm = (MapMessage) message;
                 try {
                     System.out.println("from get MapMessage:\t" + mm.getString("msgId"));
                 } catch (JMSException e) {
                     // TODO Auto-generated catch block
                     e.printStackTrace();
                 }
             }
         }
         // 如果是Object消息
         if (message instanceof ObjectMessage) {
             ObjectMessage om = (ObjectMessage) message;
             System.out.println("from get ObjectMessage:\t");
         }
     } catch (JMSException e) {
         e.printStackTrace();
     }
    

    }
    }

使用监听还需要启动消费者吗?

可以不启动消费者,需要启动生产者

消息监听器MessageListener

1.MessageListener
2.SessionAwareMessageListener,
SessionAwareMessageListener是Spring为我们提供的,
它不是标准的JMS MessageListener。
3.MessageListenerAdapter,
MessageListenerAdapter类实现了MessageListener接口
和SessionAwareMessageListener接口,
它的主要作用是将接收到的消息进行类型转换,
然后通过反射的形式把它交给一个普通的Java类进行处理。
其另外一个主要的功能是可以自动的发送返回消息。
ActiveMQ队列消息积压问题
1.队列长时间无人监听时,自动删除该队列,在ActiveMQ官方提供的功能列表中,有这样一项功能:Delete Inactive Destination。它可以删除“没有未处理消息、并且没有消费者的Destination”。我就觉得我们的业务队列太多了,没计算过,但至少实在50个以上的,没有必要~,不常用的可以手动去启停。

本文参考资料

ActiveMQ队列消息积压问题 https://blog.51cto.com/winters1224/2049432
如何监听死信队列?https://www.cnblogs.com/zhoading/p/12009116.html
activemq 中如何消费已经在死信队列中的消息 https://q.cnblogs.com/q/90513/
推荐博文 https://blog.csdn.net/vop444/article/details/85222353

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。