centos7 下安装rabbitmq,spring-boot整合rabbitmq

举报
小米粒-biubiubiu 发表于 2020/11/30 23:19:20 2020/11/30
【摘要】 你是否遇到过两个(多个)系统间需要通过定时任务来同步某些数据?你是否在为异构系统的不同进程间相互调用、通讯的问题而苦恼、挣扎?如果是,那么恭喜你,消息服务让你可以很轻松地解决这些问题。 消息服务擅长于解决多系统、异构系统间的数据交换(消息通知/通讯)问题,你也可以把它用于系统间服务的相互调用(RPC) 一、下载erlang 和 rabbitmq 的rpm 包 到/usr/l...

你是否遇到过两个(多个)系统间需要通过定时任务来同步某些数据?你是否在为异构系统的不同进程间相互调用、通讯的问题而苦恼、挣扎?如果是,那么恭喜你,消息服务让你可以很轻松地解决这些问题。
消息服务擅长于解决多系统、异构系统间的数据交换(消息通知/通讯)问题,你也可以把它用于系统间服务的相互调用(RPC)

一、下载erlang 和 rabbitmq 的rpm 包 到/usr/local/下,并安装

erlang rpm包下载地址:

https://bintray.com/rabbitmq/rpm/download_file?file_path=erlang%2F21%2Fel%2F7%2Fx86_64%2Ferlang-21.0.5-1.el7.centos.x86_64.rpm

rabbitmq 下载地址:

https://dl.bintray.com/rabbitmq/all/rabbitmq-server/3.7.7/rabbitmq-server-3.7.7-1.el7.noarch.rpm


  
  1. [root@localhost local]# yum -y install erlang-21.0.5-1.el7.centos.x86_64.rpm
  2. [root@localhost local]# yum -y install rabbitmq-server-3.7.7-1.el7.noarch.rpm 

输入erl 命令验证erlang 是否安装成功 


  
  1. [root@localhost local]# erl
  2. Erlang/OTP 21 [erts-10.0.5] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:1] [hipe]
  3. Eshell V10.0.5 (abort with ^G)
  4. 1>
  5. BREAK: (a)bort (c)ontinue (p)roc info (i)nfo (l)oaded
  6. (v)ersion (k)ill (D)b-tables (d)istribution

运行RabbitMQ


  
  1. cd /usr/sbin
  2. ./rabbitmq-server start

出现以下信息,则表示启动成功:


  
  1. RabbitMQ 3.7.7. Copyright (C) 2007-2016 Pivotal Software, Inc.
  2. ## ## Licensed under the MPL. See http://www.rabbitmq.com/
  3. ## ##
  4. ########## Logs: /var/log/rabbitmq/rabbit@mq01.log
  5. ###### ## /var/log/rabbitmq/rabbit@mq01-sasl.log
  6. ##########
  7. Starting broker...
  8. completed with 0 plugins.

后台启动可使用:

./rabbitmq-server -detached
 

启动rabbitmq 的时候可能会提示  /usr/local/rabbitmq-server/data 或者 /usr/local/rabbitmq-server/log 文件夹无权限,需要执行


  
  1. chown -R rabbitmq:rabbitmq  /usr/local/rabbitmq-server/data
  2. chown -R rabbitmq:rabbitmq  /usr/local/rabbitmq-server/log

如果没有创建这两个文件夹,需要提前创建。

二、启用rabbitmq可视化管理界面

插件安装

(1)查看目前RabbitMQ已安装的插件


  
  1. cd /usr/sbin
  2. ./rabbitmq-plugins list

安装web管理端

./rabbitmq-plugins enable rabbitmq_management

 

安装成功后,启动MQ

通过浏览器访问地址:http://192.168.42.85:15672/#/

由于guest用户被限制,只能通过localhost访问

我们可以编辑一下  vim  /etc/rabbitmq/rabbitmq.config   ,添加以下内容

[{rabbit, [{loopback_users, []}]}].

./rabbitmq-server -detached重启rabbitmq 服务即可使用 外网ip 登录。

