【JMS】JMS之ActiveMQ的使用

举报
程序员私房菜 发表于 2019/02/01 10:23:12 2019/02/01
【摘要】 这篇文章主要是简单介绍一下JMS和ActiveMQ,以及使用ActiveMQ来写两个demo。1. JMS是啥百度百科的解释:JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商...

这篇文章主要是简单介绍一下JMS和ActiveMQ,以及使用ActiveMQ来写两个demo。


1. JMS是啥

百度百科的解释:


JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。



JMS只是接口,不同的提供商或者开源组织对其有不同的实现,ActiveMQ就是其中之一,它支持JMS,是Apache推出的。JMS中有几个对象模型:


连接工厂:ConnectionFactory 
JMS连接:Connection 
JMS会话:Session 
JMS目的:Destination 
JMS生产者:Producer 
JMS消费者:Consumer 
JMS消息两种类型:点对点和发布/订阅。


可以看出JMS实际上和JDBC有点类似,JDBC 是可以用来访问许多不同关系数据库的 API,而 JMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。本文主要使用ActiveMQ。


2. ActiveMQ

关于ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。 

使用ActiveMQ首先需要去官网下载,我用的是apache-activemq-5.11.1,下载后解压缩,会看到里面有个activemq-all-5.11.1.jar,这个jar是接下来我们写程序的时候需要加进工程里的。 

在使用ActiveMQ之前,首先得先启动,刚才解压后的目录中有个bin目录,里面有win32和win64两个目录,根据自己电脑选择其中一个打开运行activemq.bat启动ActiveMQ。 

启动完成后,在浏览器中输入http://127.0.0.1:8161/admin/来访问ActiveMQ的服务器,用户名和密码是admin/admin。如下:


image.png


我们等会儿主要是看Queues和Topics这两个选项,因为这就是上面提到的点对点和发布/订阅的查看窗口。 

OK,准备工作做好了,下面开始针对这两中消息类型写两个demo来跑一下。


3. 点对点消息

首先写消息的生产者,也就是发送消息。


public class JMSProducer {
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认用户名
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认密码
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; // 默认连接地址
    private static final int SENDNUM = 10; // 定义发送的消息数量
    public static void main(String[] args) {
        ConnectionFactory connectionFactory; // 连接工厂,用来生产Connection
        Connection connection = null; // 连接
        Session session; // 会话,接收或者发送消息的线程
        Destination destination; // 消息的目的地
        MessageProducer messageProducer; // 消息发送者
        // 实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
        try {
            connection = connectionFactory.createConnection(); // 通过连接工厂获取连接
            connection.start();  // 启动连接
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 获取Session
            destination = session.createQueue("FirstQueue1"); // 创建消息队列,名为FirstQueue1
            messageProducer = session.createProducer(destination); // 创建消息生产者
            sendMessage(session, messageProducer); // 发送消息
            session.commit(); // 因为上面加了事务Boolean.TRUE表示有事务,所以要commit
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
            if(connection != null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }           
        }
    }
    /**
     * @Description 发送消息
     * @param session
     * @param messageProducer
     * @throws JMSException 
     */
    public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
        for(int i = 0; i < JMSProducer.SENDNUM; i++) {
            TextMessage message = session.createTextMessage("ActiveMQ 发送的消息" + i);
            System.out.println("ActiveMQ 发送的消息" + i);
            messageProducer.send(message);
        }
    }
}



可以看出,主要的流程就是上面提到的JMS的一些对象模型,这个和JDBC很类似,接下来再写消息的消费者,也就是接收方。


