From 494f809ef01906e039ce0b6076e841246d0a7ded Mon Sep 17 00:00:00 2001 From: jxxghp Date: Mon, 29 Jun 2026 07:07:33 +0800 Subject: [PATCH] Refine transfer history batch actions --- app/api/endpoints/dashboard.py | 48 ++++- app/chain/mediaserver.py | 73 ++++++- app/chain/recommend.py | 54 ++++- app/chain/site.py | 80 ++++++- app/chain/subscribe.py | 228 ++++++++++++++++++-- app/chain/torrents.py | 46 +++- app/chain/transfer.py | 46 +++- app/chain/workflow.py | 32 ++- app/core/plugin.py | 105 +++++++--- app/helper/progress.py | 48 +++-- app/scheduler.py | 314 ++++++++++++++++++++++++---- app/schemas/dashboard.py | 54 ++++- app/utils/system.py | 26 ++- docs/mcp-api.md | 3 + skills/moviepilot-api/SKILL.md | 4 +- tests/test_dashboard_system_info.py | 22 ++ tests/test_scheduler_progress.py | 154 ++++++++++++++ 17 files changed, 1194 insertions(+), 143 deletions(-) create mode 100644 tests/test_scheduler_progress.py diff --git a/app/api/endpoints/dashboard.py b/app/api/endpoints/dashboard.py index accef0b9..65da8194 100644 --- a/app/api/endpoints/dashboard.py +++ b/app/api/endpoints/dashboard.py @@ -177,6 +177,23 @@ async def schedule(_: Any = Depends(get_current_active_superuser)) -> Any: return Scheduler().list() +@router.get( + "/schedule/{job_id}/progress", + summary="后台服务进度", + response_model=schemas.Response, +) +async def schedule_progress( + job_id: str, _: Any = Depends(get_current_active_superuser) +) -> Any: + """ + 查询指定后台服务的执行进度。 + """ + progress = Scheduler().get_progress(job_id) + if not progress: + return schemas.Response(success=False, message="后台服务不存在") + return schemas.Response(success=True, data=progress.model_dump()) + + @router.get( "/schedule2", summary="后台服务(API_TOKEN)", @@ -189,6 +206,23 @@ async def schedule2(_: Annotated[str, Depends(verify_apitoken)]) -> Any: return Scheduler().list() +@router.get( + "/schedule2/{job_id}/progress", + summary="后台服务进度(API_TOKEN)", + response_model=schemas.Response, +) +async def schedule_progress2( + job_id: str, _: Annotated[str, Depends(verify_apitoken)] +) -> Any: + """ + 查询指定后台服务的执行进度 API_TOKEN认证(?token=xxx) + """ + progress = Scheduler().get_progress(job_id) + if not progress: + return schemas.Response(success=False, message="后台服务不存在") + return schemas.Response(success=True, data=progress.model_dump()) + + @router.get("/transfer", summary="文件整理统计", response_model=List[int]) async def transfer( days: Optional[int] = 7, @@ -218,22 +252,26 @@ def cpu2(_: Annotated[str, Depends(verify_apitoken)]) -> Any: return SystemUtils.cpu_usage() -@router.get("/memory", summary="获取当前内存使用量和使用率", response_model=List[int]) +@router.get( + "/memory", + summary="获取当前系统内存信息", + response_model=schemas.DashboardMemoryInfo, +) def memory(_: Any = Depends(get_current_active_superuser)) -> Any: """ - 获取当前内存使用率 + 获取当前系统内存信息 """ return SystemUtils.memory_usage() @router.get( "/memory2", - summary="获取当前内存使用量和使用率(API_TOKEN)", - response_model=List[int], + summary="获取当前系统内存信息(API_TOKEN)", + response_model=schemas.DashboardMemoryInfo, ) def memory2(_: Annotated[str, Depends(verify_apitoken)]) -> Any: """ - 获取当前内存使用率 API_TOKEN认证(?token=xxx) + 获取当前系统内存信息 API_TOKEN认证(?token=xxx) """ return SystemUtils.memory_usage() diff --git a/app/chain/mediaserver.py b/app/chain/mediaserver.py index 1718f044..ce937ea1 100644 --- a/app/chain/mediaserver.py +++ b/app/chain/mediaserver.py @@ -1,6 +1,6 @@ import threading from datetime import datetime -from typing import List, Union, Optional, Generator, Any +from typing import Callable, List, Union, Optional, Generator, Any from app.chain import ChainBase from app.core.config import global_vars @@ -191,13 +191,17 @@ class MediaServerChain(ChainBase): "mediaserver_image_cookies", server=server, image_url=image_url ) - def sync(self): + def sync(self, progress_callback: Optional[Callable[..., None]] = None) -> None: """ 同步媒体库所有数据到本地数据库 + + :param progress_callback: 定时服务进度更新回调 """ # 设置的媒体服务器 mediaservers = ServiceConfigHelper.get_mediaserver_configs() if not mediaservers: + if progress_callback: + progress_callback(value=100, text="未配置媒体服务器,跳过同步") return with lock: # 汇总统计 @@ -206,7 +210,19 @@ class MediaServerChain(ChainBase): enabled_servers = [mediaserver.name for mediaserver in mediaservers if mediaserver and mediaserver.enabled and mediaserver.name] dboper.delete_excluded_servers(enabled_servers) + total_servers = len(enabled_servers) + if progress_callback: + progress_callback( + value=0, + text=f"开始同步媒体服务器,共 {total_servers} 个 ...", + data={"total": total_servers, "finished": 0}, + ) + if not total_servers: + if progress_callback: + progress_callback(value=100, text="没有已启用的媒体服务器") + return # 遍历媒体服务器 + server_index = 0 for mediaserver in mediaservers: if not mediaserver: continue @@ -214,15 +230,36 @@ class MediaServerChain(ChainBase): if not mediaserver.enabled: logger.info(f"媒体服务器 {mediaserver.name} 未启用,跳过") continue + server_index += 1 server_name = mediaserver.name + if progress_callback: + progress_callback( + value=(server_index - 1) / total_servers * 100, + text=( + f"正在同步媒体服务器" + f"({server_index}/{total_servers}){server_name} ..." + ), + data={ + "total": total_servers, + "finished": server_index - 1, + "current": server_name, + }, + ) sync_libraries = mediaserver.sync_libraries or [] logger.info(f"开始同步媒体服务器 {server_name} 的数据 ...") libraries = self.librarys(server_name) if not libraries: logger.info(f"没有获取到媒体服务器 {server_name} 的媒体库,跳过") + if progress_callback: + progress_callback( + value=server_index / total_servers * 100, + text=f"媒体服务器 {server_name} 无可同步媒体库", + data={"total": total_servers, "finished": server_index}, + ) continue sync_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") - for library in libraries: + total_libraries = len(libraries) + for library_index, library in enumerate(libraries, start=1): if sync_libraries \ and "all" not in sync_libraries \ and str(library.id) not in sync_libraries: @@ -255,6 +292,36 @@ class MediaServerChain(ChainBase): logger.info(f"{server_name} 媒体库 {library.name} 同步完成,共同步数量:{library_count}") # 总数累加 total_count += library_count + if progress_callback: + progress_callback( + value=( + (server_index - 1 + library_index / total_libraries) + / total_servers + * 100 + ), + text=( + f"{server_name} 媒体库" + f"({library_index}/{total_libraries}){library.name} 同步完成" + ), + data={ + "total": total_servers, + "finished": server_index - 1, + "current": server_name, + "library_total": total_libraries, + "library_finished": library_index, + }, + ) stale_count = dboper.delete_stale(server=server_name, sync_time=sync_time) logger.info(f"媒体服务器 {server_name} 清理陈旧数据完成,删除数量:{stale_count}") logger.info(f"媒体服务器 {server_name} 数据同步完成,总同步数量:{total_count}") + if progress_callback: + progress_callback( + value=server_index / total_servers * 100, + text=( + f"媒体服务器({server_index}/{total_servers})" + f"{server_name} 同步完成" + ), + data={"total": total_servers, "finished": server_index}, + ) + if progress_callback: + progress_callback(value=100, text="媒体服务器同步完成") diff --git a/app/chain/recommend.py b/app/chain/recommend.py index ddec89cd..4b69ba2f 100644 --- a/app/chain/recommend.py +++ b/app/chain/recommend.py @@ -1,4 +1,4 @@ -from typing import List, Optional +from typing import Callable, List, Optional import pillow_avif # noqa 用于自动注册AVIF支持 @@ -27,11 +27,16 @@ class RecommendChain(ChainBase, metaclass=Singleton): # 推荐缓存区域 recommend_cache_region = "recommend" - def refresh_recommend(self, manual: bool = False): + def refresh_recommend( + self, + manual: bool = False, + progress_callback: Optional[Callable[..., None]] = None, + ) -> None: """ 刷新推荐 :param manual: 手动触发 + :param progress_callback: 定时服务进度更新回调 """ logger.debug("Starting to refresh Recommend data.") @@ -56,6 +61,14 @@ class RecommendChain(ChainBase, metaclass=Singleton): recommends = [] # 记录哪些方法已完成 methods_finished = set() + total_requests = len(recommend_methods) * self.cache_max_pages + finished_requests = 0 + if progress_callback: + progress_callback( + value=0, + text=f"开始刷新推荐缓存,共 {total_requests} 个数据分页 ...", + data={"total": total_requests, "finished": 0}, + ) # 这里避免区间内连续调用相同来源,因此遍历方案为每页遍历所有推荐来源,再进行页数遍历 for page in range(1, self.cache_max_pages + 1): for method in recommend_methods: @@ -67,6 +80,21 @@ class RecommendChain(ChainBase, metaclass=Singleton): # 手动触发的刷新,总是需要获取最新数据 with fresh(manual): data = method(page=page) + finished_requests += 1 + if progress_callback: + progress_callback( + value=finished_requests / total_requests * 90, + text=( + f"正在刷新推荐缓存" + f"({finished_requests}/{total_requests})..." + ), + data={ + "total": total_requests, + "finished": finished_requests, + "current": method.__name__, + "page": page, + }, + ) if not data: logger.debug("All recommendation methods have finished fetching data. Ending pagination early.") methods_finished.add(method) @@ -77,24 +105,40 @@ class RecommendChain(ChainBase, metaclass=Singleton): break # 缓存收集到的海报 - self.__cache_posters(recommends) + if progress_callback: + progress_callback(value=90, text="推荐数据刷新完成,正在缓存海报 ...") + self.__cache_posters(recommends, progress_callback=progress_callback) logger.debug("Recommend data refresh completed.") + if progress_callback: + progress_callback(value=100, text="推荐缓存刷新完成") - def __cache_posters(self, datas: List[dict]): + def __cache_posters( + self, + datas: List[dict], + progress_callback: Optional[Callable[..., None]] = None, + ) -> None: """ 提取 poster_path 并缓存图片 :param datas: 数据列表 + :param progress_callback: 定时服务进度更新回调 """ if not settings.GLOBAL_IMAGE_CACHE: return - for data in datas: + total_num = len(datas) + for index, data in enumerate(datas, start=1): if global_vars.is_system_stopped: return poster_path = data.get("poster_path") if poster_path: poster_url = poster_path.replace("original", "w500") self.__fetch_and_save_image(poster_url) + if progress_callback: + progress_callback( + value=90 + (index / total_num * 10 if total_num else 10), + text=f"正在缓存推荐海报({index}/{total_num})...", + data={"poster_total": total_num, "poster_finished": index}, + ) @staticmethod def __fetch_and_save_image(url: str): diff --git a/app/chain/site.py b/app/chain/site.py index a0c19322..e2833aa8 100644 --- a/app/chain/site.py +++ b/app/chain/site.py @@ -1,7 +1,7 @@ import base64 import re from datetime import datetime -from typing import List, Optional, Tuple, Union, Dict +from typing import Callable, List, Optional, Tuple, Union, Dict from urllib.parse import urljoin from app.helper.sites import SitesHelper # noqa @@ -103,24 +103,54 @@ class SiteChain(ChainBase): )) return userdata - def refresh_userdatas(self) -> Optional[Dict[str, SiteUserData]]: + def refresh_userdatas( + self, + progress_callback: Optional[Callable[..., None]] = None, + ) -> Optional[Dict[str, SiteUserData]]: """ 刷新所有站点的用户数据 + + :param progress_callback: 定时服务进度更新回调 """ any_site_updated = False result = {} - for site in SitesHelper().get_indexers(): + sites = [site for site in SitesHelper().get_indexers() if site.get("is_active")] + total_num = len(sites) + if progress_callback: + progress_callback( + value=0, + text=f"开始刷新站点数据,共 {total_num} 个站点 ...", + data={"total": total_num, "finished": 0}, + ) + for index, site in enumerate(sites, start=1): if global_vars.is_system_stopped: return None - if site.get("is_active"): - userdata = self.refresh_userdata(site) - if userdata: - any_site_updated = True - result[site.get("name")] = userdata + if progress_callback: + progress_callback( + value=(index - 1) / total_num * 100 if total_num else 100, + text=f"正在刷新站点数据({index}/{total_num}){site.get('name')} ...", + data={ + "total": total_num, + "finished": index - 1, + "current": site.get("id"), + }, + ) + userdata = self.refresh_userdata(site) + if userdata: + any_site_updated = True + result[site.get("name")] = userdata + if progress_callback: + progress_callback( + value=index / total_num * 100 if total_num else 100, + text=f"站点数据({index}/{total_num})刷新完成", + data={"total": total_num, "finished": index}, + ) if any_site_updated: eventmanager.send_event(EventType.SiteRefreshed, { "site_id": "*" }) + if progress_callback: + progress_callback(value=100, text="站点数据刷新完成") return result @@ -323,9 +353,16 @@ class SiteChain(ChainBase): del html return favicon_url, None - def sync_cookies(self, manual=False) -> Tuple[bool, str]: + def sync_cookies( + self, + manual: bool = False, + progress_callback: Optional[Callable[..., None]] = None, + ) -> Tuple[bool, str]: """ 通过CookieCloud同步站点Cookie + + :param manual: 是否手动同步 + :param progress_callback: 定时服务进度更新回调 """ def __indexer_domain(inx: dict, sub_domain: str) -> str: @@ -340,9 +377,13 @@ class SiteChain(ChainBase): return sub_domain logger.info("开始同步CookieCloud站点 ...") + if progress_callback: + progress_callback(value=0, text="开始下载 CookieCloud 数据 ...") cookies, msg = CookieCloudHelper().download() if not cookies: logger.error(f"CookieCloud同步失败:{msg}") + if progress_callback: + progress_callback(value=100, text=f"CookieCloud同步失败:{msg}") if manual: self.messagehelper.put(msg, title="CookieCloud同步失败", role="system") return False, msg @@ -353,11 +394,22 @@ class SiteChain(ChainBase): siteshelper = SitesHelper() siteoper = SiteOper() rsshelper = RssHelper() - for domain, cookie in cookies.items(): + total_num = len(cookies) + for index, (domain, cookie) in enumerate(cookies.items(), start=1): # 检查系统是否停止 if global_vars.is_system_stopped: logger.info("系统正在停止,中断CookieCloud同步") return False, "系统正在停止,同步被中断" + if progress_callback: + progress_callback( + value=(index - 1) / total_num * 100 if total_num else 100, + text=f"正在同步 CookieCloud 站点({index}/{total_num}){domain} ...", + data={ + "total": total_num, + "finished": index - 1, + "current": domain, + }, + ) # 索引器信息 indexer = siteshelper.get_indexer(domain) @@ -465,6 +517,12 @@ class SiteChain(ChainBase): eventmanager.send_event(EventType.SiteUpdated, { "domain": domain, }) + if progress_callback: + progress_callback( + value=index / total_num * 100 if total_num else 100, + text=f"CookieCloud 站点({index}/{total_num})同步完成", + data={"total": total_num, "finished": index}, + ) # 处理完成 ret_msg = f"更新了{_update_count}个站点,新增了{_add_count}个站点" if _fail_count > 0: @@ -472,6 +530,8 @@ class SiteChain(ChainBase): if manual: self.messagehelper.put(ret_msg, title="CookieCloud同步成功", role="system") logger.info(f"CookieCloud同步成功:{ret_msg}") + if progress_callback: + progress_callback(value=100, text=f"CookieCloud同步成功:{ret_msg}") return True, ret_msg @eventmanager.register(EventType.SiteUpdated) diff --git a/app/chain/subscribe.py b/app/chain/subscribe.py index ffb222a8..9dba27b1 100644 --- a/app/chain/subscribe.py +++ b/app/chain/subscribe.py @@ -5,7 +5,7 @@ import re import threading import time from datetime import datetime -from typing import Any, Dict, List, Optional, Union, Tuple +from typing import Any, Callable, Dict, List, Optional, Union, Tuple from app import schemas from app.chain import ChainBase @@ -979,12 +979,19 @@ class SubscribeChain(ChainBase): return True return False - def search(self, sid: Optional[int] = None, state: Optional[str] = 'N', manual: Optional[bool] = False): + def search( + self, + sid: Optional[int] = None, + state: Optional[str] = 'N', + manual: Optional[bool] = False, + progress_callback: Optional[Callable[..., None]] = None, + ) -> None: """ 订阅搜索 :param sid: 订阅ID,有值时只处理该订阅 :param state: 订阅状态 N:新建, R:订阅中, P:待定, S:暂停 :param manual: 是否手动搜索 + :param progress_callback: 定时服务进度更新回调 :return: 更新订阅状态为R或删除订阅 """ lock_acquired = False @@ -1002,12 +1009,29 @@ class SubscribeChain(ChainBase): subscribes = [subscribe] if subscribe else [] else: subscribes = subscribeoper.list(self.get_states_for_search(state)) + total_num = len(subscribes) + if progress_callback: + progress_callback( + value=0, + text=f"开始订阅搜索,共 {total_num} 个订阅 ...", + data={"total": total_num, "finished": 0}, + ) try: # 遍历订阅 - for subscribe in subscribes: + for index, subscribe in enumerate(subscribes, start=1): if global_vars.is_system_stopped: break + if progress_callback: + progress_callback( + value=(index - 1) / total_num * 100 if total_num else 100, + text=f"正在搜索订阅({index}/{total_num}){subscribe.name} ...", + data={ + "total": total_num, + "finished": index - 1, + "current": subscribe.id, + }, + ) mediakey = subscribe.tmdbid or subscribe.doubanid custom_word_list = subscribe.custom_words.split("\n") if subscribe.custom_words else None # 校验当前时间减订阅创建时间是否大于1分钟,否则跳过先,留出编辑订阅的时间 @@ -1021,6 +1045,10 @@ class SubscribeChain(ChainBase): if not sid and state in ['R', 'P']: sleep_time = random.randint(60, 300) logger.info(f'订阅搜索随机休眠 {sleep_time} 秒 ...') + if progress_callback: + progress_callback( + text=f"订阅搜索随机休眠 {sleep_time} 秒后继续 ..." + ) time.sleep(sleep_time) try: logger.info(f'开始搜索订阅,标题:{subscribe.name} ...') @@ -1168,6 +1196,12 @@ class SubscribeChain(ChainBase): # 如果状态为N则更新为R if subscribe and subscribe.state == 'N': subscribeoper.update(subscribe.id, {'state': 'R'}) + if progress_callback: + progress_callback( + value=index / total_num * 100 if total_num else 100, + text=f"订阅搜索({index}/{total_num})处理完成", + data={"total": total_num, "finished": index}, + ) # 手动触发时发送系统消息 if manual: @@ -1178,6 +1212,8 @@ class SubscribeChain(ChainBase): self.messagehelper.put('所有订阅搜索完成!', title="订阅搜索", role="system") else: self.messagehelper.put('没有找到订阅!', title="订阅搜索", role="system") + if progress_callback: + progress_callback(value=100, text="订阅搜索完成") finally: subscribes.clear() @@ -1294,17 +1330,54 @@ class SubscribeChain(ChainBase): elif not downloads: logger.info(f'{mediainfo.title_year} 继续洗版 ...') - def refresh(self): + def refresh(self, progress_callback: Optional[Callable[..., None]] = None) -> None: """ 订阅刷新 + + :param progress_callback: 定时服务进度更新回调 """ # 触发刷新站点资源,从缓存中匹配订阅 sites = self.get_subscribed_sites() if sites is None: + if progress_callback: + progress_callback(value=100, text="没有订阅需要刷新") return - self.match( - TorrentsChain().refresh(sites=sites) + def _update_refresh_progress( + value: Optional[float] = None, + text: Optional[str] = None, + data: Optional[dict] = None, + ) -> None: + """将站点刷新进度映射到订阅刷新的前半阶段。""" + if progress_callback: + progress_callback( + value=(value or 0) * 0.6, + text=text, + data=data, + ) + + def _update_match_progress( + value: Optional[float] = None, + text: Optional[str] = None, + data: Optional[dict] = None, + ) -> None: + """将订阅匹配进度映射到订阅刷新的后半阶段。""" + if progress_callback: + progress_callback( + value=60 + (value or 0) * 0.4, + text=text, + data=data, + ) + + torrents = TorrentsChain().refresh( + sites=sites, + progress_callback=_update_refresh_progress if progress_callback else None, ) + self.match( + torrents, + progress_callback=_update_match_progress if progress_callback else None, + ) + if progress_callback: + progress_callback(value=100, text="订阅刷新完成") @staticmethod def get_sub_sites(subscribe: Subscribe) -> List[int]: @@ -1349,14 +1422,26 @@ class SubscribeChain(ChainBase): return ret_sites - def match(self, torrents: Dict[str, List[Context]]): + def match( + self, + torrents: Dict[str, List[Context]], + progress_callback: Optional[Callable[..., None]] = None, + ) -> None: """ 从缓存中匹配订阅,并自动下载 + + :param torrents: 按站点分组的资源上下文 + :param progress_callback: 订阅匹配进度更新回调 """ if not torrents: logger.warn('没有缓存资源,无法匹配订阅') + if progress_callback: + progress_callback(value=100, text="没有缓存资源,跳过订阅匹配") return + if progress_callback: + progress_callback(value=0, text="正在预处理订阅资源 ...") + lock_acquired = False try: if lock_acquired := self._rlock.acquire( @@ -1409,10 +1494,32 @@ class SubscribeChain(ChainBase): # 所有订阅 subscribes = SubscribeOper().list(self.get_states_for_search('R')) + total_num = len(subscribes) + if progress_callback: + progress_callback( + value=20, + text=f"资源预处理完成,开始匹配 {total_num} 个订阅 ...", + data={"total": total_num, "finished": 0}, + ) try: - for subscribe in subscribes: + for index, subscribe in enumerate(subscribes, start=1): if global_vars.is_system_stopped: break + if progress_callback: + progress_callback( + value=20 + ( + (index - 1) / total_num * 80 if total_num else 80 + ), + text=( + f"正在匹配订阅({index}/{total_num})" + f"{subscribe.name} ..." + ), + data={ + "total": total_num, + "finished": index - 1, + "current": subscribe.id, + }, + ) logger.info(f'开始匹配订阅,标题:{subscribe.name} ...') mediakey = subscribe.tmdbid or subscribe.doubanid try: @@ -1693,22 +1800,48 @@ class SubscribeChain(ChainBase): del processed_torrents subscribes.clear() del subscribes + if progress_callback: + progress_callback( + value=100, + text="订阅资源匹配完成", + data={"total": total_num, "finished": total_num}, + ) finally: if lock_acquired: self._rlock.release() logger.debug(f"match Lock released at {datetime.now()}") - def check(self): + def check(self, progress_callback: Optional[Callable[..., None]] = None) -> None: """ 定时检查订阅,更新订阅信息 + + :param progress_callback: 定时服务进度更新回调 """ # 查询所有订阅 subscribeoper = SubscribeOper() + subscribes = subscribeoper.list() + total_num = len(subscribes) + if progress_callback: + progress_callback( + value=0, + text=f"开始更新订阅元数据,共 {total_num} 个订阅 ...", + data={"total": total_num, "finished": 0}, + ) # 遍历订阅 - for subscribe in subscribeoper.list(): + for index, subscribe in enumerate(subscribes, start=1): if global_vars.is_system_stopped: break logger.info(f'开始更新订阅元数据:{subscribe.name} ...') + if progress_callback: + progress_callback( + value=(index - 1) / total_num * 100 if total_num else 100, + text=f"正在更新订阅元数据({index}/{total_num}){subscribe.name} ...", + data={ + "total": total_num, + "finished": index - 1, + "current": subscribe.id, + }, + ) try: meta = build_subscribe_meta(subscribe) except ValueError: @@ -1773,6 +1906,14 @@ class SubscribeChain(ChainBase): subscribe.lack_episode = lack_episode subscribeoper.update(subscribe.id, update_data) logger.info(f'{subscribe.name} 订阅元数据更新完成') + if progress_callback: + progress_callback( + value=index / total_num * 100 if total_num else 100, + text=f"订阅元数据({index}/{total_num})更新完成", + data={"total": total_num, "finished": index}, + ) + if progress_callback: + progress_callback(value=100, text="订阅元数据更新完成") def get_subscribe_by_source(self, source: str) -> Optional[Subscribe]: """ @@ -1788,19 +1929,37 @@ class SubscribeChain(ChainBase): return SubscribeOper().get_by(**valid_fields) @staticmethod - def follow(): + def follow(progress_callback: Optional[Callable[..., None]] = None) -> None: """ 刷新follow的用户分享,并自动添加订阅 + + :param progress_callback: 定时服务进度更新回调 """ follow_users: List[str] = SystemConfigOper().get(SystemConfigKey.FollowSubscribers) if not follow_users: + if progress_callback: + progress_callback(value=100, text="未配置 Follow 订阅用户,跳过刷新") return logger.info(f'开始刷新follow用户分享订阅 ...') success_count = 0 subscribeoper = SubscribeOper() - for share_sub in MoviePilotServerHelper.get_subscribe_shares(): + share_subscribes = MoviePilotServerHelper.get_subscribe_shares() or [] + total_num = len(share_subscribes) + if progress_callback: + progress_callback( + value=0, + text=f"开始刷新 Follow 订阅分享,共 {total_num} 条 ...", + data={"total": total_num, "finished": 0}, + ) + for index, share_sub in enumerate(share_subscribes, start=1): if global_vars.is_system_stopped: break + if progress_callback: + progress_callback( + value=(index - 1) / total_num * 100 if total_num else 100, + text=f"正在处理 Follow 订阅分享({index}/{total_num})...", + data={"total": total_num, "finished": index - 1}, + ) uid = share_sub.get("share_uid") if uid and uid in follow_users: # 订阅已存在则跳过 @@ -1852,16 +2011,49 @@ class SubscribeChain(ChainBase): else: logger.error(f'follow用户分享订阅 {title} 添加失败:{message}') logger.info(f'follow用户分享订阅刷新完成,共添加 {success_count} 个订阅') + if progress_callback: + progress_callback( + value=100, + text=f"Follow 订阅分享刷新完成,新增 {success_count} 个订阅", + data={ + "total": total_num, + "finished": total_num, + "added": success_count, + }, + ) - async def cache_calendar(self): + async def cache_calendar( + self, + progress_callback: Optional[Callable[..., None]] = None, + ) -> None: """ 预缓存订阅日历,实际上就是查询一遍所有订阅的媒体信息 前端请示是异常的,所以需要使用异步缓存方法 + + :param progress_callback: 定时服务进度更新回调 """ logger.info(f'开始预缓存订阅日历 ...') - for subscribe in await SubscribeOper().async_list(): + subscribes = await SubscribeOper().async_list() + total_num = len(subscribes) + if progress_callback: + progress_callback( + value=0, + text=f"开始预缓存订阅日历,共 {total_num} 个订阅 ...", + data={"total": total_num, "finished": 0}, + ) + for index, subscribe in enumerate(subscribes, start=1): if global_vars.is_system_stopped: break + if progress_callback: + progress_callback( + value=(index - 1) / total_num * 100 if total_num else 100, + text=f"正在预缓存订阅日历({index}/{total_num}){subscribe.name} ...", + data={ + "total": total_num, + "finished": index - 1, + "current": subscribe.id, + }, + ) try: mtype = MediaType(subscribe.type) except ValueError: @@ -1887,7 +2079,15 @@ class SubscribeChain(ChainBase): logger.warn( f'未识别到季集信息,标题:{subscribe.name},tmdbid:{subscribe.tmdbid},豆瓣ID:{subscribe.doubanid},季:{subscribe.season}') continue + if progress_callback: + progress_callback( + value=index / total_num * 100 if total_num else 100, + text=f"订阅日历({index}/{total_num})预缓存完成", + data={"total": total_num, "finished": index}, + ) logger.info(f'订阅日历预缓存完成') + if progress_callback: + progress_callback(value=100, text="订阅日历预缓存完成") @staticmethod def __update_subscribe_note(subscribe: Subscribe, downloads: Optional[List[Context]]): diff --git a/app/chain/torrents.py b/app/chain/torrents.py index b5755d27..05065469 100644 --- a/app/chain/torrents.py +++ b/app/chain/torrents.py @@ -1,6 +1,6 @@ import re import traceback -from typing import Dict, List, Union, Optional +from typing import Callable, Dict, List, Union, Optional from app.helper.sites import SitesHelper # noqa @@ -192,11 +192,17 @@ class TorrentsChain(ChainBase): del rss_items return ret_torrents - def refresh(self, stype: Optional[str] = None, sites: List[int] = None) -> Dict[str, List[Context]]: + def refresh( + self, + stype: Optional[str] = None, + sites: List[int] = None, + progress_callback: Optional[Callable[..., None]] = None, + ) -> Dict[str, List[Context]]: """ 刷新站点最新资源,识别并缓存起来 :param stype: 强制指定缓存类型,spider:爬虫缓存,rss:rss缓存 :param sites: 强制指定站点ID列表,为空则读取设置的订阅站点 + :param progress_callback: 资源刷新进度更新回调 """ def __is_no_cache_site(_domain: str) -> bool: @@ -226,13 +232,34 @@ class TorrentsChain(ChainBase): # 需要刷新的站点domain domains = [] + indexers = [ + indexer for indexer in SitesHelper().get_indexers() + if not sites or indexer.get("id") in sites + ] + total_indexers = len(indexers) + if progress_callback: + progress_callback( + value=0, + text=f"开始刷新站点资源,共 {total_indexers} 个站点 ...", + data={"total": total_indexers, "finished": 0}, + ) # 遍历站点缓存资源 - for indexer in SitesHelper().get_indexers(): + for index, indexer in enumerate(indexers, start=1): if global_vars.is_system_stopped: break - # 未开启的站点不刷新 - if sites and indexer.get("id") not in sites: - continue + if progress_callback: + progress_callback( + value=(index - 1) / total_indexers * 100 if total_indexers else 100, + text=( + f"正在刷新站点资源({index}/{total_indexers})" + f"{indexer.get('name')} ..." + ), + data={ + "total": total_indexers, + "finished": index - 1, + "current": indexer.get("id"), + }, + ) domain = StringUtils.get_url_domain(indexer.get("domain")) domains.append(domain) if stype == "spider": @@ -335,6 +362,13 @@ class TorrentsChain(ChainBase): if sites and torrents_cache: torrents_cache = {k: v for k, v in torrents_cache.items() if k in domains} + if progress_callback: + progress_callback( + value=100, + text="站点资源刷新完成", + data={"total": total_indexers, "finished": total_indexers}, + ) + return torrents_cache @staticmethod diff --git a/app/chain/transfer.py b/app/chain/transfer.py index 895f8903..ca755575 100755 --- a/app/chain/transfer.py +++ b/app/chain/transfer.py @@ -1917,9 +1917,11 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): sample_files.append(item) return sample_files - def process(self) -> bool: + def process(self, progress_callback: Optional[Callable[..., None]] = None) -> bool: """ 获取下载器中的种子列表,并执行整理 + + :param progress_callback: 定时服务进度更新回调 """ # 全局锁,避免定时服务重复 with downloader_lock: @@ -1931,9 +1933,13 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): dir_info.monitor_type == "downloader" and dir_info.storage == "local" for dir_info in download_dirs ): + if progress_callback: + progress_callback(value=100, text="未配置下载器监控目录,跳过整理") return True logger.info("开始整理下载器中已经完成下载的文件 ...") + if progress_callback: + progress_callback(value=0, text="正在查询已完成下载任务 ...") # 从下载器获取种子列表 if torrents_list := self.list_torrents(status=TorrentStatus.TRANSFER): @@ -1951,14 +1957,38 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): if not torrents: logger.info("没有已完成下载但未整理的任务") + if progress_callback: + progress_callback(value=100, text="没有已完成下载但未整理的任务") return False logger.info(f"获取到 {len(torrents)} 个已完成的下载任务") + if progress_callback: + progress_callback( + value=0, + text=f"获取到 {len(torrents)} 个已完成下载任务", + data={"total": len(torrents), "finished": 0}, + ) try: - for torrent in torrents: + total_num = len(torrents) + for index, torrent in enumerate(torrents, start=1): if global_vars.is_system_stopped: break + if progress_callback: + torrent_name = ( + getattr(torrent, "title", None) + or getattr(torrent, "name", None) + or torrent.hash + ) + progress_callback( + value=(index - 1) / total_num * 100, + text=f"正在整理下载任务({index}/{total_num}){torrent_name} ...", + data={ + "total": total_num, + "finished": index - 1, + "current": torrent.hash, + }, + ) # 文件路径 file_path = torrent.path @@ -2022,9 +2052,15 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): extension=file_path.suffix.lstrip("."), ), mediainfo=mediainfo, - downloader=torrent.downloader, - download_hash=torrent.hash, - ) + downloader=torrent.downloader, + download_hash=torrent.hash, + ) + if progress_callback: + progress_callback( + value=index / total_num * 100, + text=f"下载任务({index}/{total_num})整理处理完成", + data={"total": total_num, "finished": index}, + ) finally: torrents.clear() diff --git a/app/chain/workflow.py b/app/chain/workflow.py index 65fad148..c47980eb 100644 --- a/app/chain/workflow.py +++ b/app/chain/workflow.py @@ -1155,11 +1155,16 @@ class WorkflowChain(ChainBase): self.process(workflow_id, from_begin=False) @staticmethod - def process(workflow_id: int, from_begin: Optional[bool] = True) -> Tuple[bool, str]: + def process( + workflow_id: int, + from_begin: Optional[bool] = True, + progress_callback: Optional[Callable[..., None]] = None, + ) -> Tuple[bool, str]: """ 处理工作流 :param workflow_id: 工作流ID :param from_begin: 是否从头开始,默认为True + :param progress_callback: 定时服务进度更新回调 """ workflowoper = WorkflowOper() @@ -1173,6 +1178,23 @@ class WorkflowChain(ChainBase): context=_serialize_workflow_context(context), execution_state=_serialize_workflow_value(execution_state) ) + if progress_callback: + runtime = execution_state.get("runtime") or {} + finished_actions = int(runtime.get("finished_actions") or 0) + total_actions = len(workflow.actions) + progress_callback( + value=finished_actions / total_actions * 100, + text=( + f"工作流动作({finished_actions}/{total_actions})" + f"{action.name or action.type or action.id} " + f"{'执行完成' if completed else '执行中'}" + ), + data={ + "total": total_actions, + "finished": finished_actions, + "current": action.id, + }, + ) # 重置工作流 if from_begin: @@ -1191,6 +1213,12 @@ class WorkflowChain(ChainBase): return False, "工作流无流程" logger.info(f"开始执行工作流 {workflow.name},共 {len(workflow.actions)} 个动作 ...") + if progress_callback: + progress_callback( + value=0, + text=f"开始执行工作流 {workflow.name} ...", + data={"total": len(workflow.actions), "finished": 0}, + ) workflowoper.start(workflow_id) # 执行工作流 @@ -1207,6 +1235,8 @@ class WorkflowChain(ChainBase): return False, executor.errmsg logger.info(f"工作流 {workflow.name} 执行完成") workflowoper.success(workflow_id) + if progress_callback: + progress_callback(value=100, text=f"工作流 {workflow.name} 执行完成") return True, "" @staticmethod diff --git a/app/core/plugin.py b/app/core/plugin.py index c6539593..fbbe151a 100644 --- a/app/core/plugin.py +++ b/app/core/plugin.py @@ -1624,60 +1624,99 @@ class PluginManager(ConfigReloadMixin, metaclass=Singleton): return " ".join(normalized_labels) or None return None - async def async_get_online_plugins(self, force: bool = False) -> List[schemas.Plugin]: + async def async_get_online_plugins( + self, + force: bool = False, + progress_callback: Optional[Callable[..., None]] = None, + ) -> List[schemas.Plugin]: """ 异步获取所有在线插件信息 :param force: 是否强制刷新(忽略缓存) + :param progress_callback: 定时服务进度更新回调 """ if not settings.PLUGIN_MARKET: + if progress_callback: + progress_callback(value=100, text="未配置插件市场,跳过刷新") return [] - # 用于存储高于 v1 版本的插件(如 v2, v3 等) + async def fetch_market( + market: str, + package_version: Optional[str], + result_version: str, + task_index: int, + ) -> Tuple[int, str, List[schemas.Plugin]]: + """ + 获取单个市场版本的插件列表并保留结果分组。 + """ + plugins = await self.async_get_plugins_from_market( + market, + package_version, + force, + ) + return task_index, result_version, plugins or [] + higher_version_plugins = [] - # 用于存储 v1 版本插件 base_version_plugins = [] - - # 使用异步并发获取线上插件 - import asyncio tasks = [] - task_to_version = {} - for m in settings.PLUGIN_MARKET.split(","): - if not m: + for market in settings.PLUGIN_MARKET.split(","): + if not market: continue - # 创建任务获取 v1 版本插件 - base_task = asyncio.create_task(self.async_get_plugins_from_market(m, None, force)) - tasks.append(base_task) - task_to_version[base_task] = "base_version" - - # 创建任务获取高版本插件(如 v2、v3) + tasks.append( + asyncio.create_task( + fetch_market(market, None, "base_version", len(tasks)) + ) + ) if settings.VERSION_FLAG: - higher_version_task = asyncio.create_task( - self.async_get_plugins_from_market(m, settings.VERSION_FLAG, force)) - tasks.append(higher_version_task) - task_to_version[higher_version_task] = "higher_version" + tasks.append( + asyncio.create_task( + fetch_market( + market, + settings.VERSION_FLAG, + "higher_version", + len(tasks), + ) + ) + ) - # 并发执行所有任务 if tasks: - completed_tasks = await asyncio.gather(*tasks, return_exceptions=True) - for i, result in enumerate(completed_tasks): - task = tasks[i] - version = task_to_version[task] - - # 检查是否有异常 - if isinstance(result, Exception): - logger.error(f"获取插件市场数据失败:{str(result)}") - continue - - plugins = result + total_tasks = len(tasks) + finished_tasks = 0 + task_results = {} + if progress_callback: + progress_callback( + value=0, + text=f"开始刷新插件市场,共 {total_tasks} 个请求 ...", + data={"total": total_tasks, "finished": 0}, + ) + for completed_task in asyncio.as_completed(tasks): + try: + task_index, version, plugins = await completed_task + task_results[task_index] = (version, plugins) + except Exception as err: + logger.error(f"获取插件市场数据失败:{str(err)}") + finished_tasks += 1 + if progress_callback: + progress_callback( + value=finished_tasks / total_tasks * 100, + text=( + f"插件市场请求" + f"({finished_tasks}/{total_tasks})处理完成" + ), + data={"total": total_tasks, "finished": finished_tasks}, + ) + for task_index in sorted(task_results): + version, plugins = task_results[task_index] if plugins: if version == "higher_version": - higher_version_plugins.extend(plugins) # 收集高版本插件 + higher_version_plugins.extend(plugins) else: - base_version_plugins.extend(plugins) # 收集 v1 版本插件 + base_version_plugins.extend(plugins) result = self.process_plugins_list(higher_version_plugins, base_version_plugins) logger.info(f"获取到 {len(result)} 个线上插件") + if progress_callback: + progress_callback(value=100, text="插件市场缓存刷新完成") return result async def async_get_plugins_from_market(self, market: str, diff --git a/app/helper/progress.py b/app/helper/progress.py index 04d06592..ef4ffef1 100644 --- a/app/helper/progress.py +++ b/app/helper/progress.py @@ -10,13 +10,13 @@ class ProgressHelper: 处理进度辅助类 """ - def __init__(self, key: Union[ProgressKey, str]): + def __init__(self, key: Union[ProgressKey, str]) -> None: if isinstance(key, Enum): key = key.value self._key = key self._progress = TTLCache(region="progress", maxsize=1024, ttl=24 * 60 * 60) - def __reset(self): + def __reset(self) -> None: """ 重置进度 """ @@ -27,7 +27,7 @@ class ProgressHelper: "data": {} } - def start(self): + def start(self) -> None: """ 开始进度 """ @@ -38,38 +38,52 @@ class ProgressHelper: current['enable'] = True self._progress[self._key] = current - def end(self): + def end( + self, + text: Optional[str] = "", + data: Optional[dict] = None, + value: Optional[Union[float, int]] = 100, + ) -> None: """ 结束进度 """ current = self._progress.get(self._key) if not current: return - current.update( - { - "enable": False, - "value": 100, - "text": "" - } - ) + if data is not None: + if not current.get('data'): + current['data'] = {} + current['data'].update(data) + current["enable"] = False + if value is not None: + current["value"] = max(min(float(value), 100), 0) + current["text"] = text or "" self._progress[self._key] = current - def update(self, value: Union[float, int] = None, text: Optional[str] = None, data: dict = None): + def update( + self, + value: Optional[Union[float, int]] = None, + text: Optional[str] = None, + data: Optional[dict] = None, + ) -> None: """ 更新进度 """ current = self._progress.get(self._key) if not current or not current.get('enable'): return - if value: - current['value'] = value - if text: + if value is not None: + current['value'] = max(min(float(value), 100), 0) + if text is not None: current['text'] = text - if data: + if data is not None: if not current.get('data'): current['data'] = {} current['data'].update(data) self._progress[self._key] = current - def get(self) -> dict: + def get(self) -> Optional[dict]: + """ + 获取当前进度 + """ return self._progress.get(self._key) diff --git a/app/scheduler.py b/app/scheduler.py index a6f89f09..af1c20a0 100644 --- a/app/scheduler.py +++ b/app/scheduler.py @@ -35,6 +35,7 @@ from app.db.models.transferhistory import TransferHistory from app.db.systemconfig_oper import SystemConfigOper from app.helper.image import WallpaperHelper from app.helper.message import MessageHelper +from app.helper.progress import ProgressHelper from app.helper.sites import SitesHelper # noqa from app.helper.server import MoviePilotServerHelper from app.log import logger @@ -46,6 +47,7 @@ from app.utils.singleton import SingletonClass from app.utils.timer import TimerUtils lock = threading.Lock() +SCHEDULER_PROGRESS_PREFIX = "scheduler" class SchedulerChain(ChainBase): @@ -55,7 +57,11 @@ class SchedulerChain(ChainBase): # 每批处理的记录数,避免一次性删除过多数据导致性能问题 DEFAULT_BATCH_SIZE = 500 - def cleanup(self, batch_size: Optional[int] = None) -> Dict[str, Any]: + def cleanup( + self, + batch_size: Optional[int] = None, + progress_callback: Optional[Callable[..., None]] = None, + ) -> Dict[str, Any]: """ 按配置保留期执行分批清理。 """ @@ -80,9 +86,12 @@ class SchedulerChain(ChainBase): errors = [] plans = self._build_cleanup_plans(started_at=started_at, batch_size=batch_size) + total_plans = len(plans) + if progress_callback: + progress_callback(value=0, text="开始清理数据表 ...") with SessionFactory() as db: - for plan in plans: + for plan_index, plan in enumerate(plans): name = plan["name"] retention_days = plan["retention_days"] if retention_days <= 0: @@ -94,9 +103,19 @@ class SchedulerChain(ChainBase): "skipped": True, "reason": "retention_days<=0", } + if progress_callback: + progress_callback( + value=(plan_index + 1) / total_plans * 100, + text=f"数据表 {name} 跳过清理", + ) continue try: + if progress_callback: + progress_callback( + value=plan_index / total_plans * 100, + text=f"正在清理数据表 {name} ...", + ) table_report = self._cleanup_in_batches( db=db, table_name=name, @@ -116,6 +135,12 @@ class SchedulerChain(ChainBase): "retention_days": retention_days, "error": str(err), } + finally: + if progress_callback: + progress_callback( + value=(plan_index + 1) / total_plans * 100, + text=f"数据表 {name} 清理处理完成", + ) if errors: report["errors"] = errors @@ -284,13 +309,33 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): # 初始化 self.init() - def on_config_changed(self): + def on_config_changed(self) -> None: + """ + 配置变更后重新初始化定时服务。 + """ self.init() - def get_reload_name(self): + def get_reload_name(self) -> str: + """ + 获取配置重载日志中的服务名称。 + """ return "定时服务" - def init(self): + @staticmethod + def _get_progress_key(job_id: str) -> str: + """ + 获取定时服务进度缓存键。 + """ + return f"{SCHEDULER_PROGRESS_PREFIX}:{job_id}" + + @staticmethod + def _format_time(value: Optional[datetime] = None) -> str: + """ + 格式化进度事件时间。 + """ + return (value or datetime.now()).strftime("%Y-%m-%d %H:%M:%S") + + def init(self) -> None: """ 初始化定时服务 """ @@ -675,6 +720,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): """ 准备定时任务 """ + started_at = self._format_time() with self._lock: job = self._jobs.get(job_id) if not job: @@ -683,45 +729,242 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): logger.warning(f"定时任务 {job_id} - {job.get('name')} 正在运行 ...") return None self._jobs[job_id]["running"] = True + self._jobs[job_id]["last_started_at"] = started_at + self._jobs[job_id]["last_finished_at"] = None + self._jobs[job_id]["last_error"] = None + progress = ProgressHelper(self._get_progress_key(job_id)) + progress.start() + progress.update( + value=0, + text=f"{job.get('name') or job_id} 开始执行 ...", + data={ + "id": job_id, + "name": job.get("name"), + "provider": job.get("provider_name", "[系统]"), + "status": "running", + "success": None, + "started_at": started_at, + "finished_at": None, + "error": None, + }, + ) return job - def __finish_job(self, job_id: str): + def __finish_job( + self, + job_id: str, + success: bool = True, + error: Optional[str] = None, + ) -> None: """ 完成定时任务 """ + finished_at = self._format_time() + job = None with self._lock: - try: - self._jobs[job_id]["running"] = False - except KeyError: - pass + job = self._jobs.get(job_id) + if job: + job["running"] = False + job["last_finished_at"] = finished_at + job["last_error"] = error + job_name = job.get("name") if job else job_id + progress = ProgressHelper(self._get_progress_key(job_id)) + current_progress = progress.get() or {} + progress_value = 100 if success else current_progress.get("value", 0) + progress.end( + text=f"{job_name} {'执行完成' if success else '执行失败'}", + data={ + "id": job_id, + "name": job_name, + "provider": job.get("provider_name", "[系统]") if job else None, + "status": "success" if success else "failed", + "success": success, + "finished_at": finished_at, + "error": error, + }, + value=progress_value, + ) - def start(self, job_id: str, *args, **kwargs): + def get_progress(self, job_id: str) -> Optional[schemas.ScheduleProgress]: + """ + 查询指定定时服务的执行进度。 + """ + if not job_id: + return None + with self._lock: + job = self._jobs.get(job_id) + job_name = job.get("name") if job else job_id + provider_name = job.get("provider_name", "[系统]") if job else None + running = bool(job.get("running")) if job else False + last_started_at = job.get("last_started_at") if job else None + last_finished_at = job.get("last_finished_at") if job else None + last_error = job.get("last_error") if job else None + detail = ProgressHelper(self._get_progress_key(job_id)).get() or {} + if not job and not detail: + return None + data = detail.get("data") or {} + value = detail.get("value", 0) + try: + value = float(value) + except (TypeError, ValueError): + value = 0.0 + return schemas.ScheduleProgress( + id=job_id, + name=data.get("name") or job_name, + provider=data.get("provider") or provider_name, + enable=bool(detail.get("enable", running)), + value=max(min(value, 100), 0), + text=detail.get("text"), + status=data.get("status") or ("running" if running else "waiting"), + success=data.get("success"), + started_at=data.get("started_at") or last_started_at, + finished_at=data.get("finished_at") or last_finished_at, + error=data.get("error") or last_error, + data=data, + ) + + def __handle_job_error(self, job_id: str, job: dict, error: Exception) -> None: + """ + 记录定时任务执行异常并发送系统错误事件。 + """ + logger.error( + f"定时任务 {job.get('name')} 执行失败:{str(error)} - {traceback.format_exc()}" + ) + MessageHelper().put( + title=f"{job.get('name')} 执行失败", message=str(error), role="system" + ) + eventmanager.send_event( + EventType.SystemError, + { + "type": "scheduler", + "scheduler_id": job_id, + "scheduler_name": job.get("name"), + "error": str(error), + "traceback": traceback.format_exc(), + }, + ) + + def __build_progress_callback(self, job_id: str, job: dict) -> Callable[..., None]: + """ + 构建传递给定时任务内部的进度更新回调。 + """ + + def update_progress( + value: Optional[float] = None, + text: Optional[str] = None, + data: Optional[dict] = None, + ) -> None: + """ + 更新当前定时任务进度。 + """ + progress_data = { + "id": job_id, + "name": job.get("name"), + "provider": job.get("provider_name", "[系统]"), + "status": "running", + "success": None, + } + if data: + progress_data.update(data) + ProgressHelper(self._get_progress_key(job_id)).update( + value=value, + text=text, + data=progress_data, + ) + + return update_progress + + @staticmethod + def __supports_progress_callback(func: Callable[..., Any]) -> bool: + """ + 判断定时任务函数是否显式支持进度回调参数。 + """ + try: + parameters = inspect.signature(func).parameters + except (TypeError, ValueError): + return False + return "progress_callback" in parameters + + @staticmethod + def __get_result_error(result: Any) -> Optional[str]: + """ + 从定时任务标准失败返回值中提取错误信息。 + """ + if ( + isinstance(result, tuple) + and result + and isinstance(result[0], bool) + and result[0] is False + ): + return str(result[1]) if len(result) > 1 and result[1] else "定时任务返回失败" + return None + + async def __run_coro_job(self, coro, job_id: str, job: dict) -> None: + """ + 在当前事件循环内执行协程定时任务并在真实完成后收敛状态。 + """ + success = True + error = None + try: + result = await coro + error = self.__get_result_error(result) + success = error is None + except Exception as err: + success = False + error = str(err) + self.__handle_job_error(job_id=job_id, job=job, error=err) + finally: + self.__finish_job(job_id=job_id, success=success, error=error) + + def start(self, job_id: str, *args, **kwargs) -> None: """ 启动定时服务 """ - def __start_coro(coro): + def __start_coro(coro) -> bool: """ - 启动协程 + 启动协程,返回是否由异步回调自行收敛任务状态。 """ - return asyncio.run_coroutine_threadsafe(coro, global_vars.loop) + try: + running_loop = asyncio.get_running_loop() + except RuntimeError: + running_loop = None + target_loop = global_vars.loop + if running_loop: + asyncio.create_task(self.__run_coro_job(coro=coro, job_id=job_id, job=job)) + return True + if target_loop and target_loop.is_running(): + asyncio.run_coroutine_threadsafe( + self.__run_coro_job(coro=coro, job_id=job_id, job=job), + target_loop, + ) + return True + asyncio.run(coro) + return False # 获取定时任务 job = self.__prepare_job(job_id) if not job: return + success = True + error = None + deferred_finish = False # 开始运行 try: if not kwargs: - kwargs = job.get("kwargs") or {} + kwargs = dict(job.get("kwargs") or {}) func = job.get("func") if not func: return + if self.__supports_progress_callback(func) and "progress_callback" not in kwargs: + kwargs["progress_callback"] = self.__build_progress_callback( + job_id=job_id, job=job + ) # 是否多进程运行 run_in_process = job.get("run_in_process", False) if inspect.iscoroutinefunction(func): # 协程函数 - __start_coro(func(*args, **kwargs)) + deferred_finish = __start_coro(func(*args, **kwargs)) elif run_in_process: # 多进程运行 p = multiprocessing.Process(target=func, args=args, kwargs=kwargs) @@ -729,26 +972,17 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): p.join() else: # 普通函数 - job["func"](*args, **kwargs) + result = func(*args, **kwargs) + error = self.__get_result_error(result) + success = error is None except Exception as e: - logger.error( - f"定时任务 {job.get('name')} 执行失败:{str(e)} - {traceback.format_exc()}" - ) - MessageHelper().put( - title=f"{job.get('name')} 执行失败", message=str(e), role="system" - ) - eventmanager.send_event( - EventType.SystemError, - { - "type": "scheduler", - "scheduler_id": job_id, - "scheduler_name": job.get("name"), - "error": str(e), - "traceback": traceback.format_exc(), - }, - ) - # 运行结束 - self.__finish_job(job_id) + success = False + error = str(e) + self.__handle_job_error(job_id=job_id, job=job, error=e) + finally: + if not deferred_finish: + # 运行结束 + self.__finish_job(job_id=job_id, success=success, error=error) def init_plugin_jobs(self): """ @@ -961,12 +1195,17 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): if service.get("running") and name and provider_name: if job_id not in added: added.append(job_id) + progress = self.get_progress(job_id) schedulers.append( schemas.ScheduleInfo( id=job_id, name=name, provider=provider_name, status="正在运行", + progress=progress.value if progress else 0, + progress_text=progress.text if progress else None, + progress_enable=progress.enable if progress else False, + progress_detail=progress, ) ) # 获取其他待执行任务 @@ -983,6 +1222,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): status = "正在运行" if service.get("running") else "等待" # 下次运行时间 next_run = TimerUtils.time_difference(job.next_run_time) + progress = self.get_progress(job_id) schedulers.append( schemas.ScheduleInfo( id=job_id, @@ -990,6 +1230,10 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): provider=service.get("provider_name", "[系统]"), status=status, next_run=next_run, + progress=progress.value if progress else 0, + progress_text=progress.text if progress else None, + progress_enable=progress.enable if progress else False, + progress_detail=progress, ) ) return schedulers diff --git a/app/schemas/dashboard.py b/app/schemas/dashboard.py index d3631656..33786e27 100644 --- a/app/schemas/dashboard.py +++ b/app/schemas/dashboard.py @@ -1,6 +1,6 @@ from typing import Optional -from pydantic import BaseModel +from pydantic import BaseModel, Field class Statistic(BaseModel): @@ -65,6 +65,50 @@ class DownloaderInfo(BaseModel): free_space: Optional[float] = 0.0 +class DashboardMemoryInfo(BaseModel): + """仪表板系统内存统计。""" + + # 总内存字节数 + total: int = 0 + # 已使用内存字节数,不包含缓存 + used: int = 0 + # 缓存与缓冲区占用字节数 + cached: int = 0 + # 可用内存字节数 + available: int = 0 + # 已使用内存占总内存百分比,不包含缓存 + usage: float = 0.0 + + +class ScheduleProgress(BaseModel): + """后台服务执行进度信息。""" + + # ID + id: Optional[str] = None + # 名称 + name: Optional[str] = None + # 提供者 + provider: Optional[str] = None + # 是否正在执行 + enable: Optional[bool] = False + # 当前完成百分比 + value: Optional[float] = 0.0 + # 当前进度文本 + text: Optional[str] = None + # 执行状态 waiting/running/success/failed + status: Optional[str] = None + # 最近一次执行是否成功 + success: Optional[bool] = None + # 最近一次开始时间 + started_at: Optional[str] = None + # 最近一次结束时间 + finished_at: Optional[str] = None + # 最近一次错误信息 + error: Optional[str] = None + # 扩展数据 + data: Optional[dict] = Field(default_factory=dict) + + class ScheduleInfo(BaseModel): """仪表板后台服务信息。""" @@ -78,6 +122,14 @@ class ScheduleInfo(BaseModel): status: Optional[str] = None # 下次执行时间 next_run: Optional[str] = None + # 当前完成百分比 + progress: Optional[float] = 0.0 + # 进度文本 + progress_text: Optional[str] = None + # 是否正在更新进度 + progress_enable: Optional[bool] = False + # 进度详情 + progress_detail: Optional[ScheduleProgress] = None class DashboardSystemInfo(BaseModel): diff --git a/app/utils/system.py b/app/utils/system.py index b30cedfe..d0958628 100644 --- a/app/utils/system.py +++ b/app/utils/system.py @@ -687,15 +687,27 @@ class SystemUtils: return psutil.cpu_percent() @staticmethod - def memory_usage() -> List[int]: + def memory_usage() -> schemas.DashboardMemoryInfo: """ - 获取当前程序的内存使用量和使用率 + 获取系统已使用、缓存、可用和总内存信息。 """ - current_process = psutil.Process() - process_memory = current_process.memory_info().rss - system_memory = psutil.virtual_memory().total - process_memory_percent = (process_memory / system_memory) * 100 - return [process_memory, int(process_memory_percent)] + memory = psutil.virtual_memory() + total = max(0, int(memory.total)) + used = max(0, int(memory.used)) + cached = max( + 0, + int(getattr(memory, "cached", 0) or 0) + + int(getattr(memory, "buffers", 0) or 0), + ) + available = max(0, total - used - cached) + usage = used / total * 100 if total else 0.0 + return schemas.DashboardMemoryInfo( + total=total, + used=used, + cached=cached, + available=available, + usage=usage, + ) @staticmethod def network_usage() -> List[int]: diff --git a/docs/mcp-api.md b/docs/mcp-api.md index 4513d23b..9ce801c4 100644 --- a/docs/mcp-api.md +++ b/docs/mcp-api.md @@ -114,6 +114,9 @@ MoviePilot 也提供普通 REST API 给前端和自动化客户端使用。所 | :--- | :--- | :--- | | GET | `/api/v1/system/ping` | 登录用户服务存活检测,用于前端重启后轮询恢复状态 | | GET | `/api/v1/dashboard/system` | 查询仪表板系统摘要,包括主机名称、操作系统、MoviePilot 运行时间和后端版本 | +| GET | `/api/v1/dashboard/schedule` | 查询所有后台定时服务,包含当前完成百分比、进度文本和执行状态 | +| GET | `/api/v1/dashboard/schedule/{job_id}/progress` | 查询指定后台定时服务的实时进度详情 | +| GET | `/api/v1/dashboard/schedule2/{job_id}/progress` | 使用 API_TOKEN 查询指定后台定时服务的实时进度详情 | | GET | `/api/v1/system/setting/public/{key}` | 登录用户读取白名单内非敏感系统设置,仅支持目录、存储、站点范围、默认订阅规则、Follow 订阅者和插件市场地址等前端必需配置 | | POST | `/api/v1/system/setting/PLUGIN_MARKET/sync-wiki` | 管理员从 MoviePilot Wiki 的插件文档同步公开插件仓库清单,和本地 `PLUGIN_MARKET` 合并去重后写入配置 | diff --git a/skills/moviepilot-api/SKILL.md b/skills/moviepilot-api/SKILL.md index a4a1465d..7fcb9963 100644 --- a/skills/moviepilot-api/SKILL.md +++ b/skills/moviepilot-api/SKILL.md @@ -282,7 +282,7 @@ All endpoints are under the base URL `{MP_HOST}`. Path parameters are shown as ` | POST | `/api/v1/transfer/manual` | Manual transfer. Params: `background`. Body: ManualTransferItem JSON | | GET | `/api/v1/transfer/now` | Run immediate transfer | -### Dashboard (17 endpoints) +### Dashboard (19 endpoints) | Method | Path | Description | |--------|------|-------------| @@ -296,6 +296,8 @@ All endpoints are under the base URL `{MP_HOST}`. Path parameters are shown as ` | GET | `/api/v1/dashboard/downloader2` | Downloader info (API_TOKEN) | | GET | `/api/v1/dashboard/schedule` | Scheduled services | | GET | `/api/v1/dashboard/schedule2` | Scheduled services (API_TOKEN) | +| GET | `/api/v1/dashboard/schedule/{job_id}/progress` | Scheduled service real-time progress | +| GET | `/api/v1/dashboard/schedule2/{job_id}/progress` | Scheduled service real-time progress (API_TOKEN) | | GET | `/api/v1/dashboard/transfer` | Transfer statistics. Params: `days` | | GET | `/api/v1/dashboard/cpu` | CPU usage | | GET | `/api/v1/dashboard/cpu2` | CPU usage (API_TOKEN) | diff --git a/tests/test_dashboard_system_info.py b/tests/test_dashboard_system_info.py index 76701c86..268256ff 100644 --- a/tests/test_dashboard_system_info.py +++ b/tests/test_dashboard_system_info.py @@ -30,6 +30,28 @@ def test_dashboard_system_info_returns_runtime_environment(monkeypatch): assert result.version == "v2.13.16" +def test_memory_usage_returns_used_cached_and_available(monkeypatch): + """内存统计应拆分已使用、缓存和可用容量,并返回已使用百分比。""" + + class FakeMemory: + """提供固定系统内存值的桩。""" + + total = 16 * 1024**3 + used = 5 * 1024**3 + cached = 3 * 1024**3 + buffers = 512 * 1024**2 + + monkeypatch.setattr(system_module.psutil, "virtual_memory", FakeMemory) + + result = SystemUtils.memory_usage() + + assert result.total == 16 * 1024**3 + assert result.used == 5 * 1024**3 + assert result.cached == int(3.5 * 1024**3) + assert result.available == int(7.5 * 1024**3) + assert result.usage == 31.25 + + def test_monthly_media_statistics_counts_successful_unique_media(): """本月新增统计应只计算成功记录,并按媒体去重。""" month = system_module.time.strftime("%Y-%m-", system_module.time.localtime()) diff --git a/tests/test_scheduler_progress.py b/tests/test_scheduler_progress.py new file mode 100644 index 00000000..6c67198b --- /dev/null +++ b/tests/test_scheduler_progress.py @@ -0,0 +1,154 @@ +import asyncio +import threading +from uuid import uuid4 + +from app.core.config import global_vars +from app.scheduler import Scheduler + + +def _build_scheduler(job_id, func): + """构造不启动 APScheduler 的定时服务测试对象。""" + scheduler = object.__new__(Scheduler) + scheduler._lock = threading.RLock() + scheduler._jobs = { + job_id: { + "name": "测试定时服务", + "provider_name": "测试", + "func": func, + "running": False, + } + } + return scheduler + + +def test_scheduler_records_live_and_completed_progress(): + """定时服务应在执行中更新进度,并在成功后收敛为 100%。""" + job_id = f"test-success-{uuid4()}" + snapshots = [] + + def task(progress_callback): + """上报一次中间进度。""" + progress_callback(value=42, text="正在处理", data={"finished": 2}) + snapshots.append(scheduler.get_progress(job_id)) + + scheduler = _build_scheduler(job_id, task) + + scheduler.start(job_id) + + assert snapshots[0].enable is True + assert snapshots[0].value == 42 + assert snapshots[0].status == "running" + assert snapshots[0].data["finished"] == 2 + progress = scheduler.get_progress(job_id) + assert progress.enable is False + assert progress.value == 100 + assert progress.status == "success" + assert progress.success is True + assert progress.started_at + assert progress.finished_at + + +def test_scheduler_failure_preserves_last_progress(monkeypatch): + """定时服务异常时应保留失败位置,而不是伪装成 100% 完成。""" + job_id = f"test-failure-{uuid4()}" + + def task(progress_callback): + """上报进度后抛出异常。""" + progress_callback(value=37, text="处理失败") + raise RuntimeError("预期失败") + + scheduler = _build_scheduler(job_id, task) + monkeypatch.setattr( + scheduler, + "_Scheduler__handle_job_error", + lambda **kwargs: None, + ) + + scheduler.start(job_id) + + progress = scheduler.get_progress(job_id) + assert progress.enable is False + assert progress.value == 37 + assert progress.status == "failed" + assert progress.success is False + assert progress.error == "预期失败" + + +def test_scheduler_treats_standard_failure_result_as_failed(): + """返回 `(False, message)` 的定时服务应记录为业务失败。""" + job_id = f"test-result-failure-{uuid4()}" + + def task(progress_callback): + """返回标准失败结果。""" + progress_callback(value=55, text="业务校验失败") + return False, "业务失败" + + scheduler = _build_scheduler(job_id, task) + + scheduler.start(job_id) + + progress = scheduler.get_progress(job_id) + assert progress.value == 55 + assert progress.status == "failed" + assert progress.error == "业务失败" + + +def test_scheduler_runs_async_job_without_running_global_loop(monkeypatch): + """全局事件循环未运行时,异步定时服务仍应正常执行并收敛进度。""" + job_id = f"test-async-{uuid4()}" + + async def task(progress_callback): + """上报异步任务进度。""" + progress_callback(value=65, text="异步处理中") + + scheduler = _build_scheduler(job_id, task) + target_loop = asyncio.new_event_loop() + monkeypatch.setattr(global_vars, "CURRENT_EVENT_LOOP", target_loop) + + try: + scheduler.start(job_id) + finally: + target_loop.close() + + progress = scheduler.get_progress(job_id) + assert progress.enable is False + assert progress.value == 100 + assert progress.status == "success" + + +def test_scheduler_runs_async_job_from_current_event_loop(monkeypatch): + """在异步入口中手动触发定时服务时,不应嵌套调用 `asyncio.run`。""" + job_id = f"test-current-loop-{uuid4()}" + + async def task(progress_callback): + """在当前事件循环中上报进度。""" + progress_callback(value=75, text="当前循环处理中") + + async def run_task(): + """从已运行的事件循环启动定时服务。""" + scheduler.start(job_id) + await asyncio.sleep(0) + + scheduler = _build_scheduler(job_id, task) + target_loop = asyncio.new_event_loop() + monkeypatch.setattr(global_vars, "CURRENT_EVENT_LOOP", target_loop) + + try: + asyncio.run(run_task()) + finally: + target_loop.close() + + progress = scheduler.get_progress(job_id) + assert progress.enable is False + assert progress.value == 100 + assert progress.status == "success" + + +def test_scheduler_returns_none_for_unknown_job(): + """未注册且无历史进度的定时服务应返回空。""" + job_id = f"test-unknown-{uuid4()}" + scheduler = object.__new__(Scheduler) + scheduler._lock = threading.RLock() + scheduler._jobs = {} + + assert scheduler.get_progress(job_id) is None