feat(mediaserver): add item count retrieval for various media servers

This commit is contained in:
jxxghp
2026-06-29 10:45:10 +08:00
parent 8bf826faa0
commit 9eb71c744b
17 changed files with 799 additions and 157 deletions

View File

@@ -109,6 +109,35 @@ class MediaServerChain(ChainBase):
yield from self.run_module("mediaserver_items", server=server, library_id=library_id,
start_index=start_index, limit=limit)
def items_count(self, server: str, library_id: Union[str, int]) -> Optional[int]:
"""
获取指定媒体库可同步的媒体条目总数
:param server: 媒体服务器名称
:param library_id: 媒体库ID
:return: 媒体条目总数无法获取时返回None
"""
return self.run_module(
"mediaserver_items_count",
server=server,
library_id=library_id,
)
def media_count(self, server: str) -> Optional[int]:
"""
获取指定媒体服务器可同步的电影和电视剧总数
:param server: 媒体服务器名称
:return: 电影和电视剧总数无法获取时返回None
"""
statistics = self.run_module("media_statistic", server=server)
if not statistics:
return None
return sum(
(statistic.movie_count or 0) + (statistic.tv_count or 0)
for statistic in statistics
)
def iteminfo(self, server: str, item_id: Union[str, int]) -> MediaServerItem:
"""
获取媒体服务器项目信息
@@ -221,8 +250,75 @@ class MediaServerChain(ChainBase):
if progress_callback:
progress_callback(value=100, text="没有已启用的媒体服务器")
return
server_sync_contexts = {}
global_media_total = 0
global_counts_available = True
for mediaserver in mediaservers:
if not mediaserver or not mediaserver.enabled:
continue
server_name = mediaserver.name
logger.info(f"正在统计媒体服务器 {server_name} 的待同步媒体数量")
libraries = self.librarys(server_name)
if not libraries:
server_sync_contexts[server_name] = None
continue
sync_libraries = mediaserver.sync_libraries or []
selected_libraries = []
for library in libraries:
if sync_libraries \
and "all" not in sync_libraries \
and str(library.id) not in sync_libraries:
logger.info(f"{library.name} 未在 {server_name} 同步媒体库列表中,跳过")
continue
selected_libraries.append(library)
library_media_counts = {
str(library.id): None for library in selected_libraries
}
sync_all_libraries = (
not sync_libraries or "all" in sync_libraries
)
server_media_count = (
self.media_count(server_name)
if sync_all_libraries else None
)
if server_media_count:
global_media_total += server_media_count
logger.info(
f"媒体服务器 {server_name}{server_media_count} 个媒体待同步"
)
else:
for library in selected_libraries:
media_count = self.items_count(
server=server_name,
library_id=library.id,
)
library_media_counts[str(library.id)] = media_count
if media_count is None:
global_counts_available = False
logger.warning(
f"未获取到 {server_name} 媒体库 {library.name} 的媒体总数,"
f"同步进度将按媒体库完成度计算"
)
else:
global_media_total += media_count
logger.info(
f"{server_name} 媒体库 {library.name}"
f"{media_count} 个媒体待同步"
)
server_sync_contexts[server_name] = (
selected_libraries,
library_media_counts,
)
if not global_counts_available:
global_media_total = None
# 遍历媒体服务器
server_index = 0
global_media_finished = 0
for mediaserver in mediaservers:
if not mediaserver:
continue
@@ -233,8 +329,13 @@ class MediaServerChain(ChainBase):
server_index += 1
server_name = mediaserver.name
if progress_callback:
progress_value = (
global_media_finished / global_media_total * 100
if global_media_total else
(server_index - 1) / total_servers * 100
)
progress_callback(
value=(server_index - 1) / total_servers * 100,
value=progress_value,
text=(
f"正在同步媒体服务器"
f"{server_index}/{total_servers}{server_name} ..."
@@ -243,29 +344,37 @@ class MediaServerChain(ChainBase):
"total": total_servers,
"finished": server_index - 1,
"current": server_name,
"media_total": global_media_total,
"media_finished": global_media_finished,
},
)
sync_libraries = mediaserver.sync_libraries or []
logger.info(f"开始同步媒体服务器 {server_name} 的数据 ...")
libraries = self.librarys(server_name)
if not libraries:
sync_context = server_sync_contexts.get(server_name)
if sync_context is None:
logger.info(f"没有获取到媒体服务器 {server_name} 的媒体库,跳过")
if progress_callback:
progress_value = (
global_media_finished / global_media_total * 100
if global_media_total else
server_index / total_servers * 100
)
progress_callback(
value=server_index / total_servers * 100,
value=progress_value,
text=f"媒体服务器 {server_name} 无可同步媒体库",
data={"total": total_servers, "finished": server_index},
data={
"total": total_servers,
"finished": server_index,
"media_total": global_media_total,
"media_finished": global_media_finished,
},
)
continue
sync_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
total_libraries = len(libraries)
for library_index, library in enumerate(libraries, start=1):
if sync_libraries \
and "all" not in sync_libraries \
and str(library.id) not in sync_libraries:
logger.info(f"{library.name} 未在 {server_name} 同步媒体库列表中,跳过")
continue
selected_libraries, library_media_counts = sync_context
total_libraries = len(selected_libraries)
for library_index, library in enumerate(selected_libraries, start=1):
logger.info(f"正在同步 {server_name} 媒体库 {library.name} ...")
library_media_total = library_media_counts.get(str(library.id))
library_count = 0
for item in self.items(server=server_name, library_id=library.id):
if global_vars.is_system_stopped:
@@ -275,6 +384,7 @@ class MediaServerChain(ChainBase):
logger.debug(f"正在同步 {item.title} ...")
# 计数
library_count += 1
global_media_finished += 1
seasoninfo = {}
# 类型
item_type = "电视剧" if item.item_type in ["Series", "show"] else "电影"
@@ -289,16 +399,60 @@ class MediaServerChain(ChainBase):
item_dict["item_type"] = item_type
item_dict["lst_mod_date"] = sync_time
dboper.upsert(**item_dict)
if progress_callback:
if global_media_total:
progress_value = min(
global_media_finished / global_media_total,
1,
) * 100
else:
library_progress = (
min(library_count / library_media_total, 1)
if library_media_total else 0
)
server_progress = (
library_index - 1 + library_progress
) / total_libraries
progress_value = (
server_index - 1 + server_progress
) / total_servers * 100
progress_callback(
value=progress_value,
text=(
f"正在同步 {server_name} 媒体库 {library.name}"
f"{library_count}/{library_media_total}"
if library_media_total is not None
else f"正在同步 {server_name} 媒体库 {library.name}"
),
data={
"total": total_servers,
"finished": server_index - 1,
"current": server_name,
"library_total": total_libraries,
"library_finished": library_index - 1,
"current_library": library.name,
"library_media_total": library_media_total,
"library_media_finished": library_count,
"media_total": global_media_total,
"media_finished": global_media_finished,
},
)
logger.info(f"{server_name} 媒体库 {library.name} 同步完成,共同步数量:{library_count}")
# 总数累加
total_count += library_count
if progress_callback:
if global_media_total:
progress_value = min(
global_media_finished / global_media_total,
1,
) * 100
else:
server_progress = library_index / total_libraries
progress_value = (
server_index - 1 + server_progress
) / total_servers * 100
progress_callback(
value=(
(server_index - 1 + library_index / total_libraries)
/ total_servers
* 100
),
value=progress_value,
text=(
f"{server_name} 媒体库"
f"{library_index}/{total_libraries}{library.name} 同步完成"
@@ -309,19 +463,34 @@ class MediaServerChain(ChainBase):
"current": server_name,
"library_total": total_libraries,
"library_finished": library_index,
"current_library": library.name,
"library_media_total": library_media_total,
"library_media_finished": library_count,
"media_total": global_media_total,
"media_finished": global_media_finished,
},
)
stale_count = dboper.delete_stale(server=server_name, sync_time=sync_time)
logger.info(f"媒体服务器 {server_name} 清理陈旧数据完成,删除数量:{stale_count}")
logger.info(f"媒体服务器 {server_name} 数据同步完成,总同步数量:{total_count}")
if progress_callback:
progress_value = (
min(global_media_finished / global_media_total, 1) * 100
if global_media_total else
server_index / total_servers * 100
)
progress_callback(
value=server_index / total_servers * 100,
value=progress_value,
text=(
f"媒体服务器({server_index}/{total_servers}"
f"{server_name} 同步完成"
),
data={"total": total_servers, "finished": server_index},
data={
"total": total_servers,
"finished": server_index,
"media_total": global_media_total,
"media_finished": global_media_finished,
},
)
if progress_callback:
progress_callback(value=100, text="媒体服务器同步完成")

View File

@@ -243,6 +243,19 @@ class EmbyModule(_ModuleBase, _MediaServerBase[Emby]):
return server_obj.get_items(library_id, start_index, limit)
return None
def mediaserver_items_count(self, server: str, library_id: Union[str, int]) -> Optional[int]:
"""
获取指定媒体库可同步的媒体条目总数
:param server: 媒体服务器名称
:param library_id: 媒体库ID
:return: 媒体条目总数查询失败时返回None
"""
server_obj: Emby = self.get_instance(server)
if server_obj:
return server_obj.get_items_count(library_id)
return None
def mediaserver_iteminfo(self, server: str, item_id: str) -> Optional[schemas.MediaServerItem]:
"""
媒体库项目详情

