centos7 下安装rabbitmq,spring-boot整合rabbitmq
【摘要】 你是否遇到过两个(多个)系统间需要通过定时任务来同步某些数据?你是否在为异构系统的不同进程间相互调用、通讯的问题而苦恼、挣扎?如果是,那么恭喜你,消息服务让你可以很轻松地解决这些问题。 消息服务擅长于解决多系统、异构系统间的数据交换(消息通知/通讯)问题,你也可以把它用于系统间服务的相互调用(RPC)
一、下载erlang 和 rabbitmq 的rpm 包 到/usr/l...
你是否遇到过两个(多个)系统间需要通过定时任务来同步某些数据?你是否在为异构系统的不同进程间相互调用、通讯的问题而苦恼、挣扎?如果是,那么恭喜你,消息服务让你可以很轻松地解决这些问题。
消息服务擅长于解决多系统、异构系统间的数据交换(消息通知/通讯)问题,你也可以把它用于系统间服务的相互调用(RPC)
一、下载erlang 和 rabbitmq 的rpm 包 到/usr/local/下,并安装
erlang rpm包下载地址:
rabbitmq 下载地址:
https://dl.bintray.com/rabbitmq/all/rabbitmq-server/3.7.7/rabbitmq-server-3.7.7-1.el7.noarch.rpm
[root@localhost local]# yum -y install erlang-21.0.5-1.el7.centos.x86_64.rpm
[root@localhost local]# yum -y install rabbitmq-server-3.7.7-1.el7.noarch.rpm
输入erl 命令验证erlang 是否安装成功
[root@localhost local]# erl
Erlang/OTP 21 [erts-10.0.5] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:1] [hipe]
Eshell V10.0.5 (abort with ^G)
1>
BREAK: (a)bort (c)ontinue (p)roc info (i)nfo (l)oaded
(v)ersion (k)ill (D)b-tables (d)istribution
运行RabbitMQ
cd /usr/sbin
./rabbitmq-server start
出现以下信息,则表示启动成功:
RabbitMQ 3.7.7. Copyright (C) 2007-2016 Pivotal Software, Inc.
## ## Licensed under the MPL. See http://www.rabbitmq.com/
## ##
########## Logs: /var/log/rabbitmq/rabbit@mq01.log
###### ## /var/log/rabbitmq/rabbit@mq01-sasl.log
##########
Starting broker...
completed with 0 plugins.
后台启动可使用:
./rabbitmq-server -detached
启动rabbitmq 的时候可能会提示 /usr/local/rabbitmq-server/data 或者 /usr/local/rabbitmq-server/log 文件夹无权限,需要执行
chown -R rabbitmq:rabbitmq /usr/local/rabbitmq-server/data
chown -R rabbitmq:rabbitmq /usr/local/rabbitmq-server/log
如果没有创建这两个文件夹,需要提前创建。
二、启用rabbitmq可视化管理界面
插件安装
(1)查看目前RabbitMQ已安装的插件
cd /usr/sbin
./rabbitmq-plugins list
安装web管理端
./rabbitmq-plugins enable rabbitmq_management
安装成功后,启动MQ
通过浏览器访问地址:http://192.168.42.85:15672/#/
由于guest用户被限制,只能通过localhost访问
我们可以编辑一下 vim /etc/rabbitmq/rabbitmq.config ,添加以下内容
[{rabbit, [{loopback_users, []}]}].
./rabbitmq-server -detached重启rabbitmq 服务即可使用 外网ip 登录。
(2)新建用户,设置权限,安装其他插件
新建一个用户名为admin,密码为admin的用户,并授予管理员(administrator)权限
./rabbitmqctl list_users
./rabbitmqctl add_user admin admin
./rabbitmqctl set_user_tags admin administrator
./rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'
安装mqtt
./rabbitmq-plugins enable rabbitmq_mqtt
安装websocket
./rabbitmq-plugins enable rabbitmq_web_stomp
./rabbitmq-plugins enable rabbitmq_web_stomp_examples
配置
如果需要修改RabbitMQ的默认配置,先查找配置文件样例位置
find / -name "rabbitmq.config.example"
然后将样例配置复制到制定目录
cp /usr/share/doc/rabbitmq-server-3.6.8/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
修改样例文件,并保存,重启MQ生效。
权限
主要是set_permissions的使用,先看下命令的格式:
set_permissions [-p vhost] {user} {conf} {write} {read}
首先需要注意以下几点的理解:
1.这里的权限,只是针对一般用户的访问权限,注意和角色的区分。举个例子来说,非管理用户(普通用户),角色设置为none,然后在这里配置conf、write、read的权限。
2.conf、write、read采用正则表达式,这里的正则主要是针对exchange和queue。主要2种特殊的表达式:
^$:表示完全不匹配(即没有权限)
.*:表示匹配所有(即所有权限)
如果使用RabbitMQ作为消息推送服务,需要授予用户只读访问Stomp主题权限,同时授予另一个用户对同一主题的读写访问权限。
这样的话主题订阅者就不能发布消息,从而解决安全性问题。
但是如果直接这样授权:
rabbitmqctl set_permissions read-only-user '.*' '^$' '.*'
会报如下错误:
ERROR message:access_refused content-type:text/plain version:1.0,1.1,1.2 content-length:114 ACCESS_REFUSED - access to queue 'stomp-subscription-APK3zkvXFqxvSiZ9ztmxYQ' in vhost '/' refused for user 'test' Whoops! Lost connection to ws://221.0.200.202:15674/ws
这样配置存在一个问题,将阻止任何写入,阻止任何写入将导致没有权限连接到RabbitMQ的服务
需要对可读用户权限做如下调整:
rabbitmqctl set_permissions read-only-user '^stomp-subscription.*$' '^stomp-subscription.*$'
三、spring boot整合rabbitmq
添加rabbitmq依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在application.yml添加rabbitmq的配置信息:
spring:
rabbitmq:
host: 192.168.42.85
port: 5672
password: guest
username: guest
publisher-confirms: true
virtual-host: /
RabbitMQ(Direct模式)示例:
配置类:
@Configuration
public class SenderConf {
//SpringBoot整合RabbitMQ(Direct模式)
@Bean
public Queue queue() {
return new Queue("queue");
}
//SpringBoot整合RabbitMQ(Topic转发模式)
//定义两个消息队列
@Bean(name="message")
public Queue queueMessage() {
return new Queue("topic.message");
}
@Bean(name="messages")
public Queue queueMessages() {
return new Queue("topic.messages");
}
//定义topic交换机
@Bean
public TopicExchange exchange() {
return new TopicExchange("exchange");
}
//将topic.message和topic.messages消息队列绑定到上面的交换机上,并指定不同的主题
@Bean
Binding bindingExchangeMessage(@Qualifier("message") Queue queueMessage, TopicExchange exchange) {
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
}
@Bean
Binding bindingExchangeMessages(@Qualifier("messages") Queue queueMessages, TopicExchange exchange) {
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");//*表示一个词,#表示零个或多个词
}
//SpringBoot整合RabbitMQ(广播模式)
//定义Amessage、Bmessage、Cmessage三个消息队列
@Bean(name="Amessage")
public Queue AMessage() {
return new Queue("fanout.A");
}
@Bean(name="Bmessage")
public Queue BMessage() {
return new Queue("fanout.B");
}
@Bean(name="Cmessage")
public Queue CMessage() {
return new Queue("fanout.C");
}
//配置广播路由器
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
//将Amessage、Bmessage、Cmessage三个消息队列绑定到广播路由器上
@Bean
Binding bindingExchangeA(@Qualifier("Amessage") Queue AMessage,FanoutExchange fanoutExchange) {
return BindingBuilder.bind(AMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeB(@Qualifier("Bmessage") Queue BMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(BMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(@Qualifier("Cmessage") Queue CMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(CMessage).to(fanoutExchange);
}
}
生产者:
@Component
public class HelloSender {
@Autowired
private AmqpTemplate amqpTemplate;
public void sendMsg() {
User user = new User();
user.setName("张三");
user.setPassword("123456dzx");
amqpTemplate.convertAndSend("queue", user);
amqpTemplate.convertAndSend("exchange", "topic.message", user);
//广播模式下指定队列名称不起作用,所有绑定到广播交换机上面的队列都会收到广播的消息
amqpTemplate.convertAndSend("fanoutExchange", "", user);
}
}
消费者:
@Component
public class HelloRecevier {
@RabbitListener(queues = "queue")//监听器监听指定的Queue
public void reveMsg(User user) {
System.out.println("Receive:" + user);
}
@RabbitListener(queues = "topic.message") //监听器监听指定的Queue
public void process1(User user) {
System.out.println("message:" + user);
}
@RabbitListener(queues = "topic.messages") //监听器监听指定的Queue
public void process2(User user) {
System.out.println("messages:" + user);
}
@RabbitListener(queues = "fanout.A")//监听器监听指定的Queue
public void processA(User user) {
System.out.println("ReceiveA:" + user);
}
@RabbitListener(queues = "fanout.B")//监听器监听指定的Queue
public void processB(User user) {
System.out.println("ReceiveB:" + user);
}
@RabbitListener(queues = "fanout.C")//监听器监听指定的Queue
public void processC(User user) {
System.out.println("ReceiveC:" + user);
}
}
编写测试类,启动应用,运行测试类看到控制台如下输出说明整合成功。
@RunWith(SpringRunner.class)
@SpringBootTest
public class HelloSenderTest{
@Autowired
private HelloSender helloSender;
@Test
public void sendMsg() {
helloSender.sendMsg();
}
}
文章来源: blog.csdn.net,作者:血煞风雨城2018,版权归原作者所有,如需转载,请联系作者。
原文链接:blog.csdn.net/qq_31905135/article/details/81871940
【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
作者其他文章
评论(0)