RocketMQ system busy

举报
赵KK日常技术记录 发表于 2023/06/24 14:26:03 2023/06/24
【摘要】 近期线上MQ持续发生了消息丢失的情况,因为磁盘扩容问题,在对mq broker进行升级,今天反馈某单未进行结算,也未产生异常,接到反馈开始定位。首先定位消费记录,发现并没有消费记录,然后进行单据状态查询,是正常节点状态,然后查询单据发送节点,发现满足发送条件,再事务提交后正常发送,接着查询发送记录,定位到错误如下:org.apache.rocketmq.client.exception.MQ...

近期线上MQ持续发生了消息丢失的情况,因为磁盘扩容问题,在对mq broker进行升级,今天反馈某单未进行结算,也未产生异常,接到反馈开始定位。

首先定位消费记录,发现并没有消费记录,然后进行单据状态查询,是正常节点状态,然后查询单据发送节点,发现满足发送条件,再事务提交后正常发送,接着查询发送记录,定位到错误如下:
org.apache.rocketmq.client.exception.MQBrokerException: CODE: 2  DESC: [REJECTREQUEST]system busy, start flow control for a while
For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
  at org.apache.rocketmq.client.impl.MQClientAPIImpl.processSendResponse(MQClientAPIImpl.java:556) ~[rocketmq-client-4.5.0.jar!/:4.5.0]
  at org.apache.rocketmq.client.impl.MQClientAPIImpl.access$000(MQClientAPIImpl.java:155) ~[rocketmq-client-4.5.0.jar!/:4.5.0]
  at org.apache.rocketmq.client.impl.MQClientAPIImpl$1.operationComplete(MQClientAPIImpl.java:396) ~[rocketmq-client-4.5.0.jar!/:4.5.0]
  at org.apache.rocketmq.remoting.netty.ResponseFuture.executeInvokeCallback(ResponseFuture.java:54) ~[rocketmq-remoting-4.5.0.jar!/:4.5.0]
  at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$2.run(NettyRemotingAbstract.java:309) ~[rocketmq-remoting-4.5.0.jar!/:4.5.0]
  at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
  at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
首先将类似单据重推后解决业务问题,定位原因,搜索全局异常如下
if (pair.getObject1().rejectRequest()) {
                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                    "[REJECTREQUEST]system busy, start flow control for a while");
                response.setOpaque(opaque);
                ctx.writeAndFlush(response);
                return;
}

在源码中位置

org.apache.rocketmq.remoting.netty.NettyRemotingAbstract

请在此添加图片描述

向上查询

public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        final RemotingCommand cmd = msg;
        if (cmd != null) {
            switch (cmd.getType()) {
                case REQUEST_COMMAND:
                    processRequestCommand(ctx, cmd);
                    break;
                case RESPONSE_COMMAND:
                    processResponseCommand(ctx, cmd);
                    break;
                default:
                    break;
            }
        }
    }
    
  class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
            processMessageReceived(ctx, msg);
        }
    }

结合近期mq在进行频繁broker内存参数调整,磁盘抽取,查询资料得知在broker中如下代码

org.apache.rocketmq.broker.latency.BrokerFastFailure#cleanExpiredRequest

private void cleanExpiredRequest() {
        while (this.brokerController.getMessageStore().isOSPageCacheBusy()) {
            try {
                if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) {
                    final Runnable runnable = this.brokerController.getSendThreadPoolQueue().poll(0, TimeUnit.SECONDS);
                    if (null == runnable) {
                        break;
                    }

                    final RequestTask rt = castRunnable(runnable);
                    rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", System.currentTimeMillis() - rt.getCreateTimestamp(), this.brokerController.getSendThreadPoolQueue().size()));
                } else {
                    break;
                }
            } catch (Throwable ignored) {
            }
        }

        cleanExpiredRequestInQueue(this.brokerController.getSendThreadPoolQueue(),
            this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue());

        cleanExpiredRequestInQueue(this.brokerController.getPullThreadPoolQueue(),
            this.brokerController.getBrokerConfig().getWaitTimeMillsInPullQueue());

        cleanExpiredRequestInQueue(this.brokerController.getHeartbeatThreadPoolQueue(),
            this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue());

        cleanExpiredRequestInQueue(this.brokerController.getEndTransactionThreadPoolQueue(), this
            .brokerController.getBrokerConfig().getWaitTimeMillsInTransactionQueue());
    }

客户端链接超过了默认等待时间,或者调大发送消息线程池的数量,默认值为1在mq的4.x后引进了相关配置,另外应在客户端配置发送失败重试。但主要原因是由于mq消息积压导致内存写入变慢超时了。随着集群扩展希望能得到解决,另外此类消息是会丢失消息的。

请在此添加图片描述

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。