SpringCloud实战---第十八篇:消息驱动SpringCloudStream
系列文章目录
SpringCloud快速入门到精通各组件原理
专栏传送门
@TOC
前言
我们学习的知识框架图
说起来容易做起来难,一步一步都干完!!!
学习一定要自己动手搞一搞,不能只眼会。
学习笔记是跟着尚硅谷的视频学的:https://www.bilibili.com/video/BV18E411x7eT?p=1
一、SpringCloudStream消息驱动是什么
- 解决的痛点:各种消息中间件如Kafka、RabbitMQ、ActiveMQ、RocketMQ等,我们使用时如果技术栈中存在两种及以上的中间件切换起来十分麻烦,CloudStream解决了这个问题,他整合了常用的消息中间件,我们学会CloudStream就可以不用关注底层的去使用Kafka等消息中间件。
- 屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。
- 官网:https://spring.io/projects/spring-cloud-stream#overview
我们真正使用Cloud Stream框架使用的是binder这个对象,应用程序通过inputs或者outputs来与binder对象交互。
- 当前CloudStream支持RabbitMQ和Kafka
- 官方API:https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/
- Stream中使用的是binder操作消息队列,类似于Hiburnate和Mybatis框架,通用的框架却可以操作多种数据库如mysql、Oracle等。
- 中文指导手册:https://m.wang1314.com/doc/webapp/topic/20971999.html
二、SpringCloudStream的设计思想
1. 标准MQ
生产者/消费者之间靠消息媒介传递信息内容: Message
消息必须走特定的通道: 消息通道MessageChannel
消息通道里的消息如何被消费呢,谁负责收发处理:消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息处理器所订阅
2. 为什么用Cloud Stream
如果我们用了RabbitMQ做消息队列,而后来我们技术转型改为了Kafka,那么这种底层消息队列的转型无疑灾难性的,所有的相关业务都要推到重新做,因为RabbitMQ消息队列的收发和我们的系统高度耦合,而CloudStream提供了一种解耦的方式。
3. Stream如何屏蔽底层差异?
在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性,通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(rabbitmq切换为kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程 通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。Binder可以生成Binding,Binding用来绑定消息容器的生产者和消费者,它有两种类型,INPUT和OUTPUT,INPUT对应于消费者,OUTPUT对应于生产者。
- INPUT对应于消费者。
- OUTPUT对应于生产者。
4. Stream中的消息通信方式遵循了发布-订阅模式
Topic主题进行广播:在RabbitMQ就是Exchange,在Kakfa中就是Topic。
5. Spring Cloud Stream标准流程套路
- Binder: 很方便的连接中间件,屏蔽差异。
- Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置。
- Source和Sink:简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。
6. Spring Cloud Stream常用注解及类
- Middleware:中间件,目前只支持RabbitMQ和Kafka。
- Binder:是应用和消息中间件之间的封装。
- @Input:注解标识输入通道,通过该输入通道接收到的消息进入应用程序。
- @Output:注解标识输出通道,发布的消息通过该通道发送到消息中间件。
- @StreamListener:监听队列,用于消费者队列的消息接收。
- @EnableBinding:指信道channel和exchange绑定在一起。
三、搭建环境
1. 搭建RabbitMQ环境
见第《SpringCloud实战—第十七篇:消息总线SpringCloudBus》环境搭建
2. 先构建好基础工程(一篇一篇看过来的不用重新构建)
构建基础父工程:构建基础父工程
Rest风格微服务:Rest风格微服务
传统分布式方法:传统分布式方法
改造工程,抽取公共模块:改造工程,抽取公共模块
使用Eureka:使用Eureka
Eureka集群: Eureka集群
想偷懒的请下载;gitee上我上传的代码:
https://gitee.com/xiaoZ1712/cloud2021
基础工程构建完成的目录结构:
启动所有模块,访问
localhost:7001
显示如下,代表基础工程没问题
话不多说,立马开干
四、创建消息生产者模块8801
1. 创建模块
模块名
cloud-stream-rabbitmq-provider8801
pom:主要的依赖是spring-cloud-starter-stream-rabbit
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!--基础配置-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
application.yml
重要的配置是bindings
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
output: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
eureka:
client: # 客户端进行Eureka注册的配置
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
instance-id: send-8801.com # 在信息列表时显示主机名称
prefer-ip-address: true # 访问的路径变为IP地址
包名
com.atguigu.springcloud
主启动类
package com.atguigu.springcloud;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @Author: Daisen.Z
* @Date: 2022/1/10 17:06
* @Version: 1.0
* @Description:
*/
@SpringBootApplication
public class StreamMQMain8801
{
public static void main(String[] args)
{
SpringApplication.run(StreamMQMain8801.class,args);
}
}
2. 编写业务类
新建发送消息接口service.IMessageProvider
package com.atguigu.springcloud.service;
/**
* @Author: Daisen.Z
* @Date: 2022/1/10 17:12
* @Version: 1.0
* @Description:
*/
public interface IMessageProvider {
public String send();
}
编写发送消息接口的实现类MessageProviderImpl,注意:各个类包不要引错!!!
package com.atguigu.springcloud.service.impl;
import com.atguigu.springcloud.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import javax.annotation.Resource;
import java.util.UUID;
/**
* @Author: Daisen.Z
* @Date: 2022/1/10 17:14
* @Version: 1.0
* @Description:
*/
@EnableBinding({Source.class}) // 定义消息的推送管道
public class MessageProviderImpl implements IMessageProvider {
@Resource
private MessageChannel output; // 消息发送管道
@Override
public String send() {
String serial = UUID.randomUUID().toString();
// 使用MessageBuilder构建消息发送格式
output.send(MessageBuilder.withPayload(serial).build());
System.out.println("****** serial: "+serial);
return null;
}
}
编写Controller调用消息发送
package com.atguigu.springcloud.controller;
import com.atguigu.springcloud.service.IMessageProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* @Author: Daisen.Z
* @Date: 2022/1/10 17:22
* @Version: 1.0
* @Description:
*/
@RestController
public class SendMessageController {
@Resource
private IMessageProvider messageProvider;
@GetMapping(value="/sendMessage")
public String sendMessage(){
return messageProvider.send();
}
}
3. 启动测试
启动RabbitMQ,进入到RabbitMQ的安装目录,进入sbin目录,打开cmd,执行启动命令
rabbitmq-server start
进入RabbitMQ的可视化界面,默认账号密码都是guest
http://localhost:15672/#/
启动7001、8801工程,访问8801的接口,看是否可以成功向RabbitMQ发送消息。
启动之后会发现RabbitMQ可视化界面上多了一个exchanges,这个就是我们的exchange
访问发送消息的接口,多刷新几次。
http://localhost:8801/sendMessage
查看后台,发现已经发送了几次
查看exchange的详细信息
发现exchange是有波峰的,证明RabbitMQ已经成功收到消息。
五、创建消息消费者8802
1. 创建模块
模块名
cloud-stream-rabbitmq-consumer8802
pom依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!--基础配置-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
application.yml,与发送方不同的是output改为了input
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
input: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
eureka:
client: # 客户端进行Eureka注册的配置
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
instance-id: receive-8802.com # 在信息列表时显示主机名称
prefer-ip-address: true # 访问的路径变为IP地址
包名
com.atguigu.springcloud
主启动类
package com.atguigu.springcloud;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @auther zzyy
* @create 2020-02-22 11:56
*/
@SpringBootApplication
public class StreamMQMain8802
{
public static void main(String[] args)
{
SpringApplication.run(StreamMQMain8802.class,args);
}
}
2. 编写业务类
名称
controller.ReceiveMessageListenerController
package com.atguigu.springcloud.controller;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
/**
* @Author: Daisen.Z
* @Date: 2022/1/11 15:26
* @Version: 1.0
* @Description:
*/
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {
@Value("${server.port}")
private String serverPort;
// @StreamListener注解表示这是一个接收方的类,作为一个输入源来接收队列消息
@StreamListener(Sink.INPUT)
public void input(Message<String> message)
{
System.out.println("消费者1号,----->接受到的消息: "+message.getPayload()+"\t port: "+serverPort);
}
}
3. 启动测试
先启动rabitmq,再启动7001,8801,8802,测试从8801发送消息,8802进行接收
访问8801发送消息
http://localhost:8801/sendMessage
访问完成后,查看8801和8802后台日志
六、Stream分组消费和持久化
1. 依照8802创建出8803模块
除端口号、启动类外,其余都一样
2. 重复消费问题
启动rabitmq、7001、8801、8802、8803,我们的模式种8002和8003会是一个集群,但是当7001发送一条消息是,8802和8803会重复的进行消费处理,这就相当于如果我们通过stream进行通信,kafka发送了一条入库的数据,我们有几台集群就会重复的入几条消息,这就是重复消费的问题,这显然不合理,当然解决起来也很简单,我们使用“组”就可以来解决这个问题,同一条消息同一个组只能消费一次,我们让8002和8003配置为同一个组就可以了(不同的组会重复消费)。
给8802和8803都配置相同的消费组。
group: atguiguA
重启一下8802和8803
再次访问8801发送消息,发现无论如何发8802和8803只有一个会收到消息(始终不会重复)
3. 消息持久化问题
这里不多说,大家自行测试下。
stream中如果没有配置组(group),我们会发现每次启动时会从当前消息开始消费,也就是说如果程序重启了,rabbitmq中在这期间发送的消息就收不到了。
配置上group之后,我们能消费到完整的消息,启动时会从已消费的消息开始消费新的消息。
总结
- CloudStream屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。
- 官方API:https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/
- Stream中使用的是binder操作消息队列,类似于Hiburnate和Mybatis框架,通用的框架却可以操作多种数据库如mysql、Oracle等。
- 中文指导手册:https://m.wang1314.com/doc/webapp/topic/20971999.html
- Middleware:中间件,目前只支持RabbitMQ和Kafka。
- Binder:是应用和消息中间件之间的封装。
- @Input:注解标识输入通道,通过该输入通道接收到的消息进入应用程序。
- @Output:注解标识输出通道,发布的消息通过该通道发送到消息中间件。
- @StreamListener:监听队列,用于消费者队列的消息接收。
- @EnableBinding:指信道channel和exchange绑定在一起。
发送消息代码:
@EnableBinding({Source.class}) // 定义消息的推送管道
public class MessageProviderImpl implements IMessageProvider {
@Resource
private MessageChannel output; // 消息发送管道
@Override
public String send() {
String serial = UUID.randomUUID().toString();
// 使用MessageBuilder构建消息发送格式
output.send(MessageBuilder.withPayload(serial).build());
System.out.println("****** serial: "+serial);
return null;
}
}
接收消息代码:
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {
@Value("${server.port}")
private String serverPort;
// @StreamListener注解表示这是一个接收方的类,作为一个输入源来接收队列消息
@StreamListener(Sink.INPUT)
public void input(Message<String> message)
{
System.out.println("消费者1号,----->接受到的消息: "+message.getPayload()+"\t port: "+serverPort);
}
}
- 点赞
- 收藏
- 关注作者
评论(0)