From e1f82e338a50e3e30d859726dc07b48f2e411676 Mon Sep 17 00:00:00 2001 From: jxxghp Date: Fri, 7 Mar 2025 08:07:57 +0800 Subject: [PATCH] =?UTF-8?q?fix=EF=BC=9A=E5=AE=9A=E6=97=B6=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E5=88=9D=E5=A7=8B=E5=8C=96=E5=8A=A0=E9=94=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/scheduler.py | 567 ++++++++++++++++++++++++----------------------- 1 file changed, 286 insertions(+), 281 deletions(-) diff --git a/app/scheduler.py b/app/scheduler.py index 464c2e6e..ddf17bb6 100644 --- a/app/scheduler.py +++ b/app/scheduler.py @@ -30,6 +30,9 @@ from app.utils.singleton import Singleton from app.utils.timer import TimerUtils +lock = threading.Lock() + + class SchedulerChain(ChainBase): pass @@ -56,85 +59,6 @@ class Scheduler(metaclass=Singleton): """ 初始化定时服务 """ - # 各服务的运行状态 - self._jobs = { - "cookiecloud": { - "name": "同步CookieCloud站点", - "func": SiteChain().sync_cookies, - "running": False, - }, - "mediaserver_sync": { - "name": "同步媒体服务器", - "func": MediaServerChain().sync, - "running": False, - }, - "subscribe_tmdb": { - "name": "订阅元数据更新", - "func": SubscribeChain().check, - "running": False, - }, - "subscribe_search": { - "name": "订阅搜索补全", - "func": SubscribeChain().search, - "running": False, - "kwargs": { - "state": "R" - } - }, - "new_subscribe_search": { - "name": "新增订阅搜索", - "func": SubscribeChain().search, - "running": False, - "kwargs": { - "state": "N" - } - }, - "subscribe_refresh": { - "name": "订阅刷新", - "func": SubscribeChain().refresh, - "running": False, - }, - "subscribe_follow": { - "name": "关注的订阅分享", - "func": SubscribeChain().follow, - "running": False, - }, - "transfer": { - "name": "下载文件整理", - "func": TransferChain().process, - "running": False, - }, - "clear_cache": { - "name": "缓存清理", - "func": self.clear_cache, - "running": False, - }, - "user_auth": { - "name": "用户认证检查", - "func": self.user_auth, - "running": False, - }, - "scheduler_job": { - "name": "公共定时服务", - "func": SchedulerChain().scheduler_job, - "running": False, - }, - "random_wallpager": { - "name": "壁纸缓存", - "func": TmdbChain().get_trending_wallpapers, - "running": False, - }, - "sitedata_refresh": { - "name": "站点数据刷新", - "func": SiteChain().refresh_userdatas, - "running": False, - }, - "recommend_refresh": { - "name": "推荐缓存", - "func": RecommendChain().refresh_recommend, - "running": False, - } - } # 停止定时服务 self.stop() @@ -143,221 +67,302 @@ class Scheduler(metaclass=Singleton): if settings.DEV: return - # 创建定时服务 - self._scheduler = BackgroundScheduler(timezone=settings.TZ, - executors={ - 'default': ThreadPoolExecutor(100) - }) - - # CookieCloud定时同步 - if settings.COOKIECLOUD_INTERVAL \ - and str(settings.COOKIECLOUD_INTERVAL).isdigit(): - self._scheduler.add_job( - self.start, - "interval", - id="cookiecloud", - name="同步CookieCloud站点", - minutes=int(settings.COOKIECLOUD_INTERVAL), - next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=1), - kwargs={ - 'job_id': 'cookiecloud' + with lock: + # 各服务的运行状态 + self._jobs = { + "cookiecloud": { + "name": "同步CookieCloud站点", + "func": SiteChain().sync_cookies, + "running": False, + }, + "mediaserver_sync": { + "name": "同步媒体服务器", + "func": MediaServerChain().sync, + "running": False, + }, + "subscribe_tmdb": { + "name": "订阅元数据更新", + "func": SubscribeChain().check, + "running": False, + }, + "subscribe_search": { + "name": "订阅搜索补全", + "func": SubscribeChain().search, + "running": False, + "kwargs": { + "state": "R" + } + }, + "new_subscribe_search": { + "name": "新增订阅搜索", + "func": SubscribeChain().search, + "running": False, + "kwargs": { + "state": "N" + } + }, + "subscribe_refresh": { + "name": "订阅刷新", + "func": SubscribeChain().refresh, + "running": False, + }, + "subscribe_follow": { + "name": "关注的订阅分享", + "func": SubscribeChain().follow, + "running": False, + }, + "transfer": { + "name": "下载文件整理", + "func": TransferChain().process, + "running": False, + }, + "clear_cache": { + "name": "缓存清理", + "func": self.clear_cache, + "running": False, + }, + "user_auth": { + "name": "用户认证检查", + "func": self.user_auth, + "running": False, + }, + "scheduler_job": { + "name": "公共定时服务", + "func": SchedulerChain().scheduler_job, + "running": False, + }, + "random_wallpager": { + "name": "壁纸缓存", + "func": TmdbChain().get_trending_wallpapers, + "running": False, + }, + "sitedata_refresh": { + "name": "站点数据刷新", + "func": SiteChain().refresh_userdatas, + "running": False, + }, + "recommend_refresh": { + "name": "推荐缓存", + "func": RecommendChain().refresh_recommend, + "running": False, } - ) - - # 媒体服务器同步 - if settings.MEDIASERVER_SYNC_INTERVAL \ - and str(settings.MEDIASERVER_SYNC_INTERVAL).isdigit(): - self._scheduler.add_job( - self.start, - "interval", - id="mediaserver_sync", - name="同步媒体服务器", - hours=int(settings.MEDIASERVER_SYNC_INTERVAL), - next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=5), - kwargs={ - 'job_id': 'mediaserver_sync' - } - ) - - # 新增订阅时搜索(5分钟检查一次) - self._scheduler.add_job( - self.start, - "interval", - id="new_subscribe_search", - name="新增订阅搜索", - minutes=5, - kwargs={ - 'job_id': 'new_subscribe_search' } - ) - # 检查更新订阅TMDB数据(每隔6小时) - self._scheduler.add_job( - self.start, - "interval", - id="subscribe_tmdb", - name="订阅元数据更新", - hours=6, - kwargs={ - 'job_id': 'subscribe_tmdb' - } - ) + # 创建定时服务 + self._scheduler = BackgroundScheduler(timezone=settings.TZ, + executors={ + 'default': ThreadPoolExecutor(100) + }) - # 订阅状态每隔24小时搜索一次 - if settings.SUBSCRIBE_SEARCH: - self._scheduler.add_job( - self.start, - "interval", - id="subscribe_search", - name="订阅搜索补全", - hours=24, - kwargs={ - 'job_id': 'subscribe_search' - } - ) - - if settings.SUBSCRIBE_MODE == "spider": - # 站点首页种子定时刷新模式 - triggers = TimerUtils.random_scheduler(num_executions=32) - for trigger in triggers: + # CookieCloud定时同步 + if settings.COOKIECLOUD_INTERVAL \ + and str(settings.COOKIECLOUD_INTERVAL).isdigit(): self._scheduler.add_job( self.start, - "cron", - id=f"subscribe_refresh|{trigger.hour}:{trigger.minute}", - name="订阅刷新", - hour=trigger.hour, - minute=trigger.minute, + "interval", + id="cookiecloud", + name="同步CookieCloud站点", + minutes=int(settings.COOKIECLOUD_INTERVAL), + next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=1), + kwargs={ + 'job_id': 'cookiecloud' + } + ) + + # 媒体服务器同步 + if settings.MEDIASERVER_SYNC_INTERVAL \ + and str(settings.MEDIASERVER_SYNC_INTERVAL).isdigit(): + self._scheduler.add_job( + self.start, + "interval", + id="mediaserver_sync", + name="同步媒体服务器", + hours=int(settings.MEDIASERVER_SYNC_INTERVAL), + next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=5), + kwargs={ + 'job_id': 'mediaserver_sync' + } + ) + + # 新增订阅时搜索(5分钟检查一次) + self._scheduler.add_job( + self.start, + "interval", + id="new_subscribe_search", + name="新增订阅搜索", + minutes=5, + kwargs={ + 'job_id': 'new_subscribe_search' + } + ) + + # 检查更新订阅TMDB数据(每隔6小时) + self._scheduler.add_job( + self.start, + "interval", + id="subscribe_tmdb", + name="订阅元数据更新", + hours=6, + kwargs={ + 'job_id': 'subscribe_tmdb' + } + ) + + # 订阅状态每隔24小时搜索一次 + if settings.SUBSCRIBE_SEARCH: + self._scheduler.add_job( + self.start, + "interval", + id="subscribe_search", + name="订阅搜索补全", + hours=24, + kwargs={ + 'job_id': 'subscribe_search' + } + ) + + if settings.SUBSCRIBE_MODE == "spider": + # 站点首页种子定时刷新模式 + triggers = TimerUtils.random_scheduler(num_executions=32) + for trigger in triggers: + self._scheduler.add_job( + self.start, + "cron", + id=f"subscribe_refresh|{trigger.hour}:{trigger.minute}", + name="订阅刷新", + hour=trigger.hour, + minute=trigger.minute, + kwargs={ + 'job_id': 'subscribe_refresh' + }) + else: + # RSS订阅模式 + if not settings.SUBSCRIBE_RSS_INTERVAL \ + or not str(settings.SUBSCRIBE_RSS_INTERVAL).isdigit(): + settings.SUBSCRIBE_RSS_INTERVAL = 30 + elif int(settings.SUBSCRIBE_RSS_INTERVAL) < 5: + settings.SUBSCRIBE_RSS_INTERVAL = 5 + self._scheduler.add_job( + self.start, + "interval", + id="subscribe_refresh", + name="RSS订阅刷新", + minutes=int(settings.SUBSCRIBE_RSS_INTERVAL), kwargs={ 'job_id': 'subscribe_refresh' - }) - else: - # RSS订阅模式 - if not settings.SUBSCRIBE_RSS_INTERVAL \ - or not str(settings.SUBSCRIBE_RSS_INTERVAL).isdigit(): - settings.SUBSCRIBE_RSS_INTERVAL = 30 - elif int(settings.SUBSCRIBE_RSS_INTERVAL) < 5: - settings.SUBSCRIBE_RSS_INTERVAL = 5 + } + ) + + # 关注订阅分享(每1小时) self._scheduler.add_job( self.start, "interval", - id="subscribe_refresh", - name="RSS订阅刷新", - minutes=int(settings.SUBSCRIBE_RSS_INTERVAL), + id="subscribe_follow", + name="关注的订阅分享", + hours=1, kwargs={ - 'job_id': 'subscribe_refresh' + 'job_id': 'subscribe_follow' } ) - # 关注订阅分享(每1小时) - self._scheduler.add_job( - self.start, - "interval", - id="subscribe_follow", - name="关注的订阅分享", - hours=1, - kwargs={ - 'job_id': 'subscribe_follow' - } - ) - - # 下载器文件转移(每5分钟) - self._scheduler.add_job( - self.start, - "interval", - id="transfer", - name="下载文件整理", - minutes=5, - kwargs={ - 'job_id': 'transfer' - } - ) - - # 后台刷新TMDB壁纸 - self._scheduler.add_job( - self.start, - "interval", - id="random_wallpager", - name="壁纸缓存", - minutes=30, - next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(seconds=3), - kwargs={ - 'job_id': 'random_wallpager' - } - ) - - # 公共定时服务 - self._scheduler.add_job( - self.start, - "interval", - id="scheduler_job", - name="公共定时服务", - minutes=10, - kwargs={ - 'job_id': 'scheduler_job' - } - ) - - # 缓存清理服务,每隔24小时 - self._scheduler.add_job( - self.start, - "interval", - id="clear_cache", - name="缓存清理", - hours=settings.CACHE_CONF["meta"] / 3600, - kwargs={ - 'job_id': 'clear_cache' - } - ) - - # 定时检查用户认证,每隔10分钟 - self._scheduler.add_job( - self.start, - "interval", - id="user_auth", - name="用户认证检查", - minutes=10, - kwargs={ - 'job_id': 'user_auth' - } - ) - - # 站点数据刷新 - if settings.SITEDATA_REFRESH_INTERVAL: + # 下载器文件转移(每5分钟) self._scheduler.add_job( self.start, "interval", - id="sitedata_refresh", - name="站点数据刷新", - minutes=settings.SITEDATA_REFRESH_INTERVAL * 60, + id="transfer", + name="下载文件整理", + minutes=5, kwargs={ - 'job_id': 'sitedata_refresh' + 'job_id': 'transfer' } ) - # 推荐缓存 - self._scheduler.add_job( - self.start, - "interval", - id="recommend_refresh", - name="推荐缓存", - hours=24, - next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(seconds=3), - kwargs={ - 'job_id': 'recommend_refresh' - } - ) + # 后台刷新TMDB壁纸 + self._scheduler.add_job( + self.start, + "interval", + id="random_wallpager", + name="壁纸缓存", + minutes=30, + next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(seconds=3), + kwargs={ + 'job_id': 'random_wallpager' + } + ) - # 初始化工作流服务 - self.init_workflow_jobs() - - # 初始化插件服务 - self.init_plugin_jobs() + # 公共定时服务 + self._scheduler.add_job( + self.start, + "interval", + id="scheduler_job", + name="公共定时服务", + minutes=10, + kwargs={ + 'job_id': 'scheduler_job' + } + ) - # 打印服务 - logger.debug(self._scheduler.print_jobs()) + # 缓存清理服务,每隔24小时 + self._scheduler.add_job( + self.start, + "interval", + id="clear_cache", + name="缓存清理", + hours=settings.CACHE_CONF["meta"] / 3600, + kwargs={ + 'job_id': 'clear_cache' + } + ) - # 启动定时服务 - self._scheduler.start() + # 定时检查用户认证,每隔10分钟 + self._scheduler.add_job( + self.start, + "interval", + id="user_auth", + name="用户认证检查", + minutes=10, + kwargs={ + 'job_id': 'user_auth' + } + ) + + # 站点数据刷新 + if settings.SITEDATA_REFRESH_INTERVAL: + self._scheduler.add_job( + self.start, + "interval", + id="sitedata_refresh", + name="站点数据刷新", + minutes=settings.SITEDATA_REFRESH_INTERVAL * 60, + kwargs={ + 'job_id': 'sitedata_refresh' + } + ) + + # 推荐缓存 + self._scheduler.add_job( + self.start, + "interval", + id="recommend_refresh", + name="推荐缓存", + hours=24, + next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(seconds=3), + kwargs={ + 'job_id': 'recommend_refresh' + } + ) + + # 初始化工作流服务 + self.init_workflow_jobs() + + # 初始化插件服务 + self.init_plugin_jobs() + + # 打印服务 + logger.debug(self._scheduler.print_jobs()) + + # 启动定时服务 + self._scheduler.start() def start(self, job_id: str, *args, **kwargs): """ @@ -496,7 +501,6 @@ class Scheduler(metaclass=Singleton): """ if not self._scheduler: return - # 移除该工作流的全部服务 self.remove_workflow_job(workflow) # 添加工作流服务 @@ -625,17 +629,18 @@ class Scheduler(metaclass=Singleton): """ 关闭定时服务 """ - try: - if self._scheduler: - logger.info("正在停止定时任务...") - self._event.set() - self._scheduler.remove_all_jobs() - if self._scheduler.running: - self._scheduler.shutdown() - self._scheduler = None - logger.info("定时任务停止完成") - except Exception as e: - logger.error(f"停止定时任务失败::{str(e)} - {traceback.format_exc()}") + with lock: + try: + if self._scheduler: + logger.info("正在停止定时任务...") + self._event.set() + self._scheduler.remove_all_jobs() + if self._scheduler.running: + self._scheduler.shutdown() + self._scheduler = None + logger.info("定时任务停止完成") + except Exception as e: + logger.error(f"停止定时任务失败::{str(e)} - {traceback.format_exc()}") @staticmethod def clear_cache():