Springboot+RabbitMQ实现消息队列+发送邮箱

举报
写程序的小王叔叔 发表于 2022/05/19 08:34:30 2022/05/19
【摘要】 主页:小王叔叔的博客

主页小王叔叔的博客

支持:点赞👍关注✔️收藏💖


一、效果

二、RMQ可以实现的功能

【介绍】:集合了网上各种大佬的教学一起整理

2.1消息中间件:

2.2rmq安装

windows下 安装 rabbitMQ 及操作常用命令(操作创建用户密码 角色等)_wcy10086的博客-CSDN博客

windows10环境下的RabbitMQ安装步骤(图文) - 清明-心若淡定 - 博客园

2.3含义

2.4原理

编辑

三、SpringBoot +  RMQ集成项目消息队列及聊天功能

【实现】:根据各位大佬的整理的原理,我们自己实现下如何使用吧

3.1RMQ配置

在rmq安装成功之后,浏览器输入http://localhost:15672,账号密码:guest/guest登录之后,给这个guest账号设置初始交换机(代码中默认设置的交换机,我的是:EXCHANGE_Member)的权限,这个问题注意下,要不一直提示:to exchange 'RabbitMQ_Exchange_Member' in vhost '/' refused for user 'guest', (这个错误找了半天并且找我们领导了,才知道的)

如下图:

3.1.1pom.xml(公共文件配置)

        <!--RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

3.1.2spring.xml(公共文件配置)

####################################RabbitMQ配置#################################################################
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
# 开启confirms回调 P -> Exchange
spring.rabbitmq.publisher-confirms=true
# 开启returnedMessage回调 Exchange -> Queue
spring.rabbitmq.publisher-returns=true
# 设置手动确认(ack) Queue -> C
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.prefetch=100


3.2Java RMQ类代码

3.2.1 使用交换机DirectExchange : 按照routingkey分发到指定队列-(直连)

rmqpConfig.java-------rmqp基本配置

@Configuration
public class RabbitConfig {

	private static final Logger LOGGER = LogManager.getLogger(RabbitConfig.class);
	 
    @Value("${spring.rabbitmq.host}")
    private String host;
 
    @Value("${spring.rabbitmq.port}")
    private int port;
 
    @Value("${spring.rabbitmq.username}")
    private String username;
    
    @Value("${spring.rabbitmq.password}")
    private String password;

    @Value("${spring.rabbitmq.virtual-host}")
    private String vhost;

    public static final  String EXCHANGE_Member = "RabbitMQ_Exchange_Member";//邮件:注册+登录
   
	
    public static final  String QUEUE_Member = "RabbitMQ_Queue_Member";//邮件:注册+登录
   
    
    public static final  String ROUTINGKEY_Member = "RabbitMQ_RoutingKey_Member";//邮件:注册+登录
  
    

    
    //建立一个连接容器,类型数据库的连接池
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(vhost);
        connectionFactory.setPublisherConfirms(true);
        return connectionFactory;
    }
    
    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setMandatory(true);
        template.setEncoding("UTF-8");
        // 消息发送失败返回到队列中, yml需要配置 publisher-returns: true
        template.setMandatory(true);
        template.setConfirmCallback((correlationData, ack, cause) -> {
        	if (ack) {
        		LOGGER.info("消息成功消费");
            } else {
            	LOGGER.info("消息消费失败:" + cause);
            }
        });
        template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            String correlationId = message.getMessageProperties().getCorrelationIdString();
            LOGGER.info("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {}  路由键: {}", correlationId, replyCode, replyText, exchange, routingKey);
          });
        return template;
    }
    
    
    /**
     * 交换机针对消费者配置
     * FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
     * DirectExchange:按照routingkey分发到指定队列,多关键字匹配
     */
    @Bean
    public DirectExchange  directExchange() {
    	 return new DirectExchange(EXCHANGE_Member, true,false);
    }
 
    /**
     * 队列
     *
     * @return
     */
    @Bean
    public Queue directQueue() {
        return new Queue(QUEUE_Member, true); 
 
    }
 
    /**
     * 绑定
     *
     * @return
     */
    @Bean
    public Binding directBinding() {
    	 return BindingBuilder.bind(directQueue()).to(directExchange()).with(ROUTINGKEY_Member);
    }
    
    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    
    
}

