php+redis实现延迟队列(订单超时未支付。会员时间过期)

举报
原来是咔咔 发表于 2022/03/27 02:45:59 2022/03/27
【摘要】 基于redis有序集实现延迟任务执行,比如某个时间给某个用户发短信,订单过期处理,等等 我是在tp5框架上写的,实现起来很简单,对于一些不是很复杂的应用足够了,目前在公司项目中使用,后台进程并没有实现多进程,   1、命令行脚本 执行方法:php think delay-queue queuename(这是有序集的key) ...

基于redis有序集实现延迟任务执行,比如某个时间给某个用户发短信,订单过期处理,等等
我是在tp5框架上写的,实现起来很简单,对于一些不是很复杂的应用足够了,目前在公司项目中使用,后台进程并没有实现多进程,
 

1、命令行脚本 执行方法:php think delay-queue queuename(这是有序集的key)


  
  1. namespace app\command;
  2. use app\common\lib\delayqueue\DelayQueue;
  3. use think\console\Command;
  4. use think\console\Input;
  5. use think\console\Output;
  6. use think\Db;
  7. class DelayQueueWorker extends Command
  8. {
  9. const COMMAND_ARGV_1 = 'queue';
  10. protected function configure()
  11. {
  12. $this->setName('delay-queue')->setDescription('延迟队列任务进程');
  13. $this->addArgument(self::COMMAND_ARGV_1);
  14. }
  15. protected function execute(Input $input, Output $output)
  16. {
  17. $queue = $input->getArgument(self::COMMAND_ARGV_1);
  18. //参数1 延迟队列表名,对应与redis的有序集key名
  19. while (true) {
  20. DelayQueue::getInstance($queue)->perform();
  21. usleep(300000);
  22. }
  23. }
  24. }

库类目录结构
clipboard.png

config.php 里是redis连接参数配置

RedisHandler.php只实现有序集的操作,重连机制还没有实现


  
  1. namespace app\common\lib\delayqueue;
  2. class RedisHandler
  3. {
  4. public $provider;
  5. private static $_instance = null;
  6. private function __construct() {
  7. $this->provider = new \Redis();
  8. //host port
  9. $config = require_once 'config.php';
  10. $this->provider->connect($config['redis_host'], $config['redis_port']);
  11. }
  12. final private function __clone() {}
  13. public static function getInstance() {
  14. if(!self::$_instance) {
  15. self::$_instance = new RedisHandler();
  16. }
  17. return self::$_instance;
  18. }
  19. /**
  20. * @param string $key 有序集key
  21. * @param number $score 排序值
  22. * @param string $value 格式化的数据
  23. * @return int
  24. */
  25. public function zAdd($key, $score, $value)
  26. {
  27. return $this->provider->zAdd($key, $score, $value);
  28. }
  29. /**
  30. * 获取有序集数据
  31. * @param $key
  32. * @param $start
  33. * @param $end
  34. * @param null $withscores
  35. * @return array
  36. */
  37. public function zRange($key, $start, $end, $withscores = null)
  38. {
  39. return $this->provider->zRange($key, $start, $end, $withscores);
  40. }
  41. /**
  42. * 删除有序集数据
  43. * @param $key
  44. * @param $member
  45. * @return int
  46. */
  47. public function zRem($key,$member)
  48. {
  49. return $this->provider->zRem($key,$member);
  50. }
  51. }

延迟队列类


  
  1. namespace app\common\lib\delayqueue;
  2. class DelayQueue
  3. {
  4. private $prefix = 'delay_queue:';
  5. private $queue;
  6. private static $_instance = null;
  7. private function __construct($queue) {
  8. $this->queue = $queue;
  9. }
  10. final private function __clone() {}
  11. public static function getInstance($queue = '') {
  12. if(!self::$_instance) {
  13. self::$_instance = new DelayQueue($queue);
  14. }
  15. return self::$_instance;
  16. }
  17. /**
  18. * 添加任务信息到队列
  19. *
  20. * demo DelayQueue::getInstance('test')->addTask(
  21. * 'app\common\lib\delayqueue\job\Test',
  22. * strtotime('2018-05-02 20:55:20'),
  23. * ['abc'=>111]
  24. * );
  25. *
  26. * @param $jobClass
  27. * @param int $runTime 执行时间
  28. * @param array $args
  29. */
  30. public function addTask($jobClass, $runTime, $args = null)
  31. {
  32. $key = $this->prefix.$this->queue;
  33. $params = [
  34. 'class' => $jobClass,
  35. 'args' => $args,
  36. 'runtime' => $runTime,
  37. ];
  38. RedisHandler::getInstance()->zAdd(
  39. $key,
  40. $runTime,
  41. serialize($params)
  42. );
  43. }
  44. /**
  45. * 执行job
  46. * @return bool
  47. */
  48. public function perform()
  49. {
  50. $key = $this->prefix.$this->queue;
  51. //取出有序集第一个元素
  52. $result = RedisHandler::getInstance()->zRange($key, 0 ,0);
  53. if (!$result) {
  54. return false;
  55. }
  56. $jobInfo = unserialize($result[0]);
  57. print_r('job: '.$jobInfo['class'].' will run at: '. date('Y-m-d H:i:s',$jobInfo['runtime']).PHP_EOL);
  58. $jobClass = $jobInfo['class'];
  59. if(!@class_exists($jobClass)) {
  60. print_r($jobClass.' undefined'. PHP_EOL);
  61. RedisHandler::getInstance()->zRem($key, $result[0]);
  62. return false;
  63. }
  64. // 到时间执行
  65. if (time() >= $jobInfo['runtime']) {
  66. $job = new $jobClass;
  67. $job->setPayload($jobInfo['args']);
  68. $jobResult = $job->preform();
  69. if ($jobResult) {
  70. // 将任务移除
  71. RedisHandler::getInstance()->zRem($key, $result[0]);
  72. return true;
  73. }
  74. }
  75. return false;
  76. }
  77. }

异步任务基类:


  
  1. namespace app\common\lib\delayqueue;
  2. class DelayJob
  3. {
  4. protected $payload;
  5. public function preform ()
  6. {
  7. // todo
  8. return true;
  9. }
  10. public function setPayload($args = null)
  11. {
  12. $this->payload = $args;
  13. }
  14. }

所有异步执行的任务都卸载job目录下,且要继承DelayJob,你可以实现任何你想延迟执行的任务

如:


  
  1. namespace app\common\lib\delayqueue\job;
  2. use app\common\lib\delayqueue\DelayJob;
  3. class Test extends DelayJob
  4. {
  5. public function preform()
  6. {
  7. // payload 里应该有处理任务所需的参数,通过DelayQueue的addTask传入
  8. print_r('test job'.PHP_EOL);
  9. return true;
  10. }
  11. }

使用方法:

假设用户创建了一个订单,订单在10分钟后失效,那么在订单创建后加入:


  
  1. DelayQueue::getInstance('close_order')->addTask(
  2. 'app\common\lib\delayqueue\job\CloseOrder', // 自己实现的job
  3. strtotime('2018-05-02 20:55:20'), // 订单失效时间
  4. ['order_id'=>123456] // 传递给job的参数
  5. );

close_order 是有序集的key

命令行启动进程

php think delay-queue close_order

文章来源: blog.csdn.net,作者:咔咔-,版权归原作者所有,如需转载,请联系作者。

原文链接:blog.csdn.net/fangkang7/article/details/82849040

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。