14 rocketmq整合SpringCloudStream

举报
周杰伦本人 发表于 2022/05/17 17:33:44 2022/05/17
【摘要】 14 rocketmq整合SpringCloudStream 发送消息 消费消息: Spring Cloud Stream 14 rocketmq整合SpringCloudStream 发送消息<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web<...

14 rocketmq整合SpringCloudStream

发送消息

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
      bindings:
        output:
          destination: TopicTest
          group: PRODUCER_GROUP_TOPIC_TEST
@SpringBootApplication
@EnableBinding({ Source.class })
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}
@Component
public class ProduceController {

    @Autowired
    private Source source;

    @PostConstruct
    private void init() throws InterruptedException {
        MessageBuilder builder = MessageBuilder.withPayload("init...");
        Message message = builder.build();
        source.output().send(message);
        System.out.println("init...");
    }
}

@EnableBinding({ Source.class })表示绑定配置文件中名称为output的消息通道Binding,Source类中定义的消息通道名称为output。

消费消息:

spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
      bindings:
        input:
          destination: TopicTest2
          group: CONSUER_GROUP_DEMO_1

name-server是RocketMq的NameServer地址,destination指定Topic名称,指定名称为input的Binding接收TopicTest的消息

消息监听:

@EnableBinding({ Sink.class})
@SpringBootApplication
public class Application {

    @StreamListener(value = InputChannel.ORDER_INPUT)
    public void receive(String receiveMsg) {
        System.out.println("receive: " + receiveMsg);
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

@EnableBinding({ Sink.class})表示绑定配置文件名称为input的消息通道Binding,Sink类中定义的消息通道名称为input,@StreamListener表示定义一个消息监听器,接收RocketMQ中的消息。

Spring Cloud Stream

Spring Cloud Stream是构建与共享消息传递系统连接的高度可伸缩的事件驱动微服务,目的是简化消息业务在Spring Cloud应用程序中的开发。

通过Spring Cloud Stream注入的输入通道inputs和输出通道outputs与消息中间件Middleware通信,消息通道通过特定中间件绑定器Binder实现连接到外部代理。

Spring Cloud Stream实现基于发布/订阅机制,核心四个部分组成:Spring Framework中的Spring Messaging和Spring Integration,Spring Cloud Stream中的Binders和Bindings。

Spring Messaging:Spring Framework中的统一消息编程模型

  • Message:消息对象,包含消息头Header和消息体Payload
  • MessageChannel:消息通道接口,用于接收消息,提供send方法将消息发送至消息通道。
  • MessageHandler:消息处理器接口,用于处理消息逻辑。

Spring Integration:支持企业集成的扩展机制,提供简单的模型来构建企业集成解决方案,对Spring Messaging进行扩展。

  • MessageDispatcher:消息分发接口,用于分发消息和添加删除消息处理器
  • MessageRouter:消息路由接口,定义默认的输出消息通道。
  • Filter:消息过滤注解,用于配置消息过滤表达式
  • Aggregator:消息的聚合注解,用于将一条消息拆分成多条。
  • Splitter:消息分割,用于将一条消息拆分成多条。

Binders:目标绑定器,负责与外部消息中间件系统集成的组件。

  • doBindProducer:绑定消息中间件客户端发送消息模块。
  • doBindConsumer:绑定消息中间件客户端接收消息模块。

Bindings:外部消息中间件系统与应用程序提供的消息生产者和消费者之间的桥梁。


Spring Cloud Alibaba RocketMQ架构图

  • MessageChannel(output):消息通道,用于发送消息,Spring Cloud Stream的标准接口
  • MessageChannel(input):消息通道,用于订阅消息,Spring Cloud Stream的标准接口
  • Binder bindProducer:目标绑定器,将发送通道发过来的消息发送到RocketMQ消息服务器
  • Binder bindConsumer:目标绑定器,将接收到RocketMQ消息服务器的消息推送给订阅通道
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。