Springboot+RabbitMQ实现消息队列+发送邮箱
主页:小王叔叔的博客
支持:点赞👍关注✔️收藏💖
一、效果
二、RMQ可以实现的功能
【介绍】:集合了网上各种大佬的教学一起整理
2.1消息中间件:
2.2rmq安装
windows下 安装 rabbitMQ 及操作常用命令(操作创建用户密码 角色等)_wcy10086的博客-CSDN博客
windows10环境下的RabbitMQ安装步骤(图文) - 清明-心若淡定 - 博客园
2.3含义
2.4原理
三、SpringBoot + RMQ集成项目消息队列及聊天功能
【实现】:根据各位大佬的整理的原理,我们自己实现下如何使用吧
3.1RMQ配置
在rmq安装成功之后,浏览器输入http://localhost:15672,账号密码:guest/guest登录之后,给这个guest账号设置初始交换机(代码中默认设置的交换机,我的是:EXCHANGE_Member)的权限,这个问题注意下,要不一直提示:to exchange 'RabbitMQ_Exchange_Member' in vhost '/' refused for user 'guest', (这个错误找了半天并且找我们领导了,才知道的)
如下图:
3.1.1pom.xml(公共文件配置)
<!--RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3.1.2spring.xml(公共文件配置)
####################################RabbitMQ配置#################################################################
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
# 开启confirms回调 P -> Exchange
spring.rabbitmq.publisher-confirms=true
# 开启returnedMessage回调 Exchange -> Queue
spring.rabbitmq.publisher-returns=true
# 设置手动确认(ack) Queue -> C
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.prefetch=100
3.2Java RMQ类代码
3.2.1 使用交换机DirectExchange : 按照routingkey分发到指定队列-(直连)
rmqpConfig.java-------rmqp基本配置
@Configuration
public class RabbitConfig {
private static final Logger LOGGER = LogManager.getLogger(RabbitConfig.class);
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String vhost;
public static final String EXCHANGE_Member = "RabbitMQ_Exchange_Member";//邮件:注册+登录
public static final String QUEUE_Member = "RabbitMQ_Queue_Member";//邮件:注册+登录
public static final String ROUTINGKEY_Member = "RabbitMQ_RoutingKey_Member";//邮件:注册+登录
//建立一个连接容器,类型数据库的连接池
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(vhost);
connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setMandatory(true);
template.setEncoding("UTF-8");
// 消息发送失败返回到队列中, yml需要配置 publisher-returns: true
template.setMandatory(true);
template.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
LOGGER.info("消息成功消费");
} else {
LOGGER.info("消息消费失败:" + cause);
}
});
template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
String correlationId = message.getMessageProperties().getCorrelationIdString();
LOGGER.info("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {} 路由键: {}", correlationId, replyCode, replyText, exchange, routingKey);
});
return template;
}
/**
* 交换机针对消费者配置
* FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
* DirectExchange:按照routingkey分发到指定队列,多关键字匹配
*/
@Bean
public DirectExchange directExchange() {
return new DirectExchange(EXCHANGE_Member, true,false);
}
/**
* 队列
*
* @return
*/
@Bean
public Queue directQueue() {
return new Queue(QUEUE_Member, true);
}
/**
* 绑定
*
* @return
*/
@Bean
public Binding directBinding() {
return BindingBuilder.bind(directQueue()).to(directExchange()).with(ROUTINGKEY_Member);
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
RMQProducer.java ---------rmqp消息提供端/消息发送端
@Component
public class RMQProducer {
private Logger LOGGER = LoggerFactory.getLogger(RMQProducer.class);
private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Autowired
private RabbitTemplate rabbitTemplate;
/***
* 延迟消息队列信息
* @param routingKeyName
* @param msg
*/
public void sendMsg(String routingKeyName,String msg) {
LOGGER.info("消息发送成功,routingKeyName: {},msg:{},时间:{}", routingKeyName,msg,sdf.format(new Date()));
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_Member, routingKeyName, msg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setContentEncoding("utf-8");
message.getMessageProperties().setExpiration("120000"); //设置消息存活时间
return message;
}
});
//rabbitTemplate.convertAndSend(routingKeyName, msg);
}
}
RMQReceiver.java--------------rmqp消费端
/***
* 消费者
* @author Administrator
*
*/
@Component
public class RMQReceiver implements ChannelAwareMessageListener{
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@RabbitListener(queues = RabbitConfig.QUEUE_Member )
@RabbitHandler
public void handler(Message message, Channel channel) throws IOException {
logger.info("接收处理队列Member当中的消息: " + new String(message.getBody()) );
long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, false); // 采用手动应答模式, 手动确认应答更为安全稳定
}
}
3.2.2使用交换机 FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念-(广播)
rmqpConfig.java------------------rmqp配置类
@Configuration
public class RabbitConfig {
private static final Logger LOGGER = LogManager.getLogger(RabbitConfig.class);
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String vhost;
/***
* 交换机
*/
public static final String EXCHANGE_Order = "RabbitMQ_Exchange_Order";// 下单
/**
* 队列
*/
public static final String QUEUE_Order = "RabbitMQ_Queue_Order";// 下单
public static final String QUEUE_Pay = "RabbitMQ_Queue_Pay";// 支付
/***
* 路由
*/
public String ROUTINGKEY_Order = "RabbitMQ_RoutingKey_Order";// 下单
//建立一个连接容器,类型数据库的连接池
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(vhost);
connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setMandatory(true);
template.setEncoding("UTF-8");
// 消息发送失败返回到队列中, yml需要配置 publisher-returns: true
template.setMandatory(true);
template.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
LOGGER.info("消息成功消费");
} else {
LOGGER.info("消息消费失败:" + cause);
}
});
template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
String correlationId = message.getMessageProperties().getCorrelationIdString();
LOGGER.info("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {} 路由键: {}", correlationId, replyCode, replyText, exchange, routingKey);
});
return template;
}
/////////////////////////////////////////Fanout广播配置///////////////////////////////////////////////////////////////////////
/***
* 交换机配置
* @return
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(EXCHANGE_Order);
}
/**
* 队列配置
* @return
*/
@Bean
public Queue fanoutQueueOrder() {
return new Queue(QUEUE_Order);
}
@Bean
public Queue fanoutQueuePay() {
return new Queue(QUEUE_Pay);
}
/***
* 绑定交换机和队列
* @return
*/
@Bean
public Binding bindFanoutExchangeOrder() {
return BindingBuilder.bind(fanoutQueueOrder()).to(fanoutExchange());
}
@Bean
public Binding bindFanoutExchangePay() {
return BindingBuilder.bind(fanoutQueuePay()).to(fanoutExchange());
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
RMQProducer.java ---------rmqp消息提供端/消息发送端
RMQReceiver.java--------------rmqp消费端
/***
* 消费者
* @author Administrator
*
*/
@Component
public class RMQReceiver implements ChannelAwareMessageListener{
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@RabbitListener(queues = RabbitConfig.QUEUE_Order )
@RabbitHandler
public void handlerOrder(Message message, Channel channel) throws IOException {
logger.info("接收处理队列QUEUE_Order当中的消息: " + new String(message.getBody()) );
long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, false); // 采用手动应答模式, 手动确认应答更为安全稳定
}
@RabbitListener(queues = RabbitConfig.QUEUE_Pay )
@RabbitHandler
public void handlerPay(Message message, Channel channel) throws IOException {
logger.info("接收处理队列QUEUE_Pay当中的消息: " + new String(message.getBody()) );
long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, false); // 采用手动应答模式, 手动确认应答更为安全稳定
}
/**
* @param trim
* @return
*/
private Map<String, String> mapStringToMap(String str) {
str = str.substring(1, str.length() - 1);
String[] strs = str.split(",");
Map<String, String> map = new HashMap<String, String>();
for (String string : strs) {
String key = string.split("=")[0].trim();
String value = string.split("=")[1];
map.put(key, value);
}
return map;
}
}
3.2.3使用交换机 topicExchange: 通配符方式分发消息-(订阅)
后期补充。。。。
四、解决
4.1 工具安装好后,guest/guest登录失败如何解决:
解决办法:执行如下命令
命令1:rabbitmqctl set_user_tags guest administrator
命令2:rabbitmqctl set_permissions -p / guest '.*' '.*' '.*'
重启rabbitmq即可。
重启服务:
------------------------ 我是愉快的分割线 -----------------------------
停止:service rabbitmq-server stop
启动:service rabbitmq-server start
查看状态:service rabbitmq-server status
4.2什么时间让它消费,什么时间手动消费(手动消费:不消费永远都在rmqp中保留)
操作:目前的得到的方案是:将消费端的监听事件关闭,不用监听,则这样的消息会永远停留在rmqp的交换机-队列-路由中
/***
* 消费者
* @author Administrator
*
*/
@Component
public class RMQReceiver implements ChannelAwareMessageListener{
private final Logger logger = LoggerFactory.getLogger(this.getClass());
//@RabbitListener(queues = RabbitConfig.QUEUE_Member )
//@RabbitHandler
public void handler(Message message, Channel channel) throws IOException {
logger.info("接收处理队列Member当中的消息: " + new String(message.getBody()) );
long deliveryTag = message.getMessageProperties().getDeliveryTag();
// channel.basicAck(deliveryTag, false); // 采用手动应答模式, 手动确认应答更为安全稳定
}
}
如下图:
五、SpringBoot + Mail集成实现邮件发送
5.1QQ邮箱授权码获取
5.2邮箱配置
pom.xml
<!-- Mail -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
</dependency>
spring.xml--》配置邮箱
###############################Mail配置####################################################################
##QQ smtp.qq.com
##sina smtp.sina.cn
##aliyun smtp.aliyun.com
##163 smtp.163.com
#126邮箱SMTP服务器地址:smtp.126.com,端口号:465或者994
#163邮箱SMTP服务器地址:smtp.163.com,端口号:465或者994
#yeah邮箱SMTP服务器地址:smtp.yeah.net,端口号:465或者994
##发送方
spring.mail.host=smtp.qq.com
##邮件地址
spring.mail.from=1247622527@qq.com
#用户名
spring.mail.username=1247622527@qq.com
##客户端授权码(不是邮箱密码,这个在qq邮箱设置里面自动生成的)
spring.mail.password=----------------》》》邮箱的授权码
#端口号465或587
spring.properties.mail.smtp.port: 25
##编码格式
spring.mail.default-encoding=UTF-8
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true
spring.mail.properties.mail.smtp.ssl.enable=true
sendMailUtil.java
@Component
public class SendMailUtil {
private static final Logger LOGGER = LogManager.getLogger(SendMailUtil.class);
@Autowired
private JavaMailSender mailSender;
//发送方邮件的发送地址
@Value("${spring.mail.host}")
public static String sendMailHost;
//发送方发送邮件的账号
@Value("${spring.mail.username}")
public static String sendMailUsername;
//发送方发送邮件的客户端授权码
@Value("${pring.mail.password}")
public static String sendMailPassword;
//发送方发送邮件的端口
@Value("${spring.properties.mail.smtp.port}")
public static String sendMailPort;
@Value("${spring.mail.from")
public static String sendMailFrom;
public static void sendSimpleMail(String to, String subject, String content) throws Exception{
//创建连接对象 连接到邮件服务器
Properties properties = new Properties();
//设置发送邮件的基本参数
//发送邮件服务器
properties.put("mail.smtp.host", sendMailHost);
//发送端口
properties.put("mail.smtp.port", sendMailPort);
properties.put("mail.smtp.auth", "true");
//设置发送邮件的账号和密码
Session session = Session.getInstance(properties, new Authenticator() {
@Override
protected PasswordAuthentication getPasswordAuthentication() {
//两个参数分别是发送邮件的账户和密码
return new PasswordAuthentication(sendMailUsername,sendMailPassword);
}
});
//创建邮件对象
Message message = new MimeMessage(session);
//设置发件人
message.setFrom(new InternetAddress(sendMailUsername));
//设置收件人
message.setRecipient(Message.RecipientType.TO,new InternetAddress(to));
//设置主题
message.setSubject(subject);
// 设置邮件内容
message.setContent(content,"text/html;charset=UTF-8");
//发送一封邮件
Transport.send(message);
}
public void sendHtmlMail(String to, String subject, String content) {
//获取MimeMessage对象
MimeMessage message = mailSender.createMimeMessage();
MimeMessageHelper messageHelper;
try {
messageHelper = new MimeMessageHelper(message, true);
//邮件发送人
messageHelper.setFrom(sendMailFrom);
//邮件接收人
messageHelper.setTo(subject);
//邮件主题
message.setSubject(subject);
//邮件内容,html格式
messageHelper.setText(content, true);
//发送
mailSender.send(message);
} catch (MessagingException e) {
}
}
public void sendAttachmentsMail(String to, String subject, String content, String filePath) {
MimeMessage message = mailSender.createMimeMessage();
try {
MimeMessageHelper helper = new MimeMessageHelper(message, true);
helper.setFrom(sendMailFrom);
helper.setTo(to);
helper.setSubject(subject);
helper.setText(content, true);
FileSystemResource file = new FileSystemResource(new File(filePath));
String fileName = filePath.substring(filePath.lastIndexOf(File.separator));
helper.addAttachment(fileName, file);
mailSender.send(message);
} catch (MessagingException e) {
//日志信息
}
}
public static void main(String[] args) throws Exception {
//sendSimpleMail("1901660505@qq.com","主题:邮箱注册","内容:这是一个邮件注册码,请输入:"+ IdGenerate.random2FiveId()) ;
//System.exit(0);
}
}
业务代码.Java
@Override
public Object userSendNewMailCode(String memberId , String newMemberEmail) {
Map<String , Object> mapMemberInfo = new HashMap<String, Object>();
if(!"".equals(memberId) && !"".equals(newMemberEmail)) {
Member member = memberService.getOnlyOneMemberInfoByMemberId(memberId);
String validEmail = IdGenerate.random2FiveId();
//邮箱登录注册后,可绑定更改新邮箱
if (member!=null && newMemberEmail.equals(member.getMemberEmail())) {
try {
SendMailUtils.sendSimpleMail(newMemberEmail,"主题:邮箱注册","内容:新邮箱账号:[ "+newMemberEmail+" ]未注册,可放心使用! \n 请将数字验证码:[ " + validEmail +" ] 填入邮箱注册码中!");
} catch (Exception e) {
e.printStackTrace();
mapMemberInfo.put("exception", e.getMessage());
}
//邮箱未登录注册后,任意邮箱可绑定
}else if(member!=null && !newMemberEmail.equals(member.getMemberEmail())){
try {
SendMailUtils.sendSimpleMail(newMemberEmail,"主题:邮箱注册","内容:邮箱账号:[ "+newMemberEmail+" ]未注册,可放心使用! \n 请将数字验证码:[ " + validEmail +" ] 填入邮箱注册码中!");
} catch (Exception e) {
e.printStackTrace();
mapMemberInfo.put("exception", e.getMessage());
}
}
List<Object> memberInfo = new ArrayList<Object>();
mapMemberInfo.put("newMemberEmail", newMemberEmail);
mapMemberInfo.put("validEmail", validEmail);
memberInfo.add(mapMemberInfo);
return mapMemberInfo;
}
return mapMemberInfo;
}
RabbitMQ教程(安装与使用详解,Spring集成)_程猿薇茑的博客
消息中间件(一)MQ详解及四大MQ比较_jcpp9527的博客
Springboot 整合RabbitMq ,用心看完这一篇就够了_小目标青年的博客
Java SpringBoot集成RabbitMq实战和总结 - 远方789 - 博客园
以上是自己整理的,并测试过,可以直接用
转载声明:本文为博主原创文章,未经博主允许不得转载
⚠️注意 ~
💯本期内容就结束了,如果内容有误,麻烦大家评论区指出!
如有疑问❓可以在评论区留言💬或私信留言💬,尽我最大能力🏃♀️帮大家解决👨🏫!
如果我的文章有帮助到您,欢迎点赞+关注✔️鼓励博主🏃,您的鼓励是我分享的动力🏃🏃🏃~
华为开发者空间发布
让每位开发者拥有一台云主机
- 点赞
- 收藏
- 关注作者
评论(0)