【金蝶天燕鲲鹏云最佳实践 二】ApusicMQ v9消息中间件实践系列(二)客户端开发

举报
davidaer 发表于 2020/02/06 18:44:32 2020/02/06
【摘要】 金蝶天燕Apusic消息中间件完全实现了 JMS1.1 规范,应用程序可以使用 JMS 接口连接到 AMQ 服务器进行消息的发送和接收。提供java、c、c++等多种客户端API。为了方便应用原件调用提供了restful方式api进行收发消息;实现了与spring开发框架的集成。

一、 JMS介绍

JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,金蝶天燕Apusic消息中间件完全实现了 JMS1.1 规范。

1、 消息模型

点对点类型(PTP):生产者发送消息到队列中,消费者从消息队列中取走消息。消息一旦被一个消费者取走,该消息就从消息队列中删除。也就是说,一条消息仅能被一个消费者消费一次。

发布/订阅类型(Pub/Sub):发布者发布的一条消息可以被该消息的所有订阅者消费一次,即每个订阅者都有一次消费该消息的机会。

2、 消息结构

一个消息对象一般分成三个部分:消息头(Header)、消息属性(Properties)、消息体(Body)。

消息头:是一组标准键值字段,客户端和提供者都用它来标识和路由消息。Message 接口为获取和设置 Header 字段提供了相关方法,如 getJMSPriority()setJMSReplyTo 等。

消息属性:消息属性是消息头的补充,它可以定义更多的信息,根据属性名和值对应的关系,支持的类型有:Stringboolean byte double int long float

消息体:消息正文,包含了发送给其他程序的消息内容,根据消息体类型的不同,JMS 拥有 5 个消息类型,并分别通过 Message 5 个子类接口来描述。

3、调用编程模型

     图片.png

父接口

PTP 子接口

Pub/Sub 子接口

ConnectionFactory

QueueConnectionFactory

TopicConnectionFactory

Connection

QueueConnection

TopicConnection

Destination

Queue

Topic

Session

QueueSession

TopicSession

MessageProducer

QueueSender

TopicPublisher

MessageConsumer

QueueReceiver

TopicSubscriber


二、 开发前准备

编写AMQ java 客户端程序需要的相关依赖 jar 包:

     APUSIC_MQ_HOME\lib\amq-client.jar

     APUSIC_MQ_HOME\li\ext\commons-net-3.5.jar

     APUSIC_MQ_HOME\li\ext\bcprov-jdk15on-1.59.jar

     APUSIC_MQ_HOME\li\ext\geronimo-*.jar

     APUSIC_MQ_HOME\li\ext\hawtbuf-1.11.jar

     APUSIC_MQ_HOME\lib\ext\netty-all-4.1.13.Final.jar

     APUSIC_MQ_HOME\li\ext\slf4j-api-1.7.12.jar

   将以上jar包加载到应用的 CLASS PATH 路径即可。


三、 JMS客户端开发

AMQ JMS 客户端开发的一般流程,适用于 PTP 模型和 Pub/Sub 模型。

 1、 获取 ConnectionFactory 对象

   ConnectionFactory 用于创建 Connection 对象,直接示例化 ConnectionFactory

 

ApusicMQConnectionFactory cf = new ApusicMQConnectionFactory(url);

urlAMQ服务的地址,如tcp://127.0.0.1:5888?jms.blobTransferPolicy.defaultUploadUrl=sftp://127.0.0.1:8026的连接串,参数后面为指定的用于文件传输的服务器。

 

2、 创建 Connection 对象

Connection 对象封装了一个和 AMQ 服务器之间的连接,通常是一个 Socket 连接。它主要用来创建Session,此外,它还可以用来唯一标识一个客户端,以及在创建的时候执行客户端认证工作。

Connection 对象是线程安全的,是重量级的对象,通常一个客户端只需要一个该对象。

获取方法:

Connection connection = connectionFactory.createConnection();

