秒发4000万数据的缓存方案以及消息优先消费

举报
赵KK日常技术记录 发表于 2023/06/29 23:08:41 2023/06/29
【摘要】 诱发问题:MQ是否设置了消息消费顺序?我发现在mq并发进入消费时并不能保证消息的消费顺序,此时如果同时一万线程对一个生产者一个消费者的一个队列业务互斥进行消费,此时的消费顺序是无序的,同一时刻会造成互斥数据同时存在多份,且发生率高达10%,而目前能想到的解决方案是,redis的消息是存取很快,且有顺序的,所以把mq消费的方法加了分布式锁,但是这效率能不能再次保证呢?不能的,因为你mq存在的意...

诱发问题:MQ是否设置了消息消费顺序?

我发现在mq并发进入消费时并不能保证消息的消费顺序,此时如果同时一万线程对一个生产者一个消费者的一个队列业务互斥进行消费,此时的消费顺序是无序的,同一时刻会造成互斥数据同时存在多份,且发生率高达10%,而目前能想到的解决方案是,redis的消息是存取很快,且有顺序的,所以把mq消费的方法加了分布式锁,但是这效率能不能再次保证呢?不能的,因为你mq存在的意义就是异步消费,如果不是互斥线程,那锁起来反而影响了性能,此时,就要看此时的数据显示重不重要?如果数据重要就锁,如果不重要,最终数据一致就可以了。

那么当每秒4000万数据进行消费,且此时需要对数据进行过滤时如何设计?

图片

L1缓存:Springcache+j2cache

L2缓存:redis

由于大量的缓存读取会导致 L2 的网络成为整个系统的瓶颈,因此 L1 的目标是降低对 L2 的读取次数,

读取顺序 -> L1 -> L2 -> DB

J2Cache 默认使用 Caffeine 作为一级缓存,使用 Redis 作为二级缓存

但是由于此类数据中是存在部分黑名单数据的,此时需要将数据过滤后才能缓存然后消费。

过滤大key数据

放入缓存key分片

队列+mq根据消息优先级去消费

二级缓存

过滤大key数据

由于瞬时数据将达到4000万,一开始的想法是能不能用bitmap去存放数据,不仅不占内存,且基于redis标识位速度也快,但是此时的单个对象过大,且不适合bitmap存储,所以引入布隆过滤。但注意布隆过滤是存在误伤率的。

另外布隆存储的占用空间也是bit的占用空间非常小。

/**

  • 判断字符串是否在单片位数组中存在
  • @param set 单片位数组
  • @param mod 某个hash函数的值
  • @param hash 字符串转化后的hashcode
    */
    public static boolean setBitMap(int[] set, long mod, long hash){
    boolean isExist = false;
    /*字符串在位数组中的位置/
    long bit = hash%mod;
    /*int类型的位置/
    long site = bit >> 5;
    /*int类型中32位的第mod1位/
    long mod1 = bit & 31;
    /**位数组中该字符串的位置是否为1
    • 如果是则退出时exits+1,否则将该位置变成1
    • */
      try {
      if(((set[(int) site] >> mod1) & 1) == 1){
      isExist = true;
      }
      else {
      set[(int) site] = set[(int) site] | (1 << mod1);
      }
      } catch (Exception e) {
      }
      return isExist;
      }
      而这里的黑名单规则则由业务规则去设置

key分片

由于数据的特殊性,将大量数据按着尾号的后两位进行分片,00-99共计100片,此时每片的数据如果平均分配后,先过滤后处理的原则,则会更快的处理完。

队列+mq根据消息优先级去消费

一开始的想法是用priorty_queue去指定优先级,搭配线程池去消费,而小伙伴是按通道优先级去取的,此时给消息mq设置优先级,搭配线程池去拉

那么此时的消息如何进行插队呢?

有一条或N条消息,现在想发送,但是优先级最高的队列已经有5000万条消息在消费了?

每个通道都按优先级分组了,都有对应的队列,消费者这边会优先取优先级高的队列数据,所以相同优先级的消息是依次消费的。如果实在想发,就相当于组合索引都命中一样,定义组合优先级。

