定时任务库的详解与魅力应用:探索schedule的无尽可能性
@[toc]
摘要
schedule
是一个 Python 库,用于在 Python 程序中方便地安排定时任务。它提供了一种简单的方式来安排周期性任务,例如定期执行函数或方法。
以下是 schedule
库的基本用法和示例:
- 安装
schedule
库:
pip install schedule
- 导入
schedule
库:
import schedule
import time
- 定义要定期执行的任务。可以是任何可调用的对象,例如函数或方法。
def job():
print("I'm working...")
- 使用
schedule.every()
方法创建定时任务,并指定任务执行的时间间隔。例如,以下代码将创建一个每隔 5 秒执行一次的任务:
schedule.every(5).seconds.do(job)
你也可以使用其他时间单位,例如 minutes
、hours
和 days
。以下是一些示例:
- 每隔 10 分钟执行一次任务:
schedule.every(10).minutes.do(job)
- 每隔 2 小时执行一次任务:
schedule.every(2).hours.do(job)
- 每周一的早上 8 点执行一次任务:
schedule.every().monday.at("08:00").do(job)
- 调用
schedule.run_pending()
方法来启动定时任务。通常,你可以在程序的主循环中定期调用此方法,以使任务按照指定的时间间隔执行。以下是一个简单的示例:
while True:
schedule.run_pending()
time.sleep(1)
这个循环将每秒钟检查一次是否有定时任务需要执行,并立即执行它们。你可以根据需要调整时间间隔。
以上是 schedule
库的基本用法。它还提供了其他功能和选项,例如暂停和恢复定时任务、取消定时任务等。你可以查看官方文档以获取更多详细信息和示例。
官网:https://schedule.readthedocs.io/en/stable/examples.html
一些示例
官网的一些例子,我做了翻译!方便大家理解:
import schedule
import time
def job():
print("I'm working...")
# Run job every 3 second/minute/hour/day/week,
# Starting 3 second/minute/hour/day/week from now
schedule.every(3).seconds.do(job)
schedule.every(3).minutes.do(job)
schedule.every(3).hours.do(job)
schedule.every(3).days.do(job)
schedule.every(3).weeks.do(job)
# 在每分钟的23秒执行job
schedule.every().minute.at(":23").do(job)
# Run job every hour at the 42nd minute/在每个小时的42分执行任务
schedule.every().hour.at(":42").do(job)
#每隔5小时、20分钟和30秒运行一次任务
#如果当前时间是02:00,则第一次执行将在06:20:30进行
schedule.every(5).hours.at("20:30").do(job)
#每天在特定的HH:MM和下一个HH:MM:SS运行任务
schedule.every().day.at("10:30").do(job)
schedule.every().day.at("10:30:42").do(job)
schedule.every().day.at("12:42", "Europe/Amsterdam").do(job)
#在一周的特定某一天运行任务
#在周一执行job
schedule.every().monday.do(job)
#在周三的13:15执行job
schedule.every().wednesday.at("13:15").do(job)
while True:
schedule.run_pending()
time.sleep(1)
也可以使用装饰器来安排任务
使用@repeat来安排函数。通过与上面相同的语法传递一个间隔,同时省略.do()。代码如下:
from schedule import every, repeat, run_pending
import time
@repeat(every(10).minutes)
def job():
print("I am a scheduled job")
while True:
run_pending()
time.sleep(1)
注:@repeat装饰器不能用于非静态类方法。
其他的示例
向任务传递参数
.do()方法可以将额外的参数传递给任务函数,代码如下:
import schedule
def greet(name):
print('Hello', name)
schedule.every(2).seconds.do(greet, name='Alice')
schedule.every(4).seconds.do(greet, name='Bob')
装饰器的写法
from schedule import every, repeat
@repeat(every().second, "World")
@repeat(every().day, "Mars")
def hello(planet):
print("Hello", planet)
取消任务
要从调度器中删除任务,请使用schedule.cancel_job(job)方法,代码如下:
import schedule
def some_task():
print('Hello world')
job = schedule.every().day.at('22:30').do(some_task)
schedule.cancel_job(job)
定义了一个任务(函数some_task
),该任务会打印出"Hello world"。然后,它使用schedule.every().day.at('22:30').do(some_task)
将这个任务安排在每天的22:30执行。最后,它使用schedule.cancel_job(job)
取消了这个任务。
运行一次任务
从任务返回schedule.CancelJob以将其从调度器中移除。
import schedule
import time
def job_that_executes_once():
# Do some work that only needs to happen once...
return schedule.CancelJob
schedule.every().day.at('22:30').do(job_that_executes_once)
while True:
schedule.run_pending()
time.sleep(1)
这段代码的目的是安排一个任务在每天的22:30执行一次,然后从调度器中取消。
下面是代码的详细解释:
- 导入
schedule
和time
模块,以便使用定时任务和时间相关的功能。 - 定义一个名为
job_that_executes_once
的函数,它执行一些只需要执行一次的工作,并返回schedule.CancelJob
。 - 使用
schedule.every().day.at('22:30').do(job_that_executes_once)
来安排任务。这将在每天的22:30执行job_that_executes_once
函数。 - 进入一个无限循环,每次循环都检查是否有需要执行的任务,通过调用
schedule.run_pending()
来实现。 - 在每次循环后,使用
time.sleep(1)
来暂停1秒钟,然后再次检查是否有任务需要执行。
这样,代码将保持运行状态,并在每天的22:30执行一次job_that_executes_once
函数,然后取消该任务。
获取所有任务
要从调度器检索所有任务,请使用schedule.get_jobs(),代码如下;
import schedule
def hello():
print('Hello world')
schedule.every().second.do(hello)
all_jobs = schedule.get_jobs()
取消所有任务
要从调度器中删除所有任务,请使用schedule.clear()
import schedule
def greet(name):
print('Hello {}'.format(name))
schedule.every().second.do(greet)
schedule.clear()
按标签获取多个任务
您可以从调度器检索一组任务,以唯一标识符进行筛选。
import schedule
def greet(name):
print('Hello {}'.format(name))
schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')
friends = schedule.get_jobs('friend')
定义了一个名为greet
的函数,该函数接收一个name
参数并打印出问候语。
接下来,代码使用schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
和类似的语句来安排定时任务。它们分别在每天和每小时执行greet
函数,并传递不同的名字作为参数。这些任务还使用tag()
方法添加了标签,以便后续筛选。
最后,代码使用schedule.get_jobs('friend')
来从调度器中检索具有friend
标签的所有任务。
以随机间隔运行作业
def my_job():
print('Foo')
# Run every 5 to 10 seconds.
schedule.every(5).to(10).seconds.do(my_job)
使用schedule.every(5).to(10).seconds.do(my_job)
来安排定时任务。这个语句表示每隔5到10秒执行一次my_job
函数。
运行一项作业,直到某个时间
import schedule
from datetime import datetime, timedelta, time
def job():
print('Boo')
# run job until a 18:30 today
schedule.every(1).hours.until("18:30").do(job)
# run job until a 2030-01-01 18:33 today
schedule.every(1).hours.until("2030-01-01 18:33").do(job)
# Schedule a job to run for the next 8 hours
schedule.every(1).hours.until(timedelta(hours=8)).do(job)
# Run my_job until today 11:33:42
schedule.every(1).hours.until(time(11, 33, 42)).do(job)
# run job until a specific datetime
schedule.every(1).hours.until(datetime(2020, 5, 17, 11, 36, 20)).do(job)
距离下次执行的时间
使用schedule. idle_seconds ()来获取到下一个任务被计划运行的秒数。如果下一个计划作业已安排在过去运行,则返回值为负。如果未计划任何作业,则返回None。
import schedule
import time
def job():
print('Hello')
schedule.every(5).seconds.do(job)
while 1:
n = schedule.idle_seconds()
if n is None:
# no more jobs
break
elif n > 0:
# sleep exactly the right amount of time
time.sleep(n)
schedule.run_pending()
立即运行所有作业,无论其计划如何
要运行所有作业,无论它们是否计划运行,请使用schedule.run_all()。在作业完成后,它们会被重新安排,就像使用run_pending()执行它们一样。
import schedule
def job_1():
print('Foo')
def job_2():
print('Bar')
schedule.every().monday.at("12:40").do(job_1)
schedule.every().tuesday.at("16:40").do(job_2)
schedule.run_all()
# Add the delay_seconds argument to run the jobs with a number
# of seconds delay in between.
schedule.run_all(delay_seconds=10)
导入了schedule
库,这是一个Python库,可以用来安排周期性的任务。
然后,定义了两个函数job_1
和job_2
。这两个函数都只是简单地打印出字符串'Foo'
和'Bar'
。
接下来,您使用schedule.every().monday.at("12:40").do(job_1)
和schedule.every().tuesday.at("16:40").do(job_2)
来安排周期性的任务。这表示job_1
将在每个周一的12:40执行,而job_2
将在每个周二的16:40执行。
使用schedule.run_all()
来运行所有已安排的任务。这意味着所有已安排的job_1
和job_2
都将立即执行。
最后一行代码schedule.run_all(delay_seconds=10)
,这个函数会等待指定的时间(这里是10秒)后再执行任务。
在后台运行
不能直接在后台运行,可以通过创建一个线程实现,代码如下:
import threading
import time
import schedule
def run_continuously(interval=1):
cease_continuous_run = threading.Event()
class ScheduleThread(threading.Thread):
@classmethod
def run(cls):
while not cease_continuous_run.is_set():
schedule.run_pending()
time.sleep(interval)
continuous_thread = ScheduleThread()
continuous_thread.start()
return cease_continuous_run
def background_job():
print('Hello from the background thread')
schedule.every().second.do(background_job)
# Start the background thread
stop_run_continuously = run_continuously()
# Do some other things...
time.sleep(10)
# Stop the background thread
stop_run_continuously.set()
多个调度器
您可以根据需要从单个调度器运行任意多的作业。但是,对于较大的安装,可能希望使用多个调度器。代码如下:
import time
import schedule
def fooJob():
print("Foo")
def barJob():
print("Bar")
# Create a new scheduler
scheduler1 = schedule.Scheduler()
# Add jobs to the created scheduler
scheduler1.every().hour.do(fooJob)
scheduler1.every().hour.do(barJob)
# Create as many schedulers as you need
scheduler2 = schedule.Scheduler()
scheduler2.every().second.do(fooJob)
scheduler2.every().second.do(barJob)
while True:
# run_pending needs to be called on every scheduler
scheduler1.run_pending()
scheduler2.run_pending()
time.sleep(1)
记录日志
将消息记录到Python名为schedule的记录器,其级别为DEBUG。要接收来自Schedule的日志,请将记录级别设置为DEBUG。
import schedule
import logging
logging.basicConfig()
schedule_logger = logging.getLogger('schedule')
schedule_logger.setLevel(level=logging.DEBUG)
def job():
print("Hello, Logs")
schedule.every().second.do(job)
schedule.run_all()
schedule.clear()
自定义日志记录
向任务添加可重用日志记录的最简单方法是实现一个处理日志记录的装饰器。作为一个例子,下面的代码添加了print_elapsed_time装饰器:
import functools
import time
import schedule
# This decorator can be applied to any job function to log the elapsed time of each job
def print_elapsed_time(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_timestamp = time.time()
print('LOG: Running job "%s"' % func.__name__)
result = func(*args, **kwargs)
print('LOG: Job "%s" completed in %d seconds' % (func.__name__, time.time() - start_timestamp))
return result
return wrapper
@print_elapsed_time
def job():
print('Hello, Logs')
time.sleep(5)
schedule.every().second.do(job)
schedule.run_all()
并行执行
我正在尝试每10秒执行50个项目,但是从我的日志中可以看出,它按顺序每10秒执行一个项目。有解决方案吗?
默认情况下,schedule按顺序执行所有任务。这样做的原因是很难找到一种使所有人都满意的并行执行模型。
您可以通过在每个线程中运行每个作业来绕过此限制:
import threading
import time
import schedule
def job():
print("I'm running on thread %s" % threading.current_thread())
def run_threaded(job_func):
job_thread = threading.Thread(target=job_func)
job_thread.start()
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
while 1:
schedule.run_pending()
time.sleep(1)
如果你想更严格地控制线程的数量,使用共享作业队列和一个或多个工作线程:
import time
import threading
import schedule
import queue
def job():
print("I'm working")
def worker_main():
while 1:
job_func = jobqueue.get()
job_func()
jobqueue.task_done()
jobqueue = queue.Queue()
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
worker_thread = threading.Thread(target=worker_main)
worker_thread.start()
while 1:
schedule.run_pending()
time.sleep(1)
该模型也适用于分布式应用程序,其中工作进程是从分布式工作队列接收作业的独立进程。
局限性
说句公道话,Schedule并不是一个万能的调度库。这个库是为了解决简单的调度问题而设计的。如果需要以下功能,Schedule可能无法满足:
- 作业持久性(在重新启动之间记住计划)
- 确切的时间(秒的精度执行)
- 并发执行(多个线程)
- 本地化(工作日或节假日)
Schedule不计算作业函数执行所需的时间。为了确保稳定的执行计划,您需要将长时间运行的作业从主线程(调度器运行的地方)中移动出去。
- 点赞
- 收藏
- 关注作者
评论(0)