一文搞懂Spring Boot整合RocketMQ
【摘要】 8 Spring Boot整合RocketMQ 8.1 Maven依赖<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.0</version></dependency> 8.2 配置文件...
8 Spring Boot整合RocketMQ
8.1 Maven依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
8.2 配置文件
server:
port: 0
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: my_mq_one # 指定组名 必要参数
8.3 生产者代码
/**
* @desc: 消息生产者
* @author: YanMingXin
* @create: 2021/9/15-12:20
**/
@Service
public class MessageProvider {
/**
* 注入RocketMQTemplate
*/
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送普通消息
*
* @param message
* @return
*/
public boolean sendMessage(String message) {
rocketMQTemplate.convertAndSend("Topic1:TagA", message);
return true;
}
/**
* 发送Spring的消息
*
* @param message
* @return
*/
public boolean sendSpringMessage(String message) {
rocketMQTemplate.send("Topic1:TagA", MessageBuilder.withPayload(message).build());
return true;
}
/**
* 发送异步消息
*
* @param message
* @return
*/
public boolean sendAsyncMessage(String message) {
//发送异步消息
rocketMQTemplate.asyncSend("Topic1:TagA", message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
//发送成功
return;
}
@Override
public void onException(Throwable throwable) {
//发送失败
return;
}
});
return true;
}
/**
* 发送顺序消息
* 注:需要加上synchronized,消费者多线程下会不保证顺序
*
* @param list
* @return
*/
public synchronized boolean sendAscMessage(List<String> list) {
for (String str : list) {
//发送顺序消息
rocketMQTemplate.syncSendOrderly("Topic1", str, str + "hash");
}
return true;
}
}
8.4 消费者代码
/**
* @desc: 消息消费者
* @author: YanMingXin
* @create: 2021/9/15-12:20
**/
@RocketMQMessageListener(topic = "Topic1", consumerGroup = "my_mq_consumer_one")
public class MessageConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("Get message is " + s);
}
}
8.5 测试
@SpringBootTest
class RocketmqSpringBootApplicationTests {
@Autowired
private MessageProvider provider;
@Test
void contextLoads() {
provider.sendMessage("Hello World");
provider.sendSpringMessage("Hello World By Spring");
provider.sendAsyncMessage("Hello World Async Message");
List<String> strings = Arrays.asList("A", "B", "C");
provider.sendAscMessage(strings);
}
}
测试结果:
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)