SpringBoot RocketMQ发布订阅

举报
程序员-上善若水 发表于 2022/06/24 00:59:56 2022/06/24
【摘要】 SpringBoot RocketMQ发布订阅 pom <!-- rocketMQ--> <dependency> &...

SpringBoot RocketMQ发布订阅

pom

  <!--        rocketMQ-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.1.0-incubating</version>
        </dependency>

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-common</artifactId>
            <version>4.1.0-incubating</version>
        </dependency>
        <!--        =======end======-->

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

application.properties

#RocketMQ
# 消费者的组名
apache.rocketmq.consumer.PushConsumer=orderConsumer
# 生产者的组名
apache.rocketmq.producer.producerGroup=Producer
apache.rocketmq.namesrvAddr=192.168.2.137:9876

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

发布订阅

@Component
public class MsgProdeucer {

    private Logger log = LoggerFactory.getLogger(this.getClass());

    @Value("${apache.rocketmq.producer.producerGroup}")
    private String producerGroup;

    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    private DefaultMQProducer producer ;

    public DefaultMQProducer getProducer(){
        return this.producer;
    }

    public boolean sendDetectionInfoMsg(String msg){
        try {
            Message messagea = new Message("Detection","info",msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult resulta = producer.send(messagea);
            return true;
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        } catch (MQClientException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (RemotingException e) {
            e.printStackTrace();
        } catch (MQBrokerException e) {
            e.printStackTrace();
        }
        return false;
    }

    @PostConstruct
    public void init() {
        log.info("RocketMQ 开始初始化");
        producer = new DefaultMQProducer(producerGroup);
        producer.setNamesrvAddr(namesrvAddr);
        producer.setVipChannelEnabled(false);
        try {
            producer.start();
            log.info("RocketMQ 初始化成功!");
        } catch (Exception e) {
            e.printStackTrace();
            log.info("RocketMQ 初始化失败!"+e.toString());
        }
    }
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

Controller

@RestController
@RequestMapping("/RocketMq")
public class RocketTestController {
    @Autowired
    MsgProdeucer prodeucer;

    @GetMapping("/test")
    public String test(){
       boolean result = prodeucer.sendDetectionInfoMsg("你好");
       if (!result){
           return "err";
       }
       return "success";
    }
}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

监听订阅

@Component
public class MsgConsumer {

    @Value("${apache.rocketmq.consumer.PushConsumer}")
    private String consumerGroup;

    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    @PostConstruct
    public void defaultMQPushConsumer() {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
        //多个地址以 ; 隔开
        consumer.setNamesrvAddr(namesrvAddr);
        
        try {
            consumer.subscribe("Detection", "*");
          
            //CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,跳过历史消息
            //CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

            //MessageListenerOrderly 这个是有序的
            //MessageListenerConcurrently 这个是无序的,并行的方式处理,效率高很多
            consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
                try {
                    for (MessageExt messageExt : list) {
                        System.out.println("messageExt: " + messageExt);
                        String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
                        System.out.println("消费响应:msgId : " + messageExt.getMsgId() + ",  msgBody : " + messageBody);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
            });
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

文章来源: blog.csdn.net,作者:小毕超,版权归原作者所有,如需转载,请联系作者。

原文链接:blog.csdn.net/qq_43692950/article/details/107443773

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。