Rabbitmq插件实现延迟队列

举报
yd_249383650 发表于 2023/03/27 14:46:44 2023/03/27
【摘要】 ​ 原因:RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。操作前需要安装延时队列插件  在官网上下载https://www.rabbitmq.com/community-plugins.html,下载rabbitmq_delayed_message_exchange插件,然后解压放置到...

 原因:RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。

操作前需要安装延时队列插件  


在官网上下载https://www.rabbitmq.com/community-plugins.html,下载

rabbitmq_delayed_message_exchange插件,然后解压放置到RabbitMQ的插件目录。

进入RabbitMQ的安装目录下的plgins目录,执行下面命令让该插件生效,然后重启RabbitMQ /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

编辑

代码架构图

在这里新增了一个队列delayed.queue,一个自定义交换机delayed.exchange,绑定关系如下:

 编辑

配置文件类代码  

 在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制 消息传递后并不会立即投递到目标队列中,而是存储在mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中。

/**
 * 基于插件的延迟队列
 */
@Configuration
public class DelayQueueConfig {
    public static  final  String DELAYED_QUEUE_NAME="delayed.queue";
    public static  final  String DELAYED_EXCHANGE_NAME="delayed.exchange";
    public static  final  String DELAYED_ROUTING_KEY="delayed.routingkey";
    @Bean
    public Queue delayedQueue()
    {
        return new Queue(DELAYED_QUEUE_NAME);
    }
    //自定义交换机,再这里定义的是一个延迟交换机
    @Bean
    public CustomExchange delayedExchange()
    {
        Map<String, Object> map = new HashMap<>();
        //自定义交换机的类型
        map.put("x-delayed-type","direct");
        return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,map);
    }
    @Bean
    public Binding bindingDelayedQueue(@Qualifier("delayedExchange") CustomExchange customExchange,@Qualifier("delayedQueue") Queue queue){
        return BindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_KEY).noargs();
    }

}

消息生产者代码  

@RestController
@Slf4j
@RequestMapping("ttl")
public class DelayedProduce {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @GetMapping("sendDelayMsg/{message}/{delayTime}")
    public void sendMsg(@PathVariable String message,@PathVariable String delayTime)
    {

        log.info("当前时间:{},发送一条时长{}毫秒TTL信息给队列C:{}", new Date(),delayTime, message);
        rabbitTemplate.convertAndSend(DelayQueueConfig.DELAYED_EXCHANGE_NAME,DelayQueueConfig.DELAYED_ROUTING_KEY,message, correlationData ->{
            correlationData.getMessageProperties().setExpiration(delayTime);
            return correlationData;
        });
    }
}

消息消费者代码 

@Slf4j
@Component
public class DelayConsumer {
    @RabbitListener(queues = DelayQueueConfig.DELAYED_QUEUE_NAME)
    public  void receiveDelayedQueue(Message message)
    {
        String s = new String(message.getBody());
        log.info("当前时间:{},收到延时队列的消息:{}" ,new Date().toString(), s);
    }

}

发起请求:

http://localhost:8080/ttl/sendDelayMsg/come on baby1/20000

http://localhost:8080/ttl/sendDelayMsg/come on baby2/2000

 编辑

编辑 第二个消息被先消费掉了,符合预期

延时队列在需要延时处理的场景下非常有用,使用RabbitMQ来实现延时队列可以很好的利用

RabbitMQ的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过RabbitMQ集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。


当然,延时队列还有很多其它选择,比如利用Java的DelayQueue,利用Redis的zset,利用Quartz

或者利用kafka的时间轮,这些方式各有特点,看需要适用的场景



【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。