【最佳实践】利用消息标签实现消息过滤
场景介绍
在实际场景中,一个队列存储的消息,可以包含不同实际用途。如果这些消息不加区分,消费者每次消费都会按顺序拉取消息,直到完成对所有消息的消费。
如果消费者只对某一类型的消息感兴趣,那么将所有消息都消费一遍势必影响消费者处理效率。
优化方案
DMS服务提供消息标签的能力,支持生产者为每条消息提供一个或多个标签(tag),消费者则根据标签(tag)的内容来过滤消息,确保每个消费者最终只会消费到它感兴趣的消息类型。
以金融交易场景为例,在一种交易中可能会产生多钟类型的消息,如股票(stock),基金(fund),贷款(loan)等。这些消息会通过交易(business)主题来传递给不同的处理系统,如股票系统,基金系统,贷款系统,实时分析系统等。然而基金系统只关心基金类型的消息,而实时分析系统可能需要获取到所有类型的消息。
在生产消息时,生产者对每条消息加上标签(tag),消费者在拉取消息时决定是否仅获取带有某标签(tag)的消息,从而提高消息消费效率。如图1所示:
DMS普通队列与FIFO队列支持消息标签(Tag)功能,Kafka队列不支持。
代码示例
以下仅贴出与消息标签相关代码,如需运行整个demo,请先下载完整的代码示例包,同时参考DMS开发指南进行部署和运行。
示例提供了基于Http Restful接口的代码,具体API接口可参考帮助中心分布式消息服务接口参考。
消息标签设计示例代码
package com.cloud.dms; import java.net.URL; import java.util.Properties; import com.cloud.dms.access.AccessServiceUtils; public class DMSHttpClient { private static String endpointUrl = ""; private static String region = ""; private static String serviceName = "dms"; private static String aKey = ""; private static String sKey = ""; private static String projectId = "546e52331ea74cd49722fda4fb23bf55"; private static String queueId = "39cd8dcb-b901-43b4-9ea1-48730e9adc58"; private static String queueGroupId = "g-ae8ed05f-464c-452c-9e37-d3bdd081000d"; /* * Read Configure File And Initialize Variables */ static { URL configPath = ClassLoader.getSystemResource("dms-service-config.properties"); Properties prop = AccessServiceUtils.getPropsFromFile(configPath.getFile()); region = prop.getProperty(Constants.DMS_SERVICE_REGION); aKey = prop.getProperty(Constants.DMS_SERVICE_AK); sKey = prop.getProperty(Constants.DMS_SERVICE_SK); endpointUrl = prop.getProperty(Constants.DMS_SERVICE_ENDPOINT_URL); if (endpointUrl.endsWith("/")) { endpointUrl = endpointUrl + "v1.0/"; } else { endpointUrl = endpointUrl + "/v1.0/"; } projectId = prop.getProperty(Constants.DMS_SERVICE_PROJECT_ID); } public static void main(String[] args) { runAllApiMethods(); } public static void runAllApiMethods() { MsgAttri msg = new MsgAttri(); msg.setaKey(aKey); msg.setEndpointUrl(endpointUrl); msg.setProjectId(projectId); msg.setQueueId(queueId); msg.setsKey(sKey); msg.setRegion(region); msg.setServiceName(serviceName); msg.setMsgLimit("10"); msg.setGroupId(queueGroupId); /** * 构造生产者和四种消费者,设置各自感兴趣的tag */ MsgProducer msgProducer = new MsgProducer(msg); MsgConsumer stock = new MsgConsumer(msg, "stock"); MsgConsumer fund = new MsgConsumer(msg, "fund"); MsgConsumer loan = new MsgConsumer(msg, "loan"); MsgConsumer all = new MsgConsumer(msg, null); /** * 创建线程,模拟生产和消费行为,设置线程的名字,便于区分 */ Thread producer = new Thread(msgProducer); Thread stockThread = new Thread(stock); Thread fundThread = new Thread(fund); Thread loanThread = new Thread(loan); Thread alls = new Thread(all); producer.setName("producer"); stockThread.setName("stock"); fundThread.setName("fund"); loanThread.setName("loan"); alls.setName("Analysis"); /** * 启动线程 */ producer.start(); stockThread.start(); fundThread.start(); // loanThread.start(); // alls.start(); } }
生产消息示例
package com.cloud.dms; import static com.cloud.dms.ApiUtils.constructTempMessages; import static com.cloud.dms.ApiUtils.sendMessages; import java.util.concurrent.TimeUnit; public class MsgProducer implements Runnable{ private MsgAttri msgAttri; public MsgProducer(MsgAttri msg) { this.msgAttri = msg; } public void run() { while (true) { /** * 模拟生产者,构造消息,JSON中包含stock, fund, loan三种消息 **/ String messages = constructTempMessages(null); sendMessages(messages, this.msgAttri); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { } } } }
消费消息示例代码
package com.cloud.dms; import static com.cloud.dms.ApiUtils.acknowledgeMessages; import static com.cloud.dms.ApiUtils.consumeMessages; import static com.cloud.dms.ApiUtils.parseHandlerIds; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; public class MsgConsumer implements Runnable{ private MsgAttri msgAttri; private String tag; /** *构造函数,获取到消费者感兴趣的tag **/ public MsgConsumer(MsgAttri msg, String tag) { this.tag = tag; this.msgAttri = msg; } public void run() { while (true) { /** *消费消息,获取该tag下的消息 **/ ResponseMessage consumeMessagesResMsg = consumeMessages(msgAttri, tag); /** *解析消息 **/ if (consumeMessagesResMsg.getStatusCode() == 200) { ListmsgStrings = ApiUtils.decodeMsg(consumeMessagesResMsg); /** *模拟处理消息,打印该tag下的消息 **/ for (String s : msgStrings) { System.out.println("Thread--"+ Thread.currentThread().getName() + "--Message Body is: "+ s); } /** * 消费确认 **/ ArrayList handlerIds = parseHandlerIds(consumeMessagesResMsg); if (handlerIds.size() > 0) { acknowledgeMessages(handlerIds, msgAttri); } } else { System.out.println("Http Response Code is: " + consumeMessagesResMsg.getStatusCode() + "\n Http Body is: " + consumeMessagesResMsg.getBody()); } try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { } } } }
示例运行结果如下
Loan线程由于指定了tag(loan),因此只能消费到loan标签的消息:
Fund和stock线程也同理只能消费到各自指定的消息:
而Analysi线程没有指定Tag,可以消费到Topic里的所有消息:
- 点赞
- 收藏
- 关注作者
评论(0)