From 48b59df11b580cd189215d8889f1687331a9ea81 Mon Sep 17 00:00:00 2001 From: Attente <19653207+wikrin@users.noreply.github.com> Date: Thu, 19 Mar 2026 21:25:57 +0800 Subject: [PATCH] =?UTF-8?q?refactor(media):=20=E5=BC=95=E5=85=A5=E9=85=8D?= =?UTF-8?q?=E7=BD=AE=E5=8C=96=E5=88=AE=E5=89=8A=E7=AD=96=E7=95=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/chain/media.py | 1088 +++++++++++++++++++++++-------------- app/schemas/types.py | 38 ++ tests/run.py | 14 + tests/test_mediascrape.py | 614 +++++++++++++++++++++ 4 files changed, 1334 insertions(+), 420 deletions(-) create mode 100644 tests/test_mediascrape.py diff --git a/app/chain/media.py b/app/chain/media.py index f1446362..74a591a8 100644 --- a/app/chain/media.py +++ b/app/chain/media.py @@ -15,7 +15,10 @@ from app.core.metainfo import MetaInfo, MetaInfoPath from app.db.systemconfig_oper import SystemConfigOper from app.log import logger from app.schemas import FileItem -from app.schemas.types import EventType, MediaType, ChainEventType, SystemConfigKey +from app.schemas.types import ChainEventType, EventType, MediaType, \ + ScrapingTarget, ScrapingMetadata, ScrapingPolicy, SystemConfigKey +from app.utils.mixins import ConfigReloadMixin +from app.utils.singleton import Singleton from app.utils.http import RequestUtils from app.utils.string import StringUtils @@ -26,58 +29,298 @@ current_umask = os.umask(0) os.umask(current_umask) -class MediaChain(ChainBase): +class ScrapingOption: + """刮削选项""" + type: ScrapingTarget = ScrapingTarget.TV + metadata: ScrapingMetadata = ScrapingMetadata.NFO + policy: ScrapingPolicy = ScrapingPolicy.MISSINGONLY + + def __init__( + self, + type: Union[str, ScrapingTarget], + metadata: Union[str, ScrapingMetadata], + value: Union[ScrapingPolicy, bool, str], + ): + if isinstance(type, ScrapingTarget): + self.type = type + elif isinstance(type, str): + self.type = ScrapingTarget(type) + if isinstance(metadata, ScrapingMetadata): + self.metadata = metadata + elif isinstance(metadata, str): + self.metadata = ScrapingMetadata(metadata) + if isinstance(value, bool): + # 兼容旧的布尔值格式 + self.policy = ScrapingPolicy.MISSINGONLY if value else ScrapingPolicy.SKIP + elif isinstance(value, ScrapingPolicy): + self.policy = value + elif isinstance(value, str): + self.policy = ScrapingPolicy(value) + else: + logger.error(f"无效的刮削选项:type={type}, metadata={metadata}, value={value}") + + @property + def is_skip(self) -> bool: + """是否跳过""" + return self.policy == ScrapingPolicy.SKIP + + @property + def is_overwrite(self) -> bool: + """是否覆盖模式""" + return self.policy == ScrapingPolicy.OVERWRITE + + +class ScrapingConfig: + """媒体刮削配置""" + + _policies: dict[tuple[str], ScrapingOption] = {} + + def __init__(self, config_dict: dict[str, str] = None): + """ + 初始化配置对象 + :param config_dict: 用户配置字典(扁平化格式),为 None 时使用默认配置 + """ + # 合并用户配置和默认配置 + if config_dict is None: + config_dict = {} + + # 以默认配置为基础,用用户配置覆盖 + _config = self.get_default_config() + for key, value in config_dict.items(): + _config[key] = value + + for key, value in _config.items(): + if "_" in key: + items = key.split('_', 1) + self._policies[tuple(items)] = ScrapingOption(*items, value) + + def option(self, item: Union[str, ScrapingTarget], metadata: Union[str, ScrapingMetadata]) -> ScrapingOption: + + if isinstance(item, ScrapingTarget): + item = item.name.lower() + if isinstance(metadata, ScrapingMetadata): + metadata = metadata.name.lower() + + return self._policies.get((item, metadata), ScrapingOption(item, metadata, ScrapingPolicy.SKIP)) + + @classmethod + def from_system_config(cls) -> 'ScrapingConfig': + """ + 从系统配置加载 + + :return: MediaScrapingConfig 实例 + """ + user_config = SystemConfigOper().get(SystemConfigKey.ScrapingSwitchs) or {} + return cls(user_config) + + @staticmethod + def get_default_config() -> dict[str, str]: + """获取默认配置字典""" + config_items = [ + f"{mt}_{md}" + for mt, mds in [ + ('movie', ['nfo', 'poster', 'backdrop', 'logo', 'disc', 'banner', 'thumb']), + ('tv', ['nfo', 'poster', 'backdrop', 'logo', 'banner', 'thumb']), + ('season', ['nfo', 'poster', 'banner', 'thumb']), + ('episode', ['nfo', 'thumb']) + ] + for md in mds + ] + return {item: ScrapingPolicy.MISSINGONLY for item in config_items} + + +class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): """ 媒体信息处理链,单例运行 """ + CONFIG_WATCH = {SystemConfigKey.ScrapingSwitchs.value} - @staticmethod - def _get_scraping_switchs() -> dict: - """ - 获取刮削开关配置 - """ - switchs = SystemConfigOper().get(SystemConfigKey.ScrapingSwitchs) or {} - # 默认配置 - default_switchs = { - 'movie_nfo': True, # 电影NFO - 'movie_poster': True, # 电影海报 - 'movie_backdrop': True, # 电影背景图 - 'movie_logo': True, # 电影Logo - 'movie_disc': True, # 电影光盘图 - 'movie_banner': True, # 电影横幅图 - 'movie_thumb': True, # 电影缩略图 - 'tv_nfo': True, # 电视剧NFO - 'tv_poster': True, # 电视剧海报 - 'tv_backdrop': True, # 电视剧背景图 - 'tv_banner': True, # 电视剧横幅图 - 'tv_logo': True, # 电视剧Logo - 'tv_thumb': True, # 电视剧缩略图 - 'season_nfo': True, # 季NFO - 'season_poster': True, # 季海报 - 'season_banner': True, # 季横幅图 - 'season_thumb': True, # 季缩略图 - 'episode_nfo': True, # 集NFO - 'episode_thumb': True # 集缩略图 - } - # 合并用户配置和默认配置 - for key, default_value in default_switchs.items(): - if key not in switchs: - switchs[key] = default_value - return switchs + IMAGE_METADATA_MAP = { + 'poster': ScrapingMetadata.POSTER, + 'backdrop': ScrapingMetadata.BACKDROP, + 'fanart': ScrapingMetadata.BACKDROP, + 'background': ScrapingMetadata.BACKDROP, + 'logo': ScrapingMetadata.LOGO, + 'disc': ScrapingMetadata.DISC, + 'cdart': ScrapingMetadata.DISC, + 'banner': ScrapingMetadata.BANNER, + 'thumb': ScrapingMetadata.THUMB, + } - @staticmethod - def set_scraping_switchs(switchs: dict) -> bool: + scraping_policies = ScrapingConfig.from_system_config() + storagechain = StorageChain() + + def on_config_changed(self): + self.scraping_policies = ScrapingConfig.from_system_config() + + def _should_scrape(self, scraping_option: ScrapingOption, file_exists: bool, global_overwrite: bool = False) -> bool: """ - 设置刮削开关配置 - :param switchs: 开关配置字典 - :return: 是否设置成功 + 判断是否应该执行刮削操作 + + :param scraping_option: 刮削选项对象 + :param file_exists: 文件是否已存在 + :param global_overwrite: 全局覆盖标志 + :return bool: 是否应该刮削 """ - return SystemConfigOper().set(SystemConfigKey.ScrapingSwitchs, switchs) + if scraping_option.is_skip: + logger.info(f"{scraping_option.type.value} {scraping_option.metadata.value} 刮削策略 {scraping_option.policy.value}") + return False + + if not file_exists: + # 文件不存在 + return True + + # 文件存在的情况 + if scraping_option.is_overwrite or global_overwrite: + logger.info( + f"{scraping_option.type.value} {scraping_option.metadata.value} 文件存在," + f"{'配置为覆盖' if scraping_option.is_overwrite else '配置为全局覆盖'}" + ) + return True + else: + logger.info(f"{scraping_option.type.value} {scraping_option.metadata.value} 文件已存在,跳过") + return False + + def _save_file(self, fileitem: schemas.FileItem, path: Path, content: Union[bytes, str]): + """ + 保存或上传文件 + + :param fileitem: 关联的媒体文件项 + :param path: 元数据文件路径 + :param content: 文件内容 + """ + if not fileitem or not content or not path: + return + # 使用tempfile创建临时文件 + with NamedTemporaryFile(delete=True, delete_on_close=False, suffix=path.suffix) as tmp_file: + tmp_file_path = Path(tmp_file.name) + # 写入内容 + if isinstance(content, bytes): + tmp_file.write(content) + else: + tmp_file.write(content.encode('utf-8')) + tmp_file.flush() + tmp_file.close() # 关闭文件句柄 + + # 刮削文件只需要读写权限 + tmp_file_path.chmod(0o666 & ~current_umask) + + # 上传文件 + item = self.storagechain.upload_file(fileitem=fileitem, path=tmp_file_path, new_name=path.name) + if item: + logger.info(f"已保存文件:{item.path}") + else: + logger.warn(f"文件保存失败:{path}") + + def _download_and_save_image(self, fileitem: schemas.FileItem, path: Path, url: str): + """ + 流式下载图片并保存到文件 + + :param storagechain: StorageChain实例 + :param fileitem: 关联的媒体文件项 + :param path: 图片文件路径 + :param url: 图片下载URL + """ + if not fileitem or not url or not path: + return + try: + logger.info(f"正在下载图片:{url} ...") + request_utils = RequestUtils(proxies=settings.PROXY, ua=settings.NORMAL_USER_AGENT) + with request_utils.get_stream(url=url) as r: + if r and r.status_code == 200: + # 使用tempfile创建临时文件,自动删除 + with NamedTemporaryFile(delete=True, delete_on_close=False, suffix=path.suffix) as tmp_file: + tmp_file_path = Path(tmp_file.name) + # 流式写入文件 + for chunk in r.iter_content(chunk_size=8192): + if chunk: + tmp_file.write(chunk) + tmp_file.flush() + tmp_file.close() # 关闭文件句柄 + + # 刮削的图片只需要读写权限 + tmp_file_path.chmod(0o666 & ~current_umask) + + # 上传文件 + item = self.storagechain.upload_file(fileitem=fileitem, path=tmp_file_path, + new_name=path.name) + if item: + logger.info(f"已保存图片:{item.path}") + else: + logger.warn(f"图片保存失败:{path}") + else: + logger.info(f"{url} 图片下载失败") + except Exception as err: + logger.error(f"{url} 图片下载失败:{str(err)}!") + + def _get_target_fileitem_and_path(self, current_fileitem: schemas.FileItem, + item_type: ScrapingTarget, metadata_type: ScrapingMetadata, + filename_hint: Optional[str] = None, + parent_fileitem: Optional[schemas.FileItem] = None + ) -> Tuple[schemas.FileItem, Optional[Path]]: + """ + 根据当前上下文、刮削项类型和元数据类型生成目标 FileItem 和 Path + 处理 NFO 和图片文件的命名约定及存储位置 + """ + # 默认保存的目录是当前文件项的目录 + target_dir_item = current_fileitem + target_dir_path = Path(current_fileitem.path) + final_filename = filename_hint # 如果提供了 filename_hint,优先使用 + + # 针对 NFO 文件的特殊命名和存储逻辑 + if metadata_type == ScrapingMetadata.NFO: + if item_type == ScrapingTarget.MOVIE: + if current_fileitem.type == "file": + # 电影文件NFO: 放在电影文件同级目录,名称与电影文件主体一致,后缀.nfo + final_filename = f"{target_dir_path.stem}.nfo" + target_dir_item = parent_fileitem or self.storagechain.get_parent_item(current_fileitem) + if not target_dir_item: + logger.error(f"无法获取文件 {current_fileitem.path} 的父目录项。") + return current_fileitem, None # 返回一个表示失败的FileItem和None + target_dir_path = Path(target_dir_item.path) + else: # current_fileitem.type == "dir" + # 电影目录NFO (例如蓝光原盘): 放在电影目录内,名称与目录名主体一致,后缀.nfo + final_filename = f"{target_dir_path.name}.nfo" + # target_dir_item 保持为 current_fileitem + # target_dir_path 保持为 Path(current_fileitem.path) + elif item_type == ScrapingTarget.TV: + # 电视剧根目录NFO: 放在剧集根目录内,命名为 tvshow.nfo + final_filename = "tvshow.nfo" + elif item_type == ScrapingTarget.SEASON: + # 电视剧季目录NFO: 放在季目录内,命名为 season.nfo + final_filename = "season.nfo" + elif item_type == ScrapingTarget.EPISODE: + # 电视剧集文件NFO: 放在集文件同级目录,名称与集文件主体一致,后缀.nfo + final_filename = f"{target_dir_path.stem}.nfo" + target_dir_item = parent_fileitem or self.storagechain.get_parent_item(current_fileitem) + if not target_dir_item: + logger.error(f"无法获取文件 {current_fileitem.path} 的父目录项。") + return current_fileitem, None# 返回一个表示失败的FileItem和None + target_dir_path = Path(target_dir_item.path) + # 图片通常是放在当前目录 (current_fileitem) 下 + # 如果是 EPISODE 类型的图片(如thumb),通常也是放在文件同级目录,调整 target_dir_item 和 target_dir_path + elif metadata_type in [ScrapingMetadata.THUMB] and item_type == ScrapingTarget.EPISODE: + target_dir_item = parent_fileitem or self.storagechain.get_parent_item(current_fileitem) + if not target_dir_item: + logger.error(f"无法获取文件 {current_fileitem.path} 的父目录项。") + return current_fileitem, None # 返回一个表示失败的FileItem和None + target_dir_path = Path(target_dir_item.path) + # TODO: 考虑其他图片类型是否也需要保存到父目录 + + # 确保最终有文件名 + if not final_filename: + logger.error(f"无法为 {item_type.value} - {metadata_type.value} 确定文件名。filename_hint: {filename_hint}") + # 返回一个表示失败的FileItem和None + return current_fileitem, None + + target_full_path = target_dir_path / final_filename + return target_dir_item, target_full_path def metadata_nfo(self, meta: MetaBase, mediainfo: MediaInfo, season: Optional[int] = None, episode: Optional[int] = None) -> Optional[str]: """ 获取NFO文件内容文本 + :param meta: 元数据 :param mediainfo: 媒体信息 :param season: 季号 @@ -89,6 +332,7 @@ class MediaChain(ChainBase): native_fn, plugin_fn) -> Optional[MediaInfo]: """ 选择识别模式,插件优先或原生优先 + :param log_name: 用于日志“标题:...”处的名称(如 file_path.name 或 title) :param log_context: 用于日志“未识别到...的媒体信息”处的上下文(如 path 或 title) :param native_fn: 原生识别函数 @@ -109,7 +353,7 @@ class MediaChain(ChainBase): mediainfo = native_fn() if not mediainfo and plugin_available: logger.info(f'原生识别未识别到 {log_context} 的媒体信息,尝试使用辅助识别') - mediainfo = plugin_fn() + mediainfo = plugin_fn() return mediainfo def recognize_by_meta(self, metainfo: MetaBase, episode_group: Optional[str] = None) -> Optional[MediaInfo]: @@ -137,6 +381,7 @@ class MediaChain(ChainBase): def recognize_help(self, title: str, org_meta: MetaBase) -> Optional[MediaInfo]: """ 请求辅助识别,返回媒体信息 + :param title: 标题 :param org_meta: 原始元数据 """ @@ -209,6 +454,7 @@ class MediaChain(ChainBase): def search(self, title: str) -> Tuple[Optional[MetaBase], List[MediaInfo]]: """ 搜索媒体/人物信息 + :param title: 搜索内容 :return: 识别元数据,媒体信息列表 """ @@ -367,8 +613,7 @@ class MediaChain(ChainBase): # 刮削锁 with scraping_lock: # 检查文件项是否存在 - storagechain = StorageChain() - if not storagechain.get_item(fileitem): + if not self.storagechain.get_item(fileitem): logger.warn(f"文件项不存在:{fileitem.path}") return # 检查是否为目录 @@ -377,12 +622,12 @@ class MediaChain(ChainBase): self.scrape_metadata(fileitem=fileitem, mediainfo=mediainfo, init_folder=False, - parent=storagechain.get_parent_item(fileitem), + parent=self.storagechain.get_parent_item(fileitem), overwrite=overwrite) else: if file_list: # 如果是BDMV原盘目录,只对根目录进行刮削,不处理子目录 - if storagechain.is_bluray_folder(fileitem): + if self.storagechain.is_bluray_folder(fileitem): logger.info(f"检测到BDMV原盘目录,只对根目录进行刮削:{fileitem.path}") self.scrape_metadata(fileitem=fileitem, mediainfo=mediainfo, @@ -411,7 +656,7 @@ class MediaChain(ChainBase): # 2. 初始化一遍子目录,但不处理文件 for sub_dir in all_dirs: - sub_dir_item = storagechain.get_file_item(storage=fileitem.storage, path=sub_dir) + sub_dir_item = self.storagechain.get_file_item(storage=fileitem.storage, path=sub_dir) if sub_dir_item: logger.info(f"为目录生成海报和nfo:{sub_dir}") # 初始化目录元数据,但不处理文件 @@ -426,7 +671,7 @@ class MediaChain(ChainBase): # 3. 刮削每个文件 logger.info(f"开始刮削 {len(file_list)} 个文件") for sub_file_path in file_list: - sub_file_item = storagechain.get_file_item(storage=fileitem.storage, + sub_file_item = self.storagechain.get_file_item(storage=fileitem.storage, path=Path(sub_file_path)) if sub_file_item: self.scrape_metadata(fileitem=sub_file_item, @@ -441,12 +686,124 @@ class MediaChain(ChainBase): self.scrape_metadata(fileitem=fileitem, meta=meta, init_folder=True, mediainfo=mediainfo, overwrite=overwrite) + def _scrape_nfo_generic(self, current_fileitem: schemas.FileItem, + meta: MetaBase, mediainfo: MediaInfo, + item_type: ScrapingTarget, + parent_fileitem: Optional[schemas.FileItem] = None, + overwrite: bool = False, + season_number: Optional[int] = None, + episode_number: Optional[int] = None): + """ + NFO 刮削 + """ + # 获取刮削选项 + nfo_option = self.scraping_policies.option(item_type, ScrapingMetadata.NFO) + + # 检查刮削开关 + if nfo_option.is_skip: + logger.info(f"{item_type.value} {ScrapingMetadata.NFO.value} 刮削策略 {nfo_option.policy.value}") + return + + # 获取目标 FileItem (`base_item`) 和 Path (`nfo_path`) + base_item, nfo_path = self._get_target_fileitem_and_path( + current_fileitem=current_fileitem, + item_type=item_type, + metadata_type=ScrapingMetadata.NFO, + parent_fileitem=parent_fileitem + ) + + if not nfo_path: # _get_target_fileitem_and_path 内部错误处理返回None + return + + # 文件存在检查 + file_exists = self.storagechain.get_file_item(storage=base_item.storage, path=nfo_path) + + # 刮削决策 + if self._should_scrape(nfo_option, bool(file_exists), overwrite): + # 生成 NFO 内容 + nfo_content = self.metadata_nfo(meta=meta, mediainfo=mediainfo, + season=season_number, episode=episode_number) + if nfo_content: + self._save_file(fileitem=base_item, path=nfo_path, content=nfo_content) + else: + logger.warn(f"{nfo_path.name} NFO 文件生成失败!") + + def _scrape_images_generic(self, current_fileitem: schemas.FileItem, + mediainfo: MediaInfo, + item_type: ScrapingTarget, + parent_fileitem: Optional[schemas.FileItem] = None, + overwrite: bool = False, + season_number: Optional[int] = None): + """ + 图片刮削 + """ + # 获取图片 URL + if item_type == ScrapingTarget.SEASON and season_number is not None: + image_dict = self.metadata_img(mediainfo=mediainfo, season=season_number) + else: + image_dict = self.metadata_img(mediainfo=mediainfo) + + if not image_dict: + logger.info(f"未获取到 {item_type.value} 的图片信息,跳过图片刮削。") + return + + # 遍历图片 image_name 和 image_url + for image_name, image_url in image_dict.items(): + metadata_type = None + # 对每个 image_name 查找匹配的 ScrapingMetadata + for keyword, meta_type in self.IMAGE_METADATA_MAP.items(): + if keyword in image_name.lower(): + metadata_type = meta_type + break + + if metadata_type: + # 获取对应的 ScrapingOption + option = self.scraping_policies.option(item_type, metadata_type) + + if option.is_skip: + logger.info(f"{item_type.value} {option.metadata.value} 刮削策略 {option.policy.value}") + continue + + # 判断是否匹配当前刮削的季号 + if item_type == ScrapingTarget.TV and image_name.lower().startswith("season"): + logger.info(f"当前为电视剧根目录刮削,跳过季图片:{image_name}") + continue + if item_type == ScrapingTarget.SEASON and season_number is not None and image_name.lower().startswith("season"): + # 检查是否只下载当前刮削季的图片 + image_season_str = "00" if "specials" in image_name.lower() else image_name[6:8] + + if image_season_str is not None and image_season_str != str(season_number).rjust(2, '0'): + logger.info(f"当前刮削季为:{season_number},跳过非本季图片:{image_name}") + continue + + # 获取目标 FileItem (`base_item`) 和 Path (`image_path`) + base_item, image_path = self._get_target_fileitem_and_path( + current_fileitem=current_fileitem, + item_type=item_type, + metadata_type=metadata_type, + filename_hint=image_name, + parent_fileitem=parent_fileitem + ) + + if not image_path: + continue + + # 文件存在检查 + file_exists = self.storagechain.get_file_item(storage=base_item.storage, path=image_path) + + # 刮削决策 + if self._should_scrape(option, bool(file_exists), overwrite): + self._download_and_save_image(fileitem=base_item, path=image_path, url=image_url) + else: + logger.debug(f"未找到图片类型 {image_name} 对应的 ScrapingMetadata,跳过。") + def scrape_metadata(self, fileitem: schemas.FileItem, meta: MetaBase = None, mediainfo: MediaInfo = None, init_folder: bool = True, parent: schemas.FileItem = None, overwrite: bool = False, recursive: bool = True): """ 手动刮削媒体信息 + :param fileitem: 刮削目录或文件 :param meta: 元数据 :param mediainfo: 媒体信息 @@ -455,84 +812,6 @@ class MediaChain(ChainBase): :param overwrite: 是否覆盖已有文件 :param recursive: 是否递归处理目录内文件 """ - - storagechain = StorageChain() - - def __list_files(_fileitem: schemas.FileItem): - """ - 列出下级文件 - """ - return storagechain.list_files(fileitem=_fileitem) or [] - - def __save_file(_fileitem: schemas.FileItem, _path: Path, _content: Union[bytes, str]): - """ - 保存或上传文件 - :param _fileitem: 关联的媒体文件项 - :param _path: 元数据文件路径 - :param _content: 文件内容 - """ - if not _fileitem or not _content or not _path: - return - # 使用tempfile创建临时文件,自动删除 - with NamedTemporaryFile(delete=True, delete_on_close=False, suffix=_path.suffix) as tmp_file: - tmp_file_path = Path(tmp_file.name) - # 写入内容 - if isinstance(_content, bytes): - tmp_file.write(_content) - else: - tmp_file.write(_content.encode('utf-8')) - tmp_file.flush() - tmp_file.close() # 关闭文件句柄 - - # 刮削文件只需要读写权限 - tmp_file_path.chmod(0o666 & ~current_umask) - - # 上传文件 - item = storagechain.upload_file(fileitem=_fileitem, path=tmp_file_path, new_name=_path.name) - if item: - logger.info(f"已保存文件:{item.path}") - else: - logger.warn(f"文件保存失败:{_path}") - - def __download_and_save_image(_fileitem: schemas.FileItem, _path: Path, _url: str): - """ - 流式下载图片并直接保存到文件(减少内存占用) - :param _fileitem: 关联的媒体文件项 - :param _path: 图片文件路径 - :param _url: 图片下载URL - """ - if not _fileitem or not _url or not _path: - return - try: - logger.info(f"正在下载图片:{_url} ...") - request_utils = RequestUtils(proxies=settings.PROXY, ua=settings.NORMAL_USER_AGENT) - with request_utils.get_stream(url=_url) as r: - if r and r.status_code == 200: - # 使用tempfile创建临时文件,自动删除 - with NamedTemporaryFile(delete=True, delete_on_close=False, suffix=_path.suffix) as tmp_file: - tmp_file_path = Path(tmp_file.name) - # 流式写入文件 - for chunk in r.iter_content(chunk_size=8192): - if chunk: - tmp_file.write(chunk) - tmp_file.flush() - tmp_file.close() # 关闭文件句柄 - - # 刮削的图片只需要读写权限 - tmp_file_path.chmod(0o666 & ~current_umask) - - # 上传文件 - item = storagechain.upload_file(fileitem=_fileitem, path=tmp_file_path, - new_name=_path.name) - if item: - logger.info(f"已保存图片:{item.path}") - else: - logger.warn(f"图片保存失败:{_path}") - else: - logger.info(f"{_url} 图片下载失败") - except Exception as err: - logger.error(f"{_url} 图片下载失败:{str(err)}!") - if not fileitem: return @@ -541,6 +820,8 @@ class MediaChain(ChainBase): if fileitem.type == "file" \ and (not filepath.suffix or filepath.suffix.lower() not in settings.RMT_MEDIAEXT): return + + # 准备元数据和媒体信息 if not meta: meta = MetaInfoPath(filepath) if not mediainfo: @@ -549,308 +830,273 @@ class MediaChain(ChainBase): logger.warn(f"{filepath} 无法识别文件媒体信息!") return - # 获取刮削开关配置 - scraping_switchs = self._get_scraping_switchs() logger.info(f"开始刮削:{filepath} ...") + + # 根据媒体类型分发处理逻辑 if mediainfo.type == MediaType.MOVIE: - # 电影 - if fileitem.type == "file": - # 检查电影NFO开关 - if scraping_switchs.get('movie_nfo', True): - # 是否已存在 - nfo_path = filepath.with_suffix(".nfo") - if overwrite or not storagechain.get_file_item(storage=fileitem.storage, path=nfo_path): - # 电影文件 - movie_nfo = self.metadata_nfo(meta=meta, mediainfo=mediainfo) - if movie_nfo: - # 保存或上传nfo文件到上级目录 - if not parent: - parent = storagechain.get_parent_item(fileitem) - __save_file(_fileitem=parent, _path=nfo_path, _content=movie_nfo) - else: - logger.warn(f"{filepath.name} nfo文件生成失败!") - else: - logger.info(f"已存在nfo文件:{nfo_path}") - else: - logger.info("电影NFO刮削已关闭,跳过") - else: - # 电影目录 - files = __list_files(_fileitem=fileitem) - is_bluray_folder = storagechain.contains_bluray_subdirectories(files) - if recursive and not is_bluray_folder: - # 处理非原盘目录内的文件 - for file in files: - if file.type == "dir": - # 电影不处理子目录 - continue - self.scrape_metadata(fileitem=file, - mediainfo=mediainfo, - init_folder=False, - parent=fileitem, - overwrite=overwrite) - # 生成目录内图片文件 - if init_folder: - if is_bluray_folder: - # 检查电影NFO开关 - if scraping_switchs.get('movie_nfo', True): - nfo_path = filepath / (filepath.name + ".nfo") - if overwrite or not storagechain.get_file_item(storage=fileitem.storage, path=nfo_path): - # 生成原盘nfo - movie_nfo = self.metadata_nfo(meta=meta, mediainfo=mediainfo) - if movie_nfo: - # 保存或上传nfo文件到当前目录 - __save_file(_fileitem=fileitem, _path=nfo_path, _content=movie_nfo) - else: - logger.warn(f"{filepath.name} nfo文件生成失败!") - else: - logger.info(f"已存在nfo文件:{nfo_path}") - else: - logger.info("电影NFO刮削已关闭,跳过") - # 图片 - image_dict = self.metadata_img(mediainfo=mediainfo) - if image_dict: - for image_name, image_url in image_dict.items(): - # 根据图片类型检查开关 - if 'poster' in image_name.lower(): - should_scrape = scraping_switchs.get('movie_poster', True) - elif ('backdrop' in image_name.lower() - or 'fanart' in image_name.lower() - or 'background' in image_name.lower()): - should_scrape = scraping_switchs.get('movie_backdrop', True) - elif 'logo' in image_name.lower(): - should_scrape = scraping_switchs.get('movie_logo', True) - elif 'disc' in image_name.lower() or 'cdart' in image_name.lower(): - should_scrape = scraping_switchs.get('movie_disc', True) - elif 'banner' in image_name.lower(): - should_scrape = scraping_switchs.get('movie_banner', True) - elif 'thumb' in image_name.lower(): - should_scrape = scraping_switchs.get('movie_thumb', True) - else: - should_scrape = True # 未知类型默认刮削 - - if should_scrape: - image_path = filepath / image_name - if overwrite or not storagechain.get_file_item(storage=fileitem.storage, - path=image_path): - # 流式下载图片并直接保存 - __download_and_save_image(_fileitem=fileitem, _path=image_path, _url=image_url) - else: - logger.info(f"已存在图片文件:{image_path}") - else: - logger.info(f"电影图片刮削已关闭,跳过:{image_name}") + self._handle_movie_scraping( + fileitem=fileitem, + meta=meta, + mediainfo=mediainfo, + init_folder=init_folder, + parent=parent, + overwrite=overwrite, + recursive=recursive + ) else: - # 电视剧 - if fileitem.type == "file": - # 重新识别季集 - file_meta = MetaInfoPath(filepath) - if not file_meta.begin_episode: - logger.warn(f"{filepath.name} 无法识别文件集数!") - return - file_mediainfo = self.recognize_media(meta=file_meta, tmdbid=mediainfo.tmdb_id, - episode_group=mediainfo.episode_group) - if not file_mediainfo: - logger.warn(f"{filepath.name} 无法识别文件媒体信息!") - return - # 检查集NFO开关 - if scraping_switchs.get('episode_nfo', True): - # 是否已存在 - nfo_path = filepath.with_suffix(".nfo") - if overwrite or not storagechain.get_file_item(storage=fileitem.storage, path=nfo_path): - # 获取集的nfo文件 - episode_nfo = self.metadata_nfo(meta=file_meta, mediainfo=file_mediainfo, - season=file_meta.begin_season, - episode=file_meta.begin_episode) - if episode_nfo: - # 保存或上传nfo文件到上级目录 - if not parent: - parent = storagechain.get_parent_item(fileitem) - __save_file(_fileitem=parent, _path=nfo_path, _content=episode_nfo) - else: - logger.warn(f"{filepath.name} nfo文件生成失败!") - else: - logger.info(f"已存在nfo文件:{nfo_path}") - else: - logger.info("集NFO刮削已关闭,跳过") - # 获取集的图片 - if scraping_switchs.get('episode_thumb', True): - image_dict = self.metadata_img(mediainfo=file_mediainfo, - season=file_meta.begin_season, episode=file_meta.begin_episode) - if image_dict: - for episode, image_url in image_dict.items(): - image_path = filepath.with_suffix(Path(image_url).suffix) - if overwrite or not storagechain.get_file_item(storage=fileitem.storage, path=image_path): - # 流式下载图片并直接保存 - if not parent: - parent = storagechain.get_parent_item(fileitem) - __download_and_save_image(_fileitem=parent, _path=image_path, _url=image_url) - else: - logger.info(f"已存在图片文件:{image_path}") - else: - logger.info("集缩略图刮削已关闭,跳过") - else: - # 当前为电视剧目录,处理目录内的文件 - if recursive: - files = __list_files(_fileitem=fileitem) - for file in files: - if ( - file.type == "dir" - and file.name not in settings.RENAME_FORMAT_S0_NAMES - and MetaInfo(file.name).begin_season is None - ): - # 电视剧不处理非季子目录 - continue - self.scrape_metadata(fileitem=file, - mediainfo=mediainfo, - parent=fileitem if file.type == "file" else None, - init_folder=True if file.type == "dir" else False, - overwrite=overwrite) - # 生成目录的nfo和图片 - if init_folder: - # TODO 目前的刮削是假定电视剧目录结构符合:/剧集根目录/季目录/剧集文件 - # 其中季目录应符合`Season 数字`等明确的季命名,不能用季标题 - # 例如:/Torchwood (2006)/Miracle Day/Torchwood (2006) S04E01.mkv - # 当刮削到`Miracle Day`目录时,会误判其为剧集根目录 - # 识别文件夹名称 - season_meta = MetaInfo(filepath.name) - # 当前文件夹为Specials或者SPs时,设置为S0 - if filepath.name in settings.RENAME_FORMAT_S0_NAMES: - season_meta.begin_season = 0 - elif season_meta.name and season_meta.begin_season is not None: - # 目录含剧名且包含季号,需排除辅助词重新识别元数据,避免误判根目录 (issue 5501) - season_meta_no_custom = MetaInfo( - filepath.name, custom_words=["#"] - ) - if season_meta_no_custom.begin_season is None: - # 季号是由辅助词指定的,按剧集根目录处理,避免`season_poster`路径错误 (issue 5373) - season_meta.begin_season = None - if season_meta.begin_season is not None: - # 检查季NFO开关 - if scraping_switchs.get('season_nfo', True): - # 是否已存在 - nfo_path = filepath / "season.nfo" - if overwrite or not storagechain.get_file_item(storage=fileitem.storage, path=nfo_path): - # 当前目录有季号,生成季nfo - season_nfo = self.metadata_nfo(meta=meta, mediainfo=mediainfo, - season=season_meta.begin_season) - if season_nfo: - # 写入nfo到根目录 - __save_file(_fileitem=fileitem, _path=nfo_path, _content=season_nfo) - else: - logger.warn(f"无法生成电视剧季nfo文件:{meta.name}") - else: - logger.info(f"已存在nfo文件:{nfo_path}") - else: - logger.info("季NFO刮削已关闭,跳过") - # TMDB季poster图片 - if scraping_switchs.get('season_poster', True): - image_dict = self.metadata_img(mediainfo=mediainfo, season=season_meta.begin_season) - if image_dict: - for image_name, image_url in image_dict.items(): - image_path = filepath.with_name(image_name) - if overwrite or not storagechain.get_file_item(storage=fileitem.storage, - path=image_path): - # 流式下载图片并直接保存 - if not parent: - parent = storagechain.get_parent_item(fileitem) - __download_and_save_image(_fileitem=parent, _path=image_path, _url=image_url) - else: - logger.info(f"已存在图片文件:{image_path}") - else: - logger.info("季海报刮削已关闭,跳过") - # 额外fanart季图片:poster thumb banner - image_dict = self.metadata_img(mediainfo=mediainfo) - if image_dict: - for image_name, image_url in image_dict.items(): - if image_name.startswith("season"): - # 根据季图片类型检查开关 - if 'poster' in image_name.lower(): - should_scrape = scraping_switchs.get('season_poster', True) - elif 'banner' in image_name.lower(): - should_scrape = scraping_switchs.get('season_banner', True) - elif 'thumb' in image_name.lower(): - should_scrape = scraping_switchs.get('season_thumb', True) - else: - should_scrape = True # 未知类型默认刮削 + self._handle_tv_scraping( + fileitem=fileitem, + meta=meta, + mediainfo=mediainfo, + init_folder=init_folder, + parent=parent, + overwrite=overwrite, + recursive=recursive + ) - if should_scrape: - image_path = filepath.with_name(image_name) - # 只下载当前刮削季的图片 - image_season = "00" if "specials" in image_name else image_name[6:8] - if image_season != str(season_meta.begin_season).rjust(2, '0'): - logger.info( - f"当前刮削季为:{season_meta.begin_season},跳过文件:{image_path}") - continue - if overwrite or not storagechain.get_file_item(storage=fileitem.storage, - path=image_path): - # 流式下载图片并直接保存 - if not parent: - parent = storagechain.get_parent_item(fileitem) - __download_and_save_image(_fileitem=parent, _path=image_path, - _url=image_url) - else: - logger.info(f"已存在图片文件:{image_path}") - else: - logger.info(f"季图片刮削已关闭,跳过:{image_name}") - # 判断当前目录是不是剧集根目录 - elif season_meta.name: - # 不含季信息(包括特别季)但含有名称的,可以认为是剧集根目录 - # 检查电视剧NFO开关 - if scraping_switchs.get('tv_nfo', True): - # 是否已存在 - nfo_path = filepath / "tvshow.nfo" - if overwrite or not storagechain.get_file_item(storage=fileitem.storage, path=nfo_path): - # 当前目录有名称,生成tvshow nfo 和 tv图片 - tv_nfo = self.metadata_nfo(meta=meta, mediainfo=mediainfo) - if tv_nfo: - # 写入tvshow nfo到根目录 - __save_file(_fileitem=fileitem, _path=nfo_path, _content=tv_nfo) - else: - logger.warn(f"无法生成电视剧nfo文件:{meta.name}") - else: - logger.info(f"已存在nfo文件:{nfo_path}") - else: - logger.info("电视剧NFO刮削已关闭,跳过") - # 生成目录图片 - image_dict = self.metadata_img(mediainfo=mediainfo) - if image_dict: - for image_name, image_url in image_dict.items(): - # 不下载季图片 - if image_name.startswith("season"): - continue - # 根据电视剧图片类型检查开关 - if 'poster' in image_name.lower(): - should_scrape = scraping_switchs.get('tv_poster', True) - elif ('backdrop' in image_name.lower() - or 'fanart' in image_name.lower() - or 'background' in image_name.lower()): - should_scrape = scraping_switchs.get('tv_backdrop', True) - elif 'banner' in image_name.lower(): - should_scrape = scraping_switchs.get('tv_banner', True) - elif 'logo' in image_name.lower(): - should_scrape = scraping_switchs.get('tv_logo', True) - elif 'thumb' in image_name.lower(): - should_scrape = scraping_switchs.get('tv_thumb', True) - else: - should_scrape = True # 未知类型默认刮削 - - if should_scrape: - image_path = filepath / image_name - if overwrite or not storagechain.get_file_item(storage=fileitem.storage, - path=image_path): - # 流式下载图片并直接保存 - __download_and_save_image(_fileitem=fileitem, _path=image_path, _url=image_url) - else: - logger.info(f"已存在图片文件:{image_path}") - else: - logger.info(f"电视剧图片刮削已关闭,跳过:{image_name}") - else: - logger.warn("无法识别元数据,跳过") logger.info(f"{filepath.name} 刮削完成") + def _handle_movie_scraping(self, fileitem: schemas.FileItem, + meta: MetaBase, mediainfo: MediaInfo, + init_folder: bool, parent: schemas.FileItem, + overwrite: bool, recursive: bool): + """ + 处理电影刮削 + """ + if fileitem.type == "file": + # 电影文件:仅处理 NFO + self._scrape_nfo_generic( + current_fileitem=fileitem, + meta=meta, + mediainfo=mediainfo, + item_type=ScrapingTarget.MOVIE, + parent_fileitem=parent, + overwrite=overwrite + ) + else: + # 电影目录:递归处理文件并初始化目录 + self._handle_movie_directory( + fileitem=fileitem, + meta=meta, + mediainfo=mediainfo, + init_folder=init_folder, + parent=parent, + overwrite=overwrite, + recursive=recursive + ) + + def _handle_movie_directory(self, fileitem: schemas.FileItem, + meta: MetaBase, mediainfo: MediaInfo, + init_folder: bool, parent: schemas.FileItem, + overwrite: bool, recursive: bool): + """ + 处理电影目录刮削 + """ + files = self.storagechain.list_files(fileitem=fileitem) or [] + is_bluray_folder = self.storagechain.contains_bluray_subdirectories(files) + + # 递归处理文件(非蓝光原盘) + if recursive and not is_bluray_folder: + for file in files: + if file.type == "dir": + continue + self.scrape_metadata(fileitem=file, + mediainfo=mediainfo, + init_folder=False, + parent=fileitem, + overwrite=overwrite) + + # 初始化目录元数据 + if init_folder: + if is_bluray_folder: + # 蓝光原盘目录:仅处理 NFO + self._scrape_nfo_generic( + current_fileitem=fileitem, + meta=meta, + mediainfo=mediainfo, + item_type=ScrapingTarget.MOVIE, + overwrite=overwrite + ) + # 电影目录:处理图片 + self._scrape_images_generic( + current_fileitem=fileitem, + mediainfo=mediainfo, + item_type=ScrapingTarget.MOVIE, + overwrite=overwrite + ) + + def _handle_tv_scraping(self, fileitem: schemas.FileItem, + meta: MetaBase, mediainfo: MediaInfo, + init_folder: bool, parent: schemas.FileItem, + overwrite: bool, recursive: bool): + """ + 处理电视剧刮削 + """ + filepath = Path(fileitem.path) + + if fileitem.type == "file": + # 电视剧集文件:重新识别季集信息并刮削 + self._handle_tv_episode_file( + fileitem=fileitem, + filepath=filepath, + mediainfo=mediainfo, + parent=parent, + overwrite=overwrite + ) + else: + # 电视剧目录:递归处理并初始化目录 + self._handle_tv_directory( + fileitem=fileitem, + filepath=filepath, + meta=meta, + mediainfo=mediainfo, + init_folder=init_folder, + parent=parent, + overwrite=overwrite, + recursive=recursive + ) + + def _handle_tv_episode_file(self, fileitem: schemas.FileItem, + filepath: Path, + mediainfo: MediaInfo, + parent: schemas.FileItem, + overwrite: bool): + """ + 处理电视剧集文件刮削 + """ + # 重新识别季集信息 + file_meta = MetaInfoPath(filepath) + if not file_meta.begin_episode: + logger.warn(f"{filepath.name} 无法识别文件集数!") + return + + file_mediainfo = self.recognize_media(meta=file_meta, tmdbid=mediainfo.tmdb_id, + episode_group=mediainfo.episode_group) + if not file_mediainfo: + logger.warn(f"{filepath.name} 无法识别文件媒体信息!") + return + + # 处理 NFO + self._scrape_nfo_generic( + current_fileitem=fileitem, + meta=file_meta, + mediainfo=file_mediainfo, + item_type=ScrapingTarget.EPISODE, + parent_fileitem=parent, + overwrite=overwrite, + season_number=file_meta.begin_season, + episode_number=file_meta.begin_episode + ) + + # 处理图片 + self._scrape_images_generic( + current_fileitem=fileitem, + mediainfo=file_mediainfo, + item_type=ScrapingTarget.EPISODE, + parent_fileitem=parent, + overwrite=overwrite, + season_number=file_meta.begin_season + ) + + def _handle_tv_directory(self, fileitem: schemas.FileItem, + filepath: Path, + meta: MetaBase, mediainfo: MediaInfo, + init_folder: bool, parent: schemas.FileItem, + overwrite: bool, recursive: bool): + """ + 处理电视剧目录刮削 + """ + # 递归处理子目录和文件 + if recursive: + files = self.storagechain.list_files(fileitem=fileitem) or [] + for file in files: + if ( + file.type == "dir" + and file.name not in settings.RENAME_FORMAT_S0_NAMES + and MetaInfo(file.name).begin_season is None + ): + # 电视剧不处理非季子目录 + continue + self.scrape_metadata(fileitem=file, + mediainfo=mediainfo, + parent=fileitem if file.type == "file" else None, + init_folder=True if file.type == "dir" else False, + overwrite=overwrite) + + # 初始化目录元数据 + if init_folder: + self._initialize_tv_directory_metadata( + fileitem=fileitem, + filepath=filepath, + meta=meta, + mediainfo=mediainfo, + parent=parent, + overwrite=overwrite + ) + + def _initialize_tv_directory_metadata(self, fileitem: schemas.FileItem, + filepath: Path, + meta: MetaBase, mediainfo: MediaInfo, + parent: schemas.FileItem, + overwrite: bool): + """ + 初始化电视剧目录元数据(识别季号并刮削) + """ + # 识别文件夹名称 + season_meta = MetaInfo(filepath.name) + + # 特殊季目录处理(Specials/SPs) + if filepath.name in settings.RENAME_FORMAT_S0_NAMES: + season_meta.begin_season = 0 + elif season_meta.name and season_meta.begin_season is not None: + # 排除辅助词重新识别,避免误判根目录 (issue https://github.com/jxxghp/MoviePilot/issues/5501) + season_meta_no_custom = MetaInfo(filepath.name, custom_words=["#"]) + if season_meta_no_custom.begin_season is None: + # 季号由辅助词指定,按剧集根目录处理 (issue https://github.com/jxxghp/MoviePilot/issues/5373) + season_meta.begin_season = None + + # 根据季号判断目录类型并刮削 + if season_meta.begin_season is not None: + # 季目录:处理季 NFO 和图片 + self._scrape_nfo_generic( + current_fileitem=fileitem, + meta=meta, + mediainfo=mediainfo, + item_type=ScrapingTarget.SEASON, + overwrite=overwrite, + season_number=season_meta.begin_season + ) + self._scrape_images_generic( + current_fileitem=fileitem, + mediainfo=mediainfo, + item_type=ScrapingTarget.SEASON, + parent_fileitem=parent, + overwrite=overwrite, + season_number=season_meta.begin_season + ) + elif season_meta.name: + # 剧集根目录:处理电视剧 NFO 和图片 + self._scrape_nfo_generic( + current_fileitem=fileitem, + meta=meta, + mediainfo=mediainfo, + item_type=ScrapingTarget.TV, + overwrite=overwrite + ) + self._scrape_images_generic( + current_fileitem=fileitem, + mediainfo=mediainfo, + item_type=ScrapingTarget.TV, + overwrite=overwrite + ) + else: + logger.warn("无法识别元数据,跳过") + async def async_select_recognize_source(self, log_name: str, log_context: str, native_fn, plugin_fn) -> Optional[MediaInfo]: """ 选择识别模式,插件优先或原生优先(异步版本) + :param log_name: 用于日志“标题:...”处的名称(如 file_path.name 或 title) :param log_context: 用于日志“未识别到...的媒体信息”处的上下文(如 path 或 title) :param native_fn: 原生识别函数 @@ -871,9 +1117,9 @@ class MediaChain(ChainBase): mediainfo = await native_fn() if not mediainfo and plugin_available: logger.info(f'原生识别未识别到 {log_context} 的媒体信息,尝试使用辅助识别') - mediainfo = await plugin_fn() + mediainfo = await plugin_fn() return mediainfo - + async def async_recognize_by_meta(self, metainfo: MetaBase, episode_group: Optional[str] = None) -> Optional[MediaInfo]: """ @@ -905,6 +1151,7 @@ class MediaChain(ChainBase): async def async_recognize_help(self, title: str, org_meta: MetaBase) -> Optional[MediaInfo]: """ 请求辅助识别,返回媒体信息(异步版本) + :param title: 标题 :param org_meta: 原始元数据 """ @@ -982,6 +1229,7 @@ class MediaChain(ChainBase): async def async_search(self, title: str) -> Tuple[Optional[MetaBase], List[MediaInfo]]: """ 搜索媒体/人物信息(异步版本) + :param title: 搜索内容 :return: 识别元数据,媒体信息列表 """ diff --git a/app/schemas/types.py b/app/schemas/types.py index e8e4ddc3..c289bd6f 100644 --- a/app/schemas/types.py +++ b/app/schemas/types.py @@ -398,3 +398,41 @@ class OtherModulesType(Enum): PostgreSQL = "PostgreSQL" # Redis Redis = "Redis" + + +class NameValueEnum(Enum): + """支持通过 name 或 value 实例化的枚举基类""" + + @classmethod + def _missing_(cls, value): + if isinstance(value, str): + for member in cls: + if member.name.lower() == value.lower() or member.value == value: + return member + return None + + +# 刮削策略 +class ScrapingPolicy(NameValueEnum): + MISSINGONLY = "仅缺失" + SKIP = "跳过" + OVERWRITE = "覆盖" + + +# 刮削目标类型 +class ScrapingTarget(NameValueEnum): + MOVIE = "电影" + TV = "电视剧" + SEASON = "季" + EPISODE = "集" + + +# 刮削元数据类型 +class ScrapingMetadata(NameValueEnum): + NFO = "NFO" + POSTER = "海报" + BACKDROP = "背景图" + LOGO = "Logo" + BANNER = "横幅图" + THUMB = "缩略图" + DISC = "光盘图" diff --git a/tests/run.py b/tests/run.py index 4872a30b..123d9851 100644 --- a/tests/run.py +++ b/tests/run.py @@ -1,6 +1,13 @@ import unittest from tests.test_bluray import BluRayTest +from tests.test_mediascrape import ( + TestMediaScrapingPaths, + TestMediaScrapingNFO, + TestMediaScrapingImages, + TestMediaScrapingTVDirectory, + TestMediaScrapeEvents +) from tests.test_metainfo import MetaInfoTest from tests.test_object import ObjectUtilsTest @@ -22,6 +29,13 @@ if __name__ == '__main__': # 测试蓝光目录识别 suite.addTest(BluRayTest()) + # 测试媒体刮削 + suite.addTest(unittest.TestLoader().loadTestsFromTestCase(TestMediaScrapingPaths)) + suite.addTest(unittest.TestLoader().loadTestsFromTestCase(TestMediaScrapingNFO)) + suite.addTest(unittest.TestLoader().loadTestsFromTestCase(TestMediaScrapingImages)) + suite.addTest(unittest.TestLoader().loadTestsFromTestCase(TestMediaScrapingTVDirectory)) + suite.addTest(unittest.TestLoader().loadTestsFromTestCase(TestMediaScrapeEvents)) + # 运行测试 runner = unittest.TextTestRunner() runner.run(suite) diff --git a/tests/test_mediascrape.py b/tests/test_mediascrape.py new file mode 100644 index 00000000..8410097f --- /dev/null +++ b/tests/test_mediascrape.py @@ -0,0 +1,614 @@ +import sys +import unittest +from pathlib import Path +from unittest.mock import patch, MagicMock + +sys.modules['app.helper.sites'] = MagicMock() +sys.modules['app.db.systemconfig_oper'] = MagicMock() +sys.modules['app.db.systemconfig_oper'].SystemConfigOper.return_value.get.return_value = None + +from app import schemas +from app.chain.media import MediaChain, ScrapingOption +from app.core.context import MediaInfo +from app.core.event import Event +from app.core.metainfo import MetaInfo +from app.schemas.types import EventType, MediaType, ScrapingTarget, ScrapingMetadata, ScrapingPolicy + + +class TestMediaScrapingPaths(unittest.TestCase): + def setUp(self): + self.media_chain = MediaChain() + self.media_chain.storagechain = MagicMock() + + def test_movie_file_nfo_path(self): + fileitem = schemas.FileItem(path="/movies/avatar.mkv", name="avatar.mkv", type="file", storage="local") + parent_item = schemas.FileItem(path="/movies", name="movies", type="dir", storage="local") + self.media_chain.storagechain.get_parent_item.return_value = parent_item + + target_item, target_path = self.media_chain._get_target_fileitem_and_path( + current_fileitem=fileitem, + item_type=ScrapingTarget.MOVIE, + metadata_type=ScrapingMetadata.NFO + ) + self.assertEqual(target_item, parent_item) + self.assertEqual(target_path, Path("/movies/avatar.nfo")) + + def test_movie_dir_nfo_path(self): + fileitem = schemas.FileItem(path="/movies/Avatar (2009)", name="Avatar (2009)", type="dir", storage="local") + + target_item, target_path = self.media_chain._get_target_fileitem_and_path( + current_fileitem=fileitem, + item_type=ScrapingTarget.MOVIE, + metadata_type=ScrapingMetadata.NFO + ) + self.assertEqual(target_item, fileitem) + self.assertEqual(target_path, Path("/movies/Avatar (2009)/Avatar (2009).nfo")) + + def test_tv_dir_nfo_path(self): + fileitem = schemas.FileItem(path="/tv/Show", name="Show", type="dir", storage="local") + target_item, target_path = self.media_chain._get_target_fileitem_and_path( + current_fileitem=fileitem, + item_type=ScrapingTarget.TV, + metadata_type=ScrapingMetadata.NFO + ) + self.assertEqual(target_item, fileitem) + self.assertEqual(target_path, Path("/tv/Show/tvshow.nfo")) + + def test_season_dir_nfo_path(self): + fileitem = schemas.FileItem(path="/tv/Show/Season 1", name="Season 1", type="dir", storage="local") + target_item, target_path = self.media_chain._get_target_fileitem_and_path( + current_fileitem=fileitem, + item_type=ScrapingTarget.SEASON, + metadata_type=ScrapingMetadata.NFO + ) + self.assertEqual(target_item, fileitem) + self.assertEqual(target_path, Path("/tv/Show/Season 1/season.nfo")) + + def test_episode_file_nfo_path(self): + fileitem = schemas.FileItem(path="/tv/Show/Season 1/S01E01.mp4", name="S01E01.mp4", type="file", storage="local") + parent_item = schemas.FileItem(path="/tv/Show/Season 1", name="Season 1", type="dir", storage="local") + self.media_chain.storagechain.get_parent_item.return_value = parent_item + target_item, target_path = self.media_chain._get_target_fileitem_and_path( + current_fileitem=fileitem, + item_type=ScrapingTarget.EPISODE, + metadata_type=ScrapingMetadata.NFO + ) + self.assertEqual(target_item, parent_item) + self.assertEqual(target_path, Path("/tv/Show/Season 1/S01E01.nfo")) + + +class TestMediaScrapingNFO(unittest.TestCase): + def setUp(self): + self.media_chain = MediaChain() + self.media_chain.storagechain = MagicMock() + self.media_chain.metadata_nfo = MagicMock(return_value="") + self.media_chain._save_file = MagicMock() + self.media_chain.scraping_policies = MagicMock() + + self.fileitem = schemas.FileItem(path="/movies/Avatar (2009)", name="Avatar (2009)", type="dir", storage="local") + self.meta = MetaInfo("Avatar (2009)") + self.mediainfo = MediaInfo() + + def test_scrape_nfo_off(self): + self.media_chain.scraping_policies.option.return_value = ScrapingOption("movie", "nfo", ScrapingPolicy.SKIP) + self.media_chain._scrape_nfo_generic(self.fileitem, self.meta, self.mediainfo, ScrapingTarget.MOVIE) + self.media_chain.metadata_nfo.assert_not_called() + self.media_chain._save_file.assert_not_called() + + def test_scrape_nfo_on_exists_skip(self): + self.media_chain.scraping_policies.option.return_value = ScrapingOption("movie", "nfo", ScrapingPolicy.MISSINGONLY) + # mock file exists + self.media_chain.storagechain.get_file_item.return_value = schemas.FileItem(path="/movies/Avatar (2009)/Avatar (2009).nfo", name="Avatar (2009).nfo", type="file", storage="local") + + self.media_chain._scrape_nfo_generic(self.fileitem, self.meta, self.mediainfo, ScrapingTarget.MOVIE) + self.media_chain.metadata_nfo.assert_not_called() + self.media_chain._save_file.assert_not_called() + + def test_scrape_nfo_on_not_exists_scrape(self): + self.media_chain.scraping_policies.option.return_value = ScrapingOption("movie", "nfo", ScrapingPolicy.MISSINGONLY) + # mock file not exists + self.media_chain.storagechain.get_file_item.return_value = None + + self.media_chain._scrape_nfo_generic(self.fileitem, self.meta, self.mediainfo, ScrapingTarget.MOVIE) + self.media_chain.metadata_nfo.assert_called_once() + self.media_chain._save_file.assert_called_once() + + def test_scrape_nfo_overwrite_exists_scrape(self): + self.media_chain.scraping_policies.option.return_value = ScrapingOption("movie", "nfo", ScrapingPolicy.OVERWRITE) + # mock file exists + self.media_chain.storagechain.get_file_item.return_value = schemas.FileItem(path="/movies/Avatar (2009)/Avatar (2009).nfo", name="Avatar (2009).nfo", type="file", storage="local") + + self.media_chain._scrape_nfo_generic(self.fileitem, self.meta, self.mediainfo, ScrapingTarget.MOVIE) + self.media_chain.metadata_nfo.assert_called_once() + self.media_chain._save_file.assert_called_once() + + +class TestMediaScrapingImages(unittest.TestCase): + def setUp(self): + self.media_chain = MediaChain() + self.original_download = self.media_chain._download_and_save_image + self.media_chain.storagechain = MagicMock() + self.media_chain.metadata_img = MagicMock() + self.media_chain._download_and_save_image = MagicMock() + self.media_chain.scraping_policies = MagicMock() + + def tearDown(self): + self.media_chain._download_and_save_image = self.original_download + + def test_scrape_images_mapping(self): + fileitem = schemas.FileItem(path="/movies/Avatar", name="Avatar", type="dir", storage="local") + mediainfo = MediaInfo() + self.media_chain.metadata_img.return_value = { + "poster.jpg": "http://poster", + "fanart.jpg": "http://fanart", + "logo.png": "http://logo" + } + self.media_chain.scraping_policies.option.return_value = ScrapingOption("movie", "poster", ScrapingPolicy.OVERWRITE) + self.media_chain.storagechain.get_file_item.return_value = None + + self.media_chain._scrape_images_generic(fileitem, mediainfo, ScrapingTarget.MOVIE) + + # Check download called for mapped metadata + calls = self.media_chain._download_and_save_image.call_args_list + self.assertEqual(len(calls), 3) + urls = [call.kwargs["url"] for call in calls] + self.assertIn("http://poster", urls) + self.assertIn("http://fanart", urls) + self.assertIn("http://logo", urls) + + def test_scrape_images_season_filter(self): + fileitem = schemas.FileItem(path="/tv/Show/Season 1", name="Season 1", type="dir", storage="local") + mediainfo = MediaInfo() + self.media_chain.metadata_img.return_value = { + "season01-poster.jpg": "http://season01", + "season02-poster.jpg": "http://season02" + } + self.media_chain.scraping_policies.option.return_value = ScrapingOption("season", "poster", ScrapingPolicy.OVERWRITE) + self.media_chain.storagechain.get_file_item.return_value = None + + self.media_chain._scrape_images_generic(fileitem, mediainfo, ScrapingTarget.SEASON, season_number=1) + + calls = self.media_chain._download_and_save_image.call_args_list + self.assertEqual(len(calls), 1) + self.assertEqual(calls[0].kwargs["url"], "http://season01") + + @patch("app.chain.media.RequestUtils") + @patch("app.chain.media.NamedTemporaryFile") + @patch("app.chain.media.Path.chmod") + @patch("app.chain.media.settings") + def test_download_and_save_image(self, mock_settings, mock_chmod, mock_temp_file, mock_request_utils): + # We need to test _download_and_save_image directly so we remove mock + self.media_chain = MediaChain() + self.media_chain._download_and_save_image = self.original_download + self.media_chain.storagechain = MagicMock() + + fileitem = schemas.FileItem(path="/movies/Avatar", name="Avatar", type="dir", storage="local") + target_path = Path("/movies/Avatar/poster.jpg") + url = "http://poster" + + # mock temp file + tmp_mock = MagicMock() + tmp_mock.name = "/tmp/mockfile" + mock_temp_file.return_value.__enter__.return_value = tmp_mock + + # mock stream + mock_stream = MagicMock() + mock_stream.status_code = 200 + mock_stream.iter_content.return_value = [b"data1", b"data2"] + + mock_instance = mock_request_utils.return_value + mock_instance.get_stream.return_value.__enter__.return_value = mock_stream + + self.media_chain.storagechain.upload_file.return_value = fileitem + + self.media_chain._download_and_save_image(fileitem, target_path, url) + + mock_request_utils.assert_called_with(proxies=mock_settings.PROXY, ua=mock_settings.NORMAL_USER_AGENT) + mock_instance.get_stream.assert_called_with(url=url) + tmp_mock.write.assert_any_call(b"data1") + tmp_mock.write.assert_any_call(b"data2") + mock_chmod.assert_called() + self.media_chain.storagechain.upload_file.assert_called_once() + call_args = self.media_chain.storagechain.upload_file.call_args.kwargs + self.assertEqual(call_args["fileitem"], fileitem) + self.assertEqual(call_args["new_name"], "poster.jpg") + + +class TestMediaScrapingTVDirectory(unittest.TestCase): + def setUp(self): + self.media_chain = MediaChain() + self.media_chain.storagechain = MagicMock() + self.media_chain._scrape_nfo_generic = MagicMock() + self.media_chain._scrape_images_generic = MagicMock() + + @patch("app.chain.media.settings") + def test_initialize_tv_directory_specials(self, mock_settings): + # mock specials directory recognition + mock_settings.RENAME_FORMAT_S0_NAMES = ["Specials", "SPs"] + mock_settings.RMT_MEDIAEXT = [".mp4", ".mkv"] + + fileitem = schemas.FileItem(path="/tv/Show/Specials", name="Specials", type="dir", storage="local") + meta = MetaInfo("Show") + mediainfo = MediaInfo(type=MediaType.TV) + self.media_chain.storagechain.list_files.return_value = [] + + self.media_chain._handle_tv_scraping(fileitem, meta, mediainfo, init_folder=True, parent=None, overwrite=False, recursive=True) + + self.media_chain._scrape_nfo_generic.assert_called_with( + current_fileitem=fileitem, + meta=meta, + mediainfo=mediainfo, + item_type=ScrapingTarget.SEASON, + overwrite=False, + season_number=0 + ) + self.media_chain._scrape_images_generic.assert_called_with( + current_fileitem=fileitem, + mediainfo=mediainfo, + item_type=ScrapingTarget.SEASON, + parent_fileitem=None, + overwrite=False, + season_number=0 + ) + + def test_initialize_tv_directory_season(self): + fileitem = schemas.FileItem(path="/tv/Show/Season 1", name="Season 1", type="dir", storage="local") + meta = MetaInfo("Show") + mediainfo = MediaInfo(type=MediaType.TV) + self.media_chain.storagechain.list_files.return_value = [] + + self.media_chain._handle_tv_scraping(fileitem, meta, mediainfo, init_folder=True, parent=None, overwrite=False, recursive=True) + + self.media_chain._scrape_nfo_generic.assert_called_with( + current_fileitem=fileitem, + meta=meta, + mediainfo=mediainfo, + item_type=ScrapingTarget.SEASON, + overwrite=False, + season_number=1 + ) + + +class TestMediaScrapeEvents(unittest.TestCase): + def setUp(self): + self.media_chain = MediaChain() + + @patch("app.chain.media.MediaChain.scrape_metadata") + @patch("app.chain.media.StorageChain.get_item") + @patch("app.chain.media.StorageChain.get_parent_item") + def test_scrape_metadata_event_file( + self, mock_get_parent, mock_get_item, mock_scrape_metadata + ): + fileitem = schemas.FileItem(path="/movies/movie.mkv", name="movie.mkv", type="file", storage="local") + parent_item = schemas.FileItem(path="/movies", name="movies", type="dir", storage="local") + + mock_get_item.return_value = fileitem + mock_get_parent.return_value = parent_item + + mediainfo = MediaInfo() + event = Event( + event_type=EventType.MetadataScrape, + event_data={ + "fileitem": fileitem, + "mediainfo": mediainfo, + "overwrite": True + } + ) + + self.media_chain.scrape_metadata_event(event) + + mock_scrape_metadata.assert_called_once_with( + fileitem=fileitem, + mediainfo=mediainfo, + init_folder=False, + parent=parent_item, + overwrite=True + ) + + @patch("app.chain.media.MediaChain.scrape_metadata") + @patch("app.chain.media.StorageChain.get_item") + @patch("app.chain.media.StorageChain.is_bluray_folder") + def test_scrape_metadata_event_dir_bluray( + self, mock_is_bluray, mock_get_item, mock_scrape_metadata + ): + fileitem = schemas.FileItem(path="/movies/bluray_movie", name="bluray_movie", type="dir", storage="local") + + mock_get_item.return_value = fileitem + mock_is_bluray.return_value = True + + mediainfo = MediaInfo() + event = Event( + event_type=EventType.MetadataScrape, + event_data={ + "fileitem": fileitem, + "file_list": ["/movies/bluray_movie/BDMV/index.bdmv"], + "mediainfo": mediainfo, + "overwrite": False + } + ) + + self.media_chain.scrape_metadata_event(event) + + mock_scrape_metadata.assert_called_once_with( + fileitem=fileitem, + mediainfo=mediainfo, + init_folder=True, + recursive=False, + overwrite=False + ) + + @patch("app.chain.media.MediaChain.scrape_metadata") + @patch("app.chain.media.StorageChain.get_item") + @patch("app.chain.media.StorageChain.is_bluray_folder") + @patch("app.chain.media.StorageChain.get_file_item") + def test_scrape_metadata_event_dir_with_filelist( + self, mock_get_file_item, mock_is_bluray, mock_get_item, mock_scrape_metadata + ): + fileitem = schemas.FileItem(path="/tv/show", name="show", type="dir", storage="local") + + mock_get_item.return_value = fileitem + mock_is_bluray.return_value = False + + def side_effect_get_file_item(storage, path): + path_str = str(path) + return schemas.FileItem(path=path_str, name=Path(path_str).name, type="dir" if "." not in path_str else "file", storage="local") + + mock_get_file_item.side_effect = side_effect_get_file_item + + mediainfo = MediaInfo() + event = Event( + event_type=EventType.MetadataScrape, + event_data={ + "fileitem": fileitem, + "file_list": ["/tv/show/Season 1/S01E01.mp4"], + "mediainfo": mediainfo, + "overwrite": True + } + ) + + self.media_chain.scrape_metadata_event(event) + + calls = mock_scrape_metadata.call_args_list + self.assertEqual(len(calls), 3) + + paths = [call.kwargs['fileitem'].path for call in calls] + self.assertIn("/tv/show", paths) + self.assertIn("/tv/show/Season 1", paths) + self.assertIn("/tv/show/Season 1/S01E01.mp4", paths) + + @patch("app.chain.media.MediaChain.scrape_metadata") + @patch("app.chain.media.StorageChain.get_item") + def test_scrape_metadata_event_dir_full( + self, mock_get_item, mock_scrape_metadata + ): + fileitem = schemas.FileItem(path="/movies/movie", name="movie", type="dir", storage="local") + + mock_get_item.return_value = fileitem + + mediainfo = MediaInfo() + meta = MetaInfo("movie") + event = Event( + event_type=EventType.MetadataScrape, + event_data={ + "fileitem": fileitem, + "meta": meta, + "mediainfo": mediainfo, + "overwrite": True + } + ) + + self.media_chain.scrape_metadata_event(event) + + mock_scrape_metadata.assert_called_once_with( + fileitem=fileitem, + meta=meta, + mediainfo=mediainfo, + init_folder=True, + overwrite=True + ) + + @patch("app.chain.media.MediaChain._handle_movie_scraping") + @patch("app.chain.media.MediaChain.recognize_by_meta") + def test_scrape_metadata_movie( + self, mock_recognize, mock_handle_movie + ): + fileitem = schemas.FileItem(path="/movies/movie.mkv", name="movie.mkv", type="file", storage="local") + meta = MetaInfo("Movie") + mediainfo = MediaInfo(type=MediaType.MOVIE) + + self.media_chain.scrape_metadata( + fileitem=fileitem, + meta=meta, + mediainfo=mediainfo, + init_folder=True, + overwrite=False, + recursive=True + ) + + mock_recognize.assert_not_called() + mock_handle_movie.assert_called_once_with( + fileitem=fileitem, + meta=meta, + mediainfo=mediainfo, + init_folder=True, + parent=None, + overwrite=False, + recursive=True + ) + + @patch("app.chain.media.MediaChain._handle_tv_scraping") + @patch("app.chain.media.MediaChain.recognize_by_meta") + def test_scrape_metadata_tv( + self, mock_recognize, mock_handle_tv + ): + fileitem = schemas.FileItem(path="/tv/show", name="show", type="dir", storage="local") + meta = MetaInfo("Show") + mediainfo = MediaInfo(type=MediaType.TV) + + self.media_chain.scrape_metadata( + fileitem=fileitem, + meta=meta, + mediainfo=mediainfo, + init_folder=True, + overwrite=False, + recursive=True + ) + + mock_handle_tv.assert_called_once_with( + fileitem=fileitem, + meta=meta, + mediainfo=mediainfo, + init_folder=True, + parent=None, + overwrite=False, + recursive=True + ) + + @patch("app.chain.media.MediaChain._handle_movie_scraping") + @patch("app.chain.media.MediaChain.recognize_by_meta") + def test_scrape_metadata_recognize_fallback( + self, mock_recognize, mock_handle_movie + ): + fileitem = schemas.FileItem(path="/movies/movie.mkv", name="movie.mkv", type="file", storage="local") + mediainfo = MediaInfo(type=MediaType.MOVIE) + mock_recognize.return_value = mediainfo + + self.media_chain.scrape_metadata( + fileitem=fileitem, + init_folder=True, + overwrite=False, + recursive=True + ) + + mock_recognize.assert_called_once() + mock_handle_movie.assert_called_once() + args, kwargs = mock_handle_movie.call_args + self.assertEqual(kwargs['mediainfo'], mediainfo) + self.assertEqual(kwargs['meta'].name, "Movie") + + @patch("app.chain.media.MediaChain._handle_movie_scraping") + @patch("app.chain.media.MediaChain._handle_tv_scraping") + def test_scrape_metadata_invalid_extension( + self, mock_handle_tv, mock_handle_movie + ): + fileitem = schemas.FileItem(path="/movies/movie.txt", name="movie.txt", type="file", storage="local") + + self.media_chain.scrape_metadata( + fileitem=fileitem + ) + + mock_handle_movie.assert_not_called() + mock_handle_tv.assert_not_called() + + @patch("app.chain.media.MediaChain.scrape_metadata") + @patch("app.chain.media.StorageChain.get_item") + @patch("app.chain.media.StorageChain.is_bluray_folder") + @patch("app.chain.media.StorageChain.get_file_item") + def test_scrape_metadata_event_dir_with_multiple_files( + self, mock_get_file_item, mock_is_bluray, mock_get_item, mock_scrape_metadata + ): + fileitem = schemas.FileItem(path="/movies/collection", name="collection", type="dir", storage="local") + + mock_get_item.return_value = fileitem + mock_is_bluray.return_value = False + + def side_effect_get_file_item(storage, path): + path_str = str(path) + return schemas.FileItem(path=path_str, name=Path(path_str).name, type="dir" if "." not in path_str else "file", storage="local") + + mock_get_file_item.side_effect = side_effect_get_file_item + + mediainfo = MediaInfo() + event = Event( + event_type=EventType.MetadataScrape, + event_data={ + "fileitem": fileitem, + "file_list": [ + "/movies/collection/movie1.mp4", + "/movies/collection/movie2.mkv", + "/movies/collection/movie3.avi" + ], + "mediainfo": mediainfo, + "overwrite": True + } + ) + + self.media_chain.scrape_metadata_event(event) + + calls = mock_scrape_metadata.call_args_list + # Should scrape directory and then each file item + self.assertEqual(len(calls), 4) + + paths = [call.kwargs['fileitem'].path for call in calls] + self.assertIn("/movies/collection", paths) + self.assertIn("/movies/collection/movie1.mp4", paths) + self.assertIn("/movies/collection/movie2.mkv", paths) + self.assertIn("/movies/collection/movie3.avi", paths) + + @patch("app.chain.media.MediaChain.scrape_metadata") + @patch("app.chain.media.StorageChain.get_item") + @patch("app.chain.media.StorageChain.is_bluray_folder") + @patch("app.chain.media.StorageChain.get_file_item") + def test_scrape_metadata_event_dir_with_tv_multi_seasons_episodes( + self, mock_get_file_item, mock_is_bluray, mock_get_item, mock_scrape_metadata + ): + fileitem = schemas.FileItem(path="/tv/MultiSeasonShow", name="MultiSeasonShow", type="dir", storage="local") + + mock_get_item.return_value = fileitem + mock_is_bluray.return_value = False + + def side_effect_get_file_item(storage, path): + path_str = str(path) + return schemas.FileItem(path=path_str, name=Path(path_str).name, type="dir" if "." not in path_str else "file", storage="local") + + mock_get_file_item.side_effect = side_effect_get_file_item + + mediainfo = MediaInfo() + event = Event( + event_type=EventType.MetadataScrape, + event_data={ + "fileitem": fileitem, + "file_list": [ + "/tv/MultiSeasonShow/Season 1/S01E01.mp4", + "/tv/MultiSeasonShow/Season 1/S01E02.mp4", + "/tv/MultiSeasonShow/Season 2/S02E01.mkv", + "/tv/MultiSeasonShow/Season 2/S02E02.mkv", + "/tv/MultiSeasonShow/Specials/S00E01.mp4" + ], + "mediainfo": mediainfo, + "overwrite": False + } + ) + + self.media_chain.scrape_metadata_event(event) + + calls = mock_scrape_metadata.call_args_list + # main dir + 3 season dirs + 5 episode files + self.assertEqual(len(calls), 9) + + paths = [call.kwargs['fileitem'].path for call in calls] + self.assertIn("/tv/MultiSeasonShow", paths) + self.assertIn("/tv/MultiSeasonShow/Season 1", paths) + self.assertIn("/tv/MultiSeasonShow/Season 2", paths) + self.assertIn("/tv/MultiSeasonShow/Specials", paths) + self.assertIn("/tv/MultiSeasonShow/Season 1/S01E01.mp4", paths) + self.assertIn("/tv/MultiSeasonShow/Season 1/S01E02.mp4", paths) + self.assertIn("/tv/MultiSeasonShow/Season 2/S02E01.mkv", paths) + self.assertIn("/tv/MultiSeasonShow/Season 2/S02E02.mkv", paths) + self.assertIn("/tv/MultiSeasonShow/Specials/S00E01.mp4", paths) + + @patch("app.chain.media.MediaChain.recognize_by_meta") + def test_scrape_metadata_recognize_fail( + self, mock_recognize + ): + fileitem = schemas.FileItem(path="/movies/movie.mkv", name="movie.mkv", type="file", storage="local") + mock_recognize.return_value = None + + with patch('app.chain.media.logger.warn') as mock_logger: + self.media_chain.scrape_metadata( + fileitem=fileitem + ) + mock_logger.assert_called_with(f"{Path(fileitem.path)} 无法识别文件媒体信息!") + +if __name__ == "__main__": + unittest.main()