View File

@@ -693,6 +693,33 @@ class Emby:
logger.error(f"连接/Users/{self.user}/Items/{itemid}出错:" + str(e))
return None
def get_items_count(self, parent: Union[str, int]) -> Optional[int]:
"""
获取指定媒体库可同步的电影和剧集总数
:param parent: 媒体库ID
:return: 媒体条目总数查询失败时返回None
"""
if not parent or not self._host or not self._apikey:
return None
url = f"{self._host}emby/Users/{self.user}/Items"
params = {
"ParentId": parent,
"Recursive": "true",
"IncludeItemTypes": "Movie,Series",
"Limit": 0,
"api_key": self._apikey,
}
try:
res = RequestUtils().get_res(url, params)
if not res or res.status_code != 200:
return None
total_count = res.json().get("TotalRecordCount")
return int(total_count) if total_count is not None else None
except Exception as e:
logger.error(f"查询媒体库 {parent} 的媒体总数出错:{str(e)}")
return None
def get_items(self, parent: Union[str, int], start_index: Optional[int] = 0,
limit: Optional[int] = -1) -> Generator[MediaServerItem | None | Any, Any, None]:
"""

View File

@@ -242,6 +242,19 @@ class JellyfinModule(_ModuleBase, _MediaServerBase[Jellyfin]):
return server_obj.get_items(library_id, start_index, limit)
return None
def mediaserver_items_count(self, server: str, library_id: Union[str, int]) -> Optional[int]:
"""
获取指定媒体库可同步的媒体条目总数
:param server: 媒体服务器名称
:param library_id: 媒体库ID
:return: 媒体条目总数查询失败时返回None
"""
server_obj: Jellyfin = self.get_instance(server)
if server_obj:
return server_obj.get_items_count(library_id)
return None
def mediaserver_iteminfo(self, server: str, item_id: str) -> Optional[schemas.MediaServerItem]:
"""
媒体库项目详情

