添加调度任务
使用 BlockingScheduler()
新建一个调度器在当前进程的主线程中运行,会阻塞当前线程。该调度器一般用的比较多,可以看到当前调度任务运行的输出结果,推荐使用。
date
使用 date
触发器可实现指定时间点,为一次触发。如下示例:在2024年2月4日15时8分0秒执行一次。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
from common.log import Logger
scheduler = BlockingScheduler()
@scheduler.scheduled_job('date', run_date=datetime.datetime(2024, 2, 4, 15, 8, 0))
def main():
for _ in range(5):
try:
print("This script is running!")
break
except Exception as e:
Logger.error(e)
if __name__ == '__main__':
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
scheduler.shutdown()
cron
使用 cron
触发器可实现周期性执行。如下示例:每周日凌晨0点0分0秒执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from apscheduler.schedulers.blocking import BlockingScheduler
from common.log import Logger
scheduler = BlockingScheduler()
@scheduler.scheduled_job("cron", day_of_week='SUN', hour='0', minute='0', second='0')
def main():
for _ in range(5):
try:
print("This script is running!")
break
except Exception as e:
Logger.error(e)
if __name__ == '__main__':
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
scheduler.shutdown()
相关链接:
一文详解Python定时任务触发