schedule 用于周期性执行 Python 函数,可以设定天、周、时、分、秒计划运行函数
安装 1 pip3 install schedule -i https://pypi.tuna.tsinghua.edu.cn/simple
方法 every()
every(interval: int = 1)
安排一个新的定期工作
每2s执行
1 every(2).seconds.do(job)
每10分钟执行
1 every(10).minutes.do(job)
每小时执行
每天 10:30:01
执行
1 every().day.at("10:30:01" ).do (job)
每星期几的 13:15:01
执行
1 2 3 4 5 6 7 every().monday.at("13:15:01" ).do (job) every().tuesday.at("13:15:01" ).do (job) every().wednesday.at("13:15:01" ).do (job) every().thursday.at("13:15:01" ).do (job) every().friday.at("13:15:01" ).do (job) every().saturday.at("13:15:01" ).do (job) every().sunday.at("13:15:01" ).do (job)
每隔5到10天执行一次job()函数
1 every(5).to(10).days.do(job)
一个例子
1 2 3 4 5 6 7 8 9 10 11 12 13 from schedule import every, run_pendingdef job_1 (): print ("job_1 working..." ) every(2 ).seconds.do(job_1) while True : run_pending()
输出
1 2 3 4 5 job_1 working... job_1 working... job_1 working... ... ...
repeat()
repeat(job, *args, **kwargs)
装饰函数来安排一个新的定期工作
任何额外的参数都会在作业运行时传递给装饰的函数
每2s执行
1 @repeat(every(2).seconds)
每10分钟执行
1 @repeat(every(10).minutes)
每小时执行
每天 10:30:01
执行
1 @repeat(every().day.at("10:30:01" ))
每星期几的 13:15:01
执行
1 2 3 4 5 6 7 @repeat(every().monday.at("13:15:01" )) @repeat(every().tuesday.at("13:15:01" )) @repeat(every().wednesday.at("13:15:01" )) @repeat(every().thursday.at("13:15:01" )) @repeat(every().friday.at("13:15:01" )) @repeat(every().saturday.at("13:15:01" )) @repeat(every().sunday.at("13:15:01" ))
每隔5到10天执行一次job()函数
1 @repeat(every(5).to(10).days)
一个例子
1 2 3 4 5 6 7 8 9 10 11 from schedule import every, repeat, run_pending@repeat(every(2 ).seconds ) def job_2 (): print ("job_2 working..." ) while True : run_pending()
输出
1 2 3 4 5 job_2 working... job_2 working... job_2 working... ... ...
run_pending()
run_pending()
运行所有计划中的工作
请注意,run_pending()的预期行为是不运行错过的作业
比如说,如果你注册了一个应该每分钟运行的作业,而你只在一小时内调用run_pending(),那么你的作业就不会在中间运行60次,而只会运行一次
一个例子
1 2 3 4 5 6 7 8 9 10 11 12 13 from schedule import every, run_pendingdef job_1 (): print ("job_1 working..." ) every(2 ).seconds.do(job_1) while True : run_pending()
输出
1 2 3 4 5 job_1 working... job_1 working... job_1 working... ... ...
run_all()
run_all(delay_seconds: int = 0)
运行所有工作,不管它们是否被安排运行
每个作业之间会有一个 delay
秒的延迟。这有助于将作业产生的系统负载更均匀地分布在一段时间内
一个例子
1 2 3 4 5 6 7 8 9 10 11 12 from schedule import every, run_alldef job_1 (): print ("job_1 working..." ) every(2 ).seconds.do(job_1) run_all(2 ) run_all()
输出
1 2 job_1 working... job_1 working...
get_jobs()
get_jobs(tag: Optional[Hashable] = None)
获取标有给定标签的计划工作,如果省略标签,则获取所有工作
一个例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 from schedule import every, get_jobsdef job_1 (): print ("job_1 working..." ) def job_2 (): print ("job_2 working..." ) every(5 ).seconds.do(job_1) every(20 ).seconds.do(job_2).tag('秒' , 'seconds' ) print (get_jobs('秒' )) print (get_jobs('seconds' )) print (get_jobs())
输出
1 2 3 [Every 20 seconds do job_2() (last run: [never], next run: 2022-04-18 22:44:24)] [Every 20 seconds do job_2() (last run: [never], next run: 2022-04-18 22:44:24)] [Every 5 seconds do job_1() (last run: [never], next run: 2022-04-18 22:44:09), Every 20 seconds do job_2() (last run: [never], next run: 2022-04-18 22:44:24)]
clear()
clear(tag: Optional[Hashable] = None)
删除标有给定标签的计划工作,如果省略标签,则删除所有工作
一个例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 from schedule import every, clear, get_jobsdef job_1 (): print ("job_1 working..." ) def job_2 (): print ("job_2 working..." ) every(5 ).seconds.do(job_1) every(20 ).seconds.do(job_2).tag('秒' , 'seconds' ) print (get_jobs()) print (clear('秒' )) print (get_jobs()) print (clear()) print (get_jobs())
输出
1 2 3 4 5 [Every 5 seconds do job_1() (last run: [never], next run: 2022-04-18 22:50:07), Every 20 seconds do job_2() (last run: [never], next run: 2022-04-18 22:50:22)] None [Every 5 seconds do job_1() (last run: [never], next run: 2022-04-18 22:50:07)] None []
cancel_job()
cancel_job(job: Job)
删除一个预定的工作
一个例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 from schedule import every, cancel_job, get_jobsdef job_1 (): print ("job_1 working..." ) def job_2 (): print ("job_2 working..." ) every(5 ).seconds.do(job_1) every(20 ).seconds.do(job_2).tag('秒' , 'seconds' ) print (get_jobs()) print (cancel_job(job_1)) print (get_jobs())
输出
1 2 3 [Every 5 seconds do job_1() (last run: [never], next run: 2022-04-18 22:54:09), Every 20 seconds do job_2() (last run: [never], next run: 2022-04-18 22:54:24)] None [Every 5 seconds do job_1() (last run: [never], next run: 2022-04-18 22:54:09), Every 20 seconds do job_2() (last run: [never], next run: 2022-04-18 22:54:24)]
next_run()
一个例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import timefrom schedule import every, next_run, run_pendingdef job_1 (): print ("job_1 working..." ) every(3 ).seconds.do(job_1) while True : run_pending() print (next_run()) time.sleep(1 )
输出
1 2 3 4 5 6 7 8 2022-04-18 23:00:20.936598 2022-04-18 23:00:20.936598 2022-04-18 23:00:20.936598 job_1 working... 2022-04-18 23:00:23.960987 2022-04-18 23:00:23.960987 ... ...
idle_seconds()
idle_seconds()
还有多少秒即将开始运行任务
一个例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import timefrom schedule import every, idle_seconds, run_pendingdef job_1 (): print ("job_1 working..." ) every(3 ).seconds.do(job_1) while True : run_pending() print (idle_seconds()) time.sleep(1 )
输出
1 2 3 4 5 6 7 8 9 3.0 1.989047 0.974167 job_1 working... 3.0 1.995853 0.98298 ... ...
资料