centos7 下安装rabbitmq,spring-boot整合rabbitmq

举报
小米粒-biubiubiu 发表于 2020/10/22 11:00:41 2020/10/22
【摘要】 centos7 下安装rabbitmq,spring-boot整合rabbitmq

你是否遇到过两个(多个)系统间需要通过定时任务来同步某些数据?你是否在为异构系统的不同进程间相互调用、通讯的问题而苦恼、挣扎?如果是,那么恭喜你,消息服务让你可以很轻松地解决这些问题。
消息服务擅长于解决多系统、异构系统间的数据交换(消息通知/通讯)问题,你也可以把它用于系统间服务的相互调用(RPC)

一、下载erlang 和 rabbitmq 的rpm 包 到/usr/local/下,并安装

erlang rpm包下载地址:

https://bintray.com/rabbitmq/rpm/download_file?file_path=erlang%2F21%2Fel%2F7%2Fx86_64%2Ferlang-21.0.5-1.el7.centos.x86_64.rpm

rabbitmq 下载地址:

https://dl.bintray.com/rabbitmq/all/rabbitmq-server/3.7.7/rabbitmq-server-3.7.7-1.el7.noarch.rpm

[root@localhost local]# yum -y install erlang-21.0.5-1.el7.centos.x86_64.rpm
[root@localhost local]# yum -y install rabbitmq-server-3.7.7-1.el7.noarch.rpm

输入erl 命令验证erlang 是否安装成功 

[root@localhost local]# erl
Erlang/OTP 21 [erts-10.0.5] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:1] [hipe]
Eshell V10.0.5  (abort with ^G)
1> 
BREAK: (a)bort (c)ontinue (p)roc info (i)nfo (l)oaded
       (v)ersion (k)ill (D)b-tables (d)istribution

运行RabbitMQ

cd /usr/sbin
./rabbitmq-server start

出现以下信息,则表示启动成功:

RabbitMQ 3.7.7. Copyright (C) 2007-2016 Pivotal Software, Inc.
## ## Licensed under the MPL. See http://www.rabbitmq.com/
## ##
########## Logs: /var/log/rabbitmq/rabbit@mq01.log
###### ## /var/log/rabbitmq/rabbit@mq01-sasl.log
##########
Starting broker...
completed with 0 plugins.

后台启动可使用:

./rabbitmq-server -detached

启动rabbitmq 的时候可能会提示  /usr/local/rabbitmq-server/data 或者 /usr/local/rabbitmq-server/log 文件夹无权限,需要执行

chown -R rabbitmq:rabbitmq  /usr/local/rabbitmq-server/data
chown -R rabbitmq:rabbitmq  /usr/local/rabbitmq-server/log

如果没有创建这两个文件夹,需要提前创建。

二、启用rabbitmq可视化管理界面

插件安装

(1)查看目前RabbitMQ已安装的插件

cd /usr/sbin
./rabbitmq-plugins list

安装web管理端

./rabbitmq-plugins enable rabbitmq_management

安装成功后,启动MQ

通过浏览器访问地址:http://192.168.42.85:15672/#/

由于guest用户被限制,只能通过localhost访问

我们可以编辑一下  vim  /etc/rabbitmq/rabbitmq.config   ,添加以下内容

[{rabbit, [{loopback_users, []}]}].

./rabbitmq-server -detached重启rabbitmq 服务即可使用 外网ip 登录。

(2)新建用户,设置权限,安装其他插件

新建一个用户名为admin,密码为admin的用户,并授予管理员(administrator)权限

./rabbitmqctl list_users
./rabbitmqctl add_user admin admin
./rabbitmqctl set_user_tags admin administrator
./rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'

安装mqtt

./rabbitmq-plugins enable rabbitmq_mqtt

安装websocket

./rabbitmq-plugins enable rabbitmq_web_stomp
./rabbitmq-plugins enable rabbitmq_web_stomp_examples

配置
如果需要修改RabbitMQ的默认配置,先查找配置文件样例位置

find / -name "rabbitmq.config.example"

然后将样例配置复制到制定目录

cp /usr/share/doc/rabbitmq-server-3.6.8/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config

修改样例文件,并保存,重启MQ生效。

权限
主要是set_permissions的使用,先看下命令的格式:

set_permissions [-p vhost] {user} {conf} {write} {read}

首先需要注意以下几点的理解:

1.这里的权限,只是针对一般用户的访问权限,注意和角色的区分。举个例子来说,非管理用户(普通用户),角色设置为none,然后在这里配置conf、write、read的权限。

2.conf、write、read采用正则表达式,这里的正则主要是针对exchange和queue。主要2种特殊的表达式:

^$:表示完全不匹配(即没有权限)

.*:表示匹配所有(即所有权限)

如果使用RabbitMQ作为消息推送服务,需要授予用户只读访问Stomp主题权限,同时授予另一个用户对同一主题的读写访问权限。

