RabbitMQ场景观察

举报
Amrf 发表于 2019/08/30 15:48:53 2019/08/30
【摘要】 搜索一下很容易得到如下的结果:异步处理应用解耦流量削峰为何用消息队列从上面的描述中可以看出消息队列是一种应用间的异步协作机制,那什么时候需要使用 MQ 呢?以常见的订单系统为例,用户点击【下单】按钮之后的业务逻辑可能包括:扣减库存、生成相应单据、发红包、发短信通知。在业务发展初期这些逻辑可能放在一起同步执行,随着业务的发展订单量增长,需要提升系统服务的性能,这时可以将一些不需要立即生效的操作...

搜索一下很容易得到如下的结果:

异步处理
应用解耦
流量削峰
为何用消息队列
从上面的描述中可以看出消息队列是一种应用间的异步协作机制,那什么时候需要使用 MQ 呢?
以常见的订单系统为例,用户点击【下单】按钮之后的业务逻辑可能包括:扣减库存、生成相应单据、发红包、发短信通知。在业务发展初期这些逻辑可能放在一起同步执行,随着业务的发展订单量增长,需要提升系统服务的性能,这时可以将一些不需要立即生效的操作拆分出来异步执行,比如发放红包、发短信通知等。这种场景下就可以用 MQ ,在下单的主流程(比如扣减库存、生成相应单据)完成之后发送一条消息到 MQ 让主流程快速完结,而由另外的单独线程拉取MQ的消息(或者由 MQ 推送消息),当发现 MQ 中有发红包或发短信之类的消息时,执行相应的业务逻辑。
以上是用于业务解耦的情况,其它常见场景包括最终一致性、广播、错峰流控等等。

那如何和自己的项目场景产生联系呢,上面的描述还是笼统了些,在看看一些别人描述的一些比较具体的场景:

消息中间件系列五:RabbitMQ的使用场景(异步处理、应用解耦)

用户注册场景:

  • 串行

saveUser.saveUser(user);
sendEmail.sendEmail(user.getEmail());
sendSms.sendSms(user.getPhoneNumber());
return true;
  • 并行

saveUser.saveUser(user);
new Thread(sendEmailFuture).start();            
new Thread(sendSmsFuture).start();
sendEmailFuture.get();//获取邮件发送的结果
sendSmsFuture.get();//获取短信发送的结果
return true;
  • rapbitrq

saveUser.saveUser(user);
rabbitTemplate.send("user-reg-exchange","email",  new Message(user.getEmail().getBytes(),new MessageProperties()));
rabbitTemplate.send("user-reg-exchange","sms", new Message(user.getEmail().getBytes(),new MessageProperties()));

然后测试结果:

串行模式  ************spend time : 251ms(这个慢好理解100+100+50)

并行模式  ************spend time : 153ms(这个也好理解100+50)

消息队列模式:************spend time : 59ms (这个是快,但是具体的email和sms其实是没有确认的,只是解了耦合加快了消息响应,但是如果需要等待一些返回的时候,那么时间会和并行模式的相似)

订单场景:

用户下订单买商品,订单成功了,去扣减库存,库存必须扣减完成,没有库存,库存低于某个阈值,可以扣减成功,要通知其他系统(如采购系统尽快采购,用户订单系统我们尽快调货)

RPC实现。库存系统失败,订单系统也无法成功,订单系统和库存系统耦合了。所以要缓存消息中间件来解耦。发送一个扣减库存的消息,保证消息必须被库存系统处理。

三个问题要解决:

1)订单系统发给MQ服务器的消息,必须被MO服务器接收到(事物、发送者确认)

2)MQ服务器拿到消息以后,消息被正常处理以前必须保存住(持久化)

3)某个库存服务出现了异常,消息要能够被其他库存系统处理(消费者确认,消息监听类要实现ChannelAwareMessageListener)

订单系统一定要知道库存系统是否处理成功怎么办?

库存系统和订单系统之间建立一个消息通道,库存系统去通知订单系统

 演示的代码挺多的,我摘了一些:

<property name="publisherConfirms" value="true"/>
<property name="confirmCallback" ref="confirmCallback"/>
<property name="returnCallback" ref="sendReturnCallback"/>
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);