View File

@@ -808,6 +808,33 @@ class Jellyfin:
logger.error(f"连接Users/{self.user}/Items/{itemid}" + str(e))
return None
def get_items_count(self, parent: Union[str, int]) -> Optional[int]:
"""
获取指定媒体库可同步的电影和剧集总数
:param parent: 媒体库ID
:return: 媒体条目总数查询失败时返回None
"""
if not parent or not self._host or not self._apikey or not self.user:
return None
url = f"{self._host}Users/{self.user}/Items"
params = {
"ParentId": parent,
"Recursive": "true",
"IncludeItemTypes": "Movie,Series",
"Limit": 0,
"api_key": self._apikey,
}
try:
res = RequestUtils().get_res(url, params)
if not res or res.status_code != 200:
return None
total_count = res.json().get("TotalRecordCount")
return int(total_count) if total_count is not None else None
except Exception as e:
logger.error(f"查询媒体库 {parent} 的媒体总数出错:{str(e)}")
return None
def get_items(self, parent: Union[str, int], start_index: Optional[int] = 0, limit: Optional[int] = -1) \
-> Generator[MediaServerItem | None | Any, Any, None]:
"""

View File

@@ -253,6 +253,19 @@ class PlexModule(_ModuleBase, _MediaServerBase[Plex]):
return server_obj.get_items(library_id, start_index, limit)
return None
def mediaserver_items_count(self, server: str, library_id: Union[str, int]) -> Optional[int]:
"""
获取指定媒体库可同步的媒体条目总数
:param server: 媒体服务器名称
:param library_id: 媒体库ID
:return: 媒体条目总数查询失败时返回None
"""
server_obj: Plex = self.get_instance(server)
if server_obj:
return server_obj.get_items_count(library_id)
return None
def mediaserver_iteminfo(self, server: str, item_id: str) -> Optional[schemas.MediaServerItem]:
"""
媒体库项目详情

View File

@@ -575,6 +575,22 @@ class Plex:
user_state=user_state,
)
def get_items_count(self, parent: Union[str, int]) -> Optional[int]:
"""
获取指定媒体库可同步的媒体条目总数
:param parent: 媒体库ID
:return: 媒体条目总数查询失败时返回None
"""
if not parent or not self._plex:
return None
try:
section = self._plex.library.sectionByID(int(parent))
return int(section.totalSize) if section else None
except Exception as err:
logger.error(f"查询媒体库 {parent} 的媒体总数出错:{str(err)}")
return None
def get_items(self, parent: Union[str, int], start_index: Optional[int] = 0, limit: Optional[int] = -1) \
-> Generator[MediaServerItem | None, Any, None]:
"""

View File

@@ -281,6 +281,21 @@ class TrimeMediaModule(_ModuleBase, _MediaServerBase[TrimeMedia]):
return server_obj.get_items(library_id, start_index, limit)
return None
def mediaserver_items_count(
self, server: str, library_id: Union[str, int]
) -> Optional[int]:
"""
获取指定媒体库可同步的媒体条目总数
:param server: 媒体服务器名称
:param library_id: 媒体库ID
:return: 媒体条目总数查询失败时返回None
"""
server_obj: Optional[TrimeMedia] = self.get_instance(server)
if server_obj:
return server_obj.get_items_count(library_id)
return None
def mediaserver_iteminfo(
self, server: str, item_id: str
) -> Optional[schemas.MediaServerItem]:

View File

@@ -347,6 +347,32 @@ class Api:
return [self.__build_item(info) for info in res.data.get("list", [])]
return None
def item_count(self, guid: str, types=None) -> Optional[int]:
"""
获取指定媒体库的媒体条目总数
:param guid: 媒体库GUID
:param types: 需要统计的媒体类型
:return: 媒体条目总数查询失败时返回None
"""
if types is None:
types = [Type.MOVIE, Type.TV]
post = {
"ancestor_guid": guid,
"tags": {"type": types},
"exclude_grouped_video": 1,
"page": 1,
"page_size": 1,
}
if (res := self.request("/item/list", data=post)) and res.success:
if not res.data:
return 0
total_count = res.data.get("total")
if total_count is None:
total_count = res.data.get("total_count")
return int(total_count) if total_count is not None else None
return None
def search_list(self, keywords: str) -> Optional[list[Item]]:
"""
搜索影片、演员

