php+redis实现延迟队列(订单超时未支付。会员时间过期)
        【摘要】 
                    基于redis有序集实现延迟任务执行,比如某个时间给某个用户发短信,订单过期处理,等等 我是在tp5框架上写的,实现起来很简单,对于一些不是很复杂的应用足够了,目前在公司项目中使用,后台进程并没有实现多进程,   
1、命令行脚本 执行方法:php think delay-queue queuename(这是有序集的key) ...
    
    
    
    基于redis有序集实现延迟任务执行,比如某个时间给某个用户发短信,订单过期处理,等等
 我是在tp5框架上写的,实现起来很简单,对于一些不是很复杂的应用足够了,目前在公司项目中使用,后台进程并没有实现多进程,
  
1、命令行脚本 执行方法:php think delay-queue queuename(这是有序集的key)
  
   - 
    
     
    
    
     
      namespace app\command;
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
     
      use app\common\lib\delayqueue\DelayQueue;
     
    
 
   - 
    
     
    
    
     
      use think\console\Command;
     
    
 
   - 
    
     
    
    
     
      use think\console\Input;
     
    
 
   - 
    
     
    
    
     
      use think\console\Output;
     
    
 
   - 
    
     
    
    
     
      use think\Db;
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
     
      class DelayQueueWorker extends Command
     
    
 
   - 
    
     
    
    
     
      {
     
    
 
   - 
    
     
    
    
         const COMMAND_ARGV_1 = 'queue';
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
         protected function configure()
     
    
 
   - 
    
     
    
    
     
          {
     
    
 
   - 
    
     
    
    
             $this->setName('delay-queue')->setDescription('延迟队列任务进程');
     
    
 
   - 
    
     
    
    
             $this->addArgument(self::COMMAND_ARGV_1);
     
    
 
   - 
    
     
    
    
     
          }
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
         protected function execute(Input $input, Output $output)
     
    
 
   - 
    
     
    
    
     
          {
     
    
 
   - 
    
     
    
    
             $queue = $input->getArgument(self::COMMAND_ARGV_1);
     
    
 
   - 
    
     
    
    
             //参数1 延迟队列表名,对应与redis的有序集key名
     
    
 
   - 
    
     
    
    
             while (true) {
     
    
 
   - 
    
     
    
    
     
                  DelayQueue::getInstance($queue)->perform();
     
    
 
   - 
    
     
    
    
     
                  usleep(300000);
     
    
 
   - 
    
     
    
    
     
              }
     
    
 
   - 
    
     
    
    
     
          }
     
    
 
   - 
    
     
    
    
     
      }
     
    
 
  
 
 
库类目录结构
config.php 里是redis连接参数配置
RedisHandler.php只实现有序集的操作,重连机制还没有实现
  
   - 
    
     
    
    
     
      namespace app\common\lib\delayqueue;
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
     
      class RedisHandler
     
    
 
   - 
    
     
    
    
     
      {
     
    
 
   - 
    
     
    
    
         public $provider;
     
    
 
   - 
    
     
    
    
         private static $_instance = null;
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
         private function __construct() {
     
    
 
   - 
    
     
    
    
             $this->provider = new \Redis();
     
    
 
   - 
    
     
    
    
             //host port
     
    
 
   - 
    
     
    
    
             $config = require_once 'config.php';
     
    
 
   - 
    
     
    
    
             $this->provider->connect($config['redis_host'], $config['redis_port']);
     
    
 
   - 
    
     
    
    
     
          }
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
         final private function __clone() {}
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
         public static function getInstance() {
     
    
 
   - 
    
     
    
    
             if(!self::$_instance) {
     
    
 
   - 
    
     
    
    
                 self::$_instance = new RedisHandler();
     
    
 
   - 
    
     
    
    
     
              }
     
    
 
   - 
    
     
    
    
             return self::$_instance;
     
    
 
   - 
    
     
    
    
     
          }
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
         /**
     
    
 
   - 
    
     
    
    
     
       * @param string $key 有序集key
     
    
 
   - 
    
     
    
    
     
       * @param number $score 排序值
     
    
 
   - 
    
     
    
    
     
       * @param string $value 格式化的数据
     
    
 
   - 
    
     
    
    
     
       * @return int
     
    
 
   - 
    
     
    
    
     
       */
     
    
 
   - 
    
     
    
    
         public function zAdd($key, $score, $value)
     
    
 
   - 
    
     
    
    
     
          {
     
    
 
   - 
    
     
    
    
             return $this->provider->zAdd($key, $score, $value);
     
    
 
   - 
    
     
    
    
     
          }
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
         /**
     
    
 
   - 
    
     
    
    
     
       * 获取有序集数据
     
    
 
   - 
    
     
    
    
     
       * @param $key
     
    
 
   - 
    
     
    
    
     
       * @param $start
     
    
 
   - 
    
     
    
    
     
       * @param $end
     
    
 
   - 
    
     
    
    
     
       * @param null $withscores
     
    
 
   - 
    
     
    
    
     
       * @return array
     
    
 
   - 
    
     
    
    
     
       */
     
    
 
   - 
    
     
    
    
         public function zRange($key, $start, $end, $withscores = null)
     
    
 
   - 
    
     
    
    
     
          {
     
    
 
   - 
    
     
    
    
             return $this->provider->zRange($key, $start, $end, $withscores);
     
    
 
   - 
    
     
    
    
     
          }
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
         /**
     
    
 
   - 
    
     
    
    
     
       * 删除有序集数据
     
    
 
   - 
    
     
    
    
     
       * @param $key
     
    
 
   - 
    
     
    
    
     
       * @param $member
     
    
 
   - 
    
     
    
    
     
       * @return int
     
    
 
   - 
    
     
    
    
     
       */
     
    
 
   - 
    
     
    
    
         public function zRem($key,$member)
     
    
 
   - 
    
     
    
    
     
          {
     
    
 
   - 
    
     
    
    
             return $this->provider->zRem($key,$member);
     
    
 
   - 
    
     
    
    
     
          }
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
     
      }
     
    
 
  
 
 
