消息中间件RabbitMQ操作实战

举报
tea_year 发表于 2025/08/27 15:52:31 2025/08/27
【摘要】 5.1 导入依赖<dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter-amqp</artifactId>    <version>2.5.6</version></dependency>5.2 在application.yml中增加配置spring: rabb...

RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。

RabbitMQ官方地址:http://www.rabbitmq.com/

1.1 导入依赖

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.5.6</version>
</dependency>

1.2 在application.yml中增加配置

spring:
  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: /yh
    username: guest
    password: guest

5.3 Hello-World 简单队列

一个生产者,一个默认的交换机,一个队列,一个消费者

结构图

1587715551088.png


1)创建配置类,用于创建队列对象

package com.it.simple;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
​
@Configuration
public class SimpleQueueConfig {
    @Bean
    public Queue simple(){
        return new Queue("simpleQueue");
    }
}

2)创建生产者

package com.it.simple;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
​
@Component
public class SimpleQueueProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
​
    public void send(){
        System.out.println("SimpleQueueProducer");
        //发送消息,第一个参数为队列名称,第二参数为消息内容
        rabbitTemplate.convertAndSend("simpleQueue","简单模式");
    }
}

3)创建消费者

package com.it.simple;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues="simpleQueue")       //监听指定的消息队列
public class SimpleQueueCustomer {    
    //@RabbitHandler修饰的方法中实现接受到消息后的处理逻辑
    @RabbitHandler
    public void receive(String content){
        System.out.println("SimpleQueueCustomer");
        System.out.println("来SimpleQueueProducer的信息:"+content);
    }
}

4)在src\test\java\com\it\Rabbitmq01ApplicationTests.java进行测试

package com.it;
import com.it.simple.SimpleQueueProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class Rabbitmq01ApplicationTests {    
    @Test
    void contextLoads() {
    }
    
    @Autowired
    private SimpleQueueProducer simpleQueueProducer;

    @Test
    public void testSimpleQueueProducer(){
        simpleQueueProducer.send();
    }
}

如果传递的是 JavaBean 对象,该实体类需要实现序列化接口,具体流程如下:

  1. 导入lombok依赖,创建User类

package com.it.pojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class User implements Serializable {
    private String username;
    private String password;
}
  1. 修改生产者中的代码

package com.it.simple;

import com.it.pojo.User;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

//生产者
@Component
public class SimplePublisher {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(){
        System.out.println("SimplePublisher...");
        //rabbitTemplate.convertAndSend("","simpleQueue","简单模式");
        rabbitTemplate.convertAndSend("","simpleQueue",new User("张三","123"));
    }
}
  1. 修改消费者中的代码

package com.it.simple;

import com.it.pojo.User;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

//消费者
@Component
@RabbitListener(queues = "simpleQueue")
public class SimpleConsumer {
//    @RabbitHandler
//    public void receive(String content){
//        System.out.println("SimpleConsumer...");
//        System.out.println("来自SimplePublisher的消息:"+content);
//    }

    @RabbitHandler
    public void receive(User user){
        System.out.println("SimpleConsumer...");
        System.out.println("来自SimplePublisher的消息:"+user);
    }
}
  1. 运行测试类即可!


5.4 Work 工作队列

一个生产者,一个默认的交换机,一个队列,两个消费者,默认采用公平分发

结构图

1587718055260.png

1)创建配置类,用于创建队列对象

package com.it.work;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class WorkQueueConfig {    
    @Bean
    public Queue work(){
        return new Queue("workQueue",true,false,false,null);
    }
}

2)创建生产者

package com.it.work;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class WorkProducer {    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void send(){
        System.out.println("工作队列模式生产者");
        //参数1:目标队列名;参数2:消息内容
        rabbitTemplate.convertAndSend("workQueue","工作队列的消息");
    }
}

3)创建消费者,本案例创建两个消费者

package com.it.work;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues="workQueue")
public class WorkCustomer_01 {    
    @RabbitHandler
    public void receive(String content){
        System.out.println("工作队列消费者_01:"+content);
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}
package com.it.work;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues="workQueue")
public class WorkCustomer_02 {
    @RabbitHandler
    public void receive(String content){
        System.out.println("工作队列消费者_02:"+content);
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

4)添加web层的Controller

@RestController
public class WorkController {
    @Autowired
    WorkProducer workProducer;

    @GetMapping("sendWork")
    public String send4(){
        workProducer.send();
        return "ok";
    }
}

5.启动类

@SpringBootApplication
public class MQApp {
    public static void main(String[] args) {
        SpringApplication.run((MQApp.class));
    }
}


5.5 Publish/Subscribe 发布订阅模式

一个生产者,一个交换机,两个队列,两个消费者

结构图

1587720941513.png


使用该模式需要借助交换机,生产者将消息发送到交换机,再通过交换机到达队列.

有四种交换机:direct/topic/headers/fanout,默认交换机是direct,发布与订阅的实现使用第四个交换器类型fanout

使用交换机时,每个消费者有自己的队列,生产者将消息发送到交换机(X),每个队列都要绑定到交换机

本例中:

创建2个消息队列

创建一个fanout交换机对象

Bind交换机和队列


image-20231118205749446.png


1)创建配置类

package com.it.fanout;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {
    //创建两个队列
    @Bean
    public Queue getQueueOne(){
        return new Queue("fqueue1");
    }
   @Bean
    public Queue getQueueTwo(){
        return new Queue("fqueue2");
    }
    //创建一个交换机
    @Bean
    public FanoutExchange getExchange(){
        return new FanoutExchange("fExchange");
    }
    //将两个队列绑定到交换机上
    @Bean
    public Binding bindingFanoutQueue1(){
        return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
    }
    @Bean
    public Binding bindingFanoutQueue2(){
        return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
    }
}