View File

@@ -510,6 +510,20 @@ class TrimeMedia:
use_cookies=True,
)
def get_items_count(self, parent: Union[str, int]) -> Optional[int]:
"""
获取指定媒体库可同步的媒体条目总数
:param parent: 媒体库ID
:return: 媒体条目总数查询失败时返回None
"""
if not self.is_authenticated():
return None
return self._api.item_count(
guid=str(parent),
types=[fnapi.Type.MOVIE, fnapi.Type.TV],
)
def get_items(
self,
parent: Union[str, int],

View File

@@ -253,6 +253,21 @@ class UgreenModule(_ModuleBase, _MediaServerBase[Ugreen]):
return server_obj.get_items(library_id, start_index, limit)
return None
def mediaserver_items_count(
self, server: str, library_id: Union[str, int]
) -> Optional[int]:
"""
获取指定媒体库可同步的媒体条目总数
:param server: 媒体服务器名称
:param library_id: 媒体库ID
:return: 媒体条目总数查询失败时返回None
"""
server_obj: Optional[Ugreen] = self.get_instance(server)
if server_obj:
return server_obj.get_items_count(library_id)
return None
def mediaserver_iteminfo(
self, server: str, item_id: str
) -> Optional[schemas.MediaServerItem]:

View File

@@ -66,15 +66,19 @@ class Ugreen:
@property
def api(self) -> Optional[Api]:
"""获取当前绿联影视 API 客户端"""
return self._api
def close(self):
"""关闭绿联影视连接"""
self.disconnect()
def is_configured(self) -> bool:
"""检查绿联影视连接配置是否完整"""
return bool(self._host and self._username and self._password)
def is_authenticated(self) -> bool:
"""检查绿联影视会话是否已认证"""
return (
self.is_configured()
and self._api is not None
@@ -83,6 +87,7 @@ class Ugreen:
)
def is_inactive(self) -> bool:
"""检查绿联影视会话是否已失效"""
if not self.is_authenticated():
return True
self._userinfo = self._api.current_user() if self._api else None
@@ -516,6 +521,12 @@ class Ugreen:
return paths
def get_librarys(self, hidden: Optional[bool] = False) -> List[schemas.MediaServerLibrary]:
"""
获取绿联影视媒体库列表
:param hidden: 是否过滤未启用同步的媒体库
:return: 媒体库列表
"""
if not self.is_authenticated() or not self._api:
return []
@@ -567,12 +578,14 @@ class Ugreen:
return libraries
def get_user_count(self) -> int:
"""获取绿联影视媒体库用户数量"""
if not self.is_authenticated() or not self._api:
return 0
users = self._api.media_lib_users()
return len(users)
def get_medias_count(self) -> schemas.Statistic:
"""获取绿联影视的电影和电视剧数量统计"""
if not self.is_authenticated() or not self._api:
return schemas.Statistic()
@@ -856,12 +869,36 @@ class Ugreen:
break
page += 1
def get_items_count(self, parent: Union[str, int]) -> Optional[int]:
"""
获取指定媒体库可同步的媒体条目总数
:param parent: 媒体库ID
:return: 媒体条目总数查询失败时返回None
"""
if not self.is_authenticated() or not self._api:
return None
if not self._libraries:
self.get_librarys()
library = self._libraries.get(str(parent))
if not library:
return None
return int(library.get("video_count") or 0)
def get_items(
self,
parent: Union[str, int],
start_index: Optional[int] = 0,
limit: Optional[int] = -1,
) -> Generator[schemas.MediaServerItem | None | Any, Any, None]:
"""
获取指定绿联影视媒体库的可同步条目
:param parent: 媒体库ID
:param start_index: 起始条目索引
:param limit: 最大返回条目数,-1表示不限制
:return: 媒体条目生成器
"""
if not self.is_authenticated() or not self._api:
return None

View File

@@ -237,6 +237,19 @@ class ZSpaceModule(_ModuleBase, _MediaServerBase[ZSpace]):
return server_obj.get_items(library_id, start_index, limit)
return None
def mediaserver_items_count(self, server: str, library_id: Union[str, int]) -> Optional[int]:
"""
获取指定媒体库可同步的媒体条目总数
:param server: 媒体服务器名称
:param library_id: 媒体库ID
:return: 媒体条目总数查询失败时返回None
"""
server_obj: ZSpace = self.get_instance(server)
if server_obj:
return server_obj.get_items_count(library_id)
return None
def mediaserver_iteminfo(self, server: str, item_id: str) -> Optional[schemas.MediaServerItem]:
"""
媒体库项目详情

View File

@@ -815,6 +815,32 @@ class ZSpace:
logger.error(f"连接/Users/{self.user}/Items/{itemid}出错:{e}")
return None
def get_items_count(self, parent: Union[str, int]) -> Optional[int]:
"""
获取指定媒体库的媒体条目总数
极影视当前兼容层会忽略条目类型过滤,因此以递归查询返回的
TotalRecordCount 作为同步进度的总数。
:param parent: 媒体库ID
:return: 媒体条目总数查询失败时返回None
"""
if not parent or not self._host or not self._apikey or not self.user:
return None
url = f"{self._host}emby/Users/{self.user}/Items"
try:
res = self.__request_utils().get_res(
url,
params={"ParentId": parent, "Recursive": "true", "Limit": 0},
)
if not res or res.status_code != 200:
return None
total_count = res.json().get("TotalRecordCount")
return int(total_count) if total_count is not None else None
except Exception as e:
logger.error(f"查询媒体库 {parent} 的媒体总数出错:{e}")
return None
def get_items(self, parent: Union[str, int], start_index: Optional[int] = 0,
limit: Optional[int] = -1) -> Generator[MediaServerItem | None | Any, Any, None]:
"""

View File

@@ -36,8 +36,8 @@ from app.db.systemconfig_oper import SystemConfigOper
from app.helper.image import WallpaperHelper
from app.helper.message import MessageHelper
from app.helper.progress import ProgressHelper
from app.helper.sites import SitesHelper # noqa
from app.helper.server import MoviePilotServerHelper
from app.helper.sites import SitesHelper # noqa
from app.log import logger
from app.schemas import Notification, NotificationType, Workflow
from app.schemas.types import EventType, SystemConfigKey
@@ -177,16 +177,16 @@ class SchedulerChain(ChainBase):
)
message_cutoff = (
started_at - timedelta(days=message_days)
started_at - timedelta(days=message_days)
).strftime("%Y-%m-%d %H:%M:%S")
download_history_cutoff = (
started_at - timedelta(days=download_history_days)
started_at - timedelta(days=download_history_days)
).strftime("%Y-%m-%d %H:%M:%S")
site_userdata_cutoff = (
started_at - timedelta(days=site_userdata_days)
started_at - timedelta(days=site_userdata_days)
).strftime("%Y-%m-%d")
transfer_history_cutoff = (
started_at - timedelta(days=transfer_history_days)
started_at - timedelta(days=transfer_history_days)
).strftime("%Y-%m-%d %H:%M:%S")
return [
@@ -472,8 +472,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
id="cookiecloud",
name="同步CookieCloud站点",
minutes=int(settings.COOKIECLOUD_INTERVAL),
next_run_time=datetime.now(pytz.timezone(settings.TZ))
+ timedelta(minutes=5),
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=5),
kwargs={"job_id": "cookiecloud"},
)
@@ -488,8 +487,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
id="mediaserver_sync",
name="同步媒体服务器",
hours=int(settings.MEDIASERVER_SYNC_INTERVAL),
next_run_time=datetime.now(pytz.timezone(settings.TZ))
+ timedelta(minutes=10),
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=10),
kwargs={"job_id": "mediaserver_sync"},
)
@@ -582,8 +580,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
id="random_wallpager",
name="壁纸缓存",
minutes=30,
next_run_time=datetime.now(pytz.timezone(settings.TZ))
+ timedelta(seconds=1),
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(seconds=1),
kwargs={"job_id": "random_wallpager"},
)
@@ -647,8 +644,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
id="recommend_refresh",
name="推荐缓存",
hours=24,
next_run_time=datetime.now(pytz.timezone(settings.TZ))
+ timedelta(seconds=5),
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(seconds=5),
kwargs={"job_id": "recommend_refresh"},
)
@@ -669,8 +665,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
id="subscribe_calendar_cache",
name="订阅日历缓存",
hours=6,
next_run_time=datetime.now(pytz.timezone(settings.TZ))
+ timedelta(minutes=2),
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=2),
kwargs={"job_id": "subscribe_calendar_cache"},
)
@@ -823,7 +818,8 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
data=data,
)
def __handle_job_error(self, job_id: str, job: dict, error: Exception) -> None:
@staticmethod
def __handle_job_error(job_id: str, job: dict, error: Exception) -> None:
"""
记录定时任务执行异常并发送系统错误事件。
"""