延迟队列类
  
   - 
    
     
    
    
     
      namespace app\common\lib\delayqueue;
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
     
      class DelayQueue
     
    
 
   - 
    
     
    
    
     
      {
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
         private $prefix = 'delay_queue:';
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
         private $queue;
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
         private static $_instance = null;
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
         private function __construct($queue) {
     
    
 
   - 
    
     
    
    
             $this->queue = $queue;
     
    
 
   - 
    
     
    
    
     
          }
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
         final private function __clone() {}
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
         public static function getInstance($queue = '') {
     
    
 
   - 
    
     
    
    
             if(!self::$_instance) {
     
    
 
   - 
    
     
    
    
                 self::$_instance = new DelayQueue($queue);
     
    
 
   - 
    
     
    
    
     
              }
     
    
 
   - 
    
     
    
    
             return self::$_instance;
     
    
 
   - 
    
     
    
    
     
          }
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
         /**
     
    
 
   - 
    
     
    
    
     
       * 添加任务信息到队列
     
    
 
   - 
    
     
    
    
     
       *
     
    
 
   - 
    
     
    
    
     
       * demo DelayQueue::getInstance('test')->addTask(
     
    
 
   - 
    
     
    
    
     
       * 'app\common\lib\delayqueue\job\Test',
     
    
 
   - 
    
     
    
    
     
       * strtotime('2018-05-02 20:55:20'),
     
    
 
   - 
    
     
    
    
     
       * ['abc'=>111]
     
    
 
   - 
    
     
    
    
     
       * );
     
    
 
   - 
    
     
    
    
     
       *
     
    
 
   - 
    
     
    
    
     
       * @param $jobClass
     
    
 
   - 
    
     
    
    
     
       * @param int $runTime 执行时间
     
    
 
   - 
    
     
    
    
     
       * @param array $args
     
    
 
   - 
    
     
    
    
     
       */
     
    
 
   - 
    
     
    
    
         public function addTask($jobClass, $runTime, $args = null)
     
    
 
   - 
    
     
    
    
     
          {
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
             $key = $this->prefix.$this->queue;
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
             $params = [
     
    
 
   - 
    
     
    
    
                 'class' => $jobClass,
     
    
 
   - 
    
     
    
    
                 'args'  => $args,
     
    
 
   - 
    
     
    
    
                 'runtime' => $runTime,
     
    
 
   - 
    
     
    
    
     
              ];
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
     
              RedisHandler::getInstance()->zAdd(
     
    
 
   - 
    
     
    
    
                 $key,
     
    
 
   - 
    
     
    
    
                 $runTime,
     
    
 
   - 
    
     
    
    
     
                  serialize($params)
     
    
 
   - 
    
     
    
    
     
              );
     
    
 
   - 
    
     
    
    
     
          }
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
         /**
     
    
 
   - 
    
     
    
    
     
       * 执行job
     
    
 
   - 
    
     
    
    
     
       * @return bool
     
    
 
   - 
    
     
    
    
     
       */
     
    
 
   - 
    
     
    
    
         public function perform()
     
    
 
   - 
    
     
    
    
     
          {
     
    
 
   - 
    
     
    
    
             $key = $this->prefix.$this->queue;
     
    
 
   - 
    
     
    
    
             //取出有序集第一个元素
     
    
 
   - 
    
     
    
    
             $result = RedisHandler::getInstance()->zRange($key, 0 ,0);
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
             if (!$result) {
     
    
 
   - 
    
     
    
    
                 return false;
     
    
 
   - 
    
     
    
    
     
              }
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
             $jobInfo = unserialize($result[0]);
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
     
              print_r('job: '.$jobInfo['class'].' will run at: '. date('Y-m-d H:i:s',$jobInfo['runtime']).PHP_EOL);
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
             $jobClass = $jobInfo['class'];
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
             if(!@class_exists($jobClass)) {
     
    
 
   - 
    
     
    
    
     
                  print_r($jobClass.' undefined'. PHP_EOL);
     
    
 
   - 
    
     
    
    
     
                  RedisHandler::getInstance()->zRem($key, $result[0]);
     
    
 
   - 
    
     
    
    
                 return false;
     
    
 
   - 
    
     
    
    
     
              }
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
             // 到时间执行
     
    
 
   - 
    
     
    
    
             if (time() >= $jobInfo['runtime']) {
     
    
 
   - 
    
     
    
    
                 $job = new $jobClass;
     
    
 
   - 
    
     
    
    
                 $job->setPayload($jobInfo['args']);
     
    
 
   - 
    
     
    
    
                 $jobResult = $job->preform();
     
    
 
   - 
    
     
    
    
                 if ($jobResult) {
     
    
 
   - 
    
     
    
    
                     // 将任务移除
     
    
 
   - 
    
     
    
    
     
                      RedisHandler::getInstance()->zRem($key, $result[0]);
     
    
 
   - 
    
     
    
    
                     return true;
     
    
 
   - 
    
     
    
    
     
                  }
     
    
 
   - 
    
     
    
    
     
              }
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
             return false;
     
    
 
   - 
    
     
    
    
     
          }
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
     
      }
     
    
 
  
 
 
