❤️【Python从入门到精通】(二十四)Python定时执行任务的姿势❤️

举报
码农飞哥 发表于 2021/09/20 17:20:04 2021/09/20
【摘要】 schedule模块在定时任务中还是有用的

您好,我是码农飞哥,感谢您阅读本文,欢迎一键三连哦
本文将介绍Python定时执行任务的几种方式。
干货满满,建议收藏,需要用到时常看看。 小伙伴们如有问题及需要,欢迎踊跃留言哦~ ~ ~。

@[TOC]

前言

实际开发中我们经常要定时执行一些任务。比如:在一个支付系统中要定时执行账单清算任务,定时关闭未支付的订单等等。针对这些场景Python应付起来也是游刃有余的。这里主要有三个模块来执行。这三个模块或者类分别如下:

Timer类

在threading模块中有Timer类,这个类可以用于控制指定函数在特定时间内执行一次。如下下面示例中就是在2秒后执行一次say_hello函数:

from threading import Timer
import threading
def say_hello():
    print(threading.current_thread().name + '码农飞哥,你好')

t = Timer(2, say_hello)
t.start()
print(threading.current_thread().name + '主线程执行完')

运行结果是:MainThread主线程执行完 Thread-1码农飞哥,你好。 从运行结果可以看出Timer是另起线程执行方法的,不会阻塞主线程的执行。
Timer类的弊端就是太简单了,只能执行一次任务,如果要重复执行任务的话,必须要通过循环来实现。针对这种情况在threading模块中提供了scheduler类。

scheduler类

sched模块的scheduler类就可以实现在指定间隔内多次执行任务。其语法结构是:scheduler(timefunc=time.monotonic,delayfunc=time.sleep) 可以向该构造方法中传入2个参数(当然也可以不指定),其中timefunc:指定生成时间戳的函数,默认使用time.monotonic来生成时间戳。
delayfunc: 在未到达指定时间前,通过该参数可以指定阻塞任务的函数,默认采用time.sleep()函数来阻塞程序。
这个类有三个方法,分别是:
1.scheduler.enter(delay, priority, action, argument=(), kwargs=_sentinel) 这个方法的作用是在time规定的时间后,执行action参数指定的函数,其中argument和kwargs负责action指定的函数传参,priority参数指定要执行任务的等级,当同一时间点多个任务要执行时,等级越高(priority值越小)的任务会优先执行,该函数会返回一个event,可用来取消该任务。
2.scheduler.cancel(event): 取消event任务。需要注意的是如果event参数执行的任务不存在时,则会报ValueError错误。
3.scheduler.run(blocking=True):运行所有需要调度的任务,如果调用该方法的blocking参数为True。该方法将会阻塞线程,直到所有被调度的任务都执行完成。
举个例子说明下吧:

import threading
from sched import scheduler


def action(arg):
    print(threading.current_thread().name + ' ' + arg)


def thread_action(*add):
    # 创建任务调度对象
    sche = scheduler()
    # 定义优先级
    i = 1

    for arc in add:
        # 指定1秒后执行action函数
        sche.enter(1, i, action, argument=(arc,))
        i = i + 1
    # 执行所有调度的任务
    sche.run()


# 定义为线程方法传入的参数
my_tuple = ("等级1的任务", '等级2的任务', '等级3的任务')
thread = threading.Thread(target=thread_action, args=my_tuple)
thread.start()

运行结果是:

Thread-1 等级1的任务
Thread-1 等级2的任务
Thread-1 等级3的任务

这种虽然可以在指定间隔的时间内重复执行任务,但是还是不够好使,对于一些比较复杂的定时任务,比如指定每天10点半执行某个任务。每周一执行某个任务则束手无策了。这时候就需要请出schedule模块了。

schedule模块

首先通过 pip install schedule 来安装schedule模块。这个模块的使用也很简单。可以用函数式的方式来调用方法。比如:要每分钟执行方法add,那么它的写法是:
schedule.every(1).minutes.do(add): 其各个方法的解释如下:
every(interval: int = 1): 表示时间间隔,默认是1。
minutes: 表示时间单位是分钟,如果是秒的话则是seconds,小时的话就是hours,天的话则是days。如果指定周一的话就是monday
do:表示需要指定的方法。do(self, job_func: Callable, *args, **kwargs) 其中 job_func表示要调用的方法,*args传入关键字参数,传入的是元组,**kwargs 传入的是字典。
语法格式基本大同小异。

import schedule
import time


def job(name):
    print("他的名字是:", name)


name = "码农飞哥"
# 每隔1分钟执行一次任务
schedule.every(1).minutes.do(job, name)
# 每隔一小时执行一次任务
schedule.every().hour.do(job, name)
# 每天10:30执行一次任务
schedule.every().day.at("10:30").do(job, name)
# 每隔5到10天执行一次任务
schedule.every(5).to(10).days.do(job, name)
# 每周一执行一次任务
schedule.every().monday.do(job, name)
# 每周三13:15执行一次任务
schedule.every().wednesday.at("13:15").do(job, name)

while True:
    schedule.run_pending()
    time.sleep(1)

schedule 执行任务是串行的。如下图,有两个定时任务,都是每隔1秒执行一次,分别执行job1函数和job2函数。

import schedule
import time
def job1():
    print("任务1开始执行")
    time.sleep(1)
    print("任务1执行完成")


def job2():
    print("任务2开始执行")
    time.sleep(1)
    print("任务2执行完成")


# 每隔1分钟执行一次任务
schedule.every(1).seconds.do(job1)
schedule.every(1).seconds.do(job2)

while True:
    schedule.run_pending()
    time.sleep(1)

其运行结果是:

任务1开始执行
任务1执行完成
任务2开始执行
任务2执行完成
任务1开始执行
任务1执行完成
任务2开始执行
任务2执行完成
......

可以看出任务1和任务2是完美的串行执行的。也就是说只有任务1执行完了才会执行任务2。那么在并发的情况下是怎样的呢?

import schedule
import time
import threading


def job1():
    print("任务1开始执行\n")
    time.sleep(1)
    print("任务1执行完成\n")


def job2():
    print("任务2开始执行\n")
    time.sleep(1)
    print("任务2执行完成\n")


def thread_job(fun):
    threading.Thread(target=fun).start()


# 每隔1分钟执行一次任务
schedule.every(1).seconds.do(thread_job, job1)
schedule.every(1).seconds.do(thread_job, job2)

while True:
    schedule.run_pending()
    time.sleep(1)

运行结果是:

任务1开始执行
任务2开始执行
任务1执行完成
任务2执行完成
任务1开始执行
任务2开始执行
任务1执行完成
任务1开始执行

在并发情况下,任务2不需要等任务1执行完才能执行。

总结

本文详细介绍了Python执行定时任务的各种模块和类。

我是码农飞哥,再次感谢您读完本文

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。