View File

@@ -0,0 +1,125 @@
from types import SimpleNamespace
from unittest.mock import patch
import pytest
from app.modules.emby.emby import Emby
from app.modules.jellyfin.jellyfin import Jellyfin
from app.modules.plex.plex import Plex
from app.modules.trimemedia.api import Api as TrimeMediaApi
from app.modules.trimemedia.api import Type as TrimeMediaType
from app.modules.ugreen.ugreen import Ugreen
from app.modules.zspace.zspace import ZSpace
class _FakeResponse:
"""模拟媒体服务器 HTTP 响应。"""
def __init__(self, payload: dict, status_code: int = 200):
"""保存响应数据和状态码。"""
self._payload = payload
self.status_code = status_code
def json(self) -> dict:
"""返回模拟的 JSON 数据。"""
return self._payload
@pytest.mark.parametrize(
("client_class", "request_utils_path", "url_path"),
[
(Emby, "app.modules.emby.emby.RequestUtils", "emby/Users/user-id/Items"),
(Jellyfin, "app.modules.jellyfin.jellyfin.RequestUtils", "Users/user-id/Items"),
],
)
def test_emby_compatible_items_count_uses_recursive_type_filter(
client_class, request_utils_path, url_path
):
"""Emby 兼容服务应通过递归类型过滤查询媒体库总数。"""
client = client_class.__new__(client_class)
client._host = "http://media.local/"
client._apikey = "token"
client.user = "user-id"
with patch(request_utils_path) as request_utils_cls:
request_utils_cls.return_value.get_res.return_value = _FakeResponse(
{"TotalRecordCount": 23}
)
result = client.get_items_count("library-id")
assert result == 23
args = request_utils_cls.return_value.get_res.call_args.args
assert args[0] == f"http://media.local/{url_path}"
assert args[1]["Recursive"] == "true"
assert args[1]["IncludeItemTypes"] == "Movie,Series"
assert args[1]["Limit"] == 0
def test_zspace_items_count_uses_total_record_count():
"""极影视应使用兼容接口返回的 TotalRecordCount。"""
client = ZSpace.__new__(ZSpace)
client._host = "http://zspace.local/"
client._apikey = "token"
client.user = "user-id"
with patch("app.modules.zspace.zspace.RequestUtils") as request_utils_cls:
request_utils_cls.return_value.get_res.return_value = _FakeResponse(
{"TotalRecordCount": 17}
)
result = client.get_items_count("library-id")
assert result == 17
params = request_utils_cls.return_value.get_res.call_args.kwargs["params"]
assert params == {
"ParentId": "library-id",
"Recursive": "true",
"Limit": 0,
}
def test_plex_items_count_uses_section_total_size():
"""Plex 应直接读取媒体库分区的条目总数。"""
client = Plex.__new__(Plex)
section = SimpleNamespace(totalSize=31)
client._plex = SimpleNamespace(
library=SimpleNamespace(sectionByID=lambda _library_id: section)
)
assert client.get_items_count("9") == 31
def test_ugreen_items_count_uses_library_video_count():
"""绿联影视应复用媒体库列表中的视频总数。"""
client = Ugreen.__new__(Ugreen)
client._host = "http://ugreen.local"
client._username = "tester"
client._password = "secret"
client._userinfo = {"name": "tester"}
client._api = SimpleNamespace(token="token")
client._libraries = {"library-id": {"video_count": 42}}
assert client.get_items_count("library-id") == 42
def test_trimemedia_item_count_reads_list_total():
"""飞牛影视应通过媒体列表接口的 total 字段获取总数。"""
api = TrimeMediaApi.__new__(TrimeMediaApi)
response = SimpleNamespace(success=True, data={"list": [], "total": 19})
with patch.object(TrimeMediaApi, "request", return_value=response) as request:
result = api.item_count(
guid="library-id",
types=[TrimeMediaType.MOVIE, TrimeMediaType.TV],
)
assert result == 19
request.assert_called_once_with(
"/item/list",
data={
"ancestor_guid": "library-id",
"tags": {"type": [TrimeMediaType.MOVIE, TrimeMediaType.TV]},
"exclude_grouped_video": 1,
"page": 1,
"page_size": 1,
},
)