public class JMSConsumer {
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认用户名
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认密码
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; // 默认连接地址
    public static void main(String[] args) {
        ConnectionFactory connectionFactory; // 连接工厂,用来生产Connection
        Connection connection = null; // 连接
        Session session; // 会话,接收或者发送消息的线程
        Destination destination; // 消息的目的地
        MessageConsumer messageConsumer; // 消息消费者
        // 实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
        try {
            connection = connectionFactory.createConnection(); // 通过连接工厂获取连接
            connection.start();  // 启动连接
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 获取Session,不需要加事务了
            destination = session.createQueue("FirstQueue1"); // 创建消息队列,名为FirstQueue1
            messageConsumer = session.createConsumer(destination); // 创建消息消费者
            //注册消息监听
            messageConsumer.setMessageListener(new Listener());
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } 
    }
}



从代码中可以看出,接收消息需要一个监听器,这个监听器可以自己来实现,需要实现MessageListener接口,如下:


public class Listener implements MessageListener{
    @Override
    public void onMessage(Message message) {
        try {
            System.out.println("收到的消息:" + ((TextMessage)message).getText());
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}


ok,现在就可以运行这两个程序了,首先运行JMSProducer,然后运行JMSConsumer就可以看到控制台的消息输出。然后打开ActiveMQ的服务器,可以看到如下信息: 


image.png


可以看出,总共产生了20条消息,消费了20条消息,这就是点对点的模式。


4. 订阅/发布消息

下面写一个订阅/发布消息的demo。和上面点对点类似,不过代码有一些细节上的差异,另外订阅方用两个Consumer来模拟一下,发布方用一个Producer来模拟。 

发布消息:


public class JMSProducer {
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认用户名
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认密码
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; // 默认连接地址
    private static final int SENDNUM = 10; // 定义发送的消息数量
    public static void main(String[] args) {
        ConnectionFactory connectionFactory; // 连接工厂,用来生产Connection
        Connection connection = null; // 连接
        Session session; // 会话,接收或者发送消息的线程
        Destination destination; // 消息的目的地
        MessageProducer messageProducer; // 消息发送者
        // 实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
        try {
            connection = connectionFactory.createConnection(); // 通过连接工厂获取连接
            connection.start();  // 启动连接
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 获取Session
            destination = session.createTopic("FirstTopic1"); // 创建消息队列,名为FirstTopic1
            messageProducer = session.createProducer(destination); // 创建消息生产者
            sendMessage(session, messageProducer); // 发送消息
            session.commit(); // 因为上面加了事务Boolean.TRUE表示有事务,所以要commit
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
            if(connection != null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }           
        }
    }
    /**
     * @Description 发布消息
     * @param session
     * @param messageProducer
     * @throws JMSException 
     */
    public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
        for(int i = 0; i < JMSProducer.SENDNUM; i++) {
            TextMessage message = session.createTextMessage("ActiveMQ 发布的消息" + i);
            System.out.println("ActiveMQ 发布的消息" + i);
            messageProducer.send(message);
        }
    }
}


和点对点有个细微的区别,这里是用session.createTopic,注意一下即可。 

订阅消息:


public class JMSConsumer {
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认用户名
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认密码
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; // 默认连接地址
    public static void main(String[] args) {
        ConnectionFactory connectionFactory; // 连接工厂,用来生产Connection
        Connection connection = null; // 连接
        Session session; // 会话,接收或者发送消息的线程
        Destination destination; // 消息的目的地
        MessageConsumer messageConsumer; // 消息消费者
        // 实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
        try {
            connection = connectionFactory.createConnection(); // 通过连接工厂获取连接
            connection.start();  // 启动连接
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 获取Session,不需要加事务了
            destination = session.createTopic("FirstTopic1"); // 创建消息队列,名为FirstTopic1
            messageConsumer = session.createConsumer(destination); // 创建消息消费者
            //注册消息监听
            messageConsumer.setMessageListener(new Listener());
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } 
    }
}


同样地,这里也是使用createTopic,另一个订阅方的代码和这个一样,就不写了,然后运行所有的JMSConsumer,再运行JMSProducer,就会看到控制台会打印出消息。再看一下服务器上的状态: 


image.png


我们可以看到,有两个订阅者,发布了10条消息,消费了20条。以上就是ActiveMQ使用的简单demo。




【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。