SpringBoot学习笔记-10:第十章-SpringBoot 与消息
第十章-SpringBoot 与消息
JMS&AMQP 简介
消息服务中间件可以提升系统异步通信,扩展解耦能力
两个重要概念:
- 消息代理 message broker
- 目的地 destination
- 队列 queue :
- 点对点消息通信 point-to-point
- 唯一的发送者和接收者
- 主体 topic
- 发布 publish/订阅 subscribe 消息通信
- 多接收者
- 队列 queue :
协议 | JMS | AMQP |
---|---|---|
英文 | Java Message Service Java | Advanced Message Queuing Protocol |
中文 | 消息服务 | 高级消息队列协议 |
实现 | ActiveMQ、HornetMQ | RabbitMQ |
定义 | JAVA API | 网络线级协议 |
跨语言 | 否 | 是 |
跨平台 | 否 | 是 |
消息模型 | peer-2-peer、Pub/Sub | 5 种 |
支持消息类型 | 多消息类型 | byte[] |
支持 | spring-jms | spring-rabbit |
发送消息 | @JmsTemplate | @RabbitTemplate |
监听消息 | @JmsListener | @RabbitListener |
开启支持 | @EnableJms | @EnableRabbit |
自动配置 | JmsAutoConfiguration | RabbitAutoConfiguration |
RabbitMQ 基本概念简介
https://www.rabbitmq.com/
RabbitMQ 是由 erlang 开发的 AMQP(Advanced Message Queuing Protocol) 实现
核心概念
- Message 消息(消息头+消息体)
- Publisher 消息发布者
- Exchange 交换器 4 种类型
- direct(默认)
- fanout
- topic
- headers
- Queue 消息队列
- Binding 绑定
- Connection 网络连接
- Channel 信道
- Consumer 消费者
- Virtual Host 虚拟主机 默认/
- Broker 消息队列服务器实体
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-qul9BCQd-1597024380528)(img/rabbitmq.jpg)]
RabbitMQ 运行机制
生产者把消息发布到 Exchange 上,
消息最终达到队列并被消费者接收,
而 Binding 决定交换器的消息应该发送到哪个队列
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8gnBrRxr-1597024380530)(img/rabbitmq-2.jpg)]
RabbitMQ 安装测试
安装启动 RabbitMQ
# 安装带有管理界面
docker pull rabbitmq:management
# 客户端:5672 管理界面:15672
docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq rabbitmq:management
- 1
- 2
- 3
- 4
- 5
管理界面
http://localhost:15672/
账号:guest
密码:guest
RabbitTemplate 发送接受消息&序列化机制
引入依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 1
- 2
- 3
- 4
自动配置类:RabbitAutoConfiguration
配置:RabbitProperties
给 Rabbit 发送和接收消息:RabbitTemplate
系统管理组件:AmqpAdmin
RabbitMQ 中新建:
queue: message
exchange: exchange.message
Routing key : message
- 1
- 2
- 3
自定义对象序列化规则
package com.example.demo.config;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MyAMQPConfig { // 以json的方式序列化对象 @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); }
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
点对点发送消息测试
package com.example.demo;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.HashMap;
import java.util.Map;
@SpringBootTest
public class RabbitMQTest { @Autowired private RabbitTemplate rabbitTemplate; // 单播:发送数据 @Test public void testSendRabbitMQ() { Map<String, Object> map = new HashMap<>(); map.put("name", "Tom"); map.put("age", 23); rabbitTemplate.convertAndSend("exchange.message", "message", map); } // 单播:接收数据 @Test public void testReceiveRabbitMQ() { Object obj = rabbitTemplate.receiveAndConvert("message"); System.out.println(obj); // {name=Tom, age=23} }
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
@RabbitListener&@EnableRabbit
开启 RabbitMQ
package com.example.demo;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@EnableRabbit
public class DemoApplication { public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); }
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
监听队列数据
package com.example.demo.service;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import java.util.Map;
@Service
public class MessageService { // 接收消息数据 @RabbitListener(queues={"message"}) public void listenMessage(Map<String, Object> map){ System.out.println("收到消息: " + map); } // 接收消息头和消息体 @RabbitListener(queues={"message"}) public void receiveMessage(Message message){ System.out.println("收到消息: " + message.getMessageProperties()); System.out.println("收到消息: " + message.getBody()); }
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
AmqpAdmin 管理组件的使用
使用示例
package com.example.demo;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.HashMap;
import java.util.Map;
@SpringBootTest
public class RabbitMQTest { @Autowired AmqpAdmin amqpAdmin; @Test public void createExchange(){ // 创建交换器 amqpAdmin.declareExchange(new DirectExchange("exchange.admin")); // 创建queque amqpAdmin.declareQueue(new Queue("queue.admin")); // 创建绑定规则 amqpAdmin.declareBinding(new Binding("queue.admin", Binding.DestinationType.QUEUE, "exchange.admin", "admin", null)); System.out.println("创建成功"); }
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
文章来源: pengshiyu.blog.csdn.net,作者:彭世瑜,版权归原作者所有,如需转载,请联系作者。
原文链接:pengshiyu.blog.csdn.net/article/details/107905638
- 点赞
- 收藏
- 关注作者
评论(0)