ActiveMQ发布订阅模式
【摘要】
Topic主题发布和订阅消息
前面讲的案例都是点对点的消息,即一个生产者发送的一条消息只能被一个消费者消费,然后就移除了。
而topic模式一条消息可以被多个消费者订阅,关系如下:
定义生产者 ...
Topic主题发布和订阅消息
前面讲的案例都是点对点的消息,即一个生产者发送的一条消息只能被一个消费者消费,然后就移除了。
而topic模式一条消息可以被多个消费者订阅,关系如下:
定义生产者
package com.dpb.topic;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* ActiveMQ中的生产者(Producer)
* @author dengp
*
*/
public class MyProducer {
public void sendhello2ActiveMq(String messageText) {
TopicSession session = null;
TopicConnection conn = null;
try {
TopicConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.88.121:61616");
conn = factory.createTopicConnection();
conn.start();
session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建消息队列
Topic topic = session.createTopic("test-topic");
// 创建消息发送者
TopicPublisher publisher = session.createPublisher(topic);
// 设置持久化模式 NON_PERSISTENT不开启 PERSISTENT 开启 默认是开启
publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("name", "波波烤鸭");
mapMessage.setString("address", "深圳");
publisher.send(mapMessage);
// 提交会话
session.commit();
} catch (Exception e) {
e.printStackTrace();
System.out.println("访问ActiveMQ服务发生错误!!");
} finally {
try {
// 回收会话资源
if (null != session)
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
try {
// 回收链接资源
if (null != conn)
conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
定义消费者
package com.dpb.topic;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* ActiveMQ中的消费者(Consumer)
* @author dengp
*
*/
public class MyConsumer {
public void reciveHelloFormActiveMq() {
TopicSession session = null;
TopicConnection conn = null;
try {
TopicConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.88.121:61616");
conn = factory.createTopicConnection();
conn.start();
session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建消息队列
Topic topic = session.createTopic("test-topic");
// 创建消息接受者
MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message msg) {
if (msg != null) {
MapMessage map = (MapMessage) msg;
try {
System.out.println(map.getString("name") + "接收#" + map.getString("address"));
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
// 休眠100s再关闭
Thread.sleep(1000 * 100);
// 提交会话
session.commit();
} catch (Exception e) {
e.printStackTrace();
System.out.println("访问ActiveMQ服务发生错误!!");
} finally {
try {
// 回收会话资源
if (null != session)
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
try {
// 回收链接资源
if (null != conn)
conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
测试
先启动消费者,可以开启多个
public static void main(String[] args) {
MyConsumer con = new MyConsumer();
con.reciveHelloFormActiveMq();
}
- 1
- 2
- 3
- 4
启动生产者
public static void main(String[] args) {
MyProducer pro = new MyProducer();
pro.sendhello2ActiveMq("你好啊...topic");
}
- 1
- 2
- 3
- 4
好了本文介绍到此,下篇介绍ActiveMQ和Spring的整合
文章来源: dpb-bobokaoya-sm.blog.csdn.net,作者:波波烤鸭,版权归原作者所有,如需转载,请联系作者。
原文链接:dpb-bobokaoya-sm.blog.csdn.net/article/details/88625448
【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)