2)创建生产者

package com.it.fanout;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class FanoutProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(){
        System.out.println("FanoutProducer");
        //第一个参数是交换机的名称
        //第二个参数是routerKey 这里设置为空字符串即可
        //第三个参数是要发送的消息
        rabbitTemplate.convertAndSend("fanoutExchange","","fanout的消");
    }
}

3)创建消费者,本案例创建两个消费者

package com.it.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "fqueue1")
public class FanoutCustomer_01 {
    @RabbitHandler
    public void receive(String content){
        System.out.println("Fanout消费者_01:"+content);
    }
}
package com.it.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "fqueue1")
public class FanoutCustomer_02 {
    @RabbitHandler
    public void receive(String content){
        System.out.println("Fanout消费者_02:"+content);
    }
}

4)在Controller类代码

@RestController
public class FanoutController {
    @Autowired
    FanoutProducer fanoutProducer;
    @GetMapping("sendFanout")
    public String sendFanout(){
        fanoutProducer.sendMsg();
        return "ok";
    }
}


5.6 Routing 路由模式

一个生产者,一个交换机,两个队列,两个消费者

结构图


1587724886159.png



生产者将消息发送到direct交换机(路由模式需要借助直连交换机实现),在绑定队列和交换机的时候有一个路由key,生产者发送的消息会指定一个路由key,那么消息只会发送到相应key相同的队列,接着监听该队列的消费者消费消息。也就是让消费者有选择性的接收消息。

本例中:

创建2个消息队列

创建一个direct交换机对象

Bind交换机和队列

1)创建配置类

package com.it.direct;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DirectConfig {
    @Bean
    public Queue directQueue1(){
        return new Queue("directQueue1");
    }
    @Bean
    public Queue directQueue2(){
        return new Queue("directQueue2");
    }
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("directExchange");
    }
    @Bean
    public Binding bingDirectQueue1(){
        return BindingBuilder.bind(directQueue1()).to(directExchange()).with("zhangsan");
    }
    @Bean
    public Binding bingDirectQueue2(){
        return BindingBuilder.bind(directQueue2()).to(directExchange()).with("lisi");
    }
}

2)创建生产者

package com.it.direct;

import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class DirectProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private DirectExchange directExchange;

    public void send(){
        System.out.println("Routing模式生产者");
        rabbitTemplate.convertAndSend(directExchange.getName(),"zhangsan","zhangsanContent");
        rabbitTemplate.convertAndSend(directExchange.getName(),"lisi","lisiContent");
    }
}

3)创建两个消费者

package com.it.direct;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "directQueue1")
public class DirectCustomer_01 {
    @RabbitHandler
    public void receive(String content){
        System.out.println("路由消费者_01:"+content);
    }
}
package com.it.direct;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "directQueue2")
public class DirectCustomer_02 {

    @RabbitHandler
    public void receive(String content){
        System.out.println("DirectCustomer_02:"+content);
    }
}


4)在测试类中添加对象和方法进行测试

@RestController
public class RoutingController {
    @Autowired
    RouterProducer routerProducer;
    @GetMapping("sendRouting")
    public String sendRouting(){
        routerProducer.sendRouting();
        return "ok";
    }
}


5.7 Topic 主题模式

一个生产者,一个交换机,两个队列,两个消费者

目标**:编写生产者、消费者代码并测试了解Topics通配符模式的特点

模式说明

Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用通配符!

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert 通配符规则:

#:匹配一个或多个词

*:匹配不多不少恰好1个词

举例: item.#:能够匹配 item.insert.abc 或者 item.insert item.*:只能匹配 item.insert

结构图

1587727436898.png


又称通配符模式(可以理解为模糊匹配,路由模式相当于精确匹配)

使用直连交换机可以改善我们的系统,但是它仍有局限性,它不能实现多重条件的路由。

在消息系统中,我们不仅想要订阅基于路由键的队列,还想订阅基于生产消息的源。这时候可以使用topic交换机。

使用主题交换机时不能采用任意写法的路由键,路由键的形式应该是由点分割的有意义的单词。例如"goods.stock.info"等。路由key最多255字节。

*号代表一个单词

#号代表0个或多个单词


1)创建配置类

package com.it.topic;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TopicConfig {
    @Bean
    public Queue topicQueue1(){
        return new Queue("topicQueue1");
    }
    @Bean
    public Queue topicQueue2(){
        return new Queue("topicQueue2");
    }
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange("topicExchange");
    }
    @Bean
    public Binding bingTopicQueue1(){           // wangwu.aaa
        return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("wangwu.*");
    }
    @Bean
    public Binding bingTopicQueue2(){//zhanoliu.aaa.bbb.cc
        return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("zhaoliu.#");
    }
}


2)创建生产者

package com.it.topic;

import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class TopicProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private TopicExchange topicExchange;

    public void send(){
        System.out.println("TopicProducer");
 rabbitTemplate.convertAndSend(topicExchange.getName(),"wangwu.abc","wangwuContent");    rabbitTemplate.convertAndSend(topicExchange.getName(),"zhaoliu.xyz.qwer","zhaoliuContent");
    }
}

3)创建两个消费者

package com.it.topic;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "topicQueue1")
public class TopicCustomer_01 {
    @RabbitHandler
    public void receive(String content){
        System.out.println("TopicCustomer_01:"+content);
    }
}
package com.it.topic;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "topicQueue2")
public class TopicCustomer_02 {
    @RabbitHandler
    public void receive(String content){
        System.out.println("TopicCustomer_02:"+content);
    }
}


4)在web层类中

@RestController
public class TopicController {
    @Autowired
    TopicProducer topicProducer;
    @GetMapping("sendTopic")
    public String sendTopic(){
        topicProducer.send();
        return "ok";
    }
}
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。