RMQProducer.java ---------rmqp消息提供端/消息发送端

@Component
public class RMQProducer {
 
	private Logger LOGGER = LoggerFactory.getLogger(RMQProducer.class);
	
	private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
	 
	
	@Autowired
	private RabbitTemplate rabbitTemplate;
	 
    /***
     * 延迟消息队列信息
     * @param routingKeyName
     * @param msg
     */
     public void sendMsg(String routingKeyName,String msg) {
    	LOGGER.info("消息发送成功,routingKeyName: {},msg:{},时间:{}", routingKeyName,msg,sdf.format(new Date()));
         rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_Member, routingKeyName, msg, new MessagePostProcessor() {
             @Override
             public Message postProcessMessage(Message message) throws AmqpException {
                 message.getMessageProperties().setContentEncoding("utf-8");
                 message.getMessageProperties().setExpiration("120000"); //设置消息存活时间
                 return message;
             }
         });
  		//rabbitTemplate.convertAndSend(routingKeyName, msg);
     }

}

RMQReceiver.java--------------rmqp消费端

/***
 * 消费者
 * @author Administrator
 *
 */
@Component
public class RMQReceiver implements ChannelAwareMessageListener{
 
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
 
    @RabbitListener(queues = RabbitConfig.QUEUE_Member )
    @RabbitHandler
    public void handler(Message message, Channel channel) throws IOException  {
    	logger.info("接收处理队列Member当中的消息: " + new String(message.getBody()) );
    	long deliveryTag = message.getMessageProperties().getDeliveryTag();
        channel.basicAck(deliveryTag, false); // 采用手动应答模式, 手动确认应答更为安全稳定
    }
 

}

3.2.2使用交换机 FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念-(广播)

rmqpConfig.java------------------rmqp配置类

@Configuration
public class RabbitConfig {

	private static final Logger LOGGER = LogManager.getLogger(RabbitConfig.class);
	 
    @Value("${spring.rabbitmq.host}")
    private String host;
 
    @Value("${spring.rabbitmq.port}")
    private int port;
 
    @Value("${spring.rabbitmq.username}")
    private String username;
    
    @Value("${spring.rabbitmq.password}")
    private String password;

    @Value("${spring.rabbitmq.virtual-host}")
    private String vhost;

    /***
     * 交换机
     */
	 
    public static final String EXCHANGE_Order = "RabbitMQ_Exchange_Order";// 下单
	
    /**
     * 队列
     */
  
    public static final String QUEUE_Order = "RabbitMQ_Queue_Order";// 下单
    public static final String QUEUE_Pay = "RabbitMQ_Queue_Pay";// 支付
    
    /***
     * 路由
     */
     
