Schedule—简单实用的 Python 周期任务调度工具
注意:Schedule
主要作用是替代linux系统自带的Crontab
命令,用于周期性
的执行某个命令。
科普一下任务调度系统
任务调度系统的地位
任务调度系统在数据平台
中算是非常核心的组件,相当于指挥部,指挥各个平台上各个组件的运行,并监督任务的运行情况。大数据平台中通常会有很多任务运行,有一个好的任务调度系统可以极大提高效率。
最简单的任务调度系统莫过于linux系统自带的Crontab工具。随着调度任务的增多
,相互之间又有着依赖
,crontab就远远满足不了开发的需求了。
调度系统架构种类
-
定时分片类作业调度系统:数据分片,将大任务
拆分
成小任务,分布式运行。定时准时
的触发。例如:Elastic-Job、Easy Scheduler -
DAG工作流类调度系统:主要解决了任务之间的
依赖关系
,要有丰富灵活的任务触发机制,例如时间、外部任务完成度等, 注意工作流调度需要注意失败是否会丢失整个工作流。例如:Oozie、Azkaban、Apache Airflow
Schedule与Crontab对比
两者都属于定时周期任务调度, 没有分片功能, 也没有工作流功能
Crontab 具有以下缺点:
- 不方便执行
秒级任务
- 任务几百上千个时,管理不方便
Schedule 具有以下优点:
- 解决crontab的缺点,且容纳了crontab的所有基本功能
- 轻量级、不依赖外部库
基本使用教程
# 基本使用教程
import schedule
import time
# 任务内容
def job1():
print("I'm working...")
def job2(name):
print("I'm working..." + name)
# 每十分钟执行任务
schedule.every(10).minutes.do(job1)
# 每个小时执行任务,并在2030-01-01 18:33后停止执行作业
schedule.every().hour.until("2030-01-01 18:33").do(job1)
# 每天的10:30执行任务, 给任务传递参数
schedule.every().day.at("10:30").do(job2, name="longfei")
# 每个月执行任务
schedule.every().monday.do(job1)
# 每个星期三的13:15分执行任务
schedule.every().wednesday.at("13:15").do(job1)
# 每分钟的第17秒执行任务
schedule.every().minute.at(":17").do(job1)
# 死循环调用
while True:
schedule.run_pending()
time.sleep(1)
# 如果你想只运行一次任务的话,可以这么配
import schedule
import time
def job_that_executes_once():
# 此处编写的任务只会执行一次...
return schedule.CancelJob # 关键代码
schedule.every().day.at('22:30').do(job_that_executes_once)
while True:
schedule.run_pending()
time.sleep(1)
# 标签功能
import schedule
def greet(name):
print('Hello {}'.format(name))
# .tag 打标签
schedule.every().day.do(greet, 'Andrea').tag('任务标签1', '标签1')
schedule.every().hour.do(greet, 'John').tag('任务标签2', '标签1')
schedule.every().hour.do(greet, 'Monica').tag('任务标签1', '标签2')
schedule.every().day.do(greet, 'Derek').tag('任务标签3', '标签3')
# get_jobs(标签):可以获取所有该标签的任务
friends = schedule.get_jobs('标签1')
# 取消所有 daily-tasks 标签的任务
schedule.clear('任务标签1')
# 多线程并行执行
import threading
import time
import schedule
# 任务列表
def job1():
print("I'm running on thread %s" % threading.current_thread())
def job2():
print("I'm running on thread %s" % threading.current_thread())
def job3():
print("I'm running on thread %s" % threading.current_thread())
# 创建线程
def run_threaded(job_func):
job_thread = threading.Thread(target=job_func)
job_thread.start()
# 定时任务
schedule.every(10).seconds.do(run_threaded, job1)
schedule.every(10).seconds.do(run_threaded, job2)
schedule.every(10).seconds.do(run_threaded, job3)
# 循环
while True:
schedule.run_pending()
time.sleep(1)
开源地址
最后更新于 2022-10-28 11:33:27 并被添加「」标签,已有 461 位童鞋阅读过。
本站使用「署名 4.0 国际」创作共享协议,可自由转载、引用,但需署名作者且注明文章出处
此处评论已关闭