一文搞懂Spring Boot整合RocketMQ

举报
海风极客 发表于 2022/10/16 19:08:57 2022/10/16
1.2k+ 0 0
【摘要】 8 Spring Boot整合RocketMQ 8.1 Maven依赖<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.0</version></dependency> 8.2 配置文件...

8 Spring Boot整合RocketMQ

8.1 Maven依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.0</version>
</dependency>

8.2 配置文件

server:
  port: 0
rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: my_mq_one  # 指定组名 必要参数

8.3 生产者代码

/**
 * @desc: 消息生产者
 * @author: YanMingXin
 * @create: 2021/9/15-12:20
 **/
@Service
public class MessageProvider {

    /**
     * 注入RocketMQTemplate
     */
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 发送普通消息
     *
     * @param message
     * @return
     */
    public boolean sendMessage(String message) {
        rocketMQTemplate.convertAndSend("Topic1:TagA", message);
        return true;
    }

    /**
     * 发送Spring的消息
     *
     * @param message
     * @return
     */
    public boolean sendSpringMessage(String message) {
        rocketMQTemplate.send("Topic1:TagA", MessageBuilder.withPayload(message).build());
        return true;
    }

    /**
     * 发送异步消息
     *
     * @param message
     * @return
     */
    public boolean sendAsyncMessage(String message) {
        //发送异步消息
        rocketMQTemplate.asyncSend("Topic1:TagA", message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                //发送成功
                return;
            }

            @Override
            public void onException(Throwable throwable) {
                //发送失败
                return;
            }
        });
        return true;
    }

    /**
     * 发送顺序消息
     * 注:需要加上synchronized,消费者多线程下会不保证顺序
     *
     * @param list
     * @return
     */
    public synchronized boolean sendAscMessage(List<String> list) {
        for (String str : list) {
            //发送顺序消息
            rocketMQTemplate.syncSendOrderly("Topic1", str, str + "hash");
        }
        return true;
    }

}

8.4 消费者代码

/**
 * @desc: 消息消费者
 * @author: YanMingXin
 * @create: 2021/9/15-12:20
 **/
@RocketMQMessageListener(topic = "Topic1", consumerGroup = "my_mq_consumer_one")
public class MessageConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("Get message is " + s);
    }
}

8.5 测试

@SpringBootTest
class RocketmqSpringBootApplicationTests {

    @Autowired
    private MessageProvider provider;
    @Test
    void contextLoads() {
        provider.sendMessage("Hello World");
        provider.sendSpringMessage("Hello World By Spring");
        provider.sendAsyncMessage("Hello World Async Message");
        List<String> strings = Arrays.asList("A", "B", "C");
        provider.sendAscMessage(strings);
    }

}

测试结果:
在这里插入图片描述

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

作者其他文章

评论(0

抱歉,系统识别当前为高风险访问,暂不支持该操作

    全部回复

    上滑加载中

    设置昵称

    在此一键设置昵称,即可参与社区互动!

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。