centos7 下安装rabbitmq,spring-boot整合rabbitmq
你是否遇到过两个(多个)系统间需要通过定时任务来同步某些数据?你是否在为异构系统的不同进程间相互调用、通讯的问题而苦恼、挣扎?如果是,那么恭喜你,消息服务让你可以很轻松地解决这些问题。
消息服务擅长于解决多系统、异构系统间的数据交换(消息通知/通讯)问题,你也可以把它用于系统间服务的相互调用(RPC)
一、下载erlang 和 rabbitmq 的rpm 包 到/usr/local/下,并安装
erlang rpm包下载地址:
rabbitmq 下载地址:
https://dl.bintray.com/rabbitmq/all/rabbitmq-server/3.7.7/rabbitmq-server-3.7.7-1.el7.noarch.rpm
[root@localhost local]# yum -y install erlang-21.0.5-1.el7.centos.x86_64.rpm [root@localhost local]# yum -y install rabbitmq-server-3.7.7-1.el7.noarch.rpm
输入erl 命令验证erlang 是否安装成功
[root@localhost local]# erl Erlang/OTP 21 [erts-10.0.5] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:1] [hipe] Eshell V10.0.5 (abort with ^G) 1> BREAK: (a)bort (c)ontinue (p)roc info (i)nfo (l)oaded (v)ersion (k)ill (D)b-tables (d)istribution
运行RabbitMQ
cd /usr/sbin ./rabbitmq-server start
出现以下信息,则表示启动成功:
RabbitMQ 3.7.7. Copyright (C) 2007-2016 Pivotal Software, Inc. ## ## Licensed under the MPL. See http://www.rabbitmq.com/ ## ## ########## Logs: /var/log/rabbitmq/rabbit@mq01.log ###### ## /var/log/rabbitmq/rabbit@mq01-sasl.log ########## Starting broker... completed with 0 plugins.
后台启动可使用:
./rabbitmq-server -detached
启动rabbitmq 的时候可能会提示 /usr/local/rabbitmq-server/data 或者 /usr/local/rabbitmq-server/log 文件夹无权限,需要执行
chown -R rabbitmq:rabbitmq /usr/local/rabbitmq-server/data chown -R rabbitmq:rabbitmq /usr/local/rabbitmq-server/log
如果没有创建这两个文件夹,需要提前创建。
二、启用rabbitmq可视化管理界面
插件安装
(1)查看目前RabbitMQ已安装的插件
cd /usr/sbin ./rabbitmq-plugins list
安装web管理端
./rabbitmq-plugins enable rabbitmq_management
安装成功后,启动MQ
通过浏览器访问地址:http://192.168.42.85:15672/#/
由于guest用户被限制,只能通过localhost访问
我们可以编辑一下 vim /etc/rabbitmq/rabbitmq.config ,添加以下内容
[{rabbit, [{loopback_users, []}]}].
./rabbitmq-server -detached重启rabbitmq 服务即可使用 外网ip 登录。
(2)新建用户,设置权限,安装其他插件
新建一个用户名为admin,密码为admin的用户,并授予管理员(administrator)权限
./rabbitmqctl list_users ./rabbitmqctl add_user admin admin ./rabbitmqctl set_user_tags admin administrator ./rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'
安装mqtt
./rabbitmq-plugins enable rabbitmq_mqtt
安装websocket
./rabbitmq-plugins enable rabbitmq_web_stomp ./rabbitmq-plugins enable rabbitmq_web_stomp_examples
配置
如果需要修改RabbitMQ的默认配置,先查找配置文件样例位置
find / -name "rabbitmq.config.example"
然后将样例配置复制到制定目录
cp /usr/share/doc/rabbitmq-server-3.6.8/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
修改样例文件,并保存,重启MQ生效。
权限
主要是set_permissions的使用,先看下命令的格式:
set_permissions [-p vhost] {user} {conf} {write} {read}
首先需要注意以下几点的理解:
1.这里的权限,只是针对一般用户的访问权限,注意和角色的区分。举个例子来说,非管理用户(普通用户),角色设置为none,然后在这里配置conf、write、read的权限。
2.conf、write、read采用正则表达式,这里的正则主要是针对exchange和queue。主要2种特殊的表达式:
^$:表示完全不匹配(即没有权限)
.*:表示匹配所有(即所有权限)
如果使用RabbitMQ作为消息推送服务,需要授予用户只读访问Stomp主题权限,同时授予另一个用户对同一主题的读写访问权限。
这样的话主题订阅者就不能发布消息,从而解决安全性问题。
但是如果直接这样授权:
rabbitmqctl set_permissions read-only-user '.*' '^$' '.*'
会报如下错误:
ERROR message:access_refused content-type:text/plain version:1.0,1.1,1.2 content-length:114 ACCESS_REFUSED - access to queue 'stomp-subscription-APK3zkvXFqxvSiZ9ztmxYQ' in vhost '/' refused for user 'test' Whoops! Lost connection to ws://221.0.200.202:15674/ws
这样配置存在一个问题,将阻止任何写入,阻止任何写入将导致没有权限连接到RabbitMQ的服务
需要对可读用户权限做如下调整:
rabbitmqctl set_permissions read-only-user '^stomp-subscription.*$' '^stomp-subscription.*$'
三、spring boot整合rabbitmq
添加rabbitmq依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
在application.yml添加rabbitmq的配置信息:
spring: rabbitmq: host: 192.168.42.85 port: 5672 password: guest username: guest publisher-confirms: true virtual-host: /
RabbitMQ(Direct模式)示例:
配置类:
@Configuration public class SenderConf { //SpringBoot整合RabbitMQ(Direct模式) @Bean public Queue queue() { return new Queue("queue"); } //SpringBoot整合RabbitMQ(Topic转发模式) //定义两个消息队列 @Bean(name="message") public Queue queueMessage() { return new Queue("topic.message"); } @Bean(name="messages") public Queue queueMessages() { return new Queue("topic.messages"); } //定义topic交换机 @Bean public TopicExchange exchange() { return new TopicExchange("exchange"); } //将topic.message和topic.messages消息队列绑定到上面的交换机上,并指定不同的主题 @Bean Binding bindingExchangeMessage(@Qualifier("message") Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message"); } @Bean Binding bindingExchangeMessages(@Qualifier("messages") Queue queueMessages, TopicExchange exchange) { return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");//*表示一个词,#表示零个或多个词 } //SpringBoot整合RabbitMQ(广播模式) //定义Amessage、Bmessage、Cmessage三个消息队列 @Bean(name="Amessage") public Queue AMessage() { return new Queue("fanout.A"); } @Bean(name="Bmessage") public Queue BMessage() { return new Queue("fanout.B"); } @Bean(name="Cmessage") public Queue CMessage() { return new Queue("fanout.C"); } //配置广播路由器 @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } //将Amessage、Bmessage、Cmessage三个消息队列绑定到广播路由器上 @Bean Binding bindingExchangeA(@Qualifier("Amessage") Queue AMessage,FanoutExchange fanoutExchange) { return BindingBuilder.bind(AMessage).to(fanoutExchange); } @Bean Binding bindingExchangeB(@Qualifier("Bmessage") Queue BMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(BMessage).to(fanoutExchange); } @Bean Binding bindingExchangeC(@Qualifier("Cmessage") Queue CMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(CMessage).to(fanoutExchange); } }
生产者:
@Component public class HelloSender { @Autowired private AmqpTemplate amqpTemplate; public void sendMsg() { User user = new User(); user.setName("张三"); user.setPassword("123456dzx"); amqpTemplate.convertAndSend("queue", user); amqpTemplate.convertAndSend("exchange", "topic.message", user); //广播模式下指定队列名称不起作用,所有绑定到广播交换机上面的队列都会收到广播的消息 amqpTemplate.convertAndSend("fanoutExchange", "", user); } }
消费者:
@Component public class HelloRecevier { @RabbitListener(queues = "queue")//监听器监听指定的Queue public void reveMsg(User user) { System.out.println("Receive:" + user); } @RabbitListener(queues = "topic.message") //监听器监听指定的Queue public void process1(User user) { System.out.println("message:" + user); } @RabbitListener(queues = "topic.messages") //监听器监听指定的Queue public void process2(User user) { System.out.println("messages:" + user); } @RabbitListener(queues = "fanout.A")//监听器监听指定的Queue public void processA(User user) { System.out.println("ReceiveA:" + user); } @RabbitListener(queues = "fanout.B")//监听器监听指定的Queue public void processB(User user) { System.out.println("ReceiveB:" + user); } @RabbitListener(queues = "fanout.C")//监听器监听指定的Queue public void processC(User user) { System.out.println("ReceiveC:" + user); } }
编写测试类,启动应用,运行测试类看到控制台如下输出说明整合成功。
@RunWith(SpringRunner.class) @SpringBootTest public class HelloSenderTest{ @Autowired private HelloSender helloSender; @Test public void sendMsg() { helloSender.sendMsg(); } }
- 点赞
- 收藏
- 关注作者
评论(0)