在Python中实现定时任务,有几种常见的方法:
使用`time.sleep()`函数
import time
while True:
print("执行定时任务")
time.sleep(10) 等待10秒后继续执行下一次任务
使用`threading.Timer`类
from threading import Timer
def task():
print("执行定时任务")
Timer(10, task).start() 每隔10秒执行一次任务
使用`sched`模块
import sched
import time
def print_time():
print("执行定时任务")
s = sched.scheduler(time.time, time.sleep)
s.enter(10, 1, print_time) 每隔10秒执行一次任务
使用`APScheduler`库
from apscheduler.schedulers.blocking import BlockingScheduler
def my_job():
print("执行定时任务")
scheduler = BlockingScheduler()
scheduler.add_job(my_job, 'interval', seconds=10) 每隔10秒执行一次任务
scheduler.start()
使用`Celery`库
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def my_periodic_task():
print("This task runs periodically.")
app.conf.beat_schedule = {
'run-every-10-seconds': {
'task': 'tasks.my_periodic_task',
'schedule': 10.0,
},
}
app.start()
选择哪种方法取决于你的具体需求,例如是否需要分布式处理、任务复杂度、是否需要持久化任务等。如果你需要更复杂的调度功能,APScheduler是一个很好的选择;如果你需要分布式任务处理,Celery可能更适合你。