From 9eb71c744bfe13d6355e9105fd444efa627e9c99 Mon Sep 17 00:00:00 2001 From: jxxghp Date: Mon, 29 Jun 2026 10:45:10 +0800 Subject: [PATCH] feat(mediaserver): add item count retrieval for various media servers --- app/chain/mediaserver.py | 209 +++++++++++-- app/modules/emby/__init__.py | 13 + app/modules/emby/emby.py | 27 ++ app/modules/jellyfin/__init__.py | 13 + app/modules/jellyfin/jellyfin.py | 27 ++ app/modules/plex/__init__.py | 13 + app/modules/plex/plex.py | 16 + app/modules/trimemedia/__init__.py | 15 + app/modules/trimemedia/api.py | 26 ++ app/modules/trimemedia/trimemedia.py | 14 + app/modules/ugreen/__init__.py | 15 + app/modules/ugreen/ugreen.py | 37 +++ app/modules/zspace/__init__.py | 13 + app/modules/zspace/zspace.py | 26 ++ app/scheduler.py | 28 +- tests/test_mediaserver_items_count.py | 125 ++++++++ tests/test_mediaserver_sync_incremental.py | 339 +++++++++++++-------- 17 files changed, 799 insertions(+), 157 deletions(-) create mode 100644 tests/test_mediaserver_items_count.py diff --git a/app/chain/mediaserver.py b/app/chain/mediaserver.py index ce937ea1..cb0d5f82 100644 --- a/app/chain/mediaserver.py +++ b/app/chain/mediaserver.py @@ -109,6 +109,35 @@ class MediaServerChain(ChainBase): yield from self.run_module("mediaserver_items", server=server, library_id=library_id, start_index=start_index, limit=limit) + def items_count(self, server: str, library_id: Union[str, int]) -> Optional[int]: + """ + 获取指定媒体库可同步的媒体条目总数 + + :param server: 媒体服务器名称 + :param library_id: 媒体库ID + :return: 媒体条目总数,无法获取时返回None + """ + return self.run_module( + "mediaserver_items_count", + server=server, + library_id=library_id, + ) + + def media_count(self, server: str) -> Optional[int]: + """ + 获取指定媒体服务器可同步的电影和电视剧总数 + + :param server: 媒体服务器名称 + :return: 电影和电视剧总数,无法获取时返回None + """ + statistics = self.run_module("media_statistic", server=server) + if not statistics: + return None + return sum( + (statistic.movie_count or 0) + (statistic.tv_count or 0) + for statistic in statistics + ) + def iteminfo(self, server: str, item_id: Union[str, int]) -> MediaServerItem: """ 获取媒体服务器项目信息 @@ -221,8 +250,75 @@ class MediaServerChain(ChainBase): if progress_callback: progress_callback(value=100, text="没有已启用的媒体服务器") return + + server_sync_contexts = {} + global_media_total = 0 + global_counts_available = True + for mediaserver in mediaservers: + if not mediaserver or not mediaserver.enabled: + continue + server_name = mediaserver.name + logger.info(f"正在统计媒体服务器 {server_name} 的待同步媒体数量") + libraries = self.librarys(server_name) + if not libraries: + server_sync_contexts[server_name] = None + continue + + sync_libraries = mediaserver.sync_libraries or [] + selected_libraries = [] + for library in libraries: + if sync_libraries \ + and "all" not in sync_libraries \ + and str(library.id) not in sync_libraries: + logger.info(f"{library.name} 未在 {server_name} 同步媒体库列表中,跳过") + continue + selected_libraries.append(library) + + library_media_counts = { + str(library.id): None for library in selected_libraries + } + sync_all_libraries = ( + not sync_libraries or "all" in sync_libraries + ) + server_media_count = ( + self.media_count(server_name) + if sync_all_libraries else None + ) + if server_media_count: + global_media_total += server_media_count + logger.info( + f"媒体服务器 {server_name} 共 {server_media_count} 个媒体待同步" + ) + else: + for library in selected_libraries: + media_count = self.items_count( + server=server_name, + library_id=library.id, + ) + library_media_counts[str(library.id)] = media_count + if media_count is None: + global_counts_available = False + logger.warning( + f"未获取到 {server_name} 媒体库 {library.name} 的媒体总数," + f"同步进度将按媒体库完成度计算" + ) + else: + global_media_total += media_count + logger.info( + f"{server_name} 媒体库 {library.name}" + f"共 {media_count} 个媒体待同步" + ) + server_sync_contexts[server_name] = ( + selected_libraries, + library_media_counts, + ) + + if not global_counts_available: + global_media_total = None + # 遍历媒体服务器 server_index = 0 + global_media_finished = 0 for mediaserver in mediaservers: if not mediaserver: continue @@ -233,8 +329,13 @@ class MediaServerChain(ChainBase): server_index += 1 server_name = mediaserver.name if progress_callback: + progress_value = ( + global_media_finished / global_media_total * 100 + if global_media_total else + (server_index - 1) / total_servers * 100 + ) progress_callback( - value=(server_index - 1) / total_servers * 100, + value=progress_value, text=( f"正在同步媒体服务器" f"({server_index}/{total_servers}){server_name} ..." @@ -243,29 +344,37 @@ class MediaServerChain(ChainBase): "total": total_servers, "finished": server_index - 1, "current": server_name, + "media_total": global_media_total, + "media_finished": global_media_finished, }, ) - sync_libraries = mediaserver.sync_libraries or [] logger.info(f"开始同步媒体服务器 {server_name} 的数据 ...") - libraries = self.librarys(server_name) - if not libraries: + sync_context = server_sync_contexts.get(server_name) + if sync_context is None: logger.info(f"没有获取到媒体服务器 {server_name} 的媒体库,跳过") if progress_callback: + progress_value = ( + global_media_finished / global_media_total * 100 + if global_media_total else + server_index / total_servers * 100 + ) progress_callback( - value=server_index / total_servers * 100, + value=progress_value, text=f"媒体服务器 {server_name} 无可同步媒体库", - data={"total": total_servers, "finished": server_index}, + data={ + "total": total_servers, + "finished": server_index, + "media_total": global_media_total, + "media_finished": global_media_finished, + }, ) continue sync_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") - total_libraries = len(libraries) - for library_index, library in enumerate(libraries, start=1): - if sync_libraries \ - and "all" not in sync_libraries \ - and str(library.id) not in sync_libraries: - logger.info(f"{library.name} 未在 {server_name} 同步媒体库列表中,跳过") - continue + selected_libraries, library_media_counts = sync_context + total_libraries = len(selected_libraries) + for library_index, library in enumerate(selected_libraries, start=1): logger.info(f"正在同步 {server_name} 媒体库 {library.name} ...") + library_media_total = library_media_counts.get(str(library.id)) library_count = 0 for item in self.items(server=server_name, library_id=library.id): if global_vars.is_system_stopped: @@ -275,6 +384,7 @@ class MediaServerChain(ChainBase): logger.debug(f"正在同步 {item.title} ...") # 计数 library_count += 1 + global_media_finished += 1 seasoninfo = {} # 类型 item_type = "电视剧" if item.item_type in ["Series", "show"] else "电影" @@ -289,16 +399,60 @@ class MediaServerChain(ChainBase): item_dict["item_type"] = item_type item_dict["lst_mod_date"] = sync_time dboper.upsert(**item_dict) + if progress_callback: + if global_media_total: + progress_value = min( + global_media_finished / global_media_total, + 1, + ) * 100 + else: + library_progress = ( + min(library_count / library_media_total, 1) + if library_media_total else 0 + ) + server_progress = ( + library_index - 1 + library_progress + ) / total_libraries + progress_value = ( + server_index - 1 + server_progress + ) / total_servers * 100 + progress_callback( + value=progress_value, + text=( + f"正在同步 {server_name} 媒体库 {library.name}" + f"({library_count}/{library_media_total})" + if library_media_total is not None + else f"正在同步 {server_name} 媒体库 {library.name}" + ), + data={ + "total": total_servers, + "finished": server_index - 1, + "current": server_name, + "library_total": total_libraries, + "library_finished": library_index - 1, + "current_library": library.name, + "library_media_total": library_media_total, + "library_media_finished": library_count, + "media_total": global_media_total, + "media_finished": global_media_finished, + }, + ) logger.info(f"{server_name} 媒体库 {library.name} 同步完成,共同步数量:{library_count}") # 总数累加 total_count += library_count if progress_callback: + if global_media_total: + progress_value = min( + global_media_finished / global_media_total, + 1, + ) * 100 + else: + server_progress = library_index / total_libraries + progress_value = ( + server_index - 1 + server_progress + ) / total_servers * 100 progress_callback( - value=( - (server_index - 1 + library_index / total_libraries) - / total_servers - * 100 - ), + value=progress_value, text=( f"{server_name} 媒体库" f"({library_index}/{total_libraries}){library.name} 同步完成" @@ -309,19 +463,34 @@ class MediaServerChain(ChainBase): "current": server_name, "library_total": total_libraries, "library_finished": library_index, + "current_library": library.name, + "library_media_total": library_media_total, + "library_media_finished": library_count, + "media_total": global_media_total, + "media_finished": global_media_finished, }, ) stale_count = dboper.delete_stale(server=server_name, sync_time=sync_time) logger.info(f"媒体服务器 {server_name} 清理陈旧数据完成,删除数量:{stale_count}") logger.info(f"媒体服务器 {server_name} 数据同步完成,总同步数量:{total_count}") if progress_callback: + progress_value = ( + min(global_media_finished / global_media_total, 1) * 100 + if global_media_total else + server_index / total_servers * 100 + ) progress_callback( - value=server_index / total_servers * 100, + value=progress_value, text=( f"媒体服务器({server_index}/{total_servers})" f"{server_name} 同步完成" ), - data={"total": total_servers, "finished": server_index}, + data={ + "total": total_servers, + "finished": server_index, + "media_total": global_media_total, + "media_finished": global_media_finished, + }, ) if progress_callback: progress_callback(value=100, text="媒体服务器同步完成") diff --git a/app/modules/emby/__init__.py b/app/modules/emby/__init__.py index dd82656c..665a4d6e 100644 --- a/app/modules/emby/__init__.py +++ b/app/modules/emby/__init__.py @@ -243,6 +243,19 @@ class EmbyModule(_ModuleBase, _MediaServerBase[Emby]): return server_obj.get_items(library_id, start_index, limit) return None + def mediaserver_items_count(self, server: str, library_id: Union[str, int]) -> Optional[int]: + """ + 获取指定媒体库可同步的媒体条目总数 + + :param server: 媒体服务器名称 + :param library_id: 媒体库ID + :return: 媒体条目总数,查询失败时返回None + """ + server_obj: Emby = self.get_instance(server) + if server_obj: + return server_obj.get_items_count(library_id) + return None + def mediaserver_iteminfo(self, server: str, item_id: str) -> Optional[schemas.MediaServerItem]: """ 媒体库项目详情 diff --git a/app/modules/emby/emby.py b/app/modules/emby/emby.py index d86ba8f0..f9a5f0cb 100644 --- a/app/modules/emby/emby.py +++ b/app/modules/emby/emby.py @@ -693,6 +693,33 @@ class Emby: logger.error(f"连接/Users/{self.user}/Items/{itemid}出错:" + str(e)) return None + def get_items_count(self, parent: Union[str, int]) -> Optional[int]: + """ + 获取指定媒体库可同步的电影和剧集总数 + + :param parent: 媒体库ID + :return: 媒体条目总数,查询失败时返回None + """ + if not parent or not self._host or not self._apikey: + return None + url = f"{self._host}emby/Users/{self.user}/Items" + params = { + "ParentId": parent, + "Recursive": "true", + "IncludeItemTypes": "Movie,Series", + "Limit": 0, + "api_key": self._apikey, + } + try: + res = RequestUtils().get_res(url, params) + if not res or res.status_code != 200: + return None + total_count = res.json().get("TotalRecordCount") + return int(total_count) if total_count is not None else None + except Exception as e: + logger.error(f"查询媒体库 {parent} 的媒体总数出错:{str(e)}") + return None + def get_items(self, parent: Union[str, int], start_index: Optional[int] = 0, limit: Optional[int] = -1) -> Generator[MediaServerItem | None | Any, Any, None]: """ diff --git a/app/modules/jellyfin/__init__.py b/app/modules/jellyfin/__init__.py index 7268f1c8..5b10f384 100644 --- a/app/modules/jellyfin/__init__.py +++ b/app/modules/jellyfin/__init__.py @@ -242,6 +242,19 @@ class JellyfinModule(_ModuleBase, _MediaServerBase[Jellyfin]): return server_obj.get_items(library_id, start_index, limit) return None + def mediaserver_items_count(self, server: str, library_id: Union[str, int]) -> Optional[int]: + """ + 获取指定媒体库可同步的媒体条目总数 + + :param server: 媒体服务器名称 + :param library_id: 媒体库ID + :return: 媒体条目总数,查询失败时返回None + """ + server_obj: Jellyfin = self.get_instance(server) + if server_obj: + return server_obj.get_items_count(library_id) + return None + def mediaserver_iteminfo(self, server: str, item_id: str) -> Optional[schemas.MediaServerItem]: """ 媒体库项目详情 diff --git a/app/modules/jellyfin/jellyfin.py b/app/modules/jellyfin/jellyfin.py index 5c56708b..dbe5ed7d 100644 --- a/app/modules/jellyfin/jellyfin.py +++ b/app/modules/jellyfin/jellyfin.py @@ -808,6 +808,33 @@ class Jellyfin: logger.error(f"连接Users/{self.user}/Items/{itemid}:" + str(e)) return None + def get_items_count(self, parent: Union[str, int]) -> Optional[int]: + """ + 获取指定媒体库可同步的电影和剧集总数 + + :param parent: 媒体库ID + :return: 媒体条目总数,查询失败时返回None + """ + if not parent or not self._host or not self._apikey or not self.user: + return None + url = f"{self._host}Users/{self.user}/Items" + params = { + "ParentId": parent, + "Recursive": "true", + "IncludeItemTypes": "Movie,Series", + "Limit": 0, + "api_key": self._apikey, + } + try: + res = RequestUtils().get_res(url, params) + if not res or res.status_code != 200: + return None + total_count = res.json().get("TotalRecordCount") + return int(total_count) if total_count is not None else None + except Exception as e: + logger.error(f"查询媒体库 {parent} 的媒体总数出错:{str(e)}") + return None + def get_items(self, parent: Union[str, int], start_index: Optional[int] = 0, limit: Optional[int] = -1) \ -> Generator[MediaServerItem | None | Any, Any, None]: """ diff --git a/app/modules/plex/__init__.py b/app/modules/plex/__init__.py index 3eb636ed..ddb83e5f 100644 --- a/app/modules/plex/__init__.py +++ b/app/modules/plex/__init__.py @@ -253,6 +253,19 @@ class PlexModule(_ModuleBase, _MediaServerBase[Plex]): return server_obj.get_items(library_id, start_index, limit) return None + def mediaserver_items_count(self, server: str, library_id: Union[str, int]) -> Optional[int]: + """ + 获取指定媒体库可同步的媒体条目总数 + + :param server: 媒体服务器名称 + :param library_id: 媒体库ID + :return: 媒体条目总数,查询失败时返回None + """ + server_obj: Plex = self.get_instance(server) + if server_obj: + return server_obj.get_items_count(library_id) + return None + def mediaserver_iteminfo(self, server: str, item_id: str) -> Optional[schemas.MediaServerItem]: """ 媒体库项目详情 diff --git a/app/modules/plex/plex.py b/app/modules/plex/plex.py index 943cac6b..2f214bd7 100644 --- a/app/modules/plex/plex.py +++ b/app/modules/plex/plex.py @@ -575,6 +575,22 @@ class Plex: user_state=user_state, ) + def get_items_count(self, parent: Union[str, int]) -> Optional[int]: + """ + 获取指定媒体库可同步的媒体条目总数 + + :param parent: 媒体库ID + :return: 媒体条目总数,查询失败时返回None + """ + if not parent or not self._plex: + return None + try: + section = self._plex.library.sectionByID(int(parent)) + return int(section.totalSize) if section else None + except Exception as err: + logger.error(f"查询媒体库 {parent} 的媒体总数出错:{str(err)}") + return None + def get_items(self, parent: Union[str, int], start_index: Optional[int] = 0, limit: Optional[int] = -1) \ -> Generator[MediaServerItem | None, Any, None]: """ diff --git a/app/modules/trimemedia/__init__.py b/app/modules/trimemedia/__init__.py index 12cf51f7..aa2d25cb 100644 --- a/app/modules/trimemedia/__init__.py +++ b/app/modules/trimemedia/__init__.py @@ -281,6 +281,21 @@ class TrimeMediaModule(_ModuleBase, _MediaServerBase[TrimeMedia]): return server_obj.get_items(library_id, start_index, limit) return None + def mediaserver_items_count( + self, server: str, library_id: Union[str, int] + ) -> Optional[int]: + """ + 获取指定媒体库可同步的媒体条目总数 + + :param server: 媒体服务器名称 + :param library_id: 媒体库ID + :return: 媒体条目总数,查询失败时返回None + """ + server_obj: Optional[TrimeMedia] = self.get_instance(server) + if server_obj: + return server_obj.get_items_count(library_id) + return None + def mediaserver_iteminfo( self, server: str, item_id: str ) -> Optional[schemas.MediaServerItem]: diff --git a/app/modules/trimemedia/api.py b/app/modules/trimemedia/api.py index 61e7fb84..ac44c04d 100644 --- a/app/modules/trimemedia/api.py +++ b/app/modules/trimemedia/api.py @@ -347,6 +347,32 @@ class Api: return [self.__build_item(info) for info in res.data.get("list", [])] return None + def item_count(self, guid: str, types=None) -> Optional[int]: + """ + 获取指定媒体库的媒体条目总数 + + :param guid: 媒体库GUID + :param types: 需要统计的媒体类型 + :return: 媒体条目总数,查询失败时返回None + """ + if types is None: + types = [Type.MOVIE, Type.TV] + post = { + "ancestor_guid": guid, + "tags": {"type": types}, + "exclude_grouped_video": 1, + "page": 1, + "page_size": 1, + } + if (res := self.request("/item/list", data=post)) and res.success: + if not res.data: + return 0 + total_count = res.data.get("total") + if total_count is None: + total_count = res.data.get("total_count") + return int(total_count) if total_count is not None else None + return None + def search_list(self, keywords: str) -> Optional[list[Item]]: """ 搜索影片、演员 diff --git a/app/modules/trimemedia/trimemedia.py b/app/modules/trimemedia/trimemedia.py index ae683cd5..0b81ca75 100644 --- a/app/modules/trimemedia/trimemedia.py +++ b/app/modules/trimemedia/trimemedia.py @@ -510,6 +510,20 @@ class TrimeMedia: use_cookies=True, ) + def get_items_count(self, parent: Union[str, int]) -> Optional[int]: + """ + 获取指定媒体库可同步的媒体条目总数 + + :param parent: 媒体库ID + :return: 媒体条目总数,查询失败时返回None + """ + if not self.is_authenticated(): + return None + return self._api.item_count( + guid=str(parent), + types=[fnapi.Type.MOVIE, fnapi.Type.TV], + ) + def get_items( self, parent: Union[str, int], diff --git a/app/modules/ugreen/__init__.py b/app/modules/ugreen/__init__.py index 59d0c9de..705ece72 100644 --- a/app/modules/ugreen/__init__.py +++ b/app/modules/ugreen/__init__.py @@ -253,6 +253,21 @@ class UgreenModule(_ModuleBase, _MediaServerBase[Ugreen]): return server_obj.get_items(library_id, start_index, limit) return None + def mediaserver_items_count( + self, server: str, library_id: Union[str, int] + ) -> Optional[int]: + """ + 获取指定媒体库可同步的媒体条目总数 + + :param server: 媒体服务器名称 + :param library_id: 媒体库ID + :return: 媒体条目总数,查询失败时返回None + """ + server_obj: Optional[Ugreen] = self.get_instance(server) + if server_obj: + return server_obj.get_items_count(library_id) + return None + def mediaserver_iteminfo( self, server: str, item_id: str ) -> Optional[schemas.MediaServerItem]: diff --git a/app/modules/ugreen/ugreen.py b/app/modules/ugreen/ugreen.py index e736280f..2eea751a 100644 --- a/app/modules/ugreen/ugreen.py +++ b/app/modules/ugreen/ugreen.py @@ -66,15 +66,19 @@ class Ugreen: @property def api(self) -> Optional[Api]: + """获取当前绿联影视 API 客户端""" return self._api def close(self): + """关闭绿联影视连接""" self.disconnect() def is_configured(self) -> bool: + """检查绿联影视连接配置是否完整""" return bool(self._host and self._username and self._password) def is_authenticated(self) -> bool: + """检查绿联影视会话是否已认证""" return ( self.is_configured() and self._api is not None @@ -83,6 +87,7 @@ class Ugreen: ) def is_inactive(self) -> bool: + """检查绿联影视会话是否已失效""" if not self.is_authenticated(): return True self._userinfo = self._api.current_user() if self._api else None @@ -516,6 +521,12 @@ class Ugreen: return paths def get_librarys(self, hidden: Optional[bool] = False) -> List[schemas.MediaServerLibrary]: + """ + 获取绿联影视媒体库列表 + + :param hidden: 是否过滤未启用同步的媒体库 + :return: 媒体库列表 + """ if not self.is_authenticated() or not self._api: return [] @@ -567,12 +578,14 @@ class Ugreen: return libraries def get_user_count(self) -> int: + """获取绿联影视媒体库用户数量""" if not self.is_authenticated() or not self._api: return 0 users = self._api.media_lib_users() return len(users) def get_medias_count(self) -> schemas.Statistic: + """获取绿联影视的电影和电视剧数量统计""" if not self.is_authenticated() or not self._api: return schemas.Statistic() @@ -856,12 +869,36 @@ class Ugreen: break page += 1 + def get_items_count(self, parent: Union[str, int]) -> Optional[int]: + """ + 获取指定媒体库可同步的媒体条目总数 + + :param parent: 媒体库ID + :return: 媒体条目总数,查询失败时返回None + """ + if not self.is_authenticated() or not self._api: + return None + if not self._libraries: + self.get_librarys() + library = self._libraries.get(str(parent)) + if not library: + return None + return int(library.get("video_count") or 0) + def get_items( self, parent: Union[str, int], start_index: Optional[int] = 0, limit: Optional[int] = -1, ) -> Generator[schemas.MediaServerItem | None | Any, Any, None]: + """ + 获取指定绿联影视媒体库的可同步条目 + + :param parent: 媒体库ID + :param start_index: 起始条目索引 + :param limit: 最大返回条目数,-1表示不限制 + :return: 媒体条目生成器 + """ if not self.is_authenticated() or not self._api: return None diff --git a/app/modules/zspace/__init__.py b/app/modules/zspace/__init__.py index 9c6e5efd..c3edb1c6 100644 --- a/app/modules/zspace/__init__.py +++ b/app/modules/zspace/__init__.py @@ -237,6 +237,19 @@ class ZSpaceModule(_ModuleBase, _MediaServerBase[ZSpace]): return server_obj.get_items(library_id, start_index, limit) return None + def mediaserver_items_count(self, server: str, library_id: Union[str, int]) -> Optional[int]: + """ + 获取指定媒体库可同步的媒体条目总数 + + :param server: 媒体服务器名称 + :param library_id: 媒体库ID + :return: 媒体条目总数,查询失败时返回None + """ + server_obj: ZSpace = self.get_instance(server) + if server_obj: + return server_obj.get_items_count(library_id) + return None + def mediaserver_iteminfo(self, server: str, item_id: str) -> Optional[schemas.MediaServerItem]: """ 媒体库项目详情 diff --git a/app/modules/zspace/zspace.py b/app/modules/zspace/zspace.py index bf82c521..e91e03fd 100644 --- a/app/modules/zspace/zspace.py +++ b/app/modules/zspace/zspace.py @@ -815,6 +815,32 @@ class ZSpace: logger.error(f"连接/Users/{self.user}/Items/{itemid}出错:{e}") return None + def get_items_count(self, parent: Union[str, int]) -> Optional[int]: + """ + 获取指定媒体库的媒体条目总数 + + 极影视当前兼容层会忽略条目类型过滤,因此以递归查询返回的 + TotalRecordCount 作为同步进度的总数。 + + :param parent: 媒体库ID + :return: 媒体条目总数,查询失败时返回None + """ + if not parent or not self._host or not self._apikey or not self.user: + return None + url = f"{self._host}emby/Users/{self.user}/Items" + try: + res = self.__request_utils().get_res( + url, + params={"ParentId": parent, "Recursive": "true", "Limit": 0}, + ) + if not res or res.status_code != 200: + return None + total_count = res.json().get("TotalRecordCount") + return int(total_count) if total_count is not None else None + except Exception as e: + logger.error(f"查询媒体库 {parent} 的媒体总数出错:{e}") + return None + def get_items(self, parent: Union[str, int], start_index: Optional[int] = 0, limit: Optional[int] = -1) -> Generator[MediaServerItem | None | Any, Any, None]: """ diff --git a/app/scheduler.py b/app/scheduler.py index af1c20a0..6189cae2 100644 --- a/app/scheduler.py +++ b/app/scheduler.py @@ -36,8 +36,8 @@ from app.db.systemconfig_oper import SystemConfigOper from app.helper.image import WallpaperHelper from app.helper.message import MessageHelper from app.helper.progress import ProgressHelper -from app.helper.sites import SitesHelper # noqa from app.helper.server import MoviePilotServerHelper +from app.helper.sites import SitesHelper # noqa from app.log import logger from app.schemas import Notification, NotificationType, Workflow from app.schemas.types import EventType, SystemConfigKey @@ -177,16 +177,16 @@ class SchedulerChain(ChainBase): ) message_cutoff = ( - started_at - timedelta(days=message_days) + started_at - timedelta(days=message_days) ).strftime("%Y-%m-%d %H:%M:%S") download_history_cutoff = ( - started_at - timedelta(days=download_history_days) + started_at - timedelta(days=download_history_days) ).strftime("%Y-%m-%d %H:%M:%S") site_userdata_cutoff = ( - started_at - timedelta(days=site_userdata_days) + started_at - timedelta(days=site_userdata_days) ).strftime("%Y-%m-%d") transfer_history_cutoff = ( - started_at - timedelta(days=transfer_history_days) + started_at - timedelta(days=transfer_history_days) ).strftime("%Y-%m-%d %H:%M:%S") return [ @@ -472,8 +472,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): id="cookiecloud", name="同步CookieCloud站点", minutes=int(settings.COOKIECLOUD_INTERVAL), - next_run_time=datetime.now(pytz.timezone(settings.TZ)) - + timedelta(minutes=5), + next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=5), kwargs={"job_id": "cookiecloud"}, ) @@ -488,8 +487,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): id="mediaserver_sync", name="同步媒体服务器", hours=int(settings.MEDIASERVER_SYNC_INTERVAL), - next_run_time=datetime.now(pytz.timezone(settings.TZ)) - + timedelta(minutes=10), + next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=10), kwargs={"job_id": "mediaserver_sync"}, ) @@ -582,8 +580,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): id="random_wallpager", name="壁纸缓存", minutes=30, - next_run_time=datetime.now(pytz.timezone(settings.TZ)) - + timedelta(seconds=1), + next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(seconds=1), kwargs={"job_id": "random_wallpager"}, ) @@ -647,8 +644,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): id="recommend_refresh", name="推荐缓存", hours=24, - next_run_time=datetime.now(pytz.timezone(settings.TZ)) - + timedelta(seconds=5), + next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(seconds=5), kwargs={"job_id": "recommend_refresh"}, ) @@ -669,8 +665,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): id="subscribe_calendar_cache", name="订阅日历缓存", hours=6, - next_run_time=datetime.now(pytz.timezone(settings.TZ)) - + timedelta(minutes=2), + next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=2), kwargs={"job_id": "subscribe_calendar_cache"}, ) @@ -823,7 +818,8 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): data=data, ) - def __handle_job_error(self, job_id: str, job: dict, error: Exception) -> None: + @staticmethod + def __handle_job_error(job_id: str, job: dict, error: Exception) -> None: """ 记录定时任务执行异常并发送系统错误事件。 """ diff --git a/tests/test_mediaserver_items_count.py b/tests/test_mediaserver_items_count.py new file mode 100644 index 00000000..ac1c3806 --- /dev/null +++ b/tests/test_mediaserver_items_count.py @@ -0,0 +1,125 @@ +from types import SimpleNamespace +from unittest.mock import patch + +import pytest + +from app.modules.emby.emby import Emby +from app.modules.jellyfin.jellyfin import Jellyfin +from app.modules.plex.plex import Plex +from app.modules.trimemedia.api import Api as TrimeMediaApi +from app.modules.trimemedia.api import Type as TrimeMediaType +from app.modules.ugreen.ugreen import Ugreen +from app.modules.zspace.zspace import ZSpace + + +class _FakeResponse: + """模拟媒体服务器 HTTP 响应。""" + + def __init__(self, payload: dict, status_code: int = 200): + """保存响应数据和状态码。""" + self._payload = payload + self.status_code = status_code + + def json(self) -> dict: + """返回模拟的 JSON 数据。""" + return self._payload + + +@pytest.mark.parametrize( + ("client_class", "request_utils_path", "url_path"), + [ + (Emby, "app.modules.emby.emby.RequestUtils", "emby/Users/user-id/Items"), + (Jellyfin, "app.modules.jellyfin.jellyfin.RequestUtils", "Users/user-id/Items"), + ], +) +def test_emby_compatible_items_count_uses_recursive_type_filter( + client_class, request_utils_path, url_path +): + """Emby 兼容服务应通过递归类型过滤查询媒体库总数。""" + client = client_class.__new__(client_class) + client._host = "http://media.local/" + client._apikey = "token" + client.user = "user-id" + + with patch(request_utils_path) as request_utils_cls: + request_utils_cls.return_value.get_res.return_value = _FakeResponse( + {"TotalRecordCount": 23} + ) + result = client.get_items_count("library-id") + + assert result == 23 + args = request_utils_cls.return_value.get_res.call_args.args + assert args[0] == f"http://media.local/{url_path}" + assert args[1]["Recursive"] == "true" + assert args[1]["IncludeItemTypes"] == "Movie,Series" + assert args[1]["Limit"] == 0 + + +def test_zspace_items_count_uses_total_record_count(): + """极影视应使用兼容接口返回的 TotalRecordCount。""" + client = ZSpace.__new__(ZSpace) + client._host = "http://zspace.local/" + client._apikey = "token" + client.user = "user-id" + + with patch("app.modules.zspace.zspace.RequestUtils") as request_utils_cls: + request_utils_cls.return_value.get_res.return_value = _FakeResponse( + {"TotalRecordCount": 17} + ) + result = client.get_items_count("library-id") + + assert result == 17 + params = request_utils_cls.return_value.get_res.call_args.kwargs["params"] + assert params == { + "ParentId": "library-id", + "Recursive": "true", + "Limit": 0, + } + + +def test_plex_items_count_uses_section_total_size(): + """Plex 应直接读取媒体库分区的条目总数。""" + client = Plex.__new__(Plex) + section = SimpleNamespace(totalSize=31) + client._plex = SimpleNamespace( + library=SimpleNamespace(sectionByID=lambda _library_id: section) + ) + + assert client.get_items_count("9") == 31 + + +def test_ugreen_items_count_uses_library_video_count(): + """绿联影视应复用媒体库列表中的视频总数。""" + client = Ugreen.__new__(Ugreen) + client._host = "http://ugreen.local" + client._username = "tester" + client._password = "secret" + client._userinfo = {"name": "tester"} + client._api = SimpleNamespace(token="token") + client._libraries = {"library-id": {"video_count": 42}} + + assert client.get_items_count("library-id") == 42 + + +def test_trimemedia_item_count_reads_list_total(): + """飞牛影视应通过媒体列表接口的 total 字段获取总数。""" + api = TrimeMediaApi.__new__(TrimeMediaApi) + response = SimpleNamespace(success=True, data={"list": [], "total": 19}) + + with patch.object(TrimeMediaApi, "request", return_value=response) as request: + result = api.item_count( + guid="library-id", + types=[TrimeMediaType.MOVIE, TrimeMediaType.TV], + ) + + assert result == 19 + request.assert_called_once_with( + "/item/list", + data={ + "ancestor_guid": "library-id", + "tags": {"type": [TrimeMediaType.MOVIE, TrimeMediaType.TV]}, + "exclude_grouped_video": 1, + "page": 1, + "page_size": 1, + }, + ) diff --git a/tests/test_mediaserver_sync_incremental.py b/tests/test_mediaserver_sync_incremental.py index fc3fdf15..012e6b0c 100644 --- a/tests/test_mediaserver_sync_incremental.py +++ b/tests/test_mediaserver_sync_incremental.py @@ -1,9 +1,7 @@ -import tempfile -import unittest -from pathlib import Path from types import SimpleNamespace from unittest.mock import patch +import pytest from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker @@ -15,139 +13,238 @@ from app.db.mediaserver_oper import MediaServerOper from app.db.models.mediaserver import MediaServerItem -class MediaServerIncrementalSyncTest(unittest.TestCase): - """验证媒体库同步改为按条目增量更新。""" +@pytest.fixture +def database(tmp_path): + """创建隔离的媒体服务器测试数据库。""" + engine = create_engine(f"sqlite:///{tmp_path / 'mediaserver.db'}") + session_factory = sessionmaker(bind=engine) + Base.metadata.create_all(bind=engine) + yield session_factory + engine.dispose() - def setUp(self): - self.temp_dir = tempfile.TemporaryDirectory() - db_path = Path(self.temp_dir.name) / "mediaserver.db" - self.engine = create_engine(f"sqlite:///{db_path}") - self.SessionFactory = sessionmaker(bind=self.engine) - Base.metadata.create_all(bind=self.engine) - def tearDown(self): - self.engine.dispose() - self.temp_dir.cleanup() +def test_add_allows_same_item_id_across_servers(database): + """不同媒体服务器允许复用相同 item_id。""" + with database() as db: + oper = MediaServerOper(db) - def test_add_allows_same_item_id_across_servers(self): - """不同媒体服务器允许复用相同 item_id。""" - with self.SessionFactory() as db: - oper = MediaServerOper(db) + assert oper.add( + server="plex", + library="movies", + item_id="same-item-id", + item_type="电影", + title="Movie A", + ) + assert oper.add( + server="jellyfin", + library="movies", + item_id="same-item-id", + item_type="电影", + title="Movie B", + ) - self.assertTrue( - oper.add( - server="plex", - library="movies", - item_id="same-item-id", - item_type="电影", - title="Movie A", - ) - ) - self.assertTrue( - oper.add( - server="jellyfin", - library="movies", - item_id="same-item-id", - item_type="电影", - title="Movie B", - ) - ) + items = ( + db.query(MediaServerItem) + .order_by(MediaServerItem.server.asc()) + .all() + ) - items = ( - db.query(MediaServerItem) - .order_by(MediaServerItem.server.asc()) - .all() - ) + assert len(items) == 2 + assert [item.server for item in items] == ["jellyfin", "plex"] - self.assertEqual(len(items), 2) - self.assertEqual([item.server for item in items], ["jellyfin", "plex"]) - def test_sync_updates_rows_and_removes_stale_entries(self): - """同步应更新已存在条目,并清理未再出现或已移除服务的数据。""" - old_sync_time = "2026-05-01 00:00:00" +def test_media_count_reuses_existing_server_statistics(): + """整服同步应复用现有媒体统计并排除剧集集数。""" + chain = object.__new__(MediaServerChain) + chain.run_module = lambda *_args, **_kwargs: [ + schemas.Statistic(movie_count=12, tv_count=8, episode_count=200) + ] - with self.SessionFactory() as db: - db.add_all( - [ - MediaServerItem( - server="plex", - library="movies", - item_id="/library/metadata/1", - item_type="电影", - title="Old Title", - year="2024", - path="/media/old.mkv", - lst_mod_date=old_sync_time, - ), - MediaServerItem( - server="plex", - library="movies", - item_id="/library/metadata/2", - item_type="电影", - title="Stale Title", - year="2020", - path="/media/stale.mkv", - lst_mod_date=old_sync_time, - ), - MediaServerItem( - server="jellyfin", - library="movies", - item_id="/library/metadata/1", - item_type="电影", - title="Removed Server Title", - year="2024", - path="/media/removed.mkv", - lst_mod_date=old_sync_time, - ), - ] - ) - db.commit() - existing_id = ( - db.query(MediaServerItem.id) - .filter( - MediaServerItem.server == "plex", - MediaServerItem.item_id == "/library/metadata/1", - ) - .scalar() - ) + assert chain.media_count("plex") == 20 - chain = object.__new__(MediaServerChain) - chain.librarys = lambda _server: [SimpleNamespace(id="movies", name="电影库")] - chain.items = lambda **_kwargs: iter( + +def test_sync_updates_rows_and_removes_stale_entries(database): + """同步应更新已存在条目,并清理未再出现或已移除服务的数据。""" + old_sync_time = "2026-05-01 00:00:00" + + with database() as db: + db.add_all( [ - schemas.MediaServerItem( + MediaServerItem( server="plex", library="movies", item_id="/library/metadata/1", - item_type="Movie", - title="New Title", + item_type="电影", + title="Old Title", year="2024", - tmdbid=100, - path="/media/new.mkv", - ) + path="/media/old.mkv", + lst_mod_date=old_sync_time, + ), + MediaServerItem( + server="plex", + library="movies", + item_id="/library/metadata/2", + item_type="电影", + title="Stale Title", + year="2020", + path="/media/stale.mkv", + lst_mod_date=old_sync_time, + ), + MediaServerItem( + server="jellyfin", + library="movies", + item_id="/library/metadata/1", + item_type="电影", + title="Removed Server Title", + year="2024", + path="/media/removed.mkv", + lst_mod_date=old_sync_time, + ), ] ) - chain.episodes = lambda *_args, **_kwargs: [] - - with patch("app.db.ScopedSession", self.SessionFactory), patch.object( - MEDIA_SERVER_CHAIN_MODULE.ServiceConfigHelper, - "get_mediaserver_configs", - return_value=[SimpleNamespace(name="plex", enabled=True, sync_libraries=["all"])], - ): - chain.sync() - - with self.SessionFactory() as db: - items = ( - db.query(MediaServerItem) - .order_by(MediaServerItem.server.asc(), MediaServerItem.item_id.asc()) - .all() + db.commit() + existing_id = ( + db.query(MediaServerItem.id) + .filter( + MediaServerItem.server == "plex", + MediaServerItem.item_id == "/library/metadata/1", ) + .scalar() + ) - self.assertEqual(len(items), 1) - self.assertEqual(items[0].id, existing_id) - self.assertEqual(items[0].server, "plex") - self.assertEqual(items[0].item_id, "/library/metadata/1") - self.assertEqual(items[0].item_type, "电影") - self.assertEqual(items[0].title, "New Title") - self.assertEqual(items[0].path, "/media/new.mkv") - self.assertNotEqual(items[0].lst_mod_date, old_sync_time) + chain = object.__new__(MediaServerChain) + chain.librarys = lambda _server: [ + SimpleNamespace(id="movies", name="电影库"), + SimpleNamespace(id="shows", name="剧集库"), + ] + chain.media_count = lambda _server: pytest.fail("部分媒体库同步不应使用整服统计") + chain.items_count = lambda **_kwargs: 1 + chain.items = lambda **_kwargs: iter( + [ + schemas.MediaServerItem( + server="plex", + library="movies", + item_id="/library/metadata/1", + item_type="Movie", + title="New Title", + year="2024", + tmdbid=100, + path="/media/new.mkv", + ) + ] + ) + chain.episodes = lambda *_args, **_kwargs: [] + + with patch("app.db.ScopedSession", database), patch.object( + MEDIA_SERVER_CHAIN_MODULE.ServiceConfigHelper, + "get_mediaserver_configs", + return_value=[SimpleNamespace(name="plex", enabled=True, sync_libraries=["movies"])], + ): + chain.sync() + + with database() as db: + items = ( + db.query(MediaServerItem) + .order_by(MediaServerItem.server.asc(), MediaServerItem.item_id.asc()) + .all() + ) + + assert len(items) == 1 + assert items[0].id == existing_id + assert items[0].server == "plex" + assert items[0].item_id == "/library/metadata/1" + assert items[0].item_type == "电影" + assert items[0].title == "New Title" + assert items[0].path == "/media/new.mkv" + assert items[0].lst_mod_date != old_sync_time + + +def test_sync_queries_counts_before_items_and_reports_media_progress(database): + """同步前应查询全部目标媒体库总数,并按媒体条目更新进度。""" + chain = object.__new__(MediaServerChain) + events = [] + progress_snapshots = [] + server_libraries = { + "plex-a": [SimpleNamespace(id="movies", name="电影库")], + "plex-b": [SimpleNamespace(id="shows", name="剧集库")], + } + library_items = { + ("plex-a", "movies"): [ + schemas.MediaServerItem( + server="plex-a", + library="movies", + item_id=f"movie-{index}", + item_type="Movie", + title=f"电影 {index}", + ) + for index in range(2) + ], + ("plex-b", "shows"): [ + schemas.MediaServerItem( + server="plex-b", + library="shows", + item_id="show-1", + item_type="Movie", + title="剧集 1", + ) + ], + } + + chain.librarys = lambda server: server_libraries[server] + + def media_count(server): + """记录整服统计顺序并返回待同步媒体总数。""" + events.append(f"count:{server}") + return sum( + len(items) + for (item_server, _library_id), items in library_items.items() + if item_server == server + ) + + def items(**kwargs): + """记录同步顺序并返回媒体库条目。""" + server = kwargs["server"] + library_id = kwargs["library_id"] + events.append(f"items:{server}:{library_id}") + return iter(library_items[(server, library_id)]) + + chain.media_count = media_count + chain.items_count = lambda **_kwargs: pytest.fail("整服同步不应逐库重复计数") + chain.items = items + chain.episodes = lambda *_args, **_kwargs: [] + + with patch("app.db.ScopedSession", database), patch.object( + MEDIA_SERVER_CHAIN_MODULE.ServiceConfigHelper, + "get_mediaserver_configs", + return_value=[ + SimpleNamespace(name="plex-a", enabled=True, sync_libraries=["all"]), + SimpleNamespace(name="plex-b", enabled=True, sync_libraries=["all"]), + ], + ): + chain.sync( + progress_callback=lambda **kwargs: progress_snapshots.append(kwargs) + ) + + assert events == [ + "count:plex-a", + "count:plex-b", + "items:plex-a:movies", + "items:plex-b:shows", + ] + media_progress = [ + snapshot + for snapshot in progress_snapshots + if snapshot["text"].startswith(("正在同步 plex-a 媒体库", "正在同步 plex-b 媒体库")) + ] + assert [round(snapshot["value"], 2) for snapshot in media_progress] == [ + 33.33, + 66.67, + 100.0, + ] + assert media_progress[0]["data"]["media_total"] == 3 + assert media_progress[1]["data"]["library_media_finished"] == 2 + assert media_progress[2]["data"]["current_library"] == "剧集库" + assert media_progress[2]["data"]["media_total"] == 3 + assert media_progress[2]["data"]["media_finished"] == 3 + progress_values = [snapshot["value"] for snapshot in progress_snapshots] + assert progress_values == sorted(progress_values)