mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-02-02 18:22:39 +08:00
822 lines
30 KiB
Python
822 lines
30 KiB
Python
import asyncio
|
||
import gc
|
||
import inspect
|
||
import multiprocessing
|
||
import threading
|
||
import traceback
|
||
from datetime import datetime, timedelta
|
||
from typing import List, Optional
|
||
|
||
import pytz
|
||
from apscheduler.executors.pool import ThreadPoolExecutor
|
||
from apscheduler.jobstores.base import JobLookupError
|
||
from apscheduler.schedulers.background import BackgroundScheduler
|
||
from apscheduler.triggers.cron import CronTrigger
|
||
|
||
from app import schemas
|
||
from app.chain import ChainBase
|
||
from app.chain.mediaserver import MediaServerChain
|
||
from app.chain.recommend import RecommendChain
|
||
from app.chain.site import SiteChain
|
||
from app.chain.subscribe import SubscribeChain
|
||
from app.chain.transfer import TransferChain
|
||
from app.chain.workflow import WorkflowChain
|
||
from app.core.config import settings, global_vars
|
||
from app.core.event import eventmanager
|
||
from app.core.plugin import PluginManager
|
||
from app.db.systemconfig_oper import SystemConfigOper
|
||
from app.helper.message import MessageHelper
|
||
from app.helper.sites import SitesHelper # noqa
|
||
from app.helper.image import WallpaperHelper
|
||
from app.log import logger
|
||
from app.schemas import Notification, NotificationType, Workflow
|
||
from app.schemas.types import EventType, SystemConfigKey
|
||
from app.utils.gc import get_memory_usage
|
||
from app.utils.mixins import ConfigReloadMixin
|
||
from app.utils.singleton import SingletonClass
|
||
from app.utils.timer import TimerUtils
|
||
|
||
lock = threading.Lock()
|
||
|
||
|
||
class SchedulerChain(ChainBase):
|
||
pass
|
||
|
||
|
||
class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
|
||
"""
|
||
定时任务管理
|
||
"""
|
||
CONFIG_WATCH = {
|
||
"DEV",
|
||
"COOKIECLOUD_INTERVAL",
|
||
"MEDIASERVER_SYNC_INTERVAL",
|
||
"SUBSCRIBE_SEARCH",
|
||
"SUBSCRIBE_SEARCH_INTERVAL",
|
||
"SUBSCRIBE_MODE",
|
||
"SUBSCRIBE_RSS_INTERVAL",
|
||
"SITEDATA_REFRESH_INTERVAL",
|
||
}
|
||
|
||
def __init__(self):
|
||
# 定时服务
|
||
self._scheduler = None
|
||
# 退出事件
|
||
self._event = threading.Event()
|
||
# 锁
|
||
self._lock = threading.RLock()
|
||
# 各服务的运行状态
|
||
self._jobs = {}
|
||
# 用户认证失败次数
|
||
self._auth_count = 0
|
||
# 用户认证失败消息发送
|
||
self._auth_message = False
|
||
# 初始化
|
||
self.init()
|
||
|
||
def on_config_changed(self):
|
||
self.init()
|
||
|
||
def get_reload_name(self):
|
||
return "定时服务"
|
||
|
||
def init(self):
|
||
"""
|
||
初始化定时服务
|
||
"""
|
||
|
||
# 停止定时服务
|
||
self.stop()
|
||
|
||
# 调试模式不启动定时服务
|
||
if settings.DEV:
|
||
return
|
||
|
||
with lock:
|
||
# 各服务的运行状态
|
||
self._jobs = {
|
||
"cookiecloud": {
|
||
"name": "同步CookieCloud站点",
|
||
"func": SiteChain().sync_cookies,
|
||
"running": False
|
||
},
|
||
"mediaserver_sync": {
|
||
"name": "同步媒体服务器",
|
||
"func": MediaServerChain().sync,
|
||
"running": False
|
||
},
|
||
"subscribe_tmdb": {
|
||
"name": "订阅元数据更新",
|
||
"func": SubscribeChain().check,
|
||
"running": False
|
||
},
|
||
"subscribe_search": {
|
||
"name": "订阅搜索补全",
|
||
"func": SubscribeChain().search,
|
||
"running": False,
|
||
"kwargs": {
|
||
"state": "R"
|
||
}
|
||
},
|
||
"new_subscribe_search": {
|
||
"name": "新增订阅搜索",
|
||
"func": SubscribeChain().search,
|
||
"running": False,
|
||
"kwargs": {
|
||
"state": "N"
|
||
}
|
||
},
|
||
"subscribe_refresh": {
|
||
"name": "订阅刷新",
|
||
"func": SubscribeChain().refresh,
|
||
"running": False
|
||
},
|
||
"subscribe_follow": {
|
||
"name": "关注的订阅分享",
|
||
"func": SubscribeChain().follow,
|
||
"running": False
|
||
},
|
||
"transfer": {
|
||
"name": "下载文件整理",
|
||
"func": TransferChain().process,
|
||
"running": False
|
||
},
|
||
"clear_cache": {
|
||
"name": "缓存清理",
|
||
"func": self.clear_cache,
|
||
"running": False
|
||
},
|
||
"user_auth": {
|
||
"name": "用户认证检查",
|
||
"func": self.user_auth,
|
||
"running": False
|
||
},
|
||
"scheduler_job": {
|
||
"name": "公共定时服务",
|
||
"func": SchedulerChain().scheduler_job,
|
||
"running": False
|
||
},
|
||
"random_wallpager": {
|
||
"name": "壁纸缓存",
|
||
"func": WallpaperHelper().get_wallpapers,
|
||
"running": False
|
||
},
|
||
"sitedata_refresh": {
|
||
"name": "站点数据刷新",
|
||
"func": SiteChain().refresh_userdatas,
|
||
"running": False
|
||
},
|
||
"recommend_refresh": {
|
||
"name": "推荐缓存",
|
||
"func": RecommendChain().refresh_recommend,
|
||
"running": False
|
||
},
|
||
"plugin_market_refresh": {
|
||
"name": "插件市场缓存",
|
||
"func": PluginManager().async_get_online_plugins,
|
||
"running": False,
|
||
"kwargs": {
|
||
"force": True
|
||
}
|
||
},
|
||
"subscribe_calendar_cache": {
|
||
"name": "订阅日历缓存",
|
||
"func": SubscribeChain().cache_calendar,
|
||
"running": False
|
||
},
|
||
"full_gc": {
|
||
"name": "主动内存回收",
|
||
"func": self.full_gc,
|
||
"running": False
|
||
}
|
||
}
|
||
|
||
# 创建定时服务
|
||
self._scheduler = BackgroundScheduler(timezone=settings.TZ,
|
||
executors={
|
||
'default': ThreadPoolExecutor(settings.CONF.scheduler)
|
||
})
|
||
|
||
# CookieCloud定时同步
|
||
if settings.COOKIECLOUD_INTERVAL \
|
||
and str(settings.COOKIECLOUD_INTERVAL).isdigit():
|
||
self._scheduler.add_job(
|
||
self.start,
|
||
"interval",
|
||
id="cookiecloud",
|
||
name="同步CookieCloud站点",
|
||
minutes=int(settings.COOKIECLOUD_INTERVAL),
|
||
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=5),
|
||
kwargs={
|
||
'job_id': 'cookiecloud'
|
||
}
|
||
)
|
||
|
||
# 媒体服务器同步
|
||
if settings.MEDIASERVER_SYNC_INTERVAL \
|
||
and str(settings.MEDIASERVER_SYNC_INTERVAL).isdigit():
|
||
self._scheduler.add_job(
|
||
self.start,
|
||
"interval",
|
||
id="mediaserver_sync",
|
||
name="同步媒体服务器",
|
||
hours=int(settings.MEDIASERVER_SYNC_INTERVAL),
|
||
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=10),
|
||
kwargs={
|
||
'job_id': 'mediaserver_sync'
|
||
}
|
||
)
|
||
|
||
# 新增订阅时搜索(5分钟检查一次)
|
||
self._scheduler.add_job(
|
||
self.start,
|
||
"interval",
|
||
id="new_subscribe_search",
|
||
name="新增订阅搜索",
|
||
minutes=5,
|
||
kwargs={
|
||
'job_id': 'new_subscribe_search'
|
||
}
|
||
)
|
||
|
||
# 检查更新订阅TMDB数据(每隔6小时)
|
||
self._scheduler.add_job(
|
||
self.start,
|
||
"interval",
|
||
id="subscribe_tmdb",
|
||
name="订阅元数据更新",
|
||
hours=6,
|
||
kwargs={
|
||
'job_id': 'subscribe_tmdb'
|
||
}
|
||
)
|
||
|
||
# 订阅状态每隔24小时搜索一次
|
||
if settings.SUBSCRIBE_SEARCH:
|
||
self._scheduler.add_job(
|
||
self.start,
|
||
"interval",
|
||
id="subscribe_search",
|
||
name="订阅搜索补全",
|
||
hours=settings.SUBSCRIBE_SEARCH_INTERVAL,
|
||
kwargs={
|
||
'job_id': 'subscribe_search'
|
||
}
|
||
)
|
||
|
||
if settings.SUBSCRIBE_MODE == "spider":
|
||
# 站点首页种子定时刷新模式
|
||
triggers = TimerUtils.random_scheduler(num_executions=32)
|
||
for trigger in triggers:
|
||
self._scheduler.add_job(
|
||
self.start,
|
||
"cron",
|
||
id=f"subscribe_refresh|{trigger.hour}:{trigger.minute}",
|
||
name="订阅刷新",
|
||
hour=trigger.hour,
|
||
minute=trigger.minute,
|
||
kwargs={
|
||
'job_id': 'subscribe_refresh'
|
||
})
|
||
else:
|
||
# RSS订阅模式
|
||
if not settings.SUBSCRIBE_RSS_INTERVAL \
|
||
or not str(settings.SUBSCRIBE_RSS_INTERVAL).isdigit():
|
||
settings.SUBSCRIBE_RSS_INTERVAL = 30
|
||
elif int(settings.SUBSCRIBE_RSS_INTERVAL) < 5:
|
||
settings.SUBSCRIBE_RSS_INTERVAL = 5
|
||
self._scheduler.add_job(
|
||
self.start,
|
||
"interval",
|
||
id="subscribe_refresh",
|
||
name="RSS订阅刷新",
|
||
minutes=int(settings.SUBSCRIBE_RSS_INTERVAL),
|
||
kwargs={
|
||
'job_id': 'subscribe_refresh'
|
||
}
|
||
)
|
||
|
||
# 关注订阅分享(每1小时)
|
||
self._scheduler.add_job(
|
||
self.start,
|
||
"interval",
|
||
id="subscribe_follow",
|
||
name="关注的订阅分享",
|
||
hours=1,
|
||
kwargs={
|
||
'job_id': 'subscribe_follow'
|
||
}
|
||
)
|
||
|
||
# 下载器文件转移(每5分钟)
|
||
self._scheduler.add_job(
|
||
self.start,
|
||
"interval",
|
||
id="transfer",
|
||
name="下载文件整理",
|
||
minutes=5,
|
||
kwargs={
|
||
'job_id': 'transfer'
|
||
}
|
||
)
|
||
|
||
# 后台刷新TMDB壁纸
|
||
self._scheduler.add_job(
|
||
self.start,
|
||
"interval",
|
||
id="random_wallpager",
|
||
name="壁纸缓存",
|
||
minutes=30,
|
||
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(seconds=1),
|
||
kwargs={
|
||
'job_id': 'random_wallpager'
|
||
}
|
||
)
|
||
|
||
# 公共定时服务
|
||
self._scheduler.add_job(
|
||
self.start,
|
||
"interval",
|
||
id="scheduler_job",
|
||
name="公共定时服务",
|
||
minutes=10,
|
||
kwargs={
|
||
'job_id': 'scheduler_job'
|
||
}
|
||
)
|
||
|
||
# 缓存清理服务,每隔24小时
|
||
self._scheduler.add_job(
|
||
self.start,
|
||
"interval",
|
||
id="clear_cache",
|
||
name="缓存清理",
|
||
hours=settings.CONF.meta / 3600,
|
||
kwargs={
|
||
'job_id': 'clear_cache'
|
||
}
|
||
)
|
||
|
||
# 定时检查用户认证,每隔10分钟
|
||
self._scheduler.add_job(
|
||
self.start,
|
||
"interval",
|
||
id="user_auth",
|
||
name="用户认证检查",
|
||
minutes=10,
|
||
kwargs={
|
||
'job_id': 'user_auth'
|
||
}
|
||
)
|
||
|
||
# 站点数据刷新
|
||
if settings.SITEDATA_REFRESH_INTERVAL:
|
||
self._scheduler.add_job(
|
||
self.start,
|
||
"interval",
|
||
id="sitedata_refresh",
|
||
name="站点数据刷新",
|
||
minutes=settings.SITEDATA_REFRESH_INTERVAL * 60,
|
||
kwargs={
|
||
'job_id': 'sitedata_refresh'
|
||
}
|
||
)
|
||
|
||
# 推荐缓存
|
||
self._scheduler.add_job(
|
||
self.start,
|
||
"interval",
|
||
id="recommend_refresh",
|
||
name="推荐缓存",
|
||
hours=24,
|
||
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(seconds=5),
|
||
kwargs={
|
||
'job_id': 'recommend_refresh'
|
||
}
|
||
)
|
||
|
||
# 插件市场缓存
|
||
self._scheduler.add_job(
|
||
self.start,
|
||
"interval",
|
||
id="plugin_market_refresh",
|
||
name="插件市场缓存",
|
||
minutes=30,
|
||
kwargs={
|
||
'job_id': 'plugin_market_refresh'
|
||
}
|
||
)
|
||
|
||
# 订阅日历缓存
|
||
self._scheduler.add_job(
|
||
self.start,
|
||
"interval",
|
||
id="subscribe_calendar_cache",
|
||
name="订阅日历缓存",
|
||
hours=6,
|
||
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=2),
|
||
kwargs={
|
||
'job_id': 'subscribe_calendar_cache'
|
||
}
|
||
)
|
||
|
||
# 主动内存回收
|
||
if settings.MEMORY_GC_INTERVAL:
|
||
self._scheduler.add_job(
|
||
self.start,
|
||
"interval",
|
||
id="full_gc",
|
||
name="主动内存回收",
|
||
minutes=settings.MEMORY_GC_INTERVAL,
|
||
kwargs={
|
||
'job_id': 'full_gc'
|
||
}
|
||
)
|
||
|
||
# 初始化工作流服务
|
||
self.init_workflow_jobs()
|
||
|
||
# 初始化插件服务
|
||
self.init_plugin_jobs()
|
||
|
||
# 启动定时服务
|
||
self._scheduler.start()
|
||
|
||
def __prepare_job(self, job_id: str) -> Optional[dict]:
|
||
"""
|
||
准备定时任务
|
||
"""
|
||
with self._lock:
|
||
job = self._jobs.get(job_id)
|
||
if not job:
|
||
return None
|
||
if job.get("running"):
|
||
logger.warning(f"定时任务 {job_id} - {job.get('name')} 正在运行 ...")
|
||
return None
|
||
self._jobs[job_id]["running"] = True
|
||
return job
|
||
|
||
def __finish_job(self, job_id: str):
|
||
"""
|
||
完成定时任务
|
||
"""
|
||
with self._lock:
|
||
try:
|
||
self._jobs[job_id]["running"] = False
|
||
except KeyError:
|
||
pass
|
||
|
||
def start(self, job_id: str, *args, **kwargs):
|
||
"""
|
||
启动定时服务
|
||
"""
|
||
|
||
def __start_coro(coro):
|
||
"""
|
||
启动协程
|
||
"""
|
||
return asyncio.run_coroutine_threadsafe(coro, global_vars.loop)
|
||
|
||
# 获取定时任务
|
||
job = self.__prepare_job(job_id)
|
||
if not job:
|
||
return
|
||
# 开始运行
|
||
try:
|
||
if not kwargs:
|
||
kwargs = job.get("kwargs") or {}
|
||
func = job.get("func")
|
||
if not func:
|
||
return
|
||
# 是否多进程运行
|
||
run_in_process = job.get("run_in_process", False)
|
||
if inspect.iscoroutinefunction(func):
|
||
# 协程函数
|
||
__start_coro(func(*args, **kwargs))
|
||
elif run_in_process:
|
||
# 多进程运行
|
||
p = multiprocessing.Process(target=func, args=args, kwargs=kwargs)
|
||
p.start()
|
||
p.join()
|
||
else:
|
||
# 普通函数
|
||
job["func"](*args, **kwargs)
|
||
except Exception as e:
|
||
logger.error(f"定时任务 {job.get('name')} 执行失败:{str(e)} - {traceback.format_exc()}")
|
||
MessageHelper().put(title=f"{job.get('name')} 执行失败",
|
||
message=str(e),
|
||
role="system")
|
||
eventmanager.send_event(
|
||
EventType.SystemError,
|
||
{
|
||
"type": "scheduler",
|
||
"scheduler_id": job_id,
|
||
"scheduler_name": job.get('name'),
|
||
"error": str(e),
|
||
"traceback": traceback.format_exc()
|
||
}
|
||
)
|
||
# 运行结束
|
||
self.__finish_job(job_id)
|
||
|
||
def init_plugin_jobs(self):
|
||
"""
|
||
初始化插件定时服务
|
||
"""
|
||
for pid in PluginManager().get_running_plugin_ids():
|
||
self.update_plugin_job(pid)
|
||
|
||
def init_workflow_jobs(self):
|
||
"""
|
||
初始化工作流定时服务
|
||
"""
|
||
for workflow in WorkflowChain().get_timer_workflows() or []:
|
||
self.update_workflow_job(workflow)
|
||
|
||
def remove_workflow_job(self, workflow: Workflow):
|
||
"""
|
||
移除工作流服务
|
||
"""
|
||
if not self._scheduler:
|
||
return
|
||
with self._lock:
|
||
job_id = f"workflow-{workflow.id}"
|
||
service = self._jobs.pop(job_id, {})
|
||
if not service:
|
||
return
|
||
try:
|
||
# 在调度器中查找并移除对应的 job
|
||
job_removed = False
|
||
for job in list(self._scheduler.get_jobs()):
|
||
if job_id == job.id:
|
||
try:
|
||
self._scheduler.remove_job(job.id)
|
||
job_removed = True
|
||
except JobLookupError:
|
||
pass
|
||
break
|
||
if job_removed:
|
||
logger.info(f"移除工作流服务:{service.get('name')}")
|
||
except Exception as e:
|
||
logger.error(f"移除工作流服务失败:{str(e)} - {job_id}: {service}")
|
||
SchedulerChain().messagehelper.put(title=f"工作流 {workflow.name} 服务移除失败",
|
||
message=str(e),
|
||
role="system")
|
||
|
||
def remove_plugin_job(self, pid: str, job_id: Optional[str] = None):
|
||
"""
|
||
移除定时服务,可以是单个服务(包括默认服务)或整个插件的所有服务
|
||
:param pid: 插件 ID
|
||
:param job_id: 可选,指定要移除的单个服务的 job_id。如果不提供,则移除该插件的所有服务,当移除单个服务时,默认服务也包含在内
|
||
"""
|
||
if not self._scheduler:
|
||
return
|
||
with self._lock:
|
||
if job_id:
|
||
# 移除单个服务
|
||
service = self._jobs.pop(job_id, None)
|
||
if not service:
|
||
return
|
||
jobs_to_remove = [(job_id, service)]
|
||
else:
|
||
# 移除插件的所有服务
|
||
jobs_to_remove = [
|
||
(job_id, service) for job_id, service in self._jobs.items() if service.get("pid") == pid
|
||
]
|
||
for job_id, _ in jobs_to_remove:
|
||
self._jobs.pop(job_id, None)
|
||
if not jobs_to_remove:
|
||
return
|
||
plugin_name = PluginManager().get_plugin_attr(pid, "plugin_name")
|
||
# 遍历移除任务
|
||
for job_id, service in jobs_to_remove:
|
||
try:
|
||
# 在调度器中查找并移除对应的 job
|
||
job_removed = False
|
||
for job in list(self._scheduler.get_jobs()):
|
||
job_id_from_service = job.id.split("|")[0]
|
||
if job_id == job_id_from_service:
|
||
try:
|
||
self._scheduler.remove_job(job.id)
|
||
job_removed = True
|
||
except JobLookupError:
|
||
pass
|
||
if job_removed:
|
||
logger.info(f"移除插件服务({plugin_name}):{service.get('name')}") # noqa
|
||
except Exception as e:
|
||
logger.error(f"移除插件服务失败:{str(e)} - {job_id}: {service}")
|
||
SchedulerChain().messagehelper.put(title=f"插件 {plugin_name} 服务移除失败",
|
||
message=str(e),
|
||
role="system")
|
||
|
||
def update_workflow_job(self, workflow: Workflow):
|
||
"""
|
||
更新工作流定时服务
|
||
"""
|
||
if not self._scheduler:
|
||
return
|
||
# 移除该工作流的全部服务
|
||
self.remove_workflow_job(workflow)
|
||
# 添加工作流服务
|
||
with self._lock:
|
||
try:
|
||
job_id = f"workflow-{workflow.id}"
|
||
self._jobs[job_id] = {
|
||
"func": WorkflowChain().process,
|
||
"name": workflow.name,
|
||
"provider_name": "工作流",
|
||
"running": False,
|
||
}
|
||
self._scheduler.add_job(
|
||
self.start,
|
||
trigger=CronTrigger.from_crontab(workflow.timer),
|
||
id=job_id,
|
||
name=workflow.name,
|
||
kwargs={"job_id": job_id, "workflow_id": workflow.id},
|
||
replace_existing=True
|
||
)
|
||
logger.info(f"注册工作流服务:{workflow.name} - {workflow.timer}")
|
||
except Exception as e:
|
||
logger.error(f"注册工作流服务失败:{workflow.name} - {str(e)}")
|
||
SchedulerChain().messagehelper.put(title=f"工作流 {workflow.name} 服务注册失败",
|
||
message=str(e),
|
||
role="system")
|
||
|
||
def update_plugin_job(self, pid: str):
|
||
"""
|
||
更新插件定时服务
|
||
"""
|
||
if not self._scheduler or not pid:
|
||
return
|
||
# 移除该插件的全部服务
|
||
self.remove_plugin_job(pid)
|
||
# 获取插件服务列表
|
||
with self._lock:
|
||
plugin_manager = PluginManager()
|
||
try:
|
||
plugin_services = plugin_manager.get_plugin_services(pid=pid)
|
||
except Exception as e:
|
||
logger.error(f"运行插件 {pid} 服务失败:{str(e)} - {traceback.format_exc()}")
|
||
return
|
||
# 获取插件名称
|
||
plugin_name = plugin_manager.get_plugin_attr(pid, "plugin_name")
|
||
# 开始注册插件服务
|
||
for service in plugin_services:
|
||
try:
|
||
sid = f"{pid}_{service['id']}"
|
||
job_id = sid.split("|")[0]
|
||
self.remove_plugin_job(pid, job_id)
|
||
self._jobs[job_id] = {
|
||
"func": service["func"],
|
||
"name": service["name"],
|
||
"pid": pid,
|
||
"provider_name": plugin_name,
|
||
"kwargs": service.get("func_kwargs") or {},
|
||
"running": False,
|
||
}
|
||
self._scheduler.add_job(
|
||
self.start,
|
||
service["trigger"],
|
||
id=sid,
|
||
name=service["name"],
|
||
**(service.get("kwargs") or {}),
|
||
kwargs={"job_id": job_id},
|
||
replace_existing=True
|
||
)
|
||
logger.info(f"注册插件{plugin_name}服务:{service['name']} - {service['trigger']}")
|
||
except Exception as e:
|
||
logger.error(f"注册插件{plugin_name}服务失败:{str(e)} - {service}")
|
||
SchedulerChain().messagehelper.put(title=f"插件 {plugin_name} 服务注册失败",
|
||
message=str(e),
|
||
role="system")
|
||
|
||
def list(self) -> List[schemas.ScheduleInfo]:
|
||
"""
|
||
当前所有任务
|
||
"""
|
||
if not self._scheduler:
|
||
return []
|
||
with self._lock:
|
||
# 返回计时任务
|
||
schedulers = []
|
||
# 去重
|
||
added = []
|
||
# 避免_scheduler.shutdown()处于阻塞状态导致的死锁
|
||
if not self._scheduler or not self._scheduler.running:
|
||
return []
|
||
jobs = self._scheduler.get_jobs()
|
||
# 按照下次运行时间排序
|
||
jobs.sort(key=lambda x: x.next_run_time)
|
||
# 将正在运行的任务提取出来 (保障一次性任务正常显示)
|
||
for job_id, service in self._jobs.items():
|
||
name = service.get("name")
|
||
provider_name = service.get("provider_name")
|
||
if service.get("running") and name and provider_name:
|
||
if job_id not in added:
|
||
added.append(job_id)
|
||
schedulers.append(schemas.ScheduleInfo(
|
||
id=job_id,
|
||
name=name,
|
||
provider=provider_name,
|
||
status="正在运行",
|
||
))
|
||
# 获取其他待执行任务
|
||
for job in jobs:
|
||
job_id = job.id.split("|")[0]
|
||
if job_id not in added:
|
||
added.append(job_id)
|
||
else:
|
||
continue
|
||
service = self._jobs.get(job_id)
|
||
if not service:
|
||
continue
|
||
# 任务状态
|
||
status = "正在运行" if service.get("running") else "等待"
|
||
# 下次运行时间
|
||
next_run = TimerUtils.time_difference(job.next_run_time)
|
||
schedulers.append(schemas.ScheduleInfo(
|
||
id=job_id,
|
||
name=job.name,
|
||
provider=service.get("provider_name", "[系统]"),
|
||
status=status,
|
||
next_run=next_run
|
||
))
|
||
return schedulers
|
||
|
||
def stop(self):
|
||
"""
|
||
关闭定时服务
|
||
"""
|
||
with lock:
|
||
try:
|
||
if self._scheduler:
|
||
logger.info("正在停止定时任务...")
|
||
self._event.set()
|
||
self._scheduler.remove_all_jobs()
|
||
if self._scheduler.running:
|
||
self._scheduler.shutdown()
|
||
self._scheduler = None
|
||
logger.info("定时任务停止完成")
|
||
except Exception as e:
|
||
logger.error(f"停止定时任务失败::{str(e)} - {traceback.format_exc()}")
|
||
|
||
@staticmethod
|
||
def clear_cache():
|
||
"""
|
||
清理缓存
|
||
"""
|
||
SchedulerChain().clear_cache()
|
||
|
||
@staticmethod
|
||
def full_gc():
|
||
"""
|
||
主动内存回收
|
||
"""
|
||
memory_before = get_memory_usage()
|
||
collected = gc.collect()
|
||
memory_after = get_memory_usage()
|
||
memory_freed = memory_before - memory_after
|
||
logger.info(f"主动内存回收完成,回收对象数: {collected},释放内存: {memory_freed:.2f} MB")
|
||
|
||
def user_auth(self):
|
||
"""
|
||
用户认证检查
|
||
"""
|
||
if SitesHelper().auth_level >= 2:
|
||
return
|
||
# 最大重试次数
|
||
__max_try__ = 30
|
||
if self._auth_count > __max_try__:
|
||
if not self._auth_message:
|
||
SchedulerChain().messagehelper.put(title=f"用户认证失败",
|
||
message="用户认证失败次数过多,将不再尝试认证!",
|
||
role="system")
|
||
self._auth_message = True
|
||
return
|
||
logger.info("用户未认证,正在尝试认证...")
|
||
auth_conf = SystemConfigOper().get(SystemConfigKey.UserSiteAuthParams)
|
||
if auth_conf:
|
||
status, msg = SitesHelper().check_user(**auth_conf)
|
||
else:
|
||
status, msg = SitesHelper().check_user()
|
||
if status:
|
||
self._auth_count = 0
|
||
logger.info(f"{msg} 用户认证成功")
|
||
SchedulerChain().post_message(
|
||
Notification(
|
||
mtype=NotificationType.Manual,
|
||
title="MoviePilot用户认证成功",
|
||
text=f"使用站点:{msg},如有插件使用异常,请重启MoviePilot。",
|
||
link=settings.MP_DOMAIN('#/site')
|
||
)
|
||
)
|
||
# 认证通过后重新初始化插件
|
||
PluginManager().init_config()
|
||
self.init_plugin_jobs()
|
||
|
||
else:
|
||
self._auth_count += 1
|
||
logger.error(f"用户认证失败,{msg},共失败 {self._auth_count} 次")
|
||
if self._auth_count >= __max_try__:
|
||
logger.error("用户认证失败次数过多,将不再尝试认证!")
|