当Connection 创建完成,它默认处于关闭状态,此时消息生产者可以发送消息,但消费者不能消费消息,需要调用 start()方法后才能消费。

 

当使用完连接的时候,你需要关闭连接,否则该连接会一直保持,调用close方法:

connection.close();

 

3、 创建 Session 对象

Session 对象用来创建消息发送者和消息接受者的上下文,它是轻量级对象,并且不是线程安全的。

获取方法:

Session session = connection.createSession(false,   Session.AUTO_ACKNOWLEDGE);


 

第一个参数表示是否启动本地事务。如果为 true,则使用该 Session 进行的消息操作将一直工作在事务环境下,AMQ 服务器跟踪各个发送和接收过程,并在 JMS 客户端发出 commit 调用时完成这些操作。如果事务中某个操作出现异常,客户端代码可以通过忽略异常重试或 rollback 整个事务来处理异常。每当调用 commit rollback 后,结束当前事务,同时自动开始一个新的事务。

第二个参数表示消息的确认模式,取值范围是 Session.AUTO_ACKNOWLEDGESession.CLIENT_ ACKNOWLEDGESession.DUPS_OK_ACKNOWLEDGE

当使用完 session 后,一个良好的习惯是立刻关闭该 session:session.close();

connection 关闭的时候也会同时关闭该连接产生的所有 session

 

4、 获取 Destination 对象

Destination 对象是指消息发送客户端的消息目标和消息接收客户端的消息来源。对于 PTP 类型,De stination 对应的类型为 Queue,对于 Pub/Sub 类型,Destination 对应的类型为 Topic

标准 JMS API 创建:

//创建队列

Destination   myQueue = session.createQueue("Your queue name");

//创建主题

Destination myTopic =   session.createTopic("Your topic name");


5、 发送消息

 

创建 MessageProducer

// destination为队列或主题名称

MessageProducer   producer = session.createProducer(destination);

  创建并发送消息:

//创建文本消息对象

TextMessage message =   session.createTextMessage();

//消息体赋值

message.setText("hello");

//发送消息

producer.send(message);

6、 接收消息

 

创建 MessageConsumer

// destination为队列或主题名称

MessageConsumer consumer   = session.createConsumer(destination);

同步方式接收消息,该方法会导致当前线程阻塞,一直到有消息可以消费:

Message m = consumer.receive();

异步方式接收消息:

consumer.setMessageListener(new   MessageListener(){

public void onMessage(Message msg){

//消息处理

}

});


 

四、 Restful调用API

     

AMQ实现了RESTful API,它允许任何拥有web能力的设备通过HTTP请求进行消息的发送与消费。 发送消息时使用HTTP POST请求。消费消息时使用HTTP GET或者DELETE请求。

发送示例:http://127.0.0.1:7888/api/message/myqueue?type=queue&body=helloamq

Myqueue:为队列或主题名称

type:指定目的地类型, queue或者topic

body:指定发送的消息内容

   接收示例:http://127.0.0.1:7888/api/message/myqueue?type=queue


五、 Spring框架集成

 AMQ实现了与spring开发框架的集成,方便开发者进行快速开发。

   在spring的xml配置文件中加入AMQ的工程类配置:

<bean   id="amqConnectionFactory"   class="com.apusic.amq.ApusicMQConnectionFactory">

<property   name="brokerURL" value="tcp://127.0.0.1:5888)"/>

</bean>

<bean   id="cachedConnectionFactory"   class="org.springframework.jms.connection.CachingConnectionFactory"

    >

<property   name="targetConnectionFactory"   ref="amqConnectionFactory"/>

<property   name="sessionCacheSize" value="10"/>

</bean>


程序使用代码:

ConnectionFactory cf =   (ConnectionFactory)context.getBean("cachedConnectionFactory");

Connection   conn = cf.createConnection();

conn.start();

Session   session = conn.createSession(false, 1);

MessageProducer   producer =   session.createProducer(session.createQueue("testQueue"));

 

String   msg = "hello";

producer.send(session.createTextMessage(msg));

 



【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。