这样的话主题订阅者就不能发布消息,从而解决安全性问题。

但是如果直接这样授权:

rabbitmqctl set_permissions read-only-user '.*' '^$' '.*'

会报如下错误:

 ERROR message:access_refused content-type:text/plain version:1.0,1.1,1.2 content-length:114 ACCESS_REFUSED - access to queue 'stomp-subscription-APK3zkvXFqxvSiZ9ztmxYQ' in vhost '/' refused for user 'test' Whoops! Lost connection to ws://221.0.200.202:15674/ws

这样配置存在一个问题,将阻止任何写入,阻止任何写入将导致没有权限连接到RabbitMQ的服务

需要对可读用户权限做如下调整:

rabbitmqctl set_permissions read-only-user '^stomp-subscription.*$' '^stomp-subscription.*$'

三、spring boot整合rabbitmq

添加rabbitmq依赖:

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

在application.yml添加rabbitmq的配置信息: 

spring:
 rabbitmq:
      host: 192.168.42.85
      port: 5672
      password: guest
      username: guest
      publisher-confirms: true
      virtual-host: /

RabbitMQ(Direct模式)示例:

配置类:

@Configuration
public class SenderConf {

    //SpringBoot整合RabbitMQ(Direct模式)
   @Bean
    public Queue queue() {
       return new Queue("queue");
  }

    //SpringBoot整合RabbitMQ(Topic转发模式)
    //定义两个消息队列
    @Bean(name="message")
    public Queue queueMessage() {
        return new Queue("topic.message");
    }

    @Bean(name="messages")
    public Queue queueMessages() {
        return new Queue("topic.messages");
    }
    //定义topic交换机
    @Bean
    public TopicExchange exchange() {
        return new TopicExchange("exchange");
    }
    //将topic.message和topic.messages消息队列绑定到上面的交换机上,并指定不同的主题
    @Bean
    Binding bindingExchangeMessage(@Qualifier("message") Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }

    @Bean
    Binding bindingExchangeMessages(@Qualifier("messages") Queue queueMessages, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");//*表示一个词,#表示零个或多个词
    }


    //SpringBoot整合RabbitMQ(广播模式)
    //定义Amessage、Bmessage、Cmessage三个消息队列
    @Bean(name="Amessage")
    public Queue AMessage() {
        return new Queue("fanout.A");
    }


    @Bean(name="Bmessage")
    public Queue BMessage() {
        return new Queue("fanout.B");
    }

    @Bean(name="Cmessage")
    public Queue CMessage() {
        return new Queue("fanout.C");
    }
    //配置广播路由器
    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

    //将Amessage、Bmessage、Cmessage三个消息队列绑定到广播路由器上
    @Bean
    Binding bindingExchangeA(@Qualifier("Amessage") Queue AMessage,FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(AMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeB(@Qualifier("Bmessage") Queue BMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(BMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeC(@Qualifier("Cmessage") Queue CMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(CMessage).to(fanoutExchange);
    }
}

生产者:

@Component
public class HelloSender {

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void sendMsg() {
        User user = new User();
        user.setName("张三");
        user.setPassword("123456dzx");
        amqpTemplate.convertAndSend("queue", user);
        amqpTemplate.convertAndSend("exchange", "topic.message", user);
        //广播模式下指定队列名称不起作用,所有绑定到广播交换机上面的队列都会收到广播的消息
        amqpTemplate.convertAndSend("fanoutExchange", "", user);
    }
}

消费者:

@Component
public class HelloRecevier {


    @RabbitListener(queues = "queue")//监听器监听指定的Queue
    public void reveMsg(User user) {
        System.out.println("Receive:" + user);
    }

    @RabbitListener(queues = "topic.message")    //监听器监听指定的Queue
    public void process1(User user) {
        System.out.println("message:" + user);
    }

    @RabbitListener(queues = "topic.messages")    //监听器监听指定的Queue
    public void process2(User user) {
        System.out.println("messages:" + user);
    }


    @RabbitListener(queues = "fanout.A")//监听器监听指定的Queue
    public void processA(User user) {
        System.out.println("ReceiveA:" + user);
    }

    @RabbitListener(queues = "fanout.B")//监听器监听指定的Queue
    public void processB(User user) {
        System.out.println("ReceiveB:" + user);
    }

    @RabbitListener(queues = "fanout.C")//监听器监听指定的Queue
    public void processC(User user) {
        System.out.println("ReceiveC:" + user);
    }

}

编写测试类,启动应用,运行测试类看到控制台如下输出说明整合成功。

@RunWith(SpringRunner.class)
@SpringBootTest
public class HelloSenderTest{

    @Autowired
    private  HelloSender  helloSender;

    @Test
    public void sendMsg() {
        helloSender.sendMsg();
    }
}






【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。