    public String ROUTINGKEY_Order = "RabbitMQ_RoutingKey_Order";// 下单
    

    
    //建立一个连接容器,类型数据库的连接池
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(vhost);
        connectionFactory.setPublisherConfirms(true);
        return connectionFactory;
    }
    
    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setMandatory(true);
        template.setEncoding("UTF-8");
        // 消息发送失败返回到队列中, yml需要配置 publisher-returns: true
        template.setMandatory(true);
        template.setConfirmCallback((correlationData, ack, cause) -> {
        	if (ack) {
        		LOGGER.info("消息成功消费");
            } else {
            	LOGGER.info("消息消费失败:" + cause);
            }
        });
        template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
	        String correlationId = message.getMessageProperties().getCorrelationIdString();
	        LOGGER.info("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {}  路由键: {}", correlationId, replyCode, replyText, exchange, routingKey);
	    });
        return template;
    }
 
    /////////////////////////////////////////Fanout广播配置///////////////////////////////////////////////////////////////////////
    /***
     * 交换机配置
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange() {
    	 return new FanoutExchange(EXCHANGE_Order);
    }
    
    /**
     * 队列配置
     * @return
     */
    @Bean
    public Queue fanoutQueueOrder() {
    	 return new Queue(QUEUE_Order);
    }
    
    @Bean
    public Queue fanoutQueuePay() {
    	 return new Queue(QUEUE_Pay);
    }
    
    /***
     * 绑定交换机和队列
     * @return
     */
    @Bean
    public Binding bindFanoutExchangeOrder() {   
        return BindingBuilder.bind(fanoutQueueOrder()).to(fanoutExchange());
    }
    
    @Bean
    public Binding bindFanoutExchangePay() {  
        return BindingBuilder.bind(fanoutQueuePay()).to(fanoutExchange());
    }
    
    //////////////////////////////////////////////////////////////////////////////////////////////////////////////
    
    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    
    
}

RMQProducer.java ---------rmqp消息提供端/消息发送端


RMQReceiver.java--------------rmqp消费端

/***
 * 消费者
 * @author Administrator
 *
 */
@Component
public class RMQReceiver implements ChannelAwareMessageListener{
 
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
 
	
    @RabbitListener(queues = RabbitConfig.QUEUE_Order )
    @RabbitHandler
    public void handlerOrder(Message message, Channel channel) throws IOException  {
    	    logger.info("接收处理队列QUEUE_Order当中的消息: " + new String(message.getBody()) );
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            channel.basicAck(deliveryTag, false); // 采用手动应答模式, 手动确认应答更为安全稳定
    }
    
   @RabbitListener(queues = RabbitConfig.QUEUE_Pay )
   @RabbitHandler
   public void handlerPay(Message message, Channel channel) throws IOException  {
	   	logger.info("接收处理队列QUEUE_Pay当中的消息: " + new String(message.getBody()) );
	   	long deliveryTag = message.getMessageProperties().getDeliveryTag();
	        channel.basicAck(deliveryTag, false); // 采用手动应答模式, 手动确认应答更为安全稳定
   }
	
	

	/**
	 * @param trim
	 * @return
	 */
	private Map<String, String> mapStringToMap(String str) {
		str = str.substring(1, str.length() - 1);
        String[] strs = str.split(",");
        Map<String, String> map = new HashMap<String, String>();
        for (String string : strs) {
            String key = string.split("=")[0].trim();
            String value = string.split("=")[1];
            map.put(key, value);
        }

		return map;
	}

}


3.2.3使用交换机 topicExchange: 通配符方式分发消息-(订阅)

后期补充。。。。

四、解决

4.1 工具安装好后,guest/guest登录失败如何解决:

解决办法:执行如下命令

命令1:rabbitmqctl set_user_tags guest administrator

命令2:rabbitmqctl set_permissions -p / guest '.*' '.*' '.*'

重启rabbitmq即可。


 重启服务:


------------------------  我是愉快的分割线 -----------------------------

停止:service rabbitmq-server stop

启动:service rabbitmq-server start

查看状态:service rabbitmq-server status


4.2什么时间让它消费,什么时间手动消费(手动消费:不消费永远都在rmqp中保留)

操作:目前的得到的方案是:将消费端的监听事件关闭,不用监听,则这样的消息会永远停留在rmqp交换机-队列-路由

/***
 * 消费者
 * @author Administrator
 *
 */
@Component
public class RMQReceiver implements ChannelAwareMessageListener{
 
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
 
    //@RabbitListener(queues = RabbitConfig.QUEUE_Member )
    //@RabbitHandler
    public void handler(Message message, Channel channel) throws IOException  {
    	logger.info("接收处理队列Member当中的消息: " + new String(message.getBody()) );
    	long deliveryTag = message.getMessageProperties().getDeliveryTag();
       // channel.basicAck(deliveryTag, false); // 采用手动应答模式, 手动确认应答更为安全稳定
    }

    
     
	 

}

