sprinbBoot整合rabbitMQ-订阅发布模式-发送消息服务(邮件和钉钉通知)

举报
ksh1998 发表于 2022/01/11 19:56:41 2022/01/11
【摘要】 rabbitmq订阅发布模式-发送消息demo-springBoot整合

订阅发布模式

订阅发布模式是一个生产者对应多个消费者(fanout-exchange)模式,可以理解为广播模式,会给这个交换机绑定的所有队列推送消息

image.png

生产者

配置类

package com.example.springbootorderrabbitmqproducer.Config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @author 康世行
* @Title:
* @Package com.example.springbootorderrabbitmqproducer.Config
* @Description: rabbitmmq配置类
* @date 2021-12-25 19:37
*/
@Configuration
public class RabbitMqConfiguration {
 //1:声明注册fancout模式的交换机
 @Bean
 public FanoutExchange fanoutExchange(){
     return new FanoutExchange("fanout-exchange",true,false);
 }
 //2:声明队列
 /**
   * @Description: 发送短息队列
   * @param ${tags}
   * @return ${return_type}
   * @throws
   * @author 康世行
   * @date 2021-12-25 19:59
   */
 @Bean
 public Queue smsQueue(){
      return new Queue("sms.fanout.queue",true);
 }
 /**
   * @Description: 发送邮件消息队列
   * @param ${tags}
   * @return ${return_type}
   * @throws
   * @author 康世行
   * @date 2021-12-25 20:00
   */
 @Bean
 public Queue emailQueue(){
     return new Queue("email.fanout.queue",true);
 }
 //3:完成队列和交换机的绑定关系
 /**
   * @Description: 发送短息消息队列和fanout交换机进行绑定
   * @param ${tags}
   * @return ${return_type}
   * @throws
   * @author 康世行
   * @date 2021-12-25 20:02
   */
 @Bean
 public Binding smsBinding(){
     return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
 }
 /**
   * @Description: 发送邮件消息队列和fanout交换机进行绑定
   * @param ${tags}
   * @return ${return_type}
   * @throws
   * @author 康世行
   * @date 2021-12-25 20:03
   */
 @Bean
 public Binding emailBinding(){
     return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
 }
}

消息生产代码

** controller**

package com.example.springbootorderrabbitmqproducer.controller;

import com.example.springbootorderrabbitmqproducer.service.OrderRabbitmqProducerimpl;
import io.swagger.annotations.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;

/**
* @author 康世行
* @Title:
* @Package com.example.springbootorderrabbitmqproducer.controller
* @Description: 测试日志controller
* @date 2021-12-09 8:27
*/
@RestController
@RequestMapping("/send")
@Api("测试swagger接口")
public class OrderRabbitmqProducerController {
 @Autowired
 OrderRabbitmqProducerimpl orderRabbitmqProducerimpl;

  

 /**
   * @Description: 发送订单,生产者
   * @param ${tags}
   * @return ${return_type}
   * @throws
   * @author 康世行
   * @date 2021-12-11 20:25
   */
 @GetMapping("/sendOrder/{DingId}/{content}")
 @ApiOperation("发送订单")
 @ApiImplicitParams({
         @ApiImplicitParam(name="DingId",value="接收消息的钉钉Id",dataType="String", paramType = "path",required = false),
         @ApiImplicitParam(name="content",value="要发送的消息",dataType="String", paramType = "path",required = false)
 })
 @ApiResponse(code = 400,message = "请求参数没填好")
 public String sendOrder(@PathVariable(value = "DingId",required = true) String DingId, @PathVariable(value = "content",required = true)String content){
     //发送订单,消息
     String relust= orderRabbitmqProducerimpl.sendOrder(DingId, content);
     return relust;
 }
}

** service**

package com.example.springbootorderrabbitmqproducer.service;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;

/**
* @author 康世行
* @Title:
* @Package com.example.springbootorderrabbitmqproducer.service
* @Description: 发送订单业务
* @date 2021-12-11 20:27
*/
@Service
public class OrderRabbitmqProducerimpl {
   @Autowired
   RabbitTemplate rabbitTemplate;
   /**
     * @Description: 发送订单,生产者
     * @param ${tags}
     * @return ${return_type}
     * @throws
     * @author 康世行
     * @date 2021-12-11 20:29
     */
   public String sendOrder(String dingId,String content){
       //组装数据
       Map<String, String> stringMap=new HashMap<>();
       stringMap.put("dingId",dingId);
       stringMap.put("content",content);
       //消息投递到队列
       //交换机,使用发布订阅模式
       String exchangeName="fanout-exchange";
       //路由key
       String routingKey="";
      rabbitTemplate.convertAndSend(exchangeName,routingKey,stringMap);
       return "消息投递成功!";
   }
}

结果

image.png
在图中可以看到sms 消息队列和email消息队列里都有两天未消费的消息

消费者

消费者就是一致监听其中一个队列,只要队列里有消息立刻消费。比如:emali这个队列被email 消费者监听,只要已有消息这个消费者立刻就消费。

消费者发送消息

钉钉通知

