RabbitMq从入门到精通- PublisherCallbackChannel is closed
消费失败:[org.springframework.amqp.AmqpException: PublisherCallbackChannel is closed
rabbitmq: PublisherCallbackChannel is closed
做的一个报表项目,需要每天定时生成报表并推送,设计的方案是每天凌晨生成分片(分片,即所有报表的一部分,全部分片就是报表的全量数据,每个分片50个报表中首尾报表的id);计算出来的分片发到rabbitmq A队列(主角以路人甲的面孔出现),为了分流到下游的各个服务实例(k8s中的pod),每个服务实例再根据分片取出报表进行生成;生成结束,再将分片发送到rabbitmq实现的延迟队列B,实现延迟发送,刚完成方案时,反复测试一切正常
在某一个luck day, 当把分片调到10个时,大奖出现了:报表被重复推送,在延迟队列B中可以看到重复的分片数据,第一个怀疑的就是生成分片有问题 ,生成分片经过数不清的细节修改之后,感觉已经随时可能有问题,但一路排查下来,反复查看分片日志没有发现问题,源码看了又看,也没有发现问题(太不自信导致浪费了大把时间),又往下游排查:根据分片生成报表,从这个流程里的日志可以看到接受到了来着A队列的重复分片,那就是上游:生成分片有问题了??又往后排查,怎么都看不出问题,排查了大半天没有一点进展,一切都是怀疑,就是拿不出证据(这时候除了吃个苹果,喝个水没有任何活动,整个人木在屏幕前);可能是这份真诚感动了上苍,突然一个念头闪过,为啥不看看rabbitmq日志,说不定有线索,(还好之前有输出spring amqp的日志文件,不然不知道猴年马月才能找到背后凶手);more springamqp.log, 一大堆exception闪闪发光,这是猿生以来,第一次感觉exception这么可爱,这一堆exception中,”PublisherCallbackChannel is closed“瞬间抓住了我的眼(这家伙好像之前碰到过,但是后面突然有消失了)
顺着线索,从stack overflow一路摸到了官网;一边看人家的api guide, 一边擦汗;感觉专门就是针对我的情况发的手册(nb,大牛不光得会写代码,还得会预测):
Channels and Concurrency Considerations (Thread Safety) As a rule of thumb, sharing Channel instances between threads is something to be avoided. Applications should prefer using a Channel per thread instead of sharing the same Channel across multiple threads.
While some operations on channels are safe to invoke concurrently, some are not and will result in incorrect frame interleaving on the wire, double acknowledgements and so on.
Concurrent publishing on a shared channel can result in incorrect frame interleaving on the wire, triggering a connection-level protocol exception and immediate connection closure by the broker. It therefore requires explicit synchronization in application code (Channel#basicPublish must be invoked in a critical section). Sharing channels between threads will also interfere with Publisher Confirms. Concurrent publishing on a shared channel is best avoided entirely, e.g. by using a channel per thread.
It is possible to use channel pooling to avoid concurrent publishing on a shared channel: once a thread is done working with a channel, it returns it to the pool, making the channel available for another thread. Channel pooling can be thought of as a specific synchronization solution. It is recommended that an existing pooling library is used instead of a homegrown solution. For example, Spring AMQP which comes with a ready-to-use channel pooling feature.
Channels consume resources and in most cases applications very rarely need more than a few hundreds open channels in the same JVM process. If we assume that the application has a thread for each channel (as channel shouldn't be used concurrently), thousands of threads for a single JVM is already a fair amount of overhead that likely can be avoided. Moreover a few fast publishers can easily saturate a network interface and a broker node: publishing involves less work than routing, storing and delivering messages.
A classic anti-pattern to be avoided is opening a channel for each published message. Channels are supposed to be reasonably long-lived and opening a new one is a network round-trip which makes this pattern extremely inefficient.
Consuming in one thread and publishing in another thread on a shared channel can be safe.
Server-pushed deliveries (see the section below) are dispatched concurrently with a guarantee that per-channel ordering is preserved. The dispatch mechanism uses a java.util.concurrent.ExecutorService, one per connection. It is possible to provide a custom executor that will be shared by all connections produced by a single ConnectionFactory using the ConnectionFactory#setSharedExecutor setter.
When manual acknowledgements are used, it is important to consider what thread does the acknowledgement. If it's different from the thread that received the delivery (e.g. Consumer#handleDelivery delegated delivery handling to a different thread), acknowledging with the multiple parameter set to true is unsafe and will result in double-acknowledgements, and therefore a channel-level protocol exception that closes the channel. Acknowledging a single message at a time can be safe.
一大堆英文,看的真是头疼,没办法,自己挖的坑,一边查牛津一边脑海中重演事故现场: rabbtimq channel不是线程安全的,多个线程共享一个channel,某些操作会导致并发问题;所以官网的建议是:by using a channel per thread.(总算有认识的单词了:每个线程使用一个channel)。回到项目中,可以在rabbitmq管理页面看到队列A情况:一个服务实例只有一个channel,而prefetch是250,当多个分片消息发送问题就来了
consumer.PNG
每个实例能够fetch多达250条消息,但是每个实例只有一个channel,一个线程处理一条消息,导致了多个线程共用了这个channel,结果就是在线程处理完生成报表,channel.basicAck时出现了PublisherCallbackChannel is closed异常,但是在发生异常之前,已经将分片发送到延迟队列B;而这个异常发生后重新创建channel, 新的channel重新发送没有成功ack的消息,所以出现了重复的分片消息
这次事故的导火索是channel只有一个,而prefetch为250,其实项目之前配置的是prefetch是8,MessageListenerContainer.concurrency是4-8;但是后面升级了spring-boot(2.0.5 --> 2.1.4),spring-amqp和rabbit client也跟着升级,而新版客户端配置覆盖了项目之前的配置(估计删除队列让服务重新声明队列能够恢复之前的配置),为了保险起见,将客户端版本调回老版本,并且在@RabbitListener注解中指明concurrency='8',
并且官网建议:channel.basicAck不要批量,即第二个参数false
保证一个线程使用一个信道,问题完美解决
但是rabbit配置的是10个线程
文章来源: blog.csdn.net,作者:隔壁老瓦,版权归原作者所有,如需转载,请联系作者。
原文链接:blog.csdn.net/wxb880114/article/details/105978614
- 点赞
- 收藏
- 关注作者
评论(0)