centos7 下安装rabbitmq,spring-boot整合rabbitmq
你是否遇到过两个(多个)系统间需要通过定时任务来同步某些数据?你是否在为异构系统的不同进程间相互调用、通讯的问题而苦恼、挣扎?如果是,那么恭喜你,消息服务让你可以很轻松地解决这些问题。
消息服务擅长于解决多系统、异构系统间的数据交换(消息通知/通讯)问题,你也可以把它用于系统间服务的相互调用(RPC)
一、下载erlang 和 rabbitmq 的rpm 包 到/usr/local/下,并安装
erlang rpm包下载地址:
rabbitmq 下载地址:
https://dl.bintray.com/rabbitmq/all/rabbitmq-server/3.7.7/rabbitmq-server-3.7.7-1.el7.noarch.rpm
-
[root@localhost local]# yum -y install erlang-21.0.5-1.el7.centos.x86_64.rpm
-
[root@localhost local]# yum -y install rabbitmq-server-3.7.7-1.el7.noarch.rpm
输入erl 命令验证erlang 是否安装成功
-
[root@localhost local]# erl
-
Erlang/OTP 21 [erts-10.0.5] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:1] [hipe]
-
Eshell V10.0.5 (abort with ^G)
-
1>
-
BREAK: (a)bort (c)ontinue (p)roc info (i)nfo (l)oaded
-
(v)ersion (k)ill (D)b-tables (d)istribution
运行RabbitMQ
-
cd /usr/sbin
-
./rabbitmq-server start
出现以下信息,则表示启动成功:
-
RabbitMQ 3.7.7. Copyright (C) 2007-2016 Pivotal Software, Inc.
-
## ## Licensed under the MPL. See http://www.rabbitmq.com/
-
## ##
-
########## Logs: /var/log/rabbitmq/rabbit@mq01.log
-
###### ## /var/log/rabbitmq/rabbit@mq01-sasl.log
-
##########
-
Starting broker...
-
completed with 0 plugins.
后台启动可使用:
./rabbitmq-server -detached
启动rabbitmq 的时候可能会提示 /usr/local/rabbitmq-server/data 或者 /usr/local/rabbitmq-server/log 文件夹无权限,需要执行
-
chown -R rabbitmq:rabbitmq /usr/local/rabbitmq-server/data
-
chown -R rabbitmq:rabbitmq /usr/local/rabbitmq-server/log
如果没有创建这两个文件夹,需要提前创建。
二、启用rabbitmq可视化管理界面
插件安装
(1)查看目前RabbitMQ已安装的插件
-
cd /usr/sbin
-
./rabbitmq-plugins list
安装web管理端
./rabbitmq-plugins enable rabbitmq_management
安装成功后,启动MQ
通过浏览器访问地址:http://192.168.42.85:15672/#/
由于guest用户被限制,只能通过localhost访问
我们可以编辑一下 vim /etc/rabbitmq/rabbitmq.config ,添加以下内容
[{rabbit, [{loopback_users, []}]}].
./rabbitmq-server -detached重启rabbitmq 服务即可使用 外网ip 登录。
(2)新建用户,设置权限,安装其他插件
新建一个用户名为admin,密码为admin的用户,并授予管理员(administrator)权限
-
./rabbitmqctl list_users
-
./rabbitmqctl add_user admin admin
-
./rabbitmqctl set_user_tags admin administrator
-
./rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'
安装mqtt
./rabbitmq-plugins enable rabbitmq_mqtt
安装websocket
-
./rabbitmq-plugins enable rabbitmq_web_stomp
-
./rabbitmq-plugins enable rabbitmq_web_stomp_examples
配置
如果需要修改RabbitMQ的默认配置,先查找配置文件样例位置
find / -name "rabbitmq.config.example"
然后将样例配置复制到制定目录
cp /usr/share/doc/rabbitmq-server-3.6.8/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
修改样例文件,并保存,重启MQ生效。
权限
主要是set_permissions的使用,先看下命令的格式:
set_permissions [-p vhost] {user} {conf} {write} {read}
首先需要注意以下几点的理解:
1.这里的权限,只是针对一般用户的访问权限,注意和角色的区分。举个例子来说,非管理用户(普通用户),角色设置为none,然后在这里配置conf、write、read的权限。
2.conf、write、read采用正则表达式,这里的正则主要是针对exchange和queue。主要2种特殊的表达式:
^$:表示完全不匹配(即没有权限)
.*:表示匹配所有(即所有权限)
如果使用RabbitMQ作为消息推送服务,需要授予用户只读访问Stomp主题权限,同时授予另一个用户对同一主题的读写访问权限。
这样的话主题订阅者就不能发布消息,从而解决安全性问题。
但是如果直接这样授权:
rabbitmqctl set_permissions read-only-user '.*' '^$' '.*'
会报如下错误:
ERROR message:access_refused content-type:text/plain version:1.0,1.1,1.2 content-length:114 ACCESS_REFUSED - access to queue 'stomp-subscription-APK3zkvXFqxvSiZ9ztmxYQ' in vhost '/' refused for user 'test' Whoops! Lost connection to ws://221.0.200.202:15674/ws
这样配置存在一个问题,将阻止任何写入,阻止任何写入将导致没有权限连接到RabbitMQ的服务
需要对可读用户权限做如下调整:
rabbitmqctl set_permissions read-only-user '^stomp-subscription.*$' '^stomp-subscription.*$'
三、spring boot整合rabbitmq
添加rabbitmq依赖:
-
<dependency>
-
<groupId>org.springframework.boot</groupId>
-
<artifactId>spring-boot-starter-amqp</artifactId>
-
</dependency>
在application.yml添加rabbitmq的配置信息:
-
spring:
-
rabbitmq:
-
host: 192.168.42.85
-
port: 5672
-
password: guest
-
username: guest
-
publisher-confirms: true
-
virtual-host: /
RabbitMQ(Direct模式)示例:
配置类:
-
@Configuration
-
public class SenderConf {
-
-
//SpringBoot整合RabbitMQ(Direct模式)
-
@Bean
-
public Queue queue() {
-
return new Queue("queue");
-
}
-
-
//SpringBoot整合RabbitMQ(Topic转发模式)
-
//定义两个消息队列
-
@Bean(name="message")
-
public Queue queueMessage() {
-
return new Queue("topic.message");
-
}
-
-
@Bean(name="messages")
-
public Queue queueMessages() {
-
return new Queue("topic.messages");
-
}
-
//定义topic交换机
-
@Bean
-
public TopicExchange exchange() {
-
return new TopicExchange("exchange");
-
}
-
//将topic.message和topic.messages消息队列绑定到上面的交换机上,并指定不同的主题
-
@Bean
-
Binding bindingExchangeMessage(@Qualifier("message") Queue queueMessage, TopicExchange exchange) {
-
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
-
}
-
-
@Bean
-
Binding bindingExchangeMessages(@Qualifier("messages") Queue queueMessages, TopicExchange exchange) {
-
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");//*表示一个词,#表示零个或多个词
-
}
-
-
-
//SpringBoot整合RabbitMQ(广播模式)
-
//定义Amessage、Bmessage、Cmessage三个消息队列
-
@Bean(name="Amessage")
-
public Queue AMessage() {
-
return new Queue("fanout.A");
-
}
-
-
-
@Bean(name="Bmessage")
-
public Queue BMessage() {
-
return new Queue("fanout.B");
-
}
-
-
@Bean(name="Cmessage")
-
public Queue CMessage() {
-
return new Queue("fanout.C");
-
}
-
//配置广播路由器
-
@Bean
-
FanoutExchange fanoutExchange() {
-
return new FanoutExchange("fanoutExchange");
-
}
-
-
//将Amessage、Bmessage、Cmessage三个消息队列绑定到广播路由器上
-
@Bean
-
Binding bindingExchangeA(@Qualifier("Amessage") Queue AMessage,FanoutExchange fanoutExchange) {
-
return BindingBuilder.bind(AMessage).to(fanoutExchange);
-
}
-
-
@Bean
-
Binding bindingExchangeB(@Qualifier("Bmessage") Queue BMessage, FanoutExchange fanoutExchange) {
-
return BindingBuilder.bind(BMessage).to(fanoutExchange);
-
}
-
-
@Bean
-
Binding bindingExchangeC(@Qualifier("Cmessage") Queue CMessage, FanoutExchange fanoutExchange) {
-
return BindingBuilder.bind(CMessage).to(fanoutExchange);
-
}
-
}
生产者:
-
@Component
-
public class HelloSender {
-
-
@Autowired
-
private AmqpTemplate amqpTemplate;
-
-
public void sendMsg() {
-
User user = new User();
-
user.setName("张三");
-
user.setPassword("123456dzx");
-
amqpTemplate.convertAndSend("queue", user);
-
amqpTemplate.convertAndSend("exchange", "topic.message", user);
-
//广播模式下指定队列名称不起作用,所有绑定到广播交换机上面的队列都会收到广播的消息
-
amqpTemplate.convertAndSend("fanoutExchange", "", user);
-
}
-
}
消费者:
-
@Component
-
public class HelloRecevier {
-
-
-
@RabbitListener(queues = "queue")//监听器监听指定的Queue
-
public void reveMsg(User user) {
-
System.out.println("Receive:" + user);
-
}
-
-
@RabbitListener(queues = "topic.message") //监听器监听指定的Queue
-
public void process1(User user) {
-
System.out.println("message:" + user);
-
}
-
-
@RabbitListener(queues = "topic.messages") //监听器监听指定的Queue
-
public void process2(User user) {
-
System.out.println("messages:" + user);
-
}
-
-
-
@RabbitListener(queues = "fanout.A")//监听器监听指定的Queue
-
public void processA(User user) {
-
System.out.println("ReceiveA:" + user);
-
}
-
-
@RabbitListener(queues = "fanout.B")//监听器监听指定的Queue
-
public void processB(User user) {
-
System.out.println("ReceiveB:" + user);
-
}
-
-
@RabbitListener(queues = "fanout.C")//监听器监听指定的Queue
-
public void processC(User user) {
-
System.out.println("ReceiveC:" + user);
-
}
-
-
}
编写测试类,启动应用,运行测试类看到控制台如下输出说明整合成功。
-
@RunWith(SpringRunner.class)
-
@SpringBootTest
-
public class HelloSenderTest{
-
-
@Autowired
-
private HelloSender helloSender;
-
-
@Test
-
public void sendMsg() {
-
helloSender.sendMsg();
-
}
-
}
文章来源: blog.csdn.net,作者:血煞风雨城2018,版权归原作者所有,如需转载,请联系作者。
原文链接:blog.csdn.net/qq_31905135/article/details/81871940
- 点赞
- 收藏
- 关注作者
评论(0)