package com.example.springbootorderrabbitmqconsumer.consumer;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* @author 康世行
* @Title:
* @Package com.example.springbootorderrabbitmqproducer.consumer
* @Description: 订单消费者
* @date 2021-12-11 20:55
*/
@Component
//监听指定队列
@RabbitListener(queues = {"sms.fanout.queue"})
public class OrderRabbimqConsumerDingDing {
 @Autowired
 RestTemplate restTemplate;
 @RabbitHandler
 /**
   * @Description: 发送钉钉消息
   * @param  DingId 钉钉id ,content 要发送的内容
   * @return ${return_type}
   * @throws
   * @author 康世行
   * @date 2021-12-11 21:02
   */
 public void messagerevice(Map<String,String> maps){
     //获取队列消息,发送钉钉消息
     String dingId = maps.get("dingId");
     String content = maps.get("content");
     System.out.println("dingId"+dingId);
     System.out.println("内容"+content);
     //data体
     Map<String, Object> map=new HashMap<>();
     List<String> dingIdlist=new ArrayList<>();
     dingIdlist.add(dingId);
     map.put("dingIds",dingIdlist);
     map.put("groupName","测试");
     map.put("messageContent",content);
     map.put("messageTitle","测试消息队列发送钉钉消息");
     map.put("messageUrl","ddd");
     map.put("picUrl","ddd");
     //htttp请求头,设置请求头信息
     HttpHeaders headers=new HttpHeaders();
     headers.setContentType(MediaType.parseMediaType("application/json"));
     //http请求实体,请求头设置和data存入http请求实体中
     HttpEntity parms=new HttpEntity(map, headers);
             //发送http请求, 参数1: 接口地址,参数2 请求的数据体(data+headers) 参数3 返回值类型
     ResponseEntity<String> stringResponseEntity = restTemplate.postForEntity("http://msg.dmsd.tech:8002/dingmessage/send/groupTextMsg", parms, String.class);
     System.out.println(stringResponseEntity);
 }
}

结果

1641891673374_6AEECDEA-477D-436a-B796-D238F9352D8F.png

邮件通知

package com.example.springbootorderrabbitmqconsumer.consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.mail.SimpleMailMessage;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;

import java.util.*;

/**
* @author 康世行
* @Title:
* @Package com.example.springbootorderrabbitmqproducer.consumer
* @Description: email消费者
* @date 2021-12-12 9:45
*/
@Component
@Slf4j
//监听指定队列
@RabbitListener(queues = {"email.fanout.queue"})
public class OrderRabbimqConsumerEmail {
 @Autowired
 JavaMailSender javaMailSender;
 @RabbitHandler
 /**
    * @Description: 发送email消息
    * @param  DingId 钉钉id ,content 要发送的内容
    * @return ${return_type}
    * @throws
    * @author 康世行
    * @date 2021-12-11 21:02
    */
 public void messagerevice(Map<String,String> maps){
     //获取队列消息,发送 email消息
     String dingId = maps.get("dingId");
     String content = maps.get("content");
     log.info("dingId"+dingId);
     log.info("内容"+content);
     // 设置邮件发送内容
     SimpleMailMessage mailMessage = new SimpleMailMessage();
     // 发件人: setFrom处必须填写自己的邮箱地址,否则会报553错误
     mailMessage.setFrom("1547403415@qq.com");
     // 收件人
     mailMessage.setTo("18332371417@163.com");
     // 抄送收件人:网易邮箱要指定抄送收件人,不然会报 554(发送内容错误)
     mailMessage.setCc("18332371417@163.com");
     // 主题
     mailMessage.setSubject("测试rabitMq发送邮费服务");
     // 内容
     mailMessage.setText(content);
     try {
         javaMailSender.send(mailMessage);
         System.out.println("发送简单文本邮件成功,主题是:" + content);
     } catch (Exception e) {
         System.out.println("-----发送简单文本邮件失败!-------" + e.toString());
         e.printStackTrace();
     }

 }
}
结果

1641891666618_2E8B96E4-CFCD-4b8f-ABAE-CF9836ECA5CD.png

yml

消费者

server:
  port: 8088
# 配置rabbitmq服务
spring:
  rabbitmq:
    username: admin
    password: admin
    virtual-host: /
    host: 192.168.77.130
    port: 5672
  mail:
    # 配置 SMTP 服务器地址
    host: smtp.qq.com
    # 发送者邮箱
    username: 1547403415@qq.com
    # 配置密码,注意不是真正的密码,而是刚刚申请到的授权码
    password: ###########
    # 端口号465587
    port: 587
    # 默认的邮件编码为UTF-8
    default-encoding: UTF-8
    # 配置SSL 加密工厂
    properties:
      mail:
        smtp:
          socketFactoryClass: javax.net.ssl.SSLSocketFactory
        #表示开启 DEBUG 模式,这样,邮件发送过程的日志会在控制台打印出来,方便排查错误
        debug: true

生产者

# 服务端口
server:
  port: 8089
# 配置rabbitmq服务  
spring:
  rabbitmq:
    username: admin
    password: admin
    virtual-host: /
    host: 192.168.77.130
    port: 5672


【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。