原理
在Redis中,Zset结构中的每个元素都是由一个成员和一个成员相关联的分值组成,其中成员以字符串方式存储,分值(sorce)以64位双精度浮点数格式存储。从而利用Redis的Zset类型的分值(score)来实现延迟队列,主要将时间戳作为score,用于进行延迟任务的查询。
主要使用RedisTemplate的 *** 如下:
//加入队列使用: redisTemplate.opsForZSet().add()
//从队列中取出任务: Set<ZSetOperations.TypedTuple<String>> tuples = redisTemplate .opsForZSet() .rangeWithScores(key, 0, now);
使用场景
如:依赖接口存在读取数据延迟,之一次请求时会失败,这时可以考虑使用延迟队列,将失败任务放入延迟队列中,等过几秒再重试,以保证数据的成功
代码示例
将任务主键放入zset***中,将当前时间+1s作为score存入zset***中,意思是延迟1s
public boolean addToDelayQueue(String bussinessId, String key ,Integer countInt) { if (countInt < LIMIT_INVOKE_SIZE) { Calendar instance = Calendar.getInstance(); instance.add(Calendar.SECOND, 1); long delaySeconds = instance.getTimeInMillis() / 1000; Integer newCount = countInt + 1; String bussinessCountNew = bussinessId + "_" + newCount; redisTemplate.opsForZSet() .add(key, bussinessCountNew, delaySeconds); } }
从队列中取出任务
public void doDelayQueueWork(){ // 获取当前时间的时间戳 long now = System.currentTimeMillis() / 1000; //long now = LocalDateTime.now().toEpochSecond(ZoneOffset.of("+8")); // 获取当前时间前的任务列表 Set<ZSetOperations.TypedTuple<String>> tuples = redisTemplate.opsForZSet().rangeWithScores(key, 0, now); if (CollectionUtils.isNotEmpty(tuples)) { for (ZSetOperations.TypedTuple<String> tuple : tuples) { String bussinessIdCount = tuple.getValue(); if (StringUtils.isNotBlank(bussinessIdCount)) { String bussinessId = StringUtils.split(bussinessIdCount, "_")[0]; String count = StringUtils.split(bussinessIdCount, "_")[1]; Integer countInt = Integer.valueOf(count); try { if (redisTemplate.opsForZSet().remove(key, bussinessIdCount) > 0 && countInt <= LIMIT_INVOKE_SIZE) { //业务 *** boolean successFlag = doBusiness(); if (!successFlag) { addToDelayQueue(key, idBill, countInt); } } } catch (Exception e) { log.info("从队列中取出数据错误:{}", *** ONObject.to *** ONString(tuple)); // 重新加入队列中 addToDelayQueue(key, bussinessId, countInt); } } } } }