【金蝶天燕鲲鹏云最佳实践 二】ApusicMQ v9消息中间件实践系列(二)客户端开发
一、 JMS介绍
JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,金蝶天燕Apusic消息中间件完全实现了 JMS1.1 规范。
1、 消息模型
点对点类型(PTP):生产者发送消息到队列中,消费者从消息队列中取走消息。消息一旦被一个消费者取走,该消息就从消息队列中删除。也就是说,一条消息仅能被一个消费者消费一次。
发布/订阅类型(Pub/Sub):发布者发布的一条消息可以被该消息的所有订阅者消费一次,即每个订阅者都有一次消费该消息的机会。
2、 消息结构
一个消息对象一般分成三个部分:消息头(Header)、消息属性(Properties)、消息体(Body)。
消息头:是一组标准键值字段,客户端和提供者都用它来标识和路由消息。Message 接口为获取和设置 Header 字段提供了相关方法,如 getJMSPriority()、setJMSReplyTo 等。
消息属性:消息属性是消息头的补充,它可以定义更多的信息,根据属性名和值对应的关系,支持的类型有:String、boolean、 byte、 double、 int、 long、 float。
消息体:消息正文,包含了发送给其他程序的消息内容,根据消息体类型的不同,JMS 拥有 5 个消息类型,并分别通过 Message 的 5 个子类接口来描述。
3、调用编程模型
父接口 | PTP 子接口 | Pub/Sub 子接口 |
ConnectionFactory | QueueConnectionFactory | TopicConnectionFactory |
Connection | QueueConnection | TopicConnection |
Destination | Queue | Topic |
Session | QueueSession | TopicSession |
MessageProducer | QueueSender | TopicPublisher |
MessageConsumer | QueueReceiver | TopicSubscriber |
二、 开发前准备
编写AMQ的 java 客户端程序需要的相关依赖 jar 包:
APUSIC_MQ_HOME\lib\amq-client.jar
APUSIC_MQ_HOME\li\ext\commons-net-3.5.jar
APUSIC_MQ_HOME\li\ext\bcprov-jdk15on-1.59.jar
APUSIC_MQ_HOME\li\ext\geronimo-*.jar
APUSIC_MQ_HOME\li\ext\hawtbuf-1.11.jar
APUSIC_MQ_HOME\lib\ext\netty-all-4.1.13.Final.jar
APUSIC_MQ_HOME\li\ext\slf4j-api-1.7.12.jar
将以上jar包加载到应用的 CLASS PATH 路径即可。
三、 JMS客户端开发
AMQ JMS 客户端开发的一般流程,适用于 PTP 模型和 Pub/Sub 模型。
1、 获取 ConnectionFactory 对象
ConnectionFactory 用于创建 Connection 对象,直接示例化 ConnectionFactory:
ApusicMQConnectionFactory cf = new ApusicMQConnectionFactory(url); |
url为AMQ服务的地址,如tcp://127.0.0.1:5888?jms.blobTransferPolicy.defaultUploadUrl=sftp://127.0.0.1:8026的连接串,参数后面为指定的用于文件传输的服务器。
2、 创建 Connection 对象
Connection 对象封装了一个和 AMQ 服务器之间的连接,通常是一个 Socket 连接。它主要用来创建Session,此外,它还可以用来唯一标识一个客户端,以及在创建的时候执行客户端认证工作。
Connection 对象是线程安全的,是重量级的对象,通常一个客户端只需要一个该对象。
获取方法:
Connection connection = connectionFactory.createConnection(); 当Connection 创建完成,它默认处于关闭状态,此时消息生产者可以发送消息,但消费者不能消费消息,需要调用 start()方法后才能消费。
当使用完连接的时候,你需要关闭连接,否则该连接会一直保持,调用close方法: connection.close(); |
3、 创建 Session 对象
Session 对象用来创建消息发送者和消息接受者的上下文,它是轻量级对象,并且不是线程安全的。
获取方法:
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
第一个参数表示是否启动本地事务。如果为 true,则使用该 Session 进行的消息操作将一直工作在事务环境下,AMQ 服务器跟踪各个发送和接收过程,并在 JMS 客户端发出 commit 调用时完成这些操作。如果事务中某个操作出现异常,客户端代码可以通过忽略异常重试或 rollback 整个事务来处理异常。每当调用 commit 或 rollback 后,结束当前事务,同时自动开始一个新的事务。
第二个参数表示消息的确认模式,取值范围是 Session.AUTO_ACKNOWLEDGE、Session.CLIENT_ ACKNOWLEDGE、Session.DUPS_OK_ACKNOWLEDGE
当使用完 session 后,一个良好的习惯是立刻关闭该 session:session.close();
connection 关闭的时候也会同时关闭该连接产生的所有 session。
4、 获取 Destination 对象
Destination 对象是指消息发送客户端的消息目标和消息接收客户端的消息来源。对于 PTP 类型,De stination 对应的类型为 Queue,对于 Pub/Sub 类型,Destination 对应的类型为 Topic。
标准 JMS API 创建:
//创建队列 Destination myQueue = session.createQueue("Your queue name"); //创建主题 Destination myTopic = session.createTopic("Your topic name"); |
5、 发送消息
创建 MessageProducer:
// destination为队列或主题名称 MessageProducer producer = session.createProducer(destination); |
创建并发送消息:
//创建文本消息对象 TextMessage message = session.createTextMessage(); //消息体赋值 message.setText("hello"); //发送消息 producer.send(message); |
6、 接收消息
创建 MessageConsumer
// destination为队列或主题名称 MessageConsumer consumer = session.createConsumer(destination); |
同步方式接收消息,该方法会导致当前线程阻塞,一直到有消息可以消费:
Message m = consumer.receive(); |
异步方式接收消息:
consumer.setMessageListener(new MessageListener(){ public void onMessage(Message msg){ //消息处理 } }); |
四、 Restful调用API
AMQ实现了RESTful API,它允许任何拥有web能力的设备通过HTTP请求进行消息的发送与消费。 发送消息时使用HTTP POST请求。消费消息时使用HTTP GET或者DELETE请求。
发送示例:http://127.0.0.1:7888/api/message/myqueue?type=queue&body=helloamq
Myqueue:为队列或主题名称
type:指定目的地类型, queue或者topic
body:指定发送的消息内容
接收示例:http://127.0.0.1:7888/api/message/myqueue?type=queue
五、 Spring框架集成
AMQ实现了与spring开发框架的集成,方便开发者进行快速开发。
在spring的xml配置文件中加入AMQ的工程类配置:
<bean id="amqConnectionFactory" class="com.apusic.amq.ApusicMQConnectionFactory"> <property name="brokerURL" value="tcp://127.0.0.1:5888)"/> </bean> <bean id="cachedConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory" > <property name="targetConnectionFactory" ref="amqConnectionFactory"/> <property name="sessionCacheSize" value="10"/> </bean> |
程序使用代码:
ConnectionFactory cf = (ConnectionFactory)context.getBean("cachedConnectionFactory"); Connection conn = cf.createConnection(); conn.start(); Session session = conn.createSession(false, 1); MessageProducer producer = session.createProducer(session.createQueue("testQueue"));
String msg = "hello"; producer.send(session.createTextMessage(msg));
|
- 点赞
- 收藏
- 关注作者
评论(0)