异步任务基类:
  
   - 
    
     
    
    
     
      namespace app\common\lib\delayqueue;
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
     
      class DelayJob
     
    
 
   - 
    
     
    
    
     
      {
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
         protected $payload;
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
         public function preform ()
     
    
 
   - 
    
     
    
    
     
          {
     
    
 
   - 
    
     
    
    
             // todo
     
    
 
   - 
    
     
    
    
             return true;
     
    
 
   - 
    
     
    
    
     
          }
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
         public function setPayload($args = null)
     
    
 
   - 
    
     
    
    
     
          {
     
    
 
   - 
    
     
    
    
             $this->payload = $args;
     
    
 
   - 
    
     
    
    
     
          }
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
     
      }
     
    
 
  
 
 
所有异步执行的任务都卸载job目录下,且要继承DelayJob,你可以实现任何你想延迟执行的任务
如:
  
   - 
    
     
    
    
     
      namespace app\common\lib\delayqueue\job;
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
     
      use app\common\lib\delayqueue\DelayJob;
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
     
      class Test extends DelayJob
     
    
 
   - 
    
     
    
    
     
      {
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
         public function preform()
     
    
 
   - 
    
     
    
    
     
          {
     
    
 
   - 
    
     
    
    
             // payload 里应该有处理任务所需的参数,通过DelayQueue的addTask传入
     
    
 
   - 
    
     
    
    
     
              print_r('test job'.PHP_EOL);
     
    
 
   - 
    
     
    
    
             return true;
     
    
 
   - 
    
     
    
    
     
          }
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
     
      }
     
    
 
  
 
 
使用方法:
假设用户创建了一个订单,订单在10分钟后失效,那么在订单创建后加入:
  
   - 
    
     
    
    
     
      DelayQueue::getInstance('close_order')->addTask(
     
    
 
   - 
    
     
    
    
          'app\common\lib\delayqueue\job\CloseOrder', // 自己实现的job
     
    
 
   - 
    
     
    
    
     
           strtotime('2018-05-02 20:55:20'), // 订单失效时间
     
    
 
   - 
    
     
    
    
     
           ['order_id'=>123456] // 传递给job的参数
     
    
 
   - 
    
     
    
    
     
       );
     
    
 
  
 
close_order 是有序集的key
命令行启动进程
php think delay-queue close_order
文章来源: blog.csdn.net,作者:咔咔-,版权归原作者所有,如需转载,请联系作者。
原文链接:blog.csdn.net/fangkang7/article/details/82849040
        【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
            cloudbbs@huaweicloud.com
        
        
        
        
        
        
        - 点赞
 - 收藏
 - 关注作者
 
            
           
评论(0)