View File

@@ -1,9 +1,7 @@
import tempfile
import unittest
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import patch
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
@@ -15,139 +13,238 @@ from app.db.mediaserver_oper import MediaServerOper
from app.db.models.mediaserver import MediaServerItem
class MediaServerIncrementalSyncTest(unittest.TestCase):
"""验证媒体库同步改为按条目增量更新。"""
@pytest.fixture
def database(tmp_path):
"""创建隔离的媒体服务器测试数据库。"""
engine = create_engine(f"sqlite:///{tmp_path / 'mediaserver.db'}")
session_factory = sessionmaker(bind=engine)
Base.metadata.create_all(bind=engine)
yield session_factory
engine.dispose()
def setUp(self):
self.temp_dir = tempfile.TemporaryDirectory()
db_path = Path(self.temp_dir.name) / "mediaserver.db"
self.engine = create_engine(f"sqlite:///{db_path}")
self.SessionFactory = sessionmaker(bind=self.engine)
Base.metadata.create_all(bind=self.engine)
def tearDown(self):
self.engine.dispose()
self.temp_dir.cleanup()
def test_add_allows_same_item_id_across_servers(database):
"""不同媒体服务器允许复用相同 item_id。"""
with database() as db:
oper = MediaServerOper(db)
def test_add_allows_same_item_id_across_servers(self):
"""不同媒体服务器允许复用相同 item_id。"""
with self.SessionFactory() as db:
oper = MediaServerOper(db)
assert oper.add(
server="plex",
library="movies",
item_id="same-item-id",
item_type="电影",
title="Movie A",
)
assert oper.add(
server="jellyfin",
library="movies",
item_id="same-item-id",
item_type="电影",
title="Movie B",
)
self.assertTrue(
oper.add(
server="plex",
library="movies",
item_id="same-item-id",
item_type="电影",
title="Movie A",
)
)
self.assertTrue(
oper.add(
server="jellyfin",
library="movies",
item_id="same-item-id",
item_type="电影",
title="Movie B",
)
)
items = (
db.query(MediaServerItem)
.order_by(MediaServerItem.server.asc())
.all()
)
items = (
db.query(MediaServerItem)
.order_by(MediaServerItem.server.asc())
.all()
)
assert len(items) == 2
assert [item.server for item in items] == ["jellyfin", "plex"]
self.assertEqual(len(items), 2)
self.assertEqual([item.server for item in items], ["jellyfin", "plex"])
def test_sync_updates_rows_and_removes_stale_entries(self):
"""同步应更新已存在条目,并清理未再出现或已移除服务的数据"""
old_sync_time = "2026-05-01 00:00:00"
def test_media_count_reuses_existing_server_statistics():
"""整服同步应复用现有媒体统计并排除剧集集数"""
chain = object.__new__(MediaServerChain)
chain.run_module = lambda *_args, **_kwargs: [
schemas.Statistic(movie_count=12, tv_count=8, episode_count=200)
]
with self.SessionFactory() as db:
db.add_all(
[
MediaServerItem(
server="plex",
library="movies",
item_id="/library/metadata/1",
item_type="电影",
title="Old Title",
year="2024",
path="/media/old.mkv",
lst_mod_date=old_sync_time,
),
MediaServerItem(
server="plex",
library="movies",
item_id="/library/metadata/2",
item_type="电影",
title="Stale Title",
year="2020",
path="/media/stale.mkv",
lst_mod_date=old_sync_time,
),
MediaServerItem(
server="jellyfin",
library="movies",
item_id="/library/metadata/1",
item_type="电影",
title="Removed Server Title",
year="2024",
path="/media/removed.mkv",
lst_mod_date=old_sync_time,
),
]
)
db.commit()
existing_id = (
db.query(MediaServerItem.id)
.filter(
MediaServerItem.server == "plex",
MediaServerItem.item_id == "/library/metadata/1",
)
.scalar()
)
assert chain.media_count("plex") == 20
chain = object.__new__(MediaServerChain)
chain.librarys = lambda _server: [SimpleNamespace(id="movies", name="电影库")]
chain.items = lambda **_kwargs: iter(
def test_sync_updates_rows_and_removes_stale_entries(database):
"""同步应更新已存在条目,并清理未再出现或已移除服务的数据。"""
old_sync_time = "2026-05-01 00:00:00"
with database() as db:
db.add_all(
[
schemas.MediaServerItem(
MediaServerItem(
server="plex",
library="movies",
item_id="/library/metadata/1",
item_type="Movie",
title="New Title",
item_type="电影",
title="Old Title",
year="2024",
tmdbid=100,
path="/media/new.mkv",
)
path="/media/old.mkv",
lst_mod_date=old_sync_time,
),
MediaServerItem(
server="plex",
library="movies",
item_id="/library/metadata/2",
item_type="电影",
title="Stale Title",
year="2020",
path="/media/stale.mkv",
lst_mod_date=old_sync_time,
),
MediaServerItem(
server="jellyfin",
library="movies",
item_id="/library/metadata/1",
item_type="电影",
title="Removed Server Title",
year="2024",
path="/media/removed.mkv",
lst_mod_date=old_sync_time,
),
]
)
chain.episodes = lambda *_args, **_kwargs: []
with patch("app.db.ScopedSession", self.SessionFactory), patch.object(
MEDIA_SERVER_CHAIN_MODULE.ServiceConfigHelper,
"get_mediaserver_configs",
return_value=[SimpleNamespace(name="plex", enabled=True, sync_libraries=["all"])],
):
chain.sync()
with self.SessionFactory() as db:
items = (
db.query(MediaServerItem)
.order_by(MediaServerItem.server.asc(), MediaServerItem.item_id.asc())
.all()
db.commit()
existing_id = (
db.query(MediaServerItem.id)
.filter(
MediaServerItem.server == "plex",
MediaServerItem.item_id == "/library/metadata/1",
)
.scalar()
)
self.assertEqual(len(items), 1)
self.assertEqual(items[0].id, existing_id)
self.assertEqual(items[0].server, "plex")
self.assertEqual(items[0].item_id, "/library/metadata/1")
self.assertEqual(items[0].item_type, "电影")
self.assertEqual(items[0].title, "New Title")
self.assertEqual(items[0].path, "/media/new.mkv")
self.assertNotEqual(items[0].lst_mod_date, old_sync_time)
chain = object.__new__(MediaServerChain)
chain.librarys = lambda _server: [
SimpleNamespace(id="movies", name="电影库"),
SimpleNamespace(id="shows", name="剧集库"),
]
chain.media_count = lambda _server: pytest.fail("部分媒体库同步不应使用整服统计")
chain.items_count = lambda **_kwargs: 1
chain.items = lambda **_kwargs: iter(
[
schemas.MediaServerItem(
server="plex",
library="movies",
item_id="/library/metadata/1",
item_type="Movie",
title="New Title",
year="2024",
tmdbid=100,
path="/media/new.mkv",
)
]
)
chain.episodes = lambda *_args, **_kwargs: []
with patch("app.db.ScopedSession", database), patch.object(
MEDIA_SERVER_CHAIN_MODULE.ServiceConfigHelper,
"get_mediaserver_configs",
return_value=[SimpleNamespace(name="plex", enabled=True, sync_libraries=["movies"])],
):
chain.sync()
with database() as db:
items = (
db.query(MediaServerItem)
.order_by(MediaServerItem.server.asc(), MediaServerItem.item_id.asc())
.all()
)
assert len(items) == 1
assert items[0].id == existing_id
assert items[0].server == "plex"
assert items[0].item_id == "/library/metadata/1"
assert items[0].item_type == "电影"
assert items[0].title == "New Title"
assert items[0].path == "/media/new.mkv"
assert items[0].lst_mod_date != old_sync_time
def test_sync_queries_counts_before_items_and_reports_media_progress(database):
"""同步前应查询全部目标媒体库总数,并按媒体条目更新进度。"""
chain = object.__new__(MediaServerChain)
events = []
progress_snapshots = []
server_libraries = {
"plex-a": [SimpleNamespace(id="movies", name="电影库")],
"plex-b": [SimpleNamespace(id="shows", name="剧集库")],
}
library_items = {
("plex-a", "movies"): [
schemas.MediaServerItem(
server="plex-a",
library="movies",
item_id=f"movie-{index}",
item_type="Movie",
title=f"电影 {index}",
)
for index in range(2)
],
("plex-b", "shows"): [
schemas.MediaServerItem(
server="plex-b",
library="shows",
item_id="show-1",
item_type="Movie",
title="剧集 1",
)
],
}
chain.librarys = lambda server: server_libraries[server]
def media_count(server):
"""记录整服统计顺序并返回待同步媒体总数。"""
events.append(f"count:{server}")
return sum(
len(items)
for (item_server, _library_id), items in library_items.items()
if item_server == server
)
def items(**kwargs):
"""记录同步顺序并返回媒体库条目。"""
server = kwargs["server"]
library_id = kwargs["library_id"]
events.append(f"items:{server}:{library_id}")
return iter(library_items[(server, library_id)])
chain.media_count = media_count
chain.items_count = lambda **_kwargs: pytest.fail("整服同步不应逐库重复计数")
chain.items = items
chain.episodes = lambda *_args, **_kwargs: []
with patch("app.db.ScopedSession", database), patch.object(
MEDIA_SERVER_CHAIN_MODULE.ServiceConfigHelper,
"get_mediaserver_configs",
return_value=[
SimpleNamespace(name="plex-a", enabled=True, sync_libraries=["all"]),
SimpleNamespace(name="plex-b", enabled=True, sync_libraries=["all"]),
],
):
chain.sync(
progress_callback=lambda **kwargs: progress_snapshots.append(kwargs)
)
assert events == [
"count:plex-a",
"count:plex-b",
"items:plex-a:movies",
"items:plex-b:shows",
]
media_progress = [
snapshot
for snapshot in progress_snapshots
if snapshot["text"].startswith(("正在同步 plex-a 媒体库", "正在同步 plex-b 媒体库"))
]
assert [round(snapshot["value"], 2) for snapshot in media_progress] == [
33.33,
66.67,
100.0,
]
assert media_progress[0]["data"]["media_total"] == 3
assert media_progress[1]["data"]["library_media_finished"] == 2
assert media_progress[2]["data"]["current_library"] == "剧集库"
assert media_progress[2]["data"]["media_total"] == 3
assert media_progress[2]["data"]["media_finished"] == 3
progress_values = [snapshot["value"] for snapshot in progress_snapshots]
assert progress_values == sorted(progress_values)