消息中间件RabbitMQ操作实战
RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。
RabbitMQ官方地址:http://www.rabbitmq.com/
1.1 导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.5.6</version>
</dependency>
1.2 在application.yml中增加配置
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /yh
username: guest
password: guest
5.3 Hello-World 简单队列
一个生产者,一个默认的交换机,一个队列,一个消费者
结构图 |
---|
|
1)创建配置类,用于创建队列对象
package com.it.simple;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class SimpleQueueConfig {
@Bean
public Queue simple(){
return new Queue("simpleQueue");
}
}
2)创建生产者
package com.it.simple;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class SimpleQueueProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(){
System.out.println("SimpleQueueProducer");
//发送消息,第一个参数为队列名称,第二参数为消息内容
rabbitTemplate.convertAndSend("simpleQueue","简单模式");
}
}
3)创建消费者
package com.it.simple;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues="simpleQueue") //监听指定的消息队列
public class SimpleQueueCustomer {
//@RabbitHandler修饰的方法中实现接受到消息后的处理逻辑
@RabbitHandler
public void receive(String content){
System.out.println("SimpleQueueCustomer");
System.out.println("来SimpleQueueProducer的信息:"+content);
}
}
4)在src\test\java\com\it\Rabbitmq01ApplicationTests.java进行测试
package com.it;
import com.it.simple.SimpleQueueProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class Rabbitmq01ApplicationTests {
@Test
void contextLoads() {
}
@Autowired
private SimpleQueueProducer simpleQueueProducer;
@Test
public void testSimpleQueueProducer(){
simpleQueueProducer.send();
}
}
如果传递的是 JavaBean 对象,该实体类需要实现序列化接口,具体流程如下:
导入lombok依赖,创建User类
package com.it.pojo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User implements Serializable {
private String username;
private String password;
}
修改生产者中的代码
package com.it.simple;
import com.it.pojo.User;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
//生产者
@Component
public class SimplePublisher {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(){
System.out.println("SimplePublisher...");
//rabbitTemplate.convertAndSend("","simpleQueue","简单模式");
rabbitTemplate.convertAndSend("","simpleQueue",new User("张三","123"));
}
}
修改消费者中的代码
package com.it.simple;
import com.it.pojo.User;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
//消费者
@Component
@RabbitListener(queues = "simpleQueue")
public class SimpleConsumer {
// @RabbitHandler
// public void receive(String content){
// System.out.println("SimpleConsumer...");
// System.out.println("来自SimplePublisher的消息:"+content);
// }
@RabbitHandler
public void receive(User user){
System.out.println("SimpleConsumer...");
System.out.println("来自SimplePublisher的消息:"+user);
}
}
运行测试类即可!
5.4 Work 工作队列
一个生产者,一个默认的交换机,一个队列,两个消费者,默认采用公平分发
结构图 |
---|
|
1)创建配置类,用于创建队列对象
package com.it.work;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class WorkQueueConfig {
@Bean
public Queue work(){
return new Queue("workQueue",true,false,false,null);
}
}
2)创建生产者
package com.it.work;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class WorkProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(){
System.out.println("工作队列模式生产者");
//参数1:目标队列名;参数2:消息内容
rabbitTemplate.convertAndSend("workQueue","工作队列的消息");
}
}
3)创建消费者,本案例创建两个消费者
package com.it.work;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues="workQueue")
public class WorkCustomer_01 {
@RabbitHandler
public void receive(String content){
System.out.println("工作队列消费者_01:"+content);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
package com.it.work;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues="workQueue")
public class WorkCustomer_02 {
@RabbitHandler
public void receive(String content){
System.out.println("工作队列消费者_02:"+content);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
4)添加web层的Controller
@RestController
public class WorkController {
@Autowired
WorkProducer workProducer;
@GetMapping("sendWork")
public String send4(){
workProducer.send();
return "ok";
}
}
5.启动类
@SpringBootApplication
public class MQApp {
public static void main(String[] args) {
SpringApplication.run((MQApp.class));
}
}
5.5 Publish/Subscribe 发布订阅模式
一个生产者,一个交换机,两个队列,两个消费者
结构图 |
---|
|
使用该模式需要借助交换机,生产者将消息发送到交换机,再通过交换机到达队列.
有四种交换机:direct/topic/headers/fanout,默认交换机是direct,发布与订阅的实现使用第四个交换器类型fanout
使用交换机时,每个消费者有自己的队列,生产者将消息发送到交换机(X),每个队列都要绑定到交换机
本例中:
创建2个消息队列
创建一个fanout交换机对象
Bind交换机和队列
1)创建配置类
package com.it.fanout;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutConfig {
//创建两个队列
@Bean
public Queue getQueueOne(){
return new Queue("fqueue1");
}
@Bean
public Queue getQueueTwo(){
return new Queue("fqueue2");
}
//创建一个交换机
@Bean
public FanoutExchange getExchange(){
return new FanoutExchange("fExchange");
}
//将两个队列绑定到交换机上
@Bean
public Binding bindingFanoutQueue1(){
return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
}
@Bean
public Binding bindingFanoutQueue2(){
return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}
}
2)创建生产者
package com.it.fanout;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class FanoutProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(){
System.out.println("FanoutProducer");
//第一个参数是交换机的名称
//第二个参数是routerKey 这里设置为空字符串即可
//第三个参数是要发送的消息
rabbitTemplate.convertAndSend("fanoutExchange","","fanout的消");
}
}
3)创建消费者,本案例创建两个消费者
package com.it.fanout;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "fqueue1")
public class FanoutCustomer_01 {
@RabbitHandler
public void receive(String content){
System.out.println("Fanout消费者_01:"+content);
}
}
package com.it.fanout;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "fqueue1")
public class FanoutCustomer_02 {
@RabbitHandler
public void receive(String content){
System.out.println("Fanout消费者_02:"+content);
}
}
4)在Controller类代码
@RestController
public class FanoutController {
@Autowired
FanoutProducer fanoutProducer;
@GetMapping("sendFanout")
public String sendFanout(){
fanoutProducer.sendMsg();
return "ok";
}
}
5.6 Routing 路由模式
一个生产者,一个交换机,两个队列,两个消费者
结构图 |
---|
生产者将消息发送到direct交换机(路由模式需要借助直连交换机实现),在绑定队列和交换机的时候有一个路由key,生产者发送的消息会指定一个路由key,那么消息只会发送到相应key相同的队列,接着监听该队列的消费者消费消息。也就是让消费者有选择性的接收消息。
本例中:
创建2个消息队列
创建一个direct交换机对象
Bind交换机和队列
1)创建配置类
package com.it.direct;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DirectConfig {
@Bean
public Queue directQueue1(){
return new Queue("directQueue1");
}
@Bean
public Queue directQueue2(){
return new Queue("directQueue2");
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange("directExchange");
}
@Bean
public Binding bingDirectQueue1(){
return BindingBuilder.bind(directQueue1()).to(directExchange()).with("zhangsan");
}
@Bean
public Binding bingDirectQueue2(){
return BindingBuilder.bind(directQueue2()).to(directExchange()).with("lisi");
}
}
2)创建生产者
package com.it.direct;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class DirectProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private DirectExchange directExchange;
public void send(){
System.out.println("Routing模式生产者");
rabbitTemplate.convertAndSend(directExchange.getName(),"zhangsan","zhangsanContent");
rabbitTemplate.convertAndSend(directExchange.getName(),"lisi","lisiContent");
}
}
3)创建两个消费者
package com.it.direct;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "directQueue1")
public class DirectCustomer_01 {
@RabbitHandler
public void receive(String content){
System.out.println("路由消费者_01:"+content);
}
}
package com.it.direct;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "directQueue2")
public class DirectCustomer_02 {
@RabbitHandler
public void receive(String content){
System.out.println("DirectCustomer_02:"+content);
}
}
4)在测试类中添加对象和方法进行测试
@RestController
public class RoutingController {
@Autowired
RouterProducer routerProducer;
@GetMapping("sendRouting")
public String sendRouting(){
routerProducer.sendRouting();
return "ok";
}
}
5.7 Topic 主题模式
一个生产者,一个交换机,两个队列,两个消费者
目标**:编写生产者、消费者代码并测试了解Topics通配符模式的特点
模式说明
Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert 通配符规则:
#:匹配一个或多个词
*:匹配不多不少恰好1个词
举例: item.#:能够匹配 item.insert.abc 或者 item.insert item.*:只能匹配 item.insert
结构图 |
---|
|
又称通配符模式(可以理解为模糊匹配,路由模式相当于精确匹配)
使用直连交换机可以改善我们的系统,但是它仍有局限性,它不能实现多重条件的路由。
在消息系统中,我们不仅想要订阅基于路由键的队列,还想订阅基于生产消息的源。这时候可以使用topic交换机。
使用主题交换机时不能采用任意写法的路由键,路由键的形式应该是由点分割的有意义的单词。例如"goods.stock.info"等。路由key最多255字节。
*号代表一个单词
#号代表0个或多个单词
1)创建配置类
package com.it.topic;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TopicConfig {
@Bean
public Queue topicQueue1(){
return new Queue("topicQueue1");
}
@Bean
public Queue topicQueue2(){
return new Queue("topicQueue2");
}
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("topicExchange");
}
@Bean
public Binding bingTopicQueue1(){ // wangwu.aaa
return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("wangwu.*");
}
@Bean
public Binding bingTopicQueue2(){//zhanoliu.aaa.bbb.cc
return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("zhaoliu.#");
}
}
2)创建生产者
package com.it.topic;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class TopicProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private TopicExchange topicExchange;
public void send(){
System.out.println("TopicProducer");
rabbitTemplate.convertAndSend(topicExchange.getName(),"wangwu.abc","wangwuContent"); rabbitTemplate.convertAndSend(topicExchange.getName(),"zhaoliu.xyz.qwer","zhaoliuContent");
}
}
3)创建两个消费者
package com.it.topic;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "topicQueue1")
public class TopicCustomer_01 {
@RabbitHandler
public void receive(String content){
System.out.println("TopicCustomer_01:"+content);
}
}
package com.it.topic;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "topicQueue2")
public class TopicCustomer_02 {
@RabbitHandler
public void receive(String content){
System.out.println("TopicCustomer_02:"+content);
}
}
4)在web层类中
@RestController
public class TopicController {
@Autowired
TopicProducer topicProducer;
@GetMapping("sendTopic")
public String sendTopic(){
topicProducer.send();
return "ok";
}
}
- 点赞
- 收藏
- 关注作者
评论(0)