SpringBoot整合RabbitMQ及其原理分析

举报
LoneWalker、 发表于 2023/08/15 14:51:34 2023/08/15
【摘要】 SpringBoot整合RabbitMQ及其原理分析

上一篇:RabbitMQ基础知识

1、相关依赖

这里无需指定版本号,让其跟着SpringBoot版本走。本示例使用SpringBoot版本号为2.7.10。

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>

2、生产者、消费者

创建两个SpringBoot应用,模拟消息生产者与消费者【publisher、consumer】。

2-1生产者

编写配置文件,用户名和密码等自行修改 这里虚拟机的名称是上一篇文章中新建的。

server.port=8082
#rabbitmq服务器ip
spring.rabbitmq.host=localhost
#rabbitmq的端口
spring.rabbitmq.port=5672
#用户名
spring.rabbitmq.username=用户名
#密码
spring.rabbitmq.password=密码
#配置虚拟机
spring.rabbitmq.virtual-host=demo

声明交换机、队列并绑定:

@Configuration
public class RabbitMqConfig {

    @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
        return rabbitTemplate;
    }

    @Bean
    public MessageConverter jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public DirectExchange getExchange(){
        return new DirectExchange("directExchange",false,false);
    }

    @Bean
    public Queue getQueue(){
        return new Queue("publisher.addUser",true,false,false);
    }

    @Bean
    public Binding getBinding(DirectExchange exchange,Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("publisher.addUser");
    }
}

新建User实体类

@Data
public class User {
    private Long id;
    private String name;
    private String desc;
}

在方法中使用RabbitTemplate来发送消息:

public interface PublisherService {

    /**
     * 添加用户
     * @param user 用户信息
     */
    void addUser(User user);

}
@RequiredArgsConstructor
@Service
public class PublisherServiceImpl implements PublisherService{

    private final RabbitTemplate rabbitTemplate;

    @Override
    public void addUser(User user) {
        rabbitTemplate.convertAndSend("directExchange","publisher.addUser",user);
    }
}

以上需要注意的就是交换机的名称队列名routingKey。示例中使用的是直连交换机,routingKey需要和队列名保持一致。不懂的可以查看上一篇文章。

controller:

@RequiredArgsConstructor
@RestController
@RequestMapping("/user")
public class UserController {


    private final PublisherService publisherService;

    @PostMapping("/add")
    public void add(){
        User user = new User();
        user.setId(1000L);
        user.setName("黄忠");
        user.setDesc("老兵不死,只是逐渐凋零");
        publisherService.addUser(user);
    }

}

2-2消费者

消费者的配置和生产者一样,不赘述了,直接看代码:

@Service
@Slf4j
public class ConsumerService {


    @RabbitListener(queues ="publisher.addUser")
    public void addUser(String userStr){
        User user = JSONObject.parseObject(userStr,User.class);
        log.info(user.toString());
    }
}

@RabbitListener 注解是指定某方法作为消息消费的方法,指定队列名称。@RabbitListener 如果标注在类上,需配合 @RabbitHandler 注解一起使用,根据接受的参数类型进入具体的方法中

2-3测试

消费端在启动时可能会报找不到交换机或队列,只需要让生产者发送一次消息,从控制台就可以看到相关的交换机和队列等信息了。

可以看到消费者成功消费了消息:

3、消费流程

通过上述操作,我们已经会简单地使用RabbitMQ了,接下来了解一下它的整个流程。如此可以让我们掌握的更牢固。

生产者:

  • 生产者连接到Message Broker【也就是RabbitMQ服务】,建立一个连接( Connection)开启一个信道(Channel)。
  • 生产者声明一个交换机,并设置相关属性,比如交换机类型、是否持久化等。
  • 生产者声明一个队列并设置相关属性。
  • 生产者通过路由键【Routing Key】将交换机和队列绑定。
  • 生产者发送消息至RabbitMQ Broker,其中包含路由键、交换器等信息。
  • 相应的交换机根据接收到的路由键查找相匹配的队列。
  • 如果找到,则将从生产者发送过来的消息存入相应的队列中。
  • 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
  • 关闭信道。
  • 关闭连接。

消费者:

  • 消费者连接到RabbitMQ Broker ,建立一个连接(Connection),开启一个信道(Channel) 。
  • 消费者向RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数,
  • 等待RabbitMQ Broker 回应并投递相应队列中的消息,消费者接收消息。
  • 消费者确认(ack) 接收到的消息。
  • RabbitMQ 从队列中删除相应己经被确认的消息。
  • 关闭信道。
  • 关闭连接。

【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。