--->回想一下这种场景,如果不是有rabbitRq会怎么处理呢,非分布式的场景下,库存失败订单失败,通过数据库事务可以达到一定程度的效果,但是库存系统和订单系统势必是非完全隔离的;通过数据库事件或者定时任务不断检测库存来通知其他系统,分布式的场景下也可以采用类似的逻辑,这样处理即时性不好,而且当检测周期设置的比较短的时也会占用系统资源影响效率--->额,但是功能上应该没问题,非身临其境不好把握问题的关键啊

假设预设前提,库存系统和订单系统是完全各自独立运行的包括数据,通过rpc或者rest或者soap之类的进行一些同步的交流,和上面一样同步串行导致部分耦合,而rabbitrq是异步的消息队列

 参考:

RabbitMQ 的应用场景以及基本原理介绍

摘一部分:

RabbitMQ 特性

RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:

 - 可靠性(Reliability)

RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。

 - 灵活的路由(Flexible Routing)

在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。

 - 消息集群(Clustering)

多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。

 - 高可用(Highly Available Queues)

队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。

 - 多种协议(Multi-protocol)

RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。

 - 多语言客户端(Many Clients)

RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。

 - 管理界面(Management UI)

RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。

 - 跟踪机制(Tracing)

如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。

 - 插件机制(Plugin System)

RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。

RabbitMQ 基本概念

- Message

消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。

 - Publisher

消息的生产者,也是一个向交换器发布消息的客户端应用程序。

 - Exchange

交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

 - Routing Key

路由关键字,exchange根据这个关键字进行消息投递。

 - Binding

绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。

 - Queue

消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

 - Connection

网络连接,比如一个TCP连接。

 - Channel

信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

 - Consumer

消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

 - Virtual Host

虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。

 - Broker
表示消息队列服务器实体。它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输,

RPC支持

MQ本身是基于异步的消息处理,前面的示例中所有的生产者(P)将消息发送到RabbitMQ后不会知道消费者(C)处理成功或者失败(甚至连有没有消费者来处理这条消息都不知道)。 但实际的应用场景中,我们很可能需要一些同步处理,需要同步等待服务端将我的消息处理完成后再进行下一步处理。这相当于RPC(Remote Procedure Call,远程过程调用)。在RabbitMQ中也支持RPC。
RabbitMQ 中实现RPC 的机制是:
客户端发送请求(消息)时,在消息的属性(MessageProperties ,在AMQP 协议中定义了14中properties ,这些属性会随着消息一起发送)中设置两个值replyTo (一个Queue 名称,用于告诉服务器处理完成后将通知我的消息发送到这个Queue 中)和correlationId (此次请求的标识号,服务器处理完成后需要将此属性返还,客户端将根据这个id了解哪条请求被成功执行了或执行失败)
服务器端收到消息并处理
服务器端处理完消息后,将生成一条应答消息到replyTo 指定的Queue ,同时带上correlationId 属性
客户端之前已订阅replyTo 指定的Queue ,从中收到服务器的应答消息后,根据其中的correlationId 属性分析哪条请求被执行了,根据执行结果进行后续业务处理


其他:

https://blog.csdn.net/anzhsoft/article/details/19563091

https://www.cnblogs.com/vipstone/p/9275256.html

https://www.jianshu.com/p/79ca08116d57

https://zhuanlan.zhihu.com/p/48230422+&cd=4&hl=zh-CN&ct=clnk&gl=sg

https://github.com/lemon-china/lemon-rabbitmq

https://www.jianshu.com/p/b5d3a1d6934b

https://blog.csdn.net/zheng_lan_fang/article/details/78612870

https://juejin.im/post/5d39101bf265da1bd3059da6

http://webcache.googleusercontent.com/search?q=cache:CTgcssVhOOMJ:coderec.cn/2016/04/10/%25E4%25BD%25BF%25E7%2594%25A8%25E6%25B6%2588%25E6%2581%25AF%25E9%2598%259F%25E5%2588%2597%25E8%25BF%259B%25E8%25A1%258C%25E8%25A7%25A3%25E8%2580%25A6/+&cd=5&hl=zh-CN&ct=clnk&gl=sg

https://github.com/leeSmall/MessageMiddleware

https://webcache.googleusercontent.com/search?q=cache:xcTN_cT_EegJ:https://zhuanlan.zhihu.com/p/66022936+&cd=1&hl=zh-CN&ct=clnk&gl=sg


【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。