(2)新建用户,设置权限,安装其他插件

新建一个用户名为admin,密码为admin的用户,并授予管理员(administrator)权限


  
  1. ./rabbitmqctl list_users
  2. ./rabbitmqctl add_user admin admin
  3. ./rabbitmqctl set_user_tags admin administrator
  4. ./rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'

安装mqtt

./rabbitmq-plugins enable rabbitmq_mqtt

 

安装websocket


  
  1. ./rabbitmq-plugins enable rabbitmq_web_stomp
  2. ./rabbitmq-plugins enable rabbitmq_web_stomp_examples

配置
如果需要修改RabbitMQ的默认配置,先查找配置文件样例位置

find / -name "rabbitmq.config.example"

 

然后将样例配置复制到制定目录

cp /usr/share/doc/rabbitmq-server-3.6.8/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config

 

修改样例文件,并保存,重启MQ生效。

权限
主要是set_permissions的使用,先看下命令的格式:

set_permissions [-p vhost] {user} {conf} {write} {read}

 

首先需要注意以下几点的理解:

1.这里的权限,只是针对一般用户的访问权限,注意和角色的区分。举个例子来说,非管理用户(普通用户),角色设置为none,然后在这里配置conf、write、read的权限。

2.conf、write、read采用正则表达式,这里的正则主要是针对exchange和queue。主要2种特殊的表达式:

^$:表示完全不匹配(即没有权限)

.*:表示匹配所有(即所有权限)

如果使用RabbitMQ作为消息推送服务,需要授予用户只读访问Stomp主题权限,同时授予另一个用户对同一主题的读写访问权限。

这样的话主题订阅者就不能发布消息,从而解决安全性问题。

但是如果直接这样授权:

rabbitmqctl set_permissions read-only-user '.*' '^$' '.*' 

 

会报如下错误:

 ERROR message:access_refused content-type:text/plain version:1.0,1.1,1.2 content-length:114 ACCESS_REFUSED - access to queue 'stomp-subscription-APK3zkvXFqxvSiZ9ztmxYQ' in vhost '/' refused for user 'test' Whoops! Lost connection to ws://221.0.200.202:15674/ws

 

这样配置存在一个问题,将阻止任何写入,阻止任何写入将导致没有权限连接到RabbitMQ的服务

需要对可读用户权限做如下调整:

rabbitmqctl set_permissions read-only-user '^stomp-subscription.*$' '^stomp-subscription.*$' 
 

三、spring boot整合rabbitmq

添加rabbitmq依赖:


  
  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

在application.yml添加rabbitmq的配置信息: 


  
  1. spring:
  2. rabbitmq:
  3. host: 192.168.42.85
  4. port: 5672
  5. password: guest
  6. username: guest
  7. publisher-confirms: true
  8. virtual-host: /

RabbitMQ(Direct模式)示例:

配置类:


  
  1. @Configuration
  2. public class SenderConf {
  3. //SpringBoot整合RabbitMQ(Direct模式)
  4. @Bean
  5. public Queue queue() {
  6. return new Queue("queue");
  7. }
  8. //SpringBoot整合RabbitMQ(Topic转发模式)
  9. //定义两个消息队列
  10. @Bean(name="message")
  11. public Queue queueMessage() {
  12. return new Queue("topic.message");
  13. }
  14. @Bean(name="messages")
  15. public Queue queueMessages() {
  16. return new Queue("topic.messages");
  17. }
  18. //定义topic交换机
  19. @Bean
  20. public TopicExchange exchange() {
  21. return new TopicExchange("exchange");
  22. }
  23. //将topic.message和topic.messages消息队列绑定到上面的交换机上,并指定不同的主题
  24. @Bean
  25. Binding bindingExchangeMessage(@Qualifier("message") Queue queueMessage, TopicExchange exchange) {
  26. return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
  27. }
  28. @Bean
  29. Binding bindingExchangeMessages(@Qualifier("messages") Queue queueMessages, TopicExchange exchange) {
  30. return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");//*表示一个词,#表示零个或多个词
  31. }
  32. //SpringBoot整合RabbitMQ(广播模式)
  33. //定义Amessage、Bmessage、Cmessage三个消息队列
  34. @Bean(name="Amessage")
  35. public Queue AMessage() {
  36. return new Queue("fanout.A");
  37. }
  38. @Bean(name="Bmessage")
  39. public Queue BMessage() {
  40. return new Queue("fanout.B");
  41. }
  42. @Bean(name="Cmessage")
  43. public Queue CMessage() {
  44. return new Queue("fanout.C");
  45. }
  46. //配置广播路由器
  47. @Bean
  48. FanoutExchange fanoutExchange() {
  49. return new FanoutExchange("fanoutExchange");
  50. }
  51. //将Amessage、Bmessage、Cmessage三个消息队列绑定到广播路由器上
  52. @Bean
  53. Binding bindingExchangeA(@Qualifier("Amessage") Queue AMessage,FanoutExchange fanoutExchange) {
  54. return BindingBuilder.bind(AMessage).to(fanoutExchange);
  55. }
  56. @Bean
  57. Binding bindingExchangeB(@Qualifier("Bmessage") Queue BMessage, FanoutExchange fanoutExchange) {
  58. return BindingBuilder.bind(BMessage).to(fanoutExchange);
  59. }
  60. @Bean
  61. Binding bindingExchangeC(@Qualifier("Cmessage") Queue CMessage, FanoutExchange fanoutExchange) {
  62. return BindingBuilder.bind(CMessage).to(fanoutExchange);
  63. }
  64. }

生产者:


  
  1. @Component
  2. public class HelloSender {
  3. @Autowired
  4. private AmqpTemplate amqpTemplate;
  5. public void sendMsg() {
  6. User user = new User();
  7. user.setName("张三");
  8. user.setPassword("123456dzx");
  9. amqpTemplate.convertAndSend("queue", user);
  10. amqpTemplate.convertAndSend("exchange", "topic.message", user);
  11. //广播模式下指定队列名称不起作用,所有绑定到广播交换机上面的队列都会收到广播的消息
  12. amqpTemplate.convertAndSend("fanoutExchange", "", user);
  13. }
  14. }

消费者:


  
  1. @Component
  2. public class HelloRecevier {
  3. @RabbitListener(queues = "queue")//监听器监听指定的Queue
  4. public void reveMsg(User user) {
  5. System.out.println("Receive:" + user);
  6. }
  7. @RabbitListener(queues = "topic.message") //监听器监听指定的Queue
  8. public void process1(User user) {
  9. System.out.println("message:" + user);
  10. }
  11. @RabbitListener(queues = "topic.messages") //监听器监听指定的Queue
  12. public void process2(User user) {
  13. System.out.println("messages:" + user);
  14. }
  15. @RabbitListener(queues = "fanout.A")//监听器监听指定的Queue
  16. public void processA(User user) {
  17. System.out.println("ReceiveA:" + user);
  18. }
  19. @RabbitListener(queues = "fanout.B")//监听器监听指定的Queue
  20. public void processB(User user) {
  21. System.out.println("ReceiveB:" + user);
  22. }
  23. @RabbitListener(queues = "fanout.C")//监听器监听指定的Queue
  24. public void processC(User user) {
  25. System.out.println("ReceiveC:" + user);
  26. }
  27. }

编写测试类,启动应用,运行测试类看到控制台如下输出说明整合成功。


  
  1. @RunWith(SpringRunner.class)
  2. @SpringBootTest
  3. public class HelloSenderTest{
  4. @Autowired
  5. private HelloSender helloSender;
  6. @Test
  7. public void sendMsg() {
  8. helloSender.sendMsg();
  9. }
  10. }

 

 

 

 

文章来源: blog.csdn.net,作者:血煞风雨城2018,版权归原作者所有,如需转载,请联系作者。

原文链接:blog.csdn.net/qq_31905135/article/details/81871940

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。