Pulsar消息积压topic级别策略老化的两种方案

举报
张俭 发表于 2023/12/11 11:01:42 2023/12/11
【摘要】 Pulsar像大多数消息中间件一样,支持按时间和大小对消息积压进行老化。但是默认的策略只能在namespace级别配置。本文将介绍如何在topic级别实现老化策略的两种方案。 方案一:开启 TopicLevelPolicy 来实现默认的策略配置通过在Zookeeper上配置对应的策略,可以通过./pulsar zookeeper-shell命令来登录zookeeper集群查询。但是如果将这一...

Pulsar像大多数消息中间件一样,支持按时间和大小对消息积压进行老化。但是默认的策略只能在namespace级别配置。本文将介绍如何在topic级别实现老化策略的两种方案。

方案一:开启 TopicLevelPolicy 来实现

默认的策略配置通过在Zookeeper上配置对应的策略,可以通过./pulsar zookeeper-shell命令来登录zookeeper集群查询。但是如果将这一实现方式扩展到topic级别,将会产生大量的(百万、千万级别)的ZooKeeper节点,这对于ZooKeeper集群来说几乎是不可接受的。因此,Pulsar提供了一种新的实现方式,即通过Topic来存储策略配置,而不是通过ZooKeeper来存储。

Pulsar,从2.7.0版本开始,引入了SystemTopic,用于存储Topic的元数据信息,包括Topic的策略配置。主题级策略使用户可以更灵活地管理主题,并不会给 ZooKeeper 带来额外负担。

您可以通过如下配置来开启TopicLevelPolicy

systemTopicEnabled=true
topicLevelPoliciesEnabled=true

然后通过set-backlog-quota命令来设置您想要的老化时间和老化大小

PS: 完整的一些功能,如命令行set-backlog-quota,在3.0.0版本中支持

方案二:通过自定义代码来实现

PulsarTopicLevelPolicy实现需要通过topic存储策略配置,而不是通过ZooKeeper来存储。在实际的极端场景下,Topic中存储的内容可能会丢失(因为未开启Bookkeeper立即落盘或磁盘文件损坏等原因),这将导致策略配置丢失,从而导致策略失效。因此,我们可以通过自定义代码来实现topic级别的策略配置,这样可以避免策略配置丢失的问题。

举个例子,业务可以将策略存放在Mysql中,然后通过PulsarAdmin API来让策略生效

自定义代码实现Backlog时间策略

ClientBrokerexpire-messages-all-subscriptions Requestexpire-messages-all-subscriptions ResponseloopClientBroker

自定义代码实现Backlog大小策略

ClientBrokerstats-internal Requeststats-internal Responseestimate the backlog positionget-message-by-id Requestget-message-by-idget the timestamp of the messageexpire-messages-all-subscriptions Requestexpire-messages-all-subscriptions Responsealt[messageBacklogSize <maxMessageBacklogSize][messageBacklogSize >=maxMessageBacklogSize]loopClientBroker
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。