另一种写法

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.support.collections.DefaultRedisList;
import org.springframework.stereotype.Component;
import java.io.Serializable;
import java.util.StringJoiner;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

@Component
public class RedisQueue implements ApplicationListener<ContextRefreshedEvent> {

@Override
public void onApplicationEvent(ContextRefreshedEvent event) {

// addTestData();
new Thread(() -> {
//优化成线程池方式异步执行,千万不要阻塞主线程
while (true){
try {
handler((message -> {
System.out.println(message);
return null;
}),(message -> {
try {
//理论是永远不会执行
System.out.println(“no message wait”);
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}),maxAll,max,middle,min);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
public void handler(Function<Message,Object> callBack,Function<Message,Object> noMessageCallBack,DefaultRedisList<Message> …queues) throws Exception {
notify.take();
for(DefaultRedisList<Message> queue : queues){
if(queue.size() > 0){
Message take = queue.take();
callBack.apply(take);
return;
}
}
noMessageCallBack.apply(null);
}

private void addTestData() {
    for(int i = 0 ;i < 2 ; i ++){
        maxAll.add(new Message("maxAll:" + UUID.randomUUID().toString(),"maxAll"));
    }

    for(int i = 0 ;i < 100 ; i ++){
        max.add(new Message("max:" + UUID.randomUUID().toString(),"max"));
    }

    for(int i = 0 ;i < 1000 ; i ++){
        middle.add(new Message("middle:" + UUID.randomUUID().toString(),"middle"));
    }

    for(int i = 0 ;i < 10000 ; i ++){
        min.add(new Message("min:" + UUID.randomUUID().toString(),"min"));
    }
}

static class Message implements Serializable {
    private String msg;
    private String type;

    @Override
    public String toString() {
        return new StringJoiner(", ", Message.class.getSimpleName() + "[", "]")
                .add("msg='" + msg + "'")
                .add("type='" + type + "'")
                .toString();
    }

    public Message(){
    }

    public Message(String msg, String type) {
        this.msg = msg;
        this.type = type;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }
}

public static final String PRIORIT_MAX_ALL = "MAX_ALL";
public static final String PRIORIT_MAX = "MAX";
public static final String PRIORIT_MIDDLE = "MIDDLE";
public static final String PRIORIT_MIN = "MIN";

public static final String NOTIFY = "notify";

private NotifyDefaultRedisList<Message> maxAll,max,min,middle;
private DefaultRedisList<String> notify;

public RedisQueue(@Autowired ApplicationContext context, @Autowired RedisTemplate redisTemplate){
    notify = new DefaultRedisList(NOTIFY,redisTemplate);
    maxAll = new NotifyDefaultRedisList(notify,PRIORIT_MAX_ALL,redisTemplate);
    max = new NotifyDefaultRedisList(notify,PRIORIT_MAX,redisTemplate);
    middle = new NotifyDefaultRedisList(notify,PRIORIT_MIDDLE,redisTemplate);
    min = new NotifyDefaultRedisList(notify,PRIORIT_MIN,redisTemplate);
}

static class NotifyDefaultRedisList<T> extends DefaultRedisList<T> {

    private DefaultRedisList<String> notify;

    public NotifyDefaultRedisList(DefaultRedisList<String> notify,String key, RedisOperations<String, T> operations) {
        super(key, operations);
        this.notify = notify;
    }

    @Override
    public boolean add(T value) {
        this.notify.add(UUID.randomUUID().toString());
        return super.add(value);
    }

}

}
此demo可直接引入项目随项目启动测试,想测试无任务时需要清理redis数据,目前未压测,待优化

二级缓存

Springcache注解整合J2cache减轻redis的压力,如果是ehcache的话会发生ehcache在get的时候为什么会触发缓存超时,会把Redis二级缓存给清除掉

解决方案

https://www.oschina.net/question/8729_224337
以上都来源于真实案例,想法不一,解决方案不一,仅供参考,待优化

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。