如下图:

五、SpringBoot +  Mail集成实现邮件发送

5.1QQ邮箱授权码获取

5.2邮箱配置

pom.xml

               <!-- Mail -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-mail</artifactId>
		</dependency>

spring.xml--》配置邮箱

###############################Mail配置####################################################################
##QQ smtp.qq.com
##sina smtp.sina.cn
##aliyun smtp.aliyun.com
##163 smtp.163.com

#126邮箱SMTP服务器地址:smtp.126.com,端口号:465或者994
#163邮箱SMTP服务器地址:smtp.163.com,端口号:465或者994
#yeah邮箱SMTP服务器地址:smtp.yeah.net,端口号:465或者994

##发送方
spring.mail.host=smtp.qq.com
	##邮件地址
spring.mail.from=1247622527@qq.com
	#用户名
spring.mail.username=1247622527@qq.com
	##客户端授权码(不是邮箱密码,这个在qq邮箱设置里面自动生成的)
spring.mail.password=----------------》》》邮箱的授权码
	#端口号465或587
spring.properties.mail.smtp.port: 25
	##编码格式
spring.mail.default-encoding=UTF-8
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true
spring.mail.properties.mail.smtp.ssl.enable=true

sendMailUtil.java

@Component
public class SendMailUtil {

	
	
	private static final Logger LOGGER = LogManager.getLogger(SendMailUtil.class);
	
    @Autowired
    private JavaMailSender mailSender;

   //发送方邮件的发送地址
  	@Value("${spring.mail.host}")
  	public static String sendMailHost;
  	
  	//发送方发送邮件的账号
  	@Value("${spring.mail.username}")
  	public static String sendMailUsername;
  	
  	//发送方发送邮件的客户端授权码
  	@Value("${pring.mail.password}")
  	public static String sendMailPassword;
  	
  	//发送方发送邮件的端口
  	@Value("${spring.properties.mail.smtp.port}")
  	public static String sendMailPort;
  	
	
  	@Value("${spring.mail.from")
  	public static String sendMailFrom;
  	
  	
  	public static void sendSimpleMail(String to, String subject, String content) throws Exception{
        
        //创建连接对象 连接到邮件服务器
        Properties properties = new Properties();
        //设置发送邮件的基本参数
        //发送邮件服务器
        properties.put("mail.smtp.host", sendMailHost);
        //发送端口
        properties.put("mail.smtp.port", sendMailPort);
        properties.put("mail.smtp.auth", "true");
        //设置发送邮件的账号和密码
        Session session = Session.getInstance(properties, new Authenticator() {
            @Override
            protected PasswordAuthentication getPasswordAuthentication() {
                //两个参数分别是发送邮件的账户和密码
                return new PasswordAuthentication(sendMailUsername,sendMailPassword);
            }
        });

        //创建邮件对象
        Message message = new MimeMessage(session);
        //设置发件人
        message.setFrom(new InternetAddress(sendMailUsername));
        //设置收件人
        message.setRecipient(Message.RecipientType.TO,new InternetAddress(to));
        //设置主题
        message.setSubject(subject);
        // 设置邮件内容
        message.setContent(content,"text/html;charset=UTF-8"); 
        //发送一封邮件
        Transport.send(message);
	}

	public void sendHtmlMail(String to, String subject, String content) {
		  //获取MimeMessage对象
        MimeMessage message = mailSender.createMimeMessage();
        MimeMessageHelper messageHelper;
        try {
            messageHelper = new MimeMessageHelper(message, true);
            //邮件发送人
            messageHelper.setFrom(sendMailFrom);
            //邮件接收人
            messageHelper.setTo(subject);
            //邮件主题
            message.setSubject(subject);
            //邮件内容,html格式
            messageHelper.setText(content, true);
            //发送
            mailSender.send(message);
        } catch (MessagingException e) {
        	
        }
		
	}

	public void sendAttachmentsMail(String to, String subject, String content, String filePath) {
		 MimeMessage message = mailSender.createMimeMessage();
	        try {
	            MimeMessageHelper helper = new MimeMessageHelper(message, true);
	            helper.setFrom(sendMailFrom);
	            helper.setTo(to);
	            helper.setSubject(subject);
	            helper.setText(content, true);

	            FileSystemResource file = new FileSystemResource(new File(filePath));
	            String fileName = filePath.substring(filePath.lastIndexOf(File.separator));
	            helper.addAttachment(fileName, file);
	            mailSender.send(message);
	           
	        } catch (MessagingException e) {
	        	//日志信息  
	        }
	}
	
	
	public static void main(String[] args) throws Exception {
		//sendSimpleMail("1901660505@qq.com","主题:邮箱注册","内容:这是一个邮件注册码,请输入:"+ IdGenerate.random2FiveId()) ;
		//System.exit(0);
		
	}
}

业务代码.Java

	@Override
	public Object userSendNewMailCode(String memberId , String newMemberEmail) {
		Map<String , Object> mapMemberInfo = new HashMap<String, Object>(); 
		if(!"".equals(memberId) && !"".equals(newMemberEmail)) {
			Member member = memberService.getOnlyOneMemberInfoByMemberId(memberId);
			String validEmail = IdGenerate.random2FiveId();
			//邮箱登录注册后,可绑定更改新邮箱
			if (member!=null && newMemberEmail.equals(member.getMemberEmail())) {
				try {
					SendMailUtils.sendSimpleMail(newMemberEmail,"主题:邮箱注册","内容:新邮箱账号:[ "+newMemberEmail+" ]未注册,可放心使用! \n 请将数字验证码:[ " + validEmail +" ] 填入邮箱注册码中!");
				} catch (Exception e) {
					e.printStackTrace();
					mapMemberInfo.put("exception", e.getMessage());
				}
			//邮箱未登录注册后,任意邮箱可绑定
			}else if(member!=null && !newMemberEmail.equals(member.getMemberEmail())){
				try {
					SendMailUtils.sendSimpleMail(newMemberEmail,"主题:邮箱注册","内容:邮箱账号:[ "+newMemberEmail+" ]未注册,可放心使用! \n 请将数字验证码:[ " + validEmail +" ] 填入邮箱注册码中!");
				} catch (Exception e) {
					e.printStackTrace();
					mapMemberInfo.put("exception", e.getMessage());
				}
			}
			List<Object>  memberInfo = new ArrayList<Object>();
			mapMemberInfo.put("newMemberEmail", newMemberEmail);
			mapMemberInfo.put("validEmail", validEmail);
			memberInfo.add(mapMemberInfo);
			return mapMemberInfo;
		}
		return mapMemberInfo;
	}

RabbitMQ教程(安装与使用详解,Spring集成)_程猿薇茑的博客

消息中间件(一)MQ详解及四大MQ比较_jcpp9527的博客

RabbitMQ核心概念以及工作原理 - 简书

Springboot 整合RabbitMq ,用心看完这一篇就够了_小目标青年的博客

Java SpringBoot集成RabbitMq实战和总结 - 远方789 - 博客园


以上是自己整理的,并测试过,可以直接用


转载声明:本文为博主原创文章,未经博主允许不得转载

⚠️注意 ~

💯本期内容就结束了,如果内容有误,麻烦大家评论区指出

如有疑问❓可以在评论区留言💬或私信留言💬,尽我最大能力🏃‍♀️帮大家解决👨‍🏫!

如果我的文章有帮助,欢迎点赞+关注✔️鼓励博主🏃,您的鼓励是我分享的动力🏃🏃🏃~

【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。