fix:定时任务初始化加锁

This commit is contained in:
jxxghp
2025-03-07 08:07:57 +08:00
parent a835d34a01
commit e1f82e338a

View File

@@ -30,6 +30,9 @@ from app.utils.singleton import Singleton
from app.utils.timer import TimerUtils
lock = threading.Lock()
class SchedulerChain(ChainBase):
pass
@@ -56,85 +59,6 @@ class Scheduler(metaclass=Singleton):
"""
初始化定时服务
"""
# 各服务的运行状态
self._jobs = {
"cookiecloud": {
"name": "同步CookieCloud站点",
"func": SiteChain().sync_cookies,
"running": False,
},
"mediaserver_sync": {
"name": "同步媒体服务器",
"func": MediaServerChain().sync,
"running": False,
},
"subscribe_tmdb": {
"name": "订阅元数据更新",
"func": SubscribeChain().check,
"running": False,
},
"subscribe_search": {
"name": "订阅搜索补全",
"func": SubscribeChain().search,
"running": False,
"kwargs": {
"state": "R"
}
},
"new_subscribe_search": {
"name": "新增订阅搜索",
"func": SubscribeChain().search,
"running": False,
"kwargs": {
"state": "N"
}
},
"subscribe_refresh": {
"name": "订阅刷新",
"func": SubscribeChain().refresh,
"running": False,
},
"subscribe_follow": {
"name": "关注的订阅分享",
"func": SubscribeChain().follow,
"running": False,
},
"transfer": {
"name": "下载文件整理",
"func": TransferChain().process,
"running": False,
},
"clear_cache": {
"name": "缓存清理",
"func": self.clear_cache,
"running": False,
},
"user_auth": {
"name": "用户认证检查",
"func": self.user_auth,
"running": False,
},
"scheduler_job": {
"name": "公共定时服务",
"func": SchedulerChain().scheduler_job,
"running": False,
},
"random_wallpager": {
"name": "壁纸缓存",
"func": TmdbChain().get_trending_wallpapers,
"running": False,
},
"sitedata_refresh": {
"name": "站点数据刷新",
"func": SiteChain().refresh_userdatas,
"running": False,
},
"recommend_refresh": {
"name": "推荐缓存",
"func": RecommendChain().refresh_recommend,
"running": False,
}
}
# 停止定时服务
self.stop()
@@ -143,221 +67,302 @@ class Scheduler(metaclass=Singleton):
if settings.DEV:
return
# 创建定时服务
self._scheduler = BackgroundScheduler(timezone=settings.TZ,
executors={
'default': ThreadPoolExecutor(100)
})
# CookieCloud定时同步
if settings.COOKIECLOUD_INTERVAL \
and str(settings.COOKIECLOUD_INTERVAL).isdigit():
self._scheduler.add_job(
self.start,
"interval",
id="cookiecloud",
name="同步CookieCloud站点",
minutes=int(settings.COOKIECLOUD_INTERVAL),
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=1),
kwargs={
'job_id': 'cookiecloud'
with lock:
# 各服务的运行状态
self._jobs = {
"cookiecloud": {
"name": "同步CookieCloud站点",
"func": SiteChain().sync_cookies,
"running": False,
},
"mediaserver_sync": {
"name": "同步媒体服务器",
"func": MediaServerChain().sync,
"running": False,
},
"subscribe_tmdb": {
"name": "订阅元数据更新",
"func": SubscribeChain().check,
"running": False,
},
"subscribe_search": {
"name": "订阅搜索补全",
"func": SubscribeChain().search,
"running": False,
"kwargs": {
"state": "R"
}
},
"new_subscribe_search": {
"name": "新增订阅搜索",
"func": SubscribeChain().search,
"running": False,
"kwargs": {
"state": "N"
}
},
"subscribe_refresh": {
"name": "订阅刷新",
"func": SubscribeChain().refresh,
"running": False,
},
"subscribe_follow": {
"name": "关注的订阅分享",
"func": SubscribeChain().follow,
"running": False,
},
"transfer": {
"name": "下载文件整理",
"func": TransferChain().process,
"running": False,
},
"clear_cache": {
"name": "缓存清理",
"func": self.clear_cache,
"running": False,
},
"user_auth": {
"name": "用户认证检查",
"func": self.user_auth,
"running": False,
},
"scheduler_job": {
"name": "公共定时服务",
"func": SchedulerChain().scheduler_job,
"running": False,
},
"random_wallpager": {
"name": "壁纸缓存",
"func": TmdbChain().get_trending_wallpapers,
"running": False,
},
"sitedata_refresh": {
"name": "站点数据刷新",
"func": SiteChain().refresh_userdatas,
"running": False,
},
"recommend_refresh": {
"name": "推荐缓存",
"func": RecommendChain().refresh_recommend,
"running": False,
}
)
# 媒体服务器同步
if settings.MEDIASERVER_SYNC_INTERVAL \
and str(settings.MEDIASERVER_SYNC_INTERVAL).isdigit():
self._scheduler.add_job(
self.start,
"interval",
id="mediaserver_sync",
name="同步媒体服务器",
hours=int(settings.MEDIASERVER_SYNC_INTERVAL),
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=5),
kwargs={
'job_id': 'mediaserver_sync'
}
)
# 新增订阅时搜索5分钟检查一次
self._scheduler.add_job(
self.start,
"interval",
id="new_subscribe_search",
name="新增订阅搜索",
minutes=5,
kwargs={
'job_id': 'new_subscribe_search'
}
)
# 检查更新订阅TMDB数据每隔6小时
self._scheduler.add_job(
self.start,
"interval",
id="subscribe_tmdb",
name="订阅元数据更新",
hours=6,
kwargs={
'job_id': 'subscribe_tmdb'
}
)
# 创建定时服务
self._scheduler = BackgroundScheduler(timezone=settings.TZ,
executors={
'default': ThreadPoolExecutor(100)
})
# 订阅状态每隔24小时搜索一次
if settings.SUBSCRIBE_SEARCH:
self._scheduler.add_job(
self.start,
"interval",
id="subscribe_search",
name="订阅搜索补全",
hours=24,
kwargs={
'job_id': 'subscribe_search'
}
)
if settings.SUBSCRIBE_MODE == "spider":
# 站点首页种子定时刷新模式
triggers = TimerUtils.random_scheduler(num_executions=32)
for trigger in triggers:
# CookieCloud定时同步
if settings.COOKIECLOUD_INTERVAL \
and str(settings.COOKIECLOUD_INTERVAL).isdigit():
self._scheduler.add_job(
self.start,
"cron",
id=f"subscribe_refresh|{trigger.hour}:{trigger.minute}",
name="订阅刷新",
hour=trigger.hour,
minute=trigger.minute,
"interval",
id="cookiecloud",
name="同步CookieCloud站点",
minutes=int(settings.COOKIECLOUD_INTERVAL),
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=1),
kwargs={
'job_id': 'cookiecloud'
}
)
# 媒体服务器同步
if settings.MEDIASERVER_SYNC_INTERVAL \
and str(settings.MEDIASERVER_SYNC_INTERVAL).isdigit():
self._scheduler.add_job(
self.start,
"interval",
id="mediaserver_sync",
name="同步媒体服务器",
hours=int(settings.MEDIASERVER_SYNC_INTERVAL),
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=5),
kwargs={
'job_id': 'mediaserver_sync'
}
)
# 新增订阅时搜索5分钟检查一次
self._scheduler.add_job(
self.start,
"interval",
id="new_subscribe_search",
name="新增订阅搜索",
minutes=5,
kwargs={
'job_id': 'new_subscribe_search'
}
)
# 检查更新订阅TMDB数据每隔6小时
self._scheduler.add_job(
self.start,
"interval",
id="subscribe_tmdb",
name="订阅元数据更新",
hours=6,
kwargs={
'job_id': 'subscribe_tmdb'
}
)
# 订阅状态每隔24小时搜索一次
if settings.SUBSCRIBE_SEARCH:
self._scheduler.add_job(
self.start,
"interval",
id="subscribe_search",
name="订阅搜索补全",
hours=24,
kwargs={
'job_id': 'subscribe_search'
}
)
if settings.SUBSCRIBE_MODE == "spider":
# 站点首页种子定时刷新模式
triggers = TimerUtils.random_scheduler(num_executions=32)
for trigger in triggers:
self._scheduler.add_job(
self.start,
"cron",
id=f"subscribe_refresh|{trigger.hour}:{trigger.minute}",
name="订阅刷新",
hour=trigger.hour,
minute=trigger.minute,
kwargs={
'job_id': 'subscribe_refresh'
})
else:
# RSS订阅模式
if not settings.SUBSCRIBE_RSS_INTERVAL \
or not str(settings.SUBSCRIBE_RSS_INTERVAL).isdigit():
settings.SUBSCRIBE_RSS_INTERVAL = 30
elif int(settings.SUBSCRIBE_RSS_INTERVAL) < 5:
settings.SUBSCRIBE_RSS_INTERVAL = 5
self._scheduler.add_job(
self.start,
"interval",
id="subscribe_refresh",
name="RSS订阅刷新",
minutes=int(settings.SUBSCRIBE_RSS_INTERVAL),
kwargs={
'job_id': 'subscribe_refresh'
})
else:
# RSS订阅模式
if not settings.SUBSCRIBE_RSS_INTERVAL \
or not str(settings.SUBSCRIBE_RSS_INTERVAL).isdigit():
settings.SUBSCRIBE_RSS_INTERVAL = 30
elif int(settings.SUBSCRIBE_RSS_INTERVAL) < 5:
settings.SUBSCRIBE_RSS_INTERVAL = 5
}
)
# 关注订阅分享每1小时
self._scheduler.add_job(
self.start,
"interval",
id="subscribe_refresh",
name="RSS订阅刷新",
minutes=int(settings.SUBSCRIBE_RSS_INTERVAL),
id="subscribe_follow",
name="关注的订阅分享",
hours=1,
kwargs={
'job_id': 'subscribe_refresh'
'job_id': 'subscribe_follow'
}
)
# 关注订阅分享每1小时
self._scheduler.add_job(
self.start,
"interval",
id="subscribe_follow",
name="关注的订阅分享",
hours=1,
kwargs={
'job_id': 'subscribe_follow'
}
)
# 下载器文件转移每5分钟
self._scheduler.add_job(
self.start,
"interval",
id="transfer",
name="下载文件整理",
minutes=5,
kwargs={
'job_id': 'transfer'
}
)
# 后台刷新TMDB壁纸
self._scheduler.add_job(
self.start,
"interval",
id="random_wallpager",
name="壁纸缓存",
minutes=30,
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(seconds=3),
kwargs={
'job_id': 'random_wallpager'
}
)
# 公共定时服务
self._scheduler.add_job(
self.start,
"interval",
id="scheduler_job",
name="公共定时服务",
minutes=10,
kwargs={
'job_id': 'scheduler_job'
}
)
# 缓存清理服务每隔24小时
self._scheduler.add_job(
self.start,
"interval",
id="clear_cache",
name="缓存清理",
hours=settings.CACHE_CONF["meta"] / 3600,
kwargs={
'job_id': 'clear_cache'
}
)
# 定时检查用户认证每隔10分钟
self._scheduler.add_job(
self.start,
"interval",
id="user_auth",
name="用户认证检查",
minutes=10,
kwargs={
'job_id': 'user_auth'
}
)
# 站点数据刷新
if settings.SITEDATA_REFRESH_INTERVAL:
# 下载器文件转移每5分钟
self._scheduler.add_job(
self.start,
"interval",
id="sitedata_refresh",
name="站点数据刷新",
minutes=settings.SITEDATA_REFRESH_INTERVAL * 60,
id="transfer",
name="下载文件整理",
minutes=5,
kwargs={
'job_id': 'sitedata_refresh'
'job_id': 'transfer'
}
)
# 推荐缓存
self._scheduler.add_job(
self.start,
"interval",
id="recommend_refresh",
name="推荐缓存",
hours=24,
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(seconds=3),
kwargs={
'job_id': 'recommend_refresh'
}
)
# 后台刷新TMDB壁纸
self._scheduler.add_job(
self.start,
"interval",
id="random_wallpager",
name="壁纸缓存",
minutes=30,
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(seconds=3),
kwargs={
'job_id': 'random_wallpager'
}
)
# 初始化工作流服务
self.init_workflow_jobs()
# 初始化插件服务
self.init_plugin_jobs()
# 公共定时服务
self._scheduler.add_job(
self.start,
"interval",
id="scheduler_job",
name="公共定时服务",
minutes=10,
kwargs={
'job_id': 'scheduler_job'
}
)
# 打印服务
logger.debug(self._scheduler.print_jobs())
# 缓存清理服务每隔24小时
self._scheduler.add_job(
self.start,
"interval",
id="clear_cache",
name="缓存清理",
hours=settings.CACHE_CONF["meta"] / 3600,
kwargs={
'job_id': 'clear_cache'
}
)
# 启动定时服务
self._scheduler.start()
# 定时检查用户认证每隔10分钟
self._scheduler.add_job(
self.start,
"interval",
id="user_auth",
name="用户认证检查",
minutes=10,
kwargs={
'job_id': 'user_auth'
}
)
# 站点数据刷新
if settings.SITEDATA_REFRESH_INTERVAL:
self._scheduler.add_job(
self.start,
"interval",
id="sitedata_refresh",
name="站点数据刷新",
minutes=settings.SITEDATA_REFRESH_INTERVAL * 60,
kwargs={
'job_id': 'sitedata_refresh'
}
)
# 推荐缓存
self._scheduler.add_job(
self.start,
"interval",
id="recommend_refresh",
name="推荐缓存",
hours=24,
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(seconds=3),
kwargs={
'job_id': 'recommend_refresh'
}
)
# 初始化工作流服务
self.init_workflow_jobs()
# 初始化插件服务
self.init_plugin_jobs()
# 打印服务
logger.debug(self._scheduler.print_jobs())
# 启动定时服务
self._scheduler.start()
def start(self, job_id: str, *args, **kwargs):
"""
@@ -496,7 +501,6 @@ class Scheduler(metaclass=Singleton):
"""
if not self._scheduler:
return
# 移除该工作流的全部服务
self.remove_workflow_job(workflow)
# 添加工作流服务
@@ -625,17 +629,18 @@ class Scheduler(metaclass=Singleton):
"""
关闭定时服务
"""
try:
if self._scheduler:
logger.info("正在停止定时任务...")
self._event.set()
self._scheduler.remove_all_jobs()
if self._scheduler.running:
self._scheduler.shutdown()
self._scheduler = None
logger.info("定时任务停止完成")
except Exception as e:
logger.error(f"停止定时任务失败::{str(e)} - {traceback.format_exc()}")
with lock:
try:
if self._scheduler:
logger.info("正在停止定时任务...")
self._event.set()
self._scheduler.remove_all_jobs()
if self._scheduler.running:
self._scheduler.shutdown()
self._scheduler = None
logger.info("定时任务停止完成")
except Exception as e:
logger.error(f"停止定时任务失败::{str(e)} - {traceback.format_exc()}")
@staticmethod
def clear_cache():