Redis实现延迟任务 + RedisUtil升级
【摘要】 1.MassMailTask.java 延迟任务bean对象package com.sunxiansheng.user.delayQueue;import lombok.Data;import java.util.Date;/** * Description: 延迟任务bean * @Author sun * @Create 2024/7/25 12:32 * @Version 1.0 *...
1.MassMailTask.java 延迟任务bean对象
package com.sunxiansheng.user.delayQueue;
import lombok.Data;
import java.util.Date;
/**
* Description: 延迟任务bean
* @Author sun
* @Create 2024/7/25 12:32
* @Version 1.0
*/
@Data
public class MassMailTask {
private Long taskId;
private Date startTime;
}
2.MassMailTaskService.java
package com.sunxiansheng.user.delayQueue;
import com.alibaba.fastjson.JSON;
import com.sunxiansheng.redis.util.RedisUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.*;
import java.util.stream.Collectors;
/**
* Description:
* @Author sun
* @Create 2024/7/25 12:33
* @Version 1.0
*/
@Slf4j
@Service
public class MassMailTaskService {
/**
* 延时任务的key
*/
public static final String MASS_TASK_KEY = "massTaskMail";
@Resource
private RedisUtil redisUtil;
/**
* 将延时任务放到Redis中,注意:只有在当前时间之后的任务才能放进去
* @param massMailTask
*/
public void pushMassMailTaskQueue(MassMailTask massMailTask) {
Date startTime = massMailTask.getStartTime();
if (startTime == null) {
return;
}
if (startTime.compareTo(new Date()) <= 0) {
return;
}
log.info("定时任务加入队列,massTask:{}", JSON.toJSONString(massMailTask));
// 将延时任务的信息放到Redis的zset中
redisUtil.zAdd(MASS_TASK_KEY, massMailTask.getTaskId(), startTime.getTime());
}
/**
* 从redis中获取目前时间之前的任务信息,然后删除任务
* @return
*/
public Set<Long> poolMassTaskQueue() {
// 获取Zset中指定范围的任务信息
Set<Object> set = redisUtil.zRangeByScore(MASS_TASK_KEY, 0, System.currentTimeMillis());
if (CollectionUtils.isEmpty(set)) {
return Collections.emptySet();
}
redisUtil.zRemoveBySet(MASS_TASK_KEY, set);
return set.stream().map(n -> {
String string = n.toString();
return Long.parseLong(string);
}).collect(Collectors.toSet());
}
}
3.RedisUtil.java
package com.sunxiansheng.redis.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* Description: RedisUtil工具类
* @Author sun
* @Create 2024/6/5 14:17
* @Version 1.0
*/
@Component
public class RedisUtil {
private static final Logger logger = LoggerFactory.getLogger(RedisUtil.class);
@Resource
private RedisTemplate<String, Object> redisTemplate;
private static final String CACHE_KEY_SEPARATOR = ".";
/**
* 构建缓存key
* @param strObjs 多个字符串拼接成缓存key
* @return 拼接后的缓存key
*/
public String buildKey(String... strObjs) {
return String.join(CACHE_KEY_SEPARATOR, strObjs);
}
// =============================Common============================
/**
* 是否存在key
* @param key Redis中的key
* @return true如果key存在,否则false
*/
public boolean exists(String key) {
return execute(() -> redisTemplate.hasKey(key));
}
/**
* 删除key
* @param key Redis中的key
* @return true如果删除成功,否则false
*/
public boolean delete(String key) {
return execute(() -> redisTemplate.delete(key));
}
// =============================String============================
/**
* 设置key-value对
* @param key Redis中的key
* @param value 要设置的值
*/
public void set(String key, Object value) {
execute(() -> {
redisTemplate.opsForValue().set(key, value);
return null;
});
}
/**
* 设置key-value对,并设置过期时间
* @param key Redis中的key
* @param value 要设置的值
* @param timeout 过期时间
* @param unit 时间单位
*/
public void set(String key, Object value, long timeout, TimeUnit unit) {
execute(() -> {
redisTemplate.opsForValue().set(key, value, timeout, unit);
return null;
});
}
/**
* 设置key-value对,如果key不存在,则设置成功,并指定过期时间
* @param key Redis中的key
* @param value 要设置的值
* @param timeout 过期时间
* @param unit 时间单位
* @return true如果设置成功,否则false
*/
public boolean setIfAbsent(String key, Object value, long timeout, TimeUnit unit) {
return execute(() -> redisTemplate.opsForValue().setIfAbsent(key, value, timeout, unit));
}
/**
* 获取指定key的值
* @param key Redis中的key
* @param clazz 值的类型
* @return key对应的值
*/
public <T> T get(String key, Class<T> clazz) {
return execute(() -> castValue(redisTemplate.opsForValue().get(key), clazz));
}
/**
* 递增
* @param key Redis中的key
* @param delta 增量
*/
public void increment(String key, long delta) {
execute(() -> {
redisTemplate.opsForValue().increment(key, delta);
return null;
});
}
// =============================Hash============================
/**
* 向hash中存入数据
* @param key Redis中的key
* @param hashKey hash中的小key
* @param value hash中的小value
*/
public void hPut(String key, String hashKey, Object value) {
execute(() -> {
redisTemplate.opsForHash().put(key, hashKey, value);
return null;
});
}
/**
* 获取hash中的数据
* @param key Redis中的key
* @param hashKey hash中的小key
* @param clazz 值的类型
* @return hash中的小value
*/
public <T> T hGet(String key, String hashKey, Class<T> clazz) {
return execute(() -> castValue(redisTemplate.opsForHash().get(key, hashKey), clazz));
}
/**
* 获取hash中的所有数据
* @param key Redis中的key
* @return hash中的所有数据
*/
public Map<Object, Object> hGetAll(String key) {
return execute(() -> redisTemplate.opsForHash().entries(key));
}
/**
* 删除hash中的指定字段
* @param key Redis中的key
* @param hashKey hash中的小key
*/
public void hDelete(String key, Object... hashKey) {
execute(() -> {
redisTemplate.opsForHash().delete(key, hashKey);
return null;
});
}
/**
* 获取并删除hash中的所有数据
* @param key Redis中的key
* @return hash中的所有数据
*/
public Map<Object, Object> hGetAndDelete(String key) {
Map<Object, Object> map = new HashMap<>();
try (Cursor<Map.Entry<Object, Object>> cursor = redisTemplate.opsForHash().scan(key, ScanOptions.NONE)) {
while (cursor.hasNext()) {
Map.Entry<Object, Object> entry = cursor.next();
Object hashKey = entry.getKey();
Object hashValue = entry.getValue();
map.put(hashKey, hashValue);
redisTemplate.opsForHash().delete(key, hashKey);
}
} catch (Exception e) {
logger.error("Redis hGetAndDelete error: key={}", key, e);
}
return map;
}
// =============================List============================
/**
* 向list中左侧推入数据
* @param key Redis中的key
* @param value list中的值
*/
public void lPush(String key, Object value) {
execute(() -> {
redisTemplate.opsForList().leftPush(key, value);
return null;
});
}
/**
* 向list中右侧推入数据
* @param key Redis中的key
* @param value list中的值
*/
public void rPush(String key, Object value) {
execute(() -> {
redisTemplate.opsForList().rightPush(key, value);
return null;
});
}
/**
* 从list中左侧弹出数据
* @param key Redis中的key
* @param clazz 值的类型
* @return list中的值
*/
public <T> T lPop(String key, Class<T> clazz) {
return execute(() -> castValue(redisTemplate.opsForList().leftPop(key), clazz));
}
/**
* 从list中右侧弹出数据
* @param key Redis中的key
* @param clazz 值的类型
* @return list中的值
*/
public <T> T rPop(String key, Class<T> clazz) {
return execute(() -> castValue(redisTemplate.opsForList().rightPop(key), clazz));
}
/**
* 获取list中的指定范围的数据
* @param key Redis中的key
* @param start 起始位置
* @param end 结束位置
* @return list中的值
*/
public List<Object> lRange(String key, long start, long end) {
return execute(() -> redisTemplate.opsForList().range(key, start, end));
}
// =============================Set============================
/**
* 向set中添加数据
* @param key Redis中的key
* @param values set中的值
*/
public void sAdd(String key, Object... values) {
execute(() -> {
redisTemplate.opsForSet().add(key, values);
return null;
});
}
/**
* 获取set中的所有数据
* @param key Redis中的key
* @return set中的所有值
*/
public Set<Object> sMembers(String key) {
return execute(() -> redisTemplate.opsForSet().members(key));
}
/**
* 判断set中是否存在指定的值
* @param key Redis中的key
* @param value set中的值
* @return true如果存在,否则false
*/
public boolean sIsMember(String key, Object value) {
return execute(() -> redisTemplate.opsForSet().isMember(key, value));
}
/**
* 从set中随机弹出一个值
* @param key Redis中的key
* @return set中的值
*/
public Object sPop(String key) {
return execute(() -> redisTemplate.opsForSet().pop(key));
}
/**
* 获取set的大小
* @param key Redis中的key
* @return set的大小
*/
public Long sCard(String key) {
return execute(() -> redisTemplate.opsForSet().size(key));
}
// =============================ZSet============================
/**
* 向有序集合中添加元素
* @param key Redis中的key
* @param value 元素的值
* @param score 元素的分数
* @return true如果添加成功,否则false
*/
public boolean zAdd(String key, Object value, double score) {
return execute(() -> redisTemplate.opsForZSet().add(key, value, score));
}
/**
* 获取有序集合的元素数量
* @param key Redis中的key
* @return 元素数量
*/
public Long zCard(String key) {
return execute(() -> redisTemplate.opsForZSet().size(key));
}
/**
* 获取有序集合指定范围内的元素
* @param key Redis中的key
* @param start 起始位置
* @param end 结束位置
* @return 指定范围内的元素集合
*/
public Set<Object> zRange(String key, long start, long end) {
return execute(() -> redisTemplate.opsForZSet().range(key, start, end));
}
/**
* 删除有序集合中的指定元素
* @param key Redis中的key
* @param value 要删除的元素
* @return 被删除的元素数量
*/
public Long zRemove(String key, Object value) {
return execute(() -> redisTemplate.opsForZSet().remove(key, value));
}
/**
* 删除有序集合中的指定多个元素
* @param key Redis中的key
* @param values 要删除的元素列表
* @return 被删除的元素数量
*/
public Long zRemoveByList(String key, List<Object> values) {
return execute(() -> {
Long removedCount = 0L;
for (Object value : values) {
removedCount += redisTemplate.opsForZSet().remove(key, value);
}
return removedCount;
});
}
/**
* 删除有序集合中的指定多个元素
* @param key Redis中的key
* @param values 要删除的元素集合
* @return 被删除的元素数量
*/
public Long zRemoveBySet(String key, Set<Object> values) {
return execute(() -> {
Long removedCount = 0L;
for (Object value : values) {
removedCount += redisTemplate.opsForZSet().remove(key, value);
}
return removedCount;
});
}
/**
* 获取有序集合中指定元素的分数
* @param key Redis中的key
* @param value 元素的值
* @return 元素的分数
*/
public Double zScore(String key, Object value) {
return execute(() -> redisTemplate.opsForZSet().score(key, value));
}
/**
* 获取有序集合中指定分数范围内的元素
* @param key Redis中的key
* @param start 起始分数
* @param end 结束分数
* @return 指定分数范围内的元素集合
*/
public Set<Object> zRangeByScore(String key, double start, double end) {
return execute(() -> redisTemplate.opsForZSet().rangeByScore(key, start, end));
}
/**
* 增加有序集合中指定元素的分数
* @param key Redis中的key
* @param value 元素的值
* @param score 增加的分数
* @return 增加后的分数
*/
public Double zIncrementScore(String key, Object value, double score) {
return execute(() -> redisTemplate.opsForZSet().incrementScore(key, value, score));
}
/**
* 获取有序集合中指定元素的排名
* @param key Redis中的key
* @param value 元素的值
* @return 元素的排名
*/
public Long zRank(String key, Object value) {
return execute(() -> redisTemplate.opsForZSet().rank(key, value));
}
/**
* 获取有序集合中指定成员及其分数
* @param key Redis中的key
* @param start 起始位置(包含)
* @param end 结束位置(包含)
* @return Set<ZSetOperations.TypedTuple < Object>> 每个TypedTuple对象包含以下内容:value: 集合中的成员,score: 成员的分数。
*/
public Set<ZSetOperations.TypedTuple<Object>> zRangeWithScores(String key, long start, long end) {
return execute(() -> redisTemplate.opsForZSet().rangeWithScores(key, start, end));
}
/**
* 获取有序集合中指定分数范围内的成员及其分数
* @param key Redis中的key
* @param min 最小分数
* @param max 最大分数
* @return Set<ZSetOperations.TypedTuple < Object>> 每个TypedTuple对象包含以下内容:value: 集合中的成员,score: 成员的分数。
*/
public Set<ZSetOperations.TypedTuple<Object>> zRangeByScoreWithScores(String key, double min, double max) {
return execute(() -> redisTemplate.opsForZSet().rangeByScoreWithScores(key, min, max));
}
/**
* 获取有序集合中指定成员的分数范围排名
* @param key Redis中的key
* @param value 成员的值
* @return 成员的分数排名
*/
public Long zRevRank(String key, Object value) {
return execute(() -> redisTemplate.opsForZSet().reverseRank(key, value));
}
/**
* 获取有序集合中指定分数范围内的元素数量
* @param key Redis中的key
* @param min 最小分数
* @param max 最大分数
* @return 元素数量
*/
public Long zCount(String key, double min, double max) {
return execute(() -> redisTemplate.opsForZSet().count(key, min, max));
}
/**
* 移除有序集合中指定分数范围内的元素
* @param key Redis中的key
* @param min 最小分数
* @param max 最大分数
* @return 移除的元素数量
*/
public Long zRemoveByScore(String key, double min, double max) {
return execute(() -> redisTemplate.opsForZSet().removeRangeByScore(key, min, max));
}
/**
* 移除有序集合中指定排名范围内的元素
* @param key Redis中的key
* @param start 起始排名
* @param end 结束排名
* @return 移除的元素数量
*/
public Long zRemoveByRank(String key, long start, long end) {
return execute(() -> redisTemplate.opsForZSet().removeRange(key, start, end));
}
private <T> T execute(RedisOperation<T> operation) {
try {
return operation.execute();
} catch (Exception e) {
logger.error("Redis operation error", e);
return null;
}
}
@FunctionalInterface
private interface RedisOperation<T> {
T execute();
}
/**
* 转换类型:会对Long类型的特殊处理
*
* @param value
* @param clazz
* @return
* @param <T>
*/
public <T> T castValue(Object value, Class<T> clazz) {
if (value == null) {
return null;
}
if (clazz == Long.class && value instanceof Integer) {
return clazz.cast(((Integer) value).longValue());
}
return clazz.cast(value);
}
}
【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)