From 630d13ac5278a7a7ad5aa551fc751058a9ec413b Mon Sep 17 00:00:00 2001 From: jxxghp Date: Mon, 23 Mar 2026 23:05:25 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=E9=9B=86=E7=BC=A9?= =?UTF-8?q?=E7=95=A5=E5=9B=BE=E6=96=87=E4=BB=B6=E5=90=8D=E9=94=99=E8=AF=AF?= =?UTF-8?q?=EF=BC=8Cepisode-thumb-xx=20=E6=94=B9=E4=B8=BA=20=E8=A7=86?= =?UTF-8?q?=E9=A2=91=E6=96=87=E4=BB=B6=E5=90=8D-thumb.xx?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/chain/media.py | 822 +++++++++++++++++++----------- app/modules/themoviedb/scraper.py | 173 +++++-- 2 files changed, 649 insertions(+), 346 deletions(-) diff --git a/app/chain/media.py b/app/chain/media.py index 003e7770..469e3493 100644 --- a/app/chain/media.py +++ b/app/chain/media.py @@ -15,8 +15,15 @@ from app.core.metainfo import MetaInfo, MetaInfoPath from app.db.systemconfig_oper import SystemConfigOper from app.log import logger from app.schemas import FileItem -from app.schemas.types import ChainEventType, EventType, MediaType, \ - ScrapingTarget, ScrapingMetadata, ScrapingPolicy, SystemConfigKey +from app.schemas.types import ( + ChainEventType, + EventType, + MediaType, + ScrapingTarget, + ScrapingMetadata, + ScrapingPolicy, + SystemConfigKey, +) from app.utils.mixins import ConfigReloadMixin from app.utils.singleton import Singleton from app.utils.http import RequestUtils @@ -31,6 +38,7 @@ os.umask(current_umask) class ScrapingOption: """刮削选项""" + type: ScrapingTarget = ScrapingTarget.TV metadata: ScrapingMetadata = ScrapingMetadata.NFO policy: ScrapingPolicy = ScrapingPolicy.MISSINGONLY @@ -57,7 +65,9 @@ class ScrapingOption: elif isinstance(value, str): self.policy = ScrapingPolicy(value) else: - logger.error(f"无效的刮削选项:type={type}, metadata={metadata}, value={value}") + logger.error( + f"无效的刮削选项:type={type}, metadata={metadata}, value={value}" + ) @property def is_skip(self) -> bool: @@ -91,20 +101,24 @@ class ScrapingConfig: for key, value in _config.items(): if "_" in key: - items = key.split('_', 1) + items = key.split("_", 1) self._policies[tuple(items)] = ScrapingOption(*items, value) - def option(self, item: Union[str, ScrapingTarget], metadata: Union[str, ScrapingMetadata]) -> ScrapingOption: + def option( + self, item: Union[str, ScrapingTarget], metadata: Union[str, ScrapingMetadata] + ) -> ScrapingOption: if isinstance(item, ScrapingTarget): item = item.name.lower() if isinstance(metadata, ScrapingMetadata): metadata = metadata.name.lower() - return self._policies.get((item, metadata), ScrapingOption(item, metadata, ScrapingPolicy.SKIP)) + return self._policies.get( + (item, metadata), ScrapingOption(item, metadata, ScrapingPolicy.SKIP) + ) @classmethod - def from_system_config(cls) -> 'ScrapingConfig': + def from_system_config(cls) -> "ScrapingConfig": """ 从系统配置加载 @@ -119,10 +133,13 @@ class ScrapingConfig: config_items = [ f"{mt}_{md}" for mt, mds in [ - ('movie', ['nfo', 'poster', 'backdrop', 'logo', 'disc', 'banner', 'thumb']), - ('tv', ['nfo', 'poster', 'backdrop', 'logo', 'banner', 'thumb']), - ('season', ['nfo', 'poster', 'banner', 'thumb']), - ('episode', ['nfo', 'thumb']) + ( + "movie", + ["nfo", "poster", "backdrop", "logo", "disc", "banner", "thumb"], + ), + ("tv", ["nfo", "poster", "backdrop", "logo", "banner", "thumb"]), + ("season", ["nfo", "poster", "banner", "thumb"]), + ("episode", ["nfo", "thumb"]), ] for md in mds ] @@ -133,18 +150,19 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): """ 媒体信息处理链,单例运行 """ + CONFIG_WATCH = {SystemConfigKey.ScrapingSwitchs.value} IMAGE_METADATA_MAP = { - 'poster': ScrapingMetadata.POSTER, - 'backdrop': ScrapingMetadata.BACKDROP, - 'fanart': ScrapingMetadata.BACKDROP, - 'background': ScrapingMetadata.BACKDROP, - 'logo': ScrapingMetadata.LOGO, - 'disc': ScrapingMetadata.DISC, - 'cdart': ScrapingMetadata.DISC, - 'banner': ScrapingMetadata.BANNER, - 'thumb': ScrapingMetadata.THUMB, + "poster": ScrapingMetadata.POSTER, + "backdrop": ScrapingMetadata.BACKDROP, + "fanart": ScrapingMetadata.BACKDROP, + "background": ScrapingMetadata.BACKDROP, + "logo": ScrapingMetadata.LOGO, + "disc": ScrapingMetadata.DISC, + "cdart": ScrapingMetadata.DISC, + "banner": ScrapingMetadata.BANNER, + "thumb": ScrapingMetadata.THUMB, } def __init__(self): @@ -155,7 +173,12 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): def on_config_changed(self): self.scraping_policies = ScrapingConfig.from_system_config() - def _should_scrape(self, scraping_option: ScrapingOption, file_exists: bool, global_overwrite: bool = False) -> bool: + def _should_scrape( + self, + scraping_option: ScrapingOption, + file_exists: bool, + global_overwrite: bool = False, + ) -> bool: """ 判断是否应该执行刮削操作 @@ -165,7 +188,9 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): :return bool: 是否应该刮削 """ if scraping_option.is_skip: - logger.info(f"{scraping_option.type.value} {scraping_option.metadata.value} 刮削策略 {scraping_option.policy.value}") + logger.info( + f"{scraping_option.type.value} {scraping_option.metadata.value} 刮削策略 {scraping_option.policy.value}" + ) return False if not file_exists: @@ -177,13 +202,17 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): logger.info( f"{scraping_option.type.value} {scraping_option.metadata.value} 文件存在," f"{'配置为覆盖' if scraping_option.is_overwrite else '配置为全局覆盖'}" - ) + ) return True else: - logger.info(f"{scraping_option.type.value} {scraping_option.metadata.value} 文件已存在,跳过") + logger.info( + f"{scraping_option.type.value} {scraping_option.metadata.value} 文件已存在,跳过" + ) return False - def _save_file(self, fileitem: schemas.FileItem, path: Path, content: Union[bytes, str]): + def _save_file( + self, fileitem: schemas.FileItem, path: Path, content: Union[bytes, str] + ): """ 保存或上传文件 @@ -194,13 +223,15 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): if not fileitem or not content or not path: return # 使用tempfile创建临时文件 - with NamedTemporaryFile(delete=True, delete_on_close=False, suffix=path.suffix) as tmp_file: + with NamedTemporaryFile( + delete=True, delete_on_close=False, suffix=path.suffix + ) as tmp_file: tmp_file_path = Path(tmp_file.name) # 写入内容 if isinstance(content, bytes): tmp_file.write(content) else: - tmp_file.write(content.encode('utf-8')) + tmp_file.write(content.encode("utf-8")) tmp_file.flush() tmp_file.close() # 关闭文件句柄 @@ -208,13 +239,17 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): tmp_file_path.chmod(0o666 & ~current_umask) # 上传文件 - item = self.storagechain.upload_file(fileitem=fileitem, path=tmp_file_path, new_name=path.name) + item = self.storagechain.upload_file( + fileitem=fileitem, path=tmp_file_path, new_name=path.name + ) if item: logger.info(f"已保存文件:{item.path}") else: logger.warn(f"文件保存失败:{path}") - def _download_and_save_image(self, fileitem: schemas.FileItem, path: Path, url: str): + def _download_and_save_image( + self, fileitem: schemas.FileItem, path: Path, url: str + ): """ 流式下载图片并保存到文件 @@ -227,11 +262,15 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): return try: logger.info(f"正在下载图片:{url} ...") - request_utils = RequestUtils(proxies=settings.PROXY, ua=settings.NORMAL_USER_AGENT) + request_utils = RequestUtils( + proxies=settings.PROXY, ua=settings.NORMAL_USER_AGENT + ) with request_utils.get_stream(url=url) as r: if r and r.status_code == 200: # 使用tempfile创建临时文件,自动删除 - with NamedTemporaryFile(delete=True, delete_on_close=False, suffix=path.suffix) as tmp_file: + with NamedTemporaryFile( + delete=True, delete_on_close=False, suffix=path.suffix + ) as tmp_file: tmp_file_path = Path(tmp_file.name) # 流式写入文件 for chunk in r.iter_content(chunk_size=8192): @@ -244,8 +283,9 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): tmp_file_path.chmod(0o666 & ~current_umask) # 上传文件 - item = self.storagechain.upload_file(fileitem=fileitem, path=tmp_file_path, - new_name=path.name) + item = self.storagechain.upload_file( + fileitem=fileitem, path=tmp_file_path, new_name=path.name + ) if item: logger.info(f"已保存图片:{item.path}") else: @@ -255,11 +295,14 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): except Exception as err: logger.error(f"{url} 图片下载失败:{str(err)}!") - def _get_target_fileitem_and_path(self, current_fileitem: schemas.FileItem, - item_type: ScrapingTarget, metadata_type: ScrapingMetadata, - filename_hint: Optional[str] = None, - parent_fileitem: Optional[schemas.FileItem] = None - ) -> Tuple[schemas.FileItem, Optional[Path]]: + def _get_target_fileitem_and_path( + self, + current_fileitem: schemas.FileItem, + item_type: ScrapingTarget, + metadata_type: ScrapingMetadata, + filename_hint: Optional[str] = None, + parent_fileitem: Optional[schemas.FileItem] = None, + ) -> Tuple[schemas.FileItem, Optional[Path]]: """ 根据当前上下文、刮削项类型和元数据类型生成目标 FileItem 和 Path 处理 NFO 和图片文件的命名约定及存储位置 @@ -267,7 +310,7 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): # 默认保存的目录是当前文件项的目录 target_dir_item = current_fileitem target_dir_path = Path(current_fileitem.path) - final_filename = filename_hint # 如果提供了 filename_hint,优先使用 + final_filename = filename_hint # 如果提供了 filename_hint,优先使用 # 针对 NFO 文件的特殊命名和存储逻辑 if metadata_type == ScrapingMetadata.NFO: @@ -275,12 +318,20 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): if current_fileitem.type == "file": # 电影文件NFO: 放在电影文件同级目录,名称与电影文件主体一致,后缀.nfo final_filename = f"{target_dir_path.stem}.nfo" - target_dir_item = parent_fileitem or self.storagechain.get_parent_item(current_fileitem) + target_dir_item = ( + parent_fileitem + or self.storagechain.get_parent_item(current_fileitem) + ) if not target_dir_item: - logger.error(f"无法获取文件 {current_fileitem.path} 的父目录项。") - return current_fileitem, None # 返回一个表示失败的FileItem和None + logger.error( + f"无法获取文件 {current_fileitem.path} 的父目录项。" + ) + return ( + current_fileitem, + None, + ) # 返回一个表示失败的FileItem和None target_dir_path = Path(target_dir_item.path) - else: # current_fileitem.type == "dir" + else: # current_fileitem.type == "dir" # 电影目录NFO (例如蓝光原盘): 放在电影目录内,名称与目录名主体一致,后缀.nfo final_filename = f"{target_dir_path.name}.nfo" # target_dir_item 保持为 current_fileitem @@ -294,32 +345,49 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): elif item_type == ScrapingTarget.EPISODE: # 电视剧集文件NFO: 放在集文件同级目录,名称与集文件主体一致,后缀.nfo final_filename = f"{target_dir_path.stem}.nfo" - target_dir_item = parent_fileitem or self.storagechain.get_parent_item(current_fileitem) + target_dir_item = parent_fileitem or self.storagechain.get_parent_item( + current_fileitem + ) if not target_dir_item: logger.error(f"无法获取文件 {current_fileitem.path} 的父目录项。") - return current_fileitem, None# 返回一个表示失败的FileItem和None + return current_fileitem, None # 返回一个表示失败的FileItem和None target_dir_path = Path(target_dir_item.path) # 图片通常是放在当前目录 (current_fileitem) 下 - # 如果是 EPISODE 类型的图片(如thumb),通常也是放在文件同级目录,调整 target_dir_item 和 target_dir_path - elif metadata_type in [ScrapingMetadata.THUMB] and item_type == ScrapingTarget.EPISODE: - target_dir_item = parent_fileitem or self.storagechain.get_parent_item(current_fileitem) + # 如果是 EPISODE 类型的图片(如thumb),通常也是放在文件同级目录,文件名与视频文件一致 + elif ( + metadata_type in [ScrapingMetadata.THUMB] + and item_type == ScrapingTarget.EPISODE + ): + # 集缩略图命名: {视频文件名}-thumb.{ext},如 Show.S01E03-thumb.jpg + hint_ext = Path(filename_hint).suffix if filename_hint else ".jpg" + final_filename = f"{target_dir_path.stem}-thumb{hint_ext}" + target_dir_item = parent_fileitem or self.storagechain.get_parent_item( + current_fileitem + ) if not target_dir_item: logger.error(f"无法获取文件 {current_fileitem.path} 的父目录项。") - return current_fileitem, None # 返回一个表示失败的FileItem和None + return current_fileitem, None # 返回一个表示失败的FileItem和None target_dir_path = Path(target_dir_item.path) # TODO: 考虑其他图片类型是否也需要保存到父目录 # 确保最终有文件名 if not final_filename: - logger.error(f"无法为 {item_type.value} - {metadata_type.value} 确定文件名。filename_hint: {filename_hint}") + logger.error( + f"无法为 {item_type.value} - {metadata_type.value} 确定文件名。filename_hint: {filename_hint}" + ) # 返回一个表示失败的FileItem和None return current_fileitem, None target_full_path = target_dir_path / final_filename return target_dir_item, target_full_path - def metadata_nfo(self, meta: MetaBase, mediainfo: MediaInfo, - season: Optional[int] = None, episode: Optional[int] = None) -> Optional[str]: + def metadata_nfo( + self, + meta: MetaBase, + mediainfo: MediaInfo, + season: Optional[int] = None, + episode: Optional[int] = None, + ) -> Optional[str]: """ 获取NFO文件内容文本 @@ -328,10 +396,17 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): :param season: 季号 :param episode: 集号 """ - return self.run_module("metadata_nfo", meta=meta, mediainfo=mediainfo, season=season, episode=episode) + return self.run_module( + "metadata_nfo", + meta=meta, + mediainfo=mediainfo, + season=season, + episode=episode, + ) - def select_recognize_source(self, log_name: str, log_context: str, - native_fn, plugin_fn) -> Optional[MediaInfo]: + def select_recognize_source( + self, log_name: str, log_context: str, native_fn, plugin_fn + ) -> Optional[MediaInfo]: """ 选择识别模式,插件优先或原生优先 @@ -347,34 +422,44 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): logger.info(f"插件优先模式已开启。请求辅助识别,标题:{log_name} ...") mediainfo = plugin_fn() if not mediainfo: - logger.info(f'辅助识别未识别到 {log_context} 的媒体信息,尝试使用原生识别') + logger.info( + f"辅助识别未识别到 {log_context} 的媒体信息,尝试使用原生识别" + ) mediainfo = native_fn() else: # 原生优先 logger.info(f"插件优先模式未开启。尝试原生识别,标题:{log_name} ...") mediainfo = native_fn() if not mediainfo and plugin_available: - logger.info(f'原生识别未识别到 {log_context} 的媒体信息,尝试使用辅助识别') + logger.info( + f"原生识别未识别到 {log_context} 的媒体信息,尝试使用辅助识别" + ) mediainfo = plugin_fn() return mediainfo - def recognize_by_meta(self, metainfo: MetaBase, episode_group: Optional[str] = None) -> Optional[MediaInfo]: + def recognize_by_meta( + self, metainfo: MetaBase, episode_group: Optional[str] = None + ) -> Optional[MediaInfo]: """ 根据主副标题识别媒体信息 """ title = metainfo.title - # 按 config 中设置的识别顺序识别 + # 按 config 中设置的识别顺序识别 mediainfo = self.select_recognize_source( - log_name=title, - log_context=title, - native_fn=lambda: self.recognize_media(meta=metainfo, episode_group=episode_group), - plugin_fn=lambda: self.recognize_help(title=title, org_meta=metainfo) - ) + log_name=title, + log_context=title, + native_fn=lambda: self.recognize_media( + meta=metainfo, episode_group=episode_group + ), + plugin_fn=lambda: self.recognize_help(title=title, org_meta=metainfo), + ) if not mediainfo: - logger.warn(f'{title} 未识别到媒体信息') + logger.warn(f"{title} 未识别到媒体信息") return None # 识别成功 - logger.info(f'{title} 识别到媒体信息:{mediainfo.type.value} {mediainfo.title_year}') + logger.info( + f"{title} 识别到媒体信息:{mediainfo.type.value} {mediainfo.title_year}" + ) # 更新媒体图片 self.obtain_images(mediainfo=mediainfo) # 返回上下文 @@ -391,14 +476,14 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): result: Event = eventmanager.send_event( ChainEventType.NameRecognize, { - 'title': title, - } + "title": title, + }, ) if not result: return None # 获取返回事件数据 event_data = result.event_data or {} - logger.info(f'获取到辅助识别结果:{event_data}') + logger.info(f"获取到辅助识别结果:{event_data}") # 处理数据格式 title, year, season_number, episode_number = None, None, None, None if event_data.get("name"): @@ -411,15 +496,15 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): episode_number = int(event_data["episode"]) if not title: return None - if title == 'Unknown': + if title == "Unknown": return None if not str(year).isdigit(): year = None # 结果赋值 if title == org_meta.name and year == org_meta.year: - logger.info(f'辅助识别与原始识别结果一致,无需重新识别媒体信息') + logger.info(f"辅助识别与原始识别结果一致,无需重新识别媒体信息") return None - logger.info(f'辅助识别结果与原始识别结果不一致,重新匹配媒体信息 ...') + logger.info(f"辅助识别结果与原始识别结果不一致,重新匹配媒体信息 ...") org_meta.name = title org_meta.year = year org_meta.begin_season = season_number @@ -429,25 +514,31 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): # 重新识别 return self.recognize_media(meta=org_meta) - def recognize_by_path(self, path: str, episode_group: Optional[str] = None) -> Optional[Context]: + def recognize_by_path( + self, path: str, episode_group: Optional[str] = None + ) -> Optional[Context]: """ 根据文件路径识别媒体信息 """ - logger.info(f'开始识别媒体信息,文件:{path} ...') + logger.info(f"开始识别媒体信息,文件:{path} ...") file_path = Path(path) # 元数据 file_meta = MetaInfoPath(file_path) - # 按 config 中设置的识别顺序识别 + # 按 config 中设置的识别顺序识别 mediainfo = self.select_recognize_source( - log_name=file_path.name, - log_context=path, - native_fn=lambda: self.recognize_media(meta=file_meta, episode_group=episode_group), - plugin_fn=lambda: self.recognize_help(title=path, org_meta=file_meta) - ) + log_name=file_path.name, + log_context=path, + native_fn=lambda: self.recognize_media( + meta=file_meta, episode_group=episode_group + ), + plugin_fn=lambda: self.recognize_help(title=path, org_meta=file_meta), + ) if not mediainfo: - logger.warn(f'{path} 未识别到媒体信息') + logger.warn(f"{path} 未识别到媒体信息") return Context(meta_info=file_meta) - logger.info(f'{path} 识别到媒体信息:{mediainfo.type.value} {mediainfo.title_year}') + logger.info( + f"{path} 识别到媒体信息:{mediainfo.type.value} {mediainfo.title_year}" + ) # 更新媒体图片 self.obtain_images(mediainfo=mediainfo) # 返回上下文 @@ -461,7 +552,9 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): :return: 识别元数据,媒体信息列表 """ # 提取要素 - mtype, key_word, season_num, episode_num, year, content = StringUtils.get_keyword(title) + mtype, key_word, season_num, episode_num, year, content = ( + StringUtils.get_keyword(title) + ) # 识别 meta = MetaInfo(content) if not meta.name: @@ -485,7 +578,9 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): # 识别的元数据,媒体信息列表 return meta, medias - def get_tmdbinfo_by_doubanid(self, doubanid: str, mtype: MediaType = None) -> Optional[dict]: + def get_tmdbinfo_by_doubanid( + self, doubanid: str, mtype: MediaType = None + ) -> Optional[dict]: """ 根据豆瓣ID获取TMDB信息 """ @@ -502,23 +597,29 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): if doubaninfo.get("year"): meta.year = doubaninfo.get("year") # 处理类型 - if isinstance(doubaninfo.get('media_type'), MediaType): - meta.type = doubaninfo.get('media_type') + if isinstance(doubaninfo.get("media_type"), MediaType): + meta.type = doubaninfo.get("media_type") else: - meta.type = MediaType.MOVIE if doubaninfo.get("type") == "movie" else MediaType.TV + meta.type = ( + MediaType.MOVIE + if doubaninfo.get("type") == "movie" + else MediaType.TV + ) # 匹配TMDB信息 - meta_names = list(dict.fromkeys([k for k in [meta_org.name, - meta.cn_name, - meta.en_name] if k])) + meta_names = list( + dict.fromkeys( + [k for k in [meta_org.name, meta.cn_name, meta.en_name] if k] + ) + ) tmdbinfo = self._match_tmdb_with_names( meta_names=meta_names, year=meta.year, mtype=mtype or meta.type, - season=meta.begin_season + season=meta.begin_season, ) if tmdbinfo: # 合季季后返回 - tmdbinfo['season'] = meta.begin_season + tmdbinfo["season"] = meta.begin_season return tmdbinfo def get_tmdbinfo_by_bangumiid(self, bangumiid: int) -> Optional[dict]: @@ -536,19 +637,21 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): # 年份 year = self._extract_year_from_bangumi(bangumiinfo) # 识别TMDB媒体信息 - meta_names = list(dict.fromkeys([k for k in [meta_cn.name, - meta.name] if k])) + meta_names = list( + dict.fromkeys([k for k in [meta_cn.name, meta.name] if k]) + ) tmdbinfo = self._match_tmdb_with_names( meta_names=meta_names, year=year, mtype=MediaType.TV, - season=meta.begin_season + season=meta.begin_season, ) return tmdbinfo return None - def get_doubaninfo_by_tmdbid(self, tmdbid: int, - mtype: MediaType = None, season: Optional[int] = None) -> Optional[dict]: + def get_doubaninfo_by_tmdbid( + self, tmdbid: int, mtype: MediaType = None, season: Optional[int] = None + ) -> Optional[dict]: """ 根据TMDBID获取豆瓣信息 """ @@ -561,10 +664,7 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): # IMDBID imdbid = tmdbinfo.get("external_ids", {}).get("imdb_id") return self.match_doubaninfo( - name=name, - year=year, - mtype=mtype, - imdbid=imdbid + name=name, year=year, mtype=mtype, imdbid=imdbid ) return None @@ -583,10 +683,7 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): year = self._extract_year_from_bangumi(bangumiinfo) # 使用名称识别豆瓣媒体信息 return self.match_doubaninfo( - name=meta.name, - year=year, - mtype=MediaType.TV, - season=meta.begin_season + name=meta.name, year=year, mtype=MediaType.TV, season=meta.begin_season ) return None @@ -621,21 +718,27 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): # 检查是否为目录 if fileitem.type == "file": # 单个文件刮削 - self.scrape_metadata(fileitem=fileitem, - mediainfo=mediainfo, - init_folder=False, - parent=self.storagechain.get_parent_item(fileitem), - overwrite=overwrite) + self.scrape_metadata( + fileitem=fileitem, + mediainfo=mediainfo, + init_folder=False, + parent=self.storagechain.get_parent_item(fileitem), + overwrite=overwrite, + ) else: if file_list: # 如果是BDMV原盘目录,只对根目录进行刮削,不处理子目录 if self.storagechain.is_bluray_folder(fileitem): - logger.info(f"检测到BDMV原盘目录,只对根目录进行刮削:{fileitem.path}") - self.scrape_metadata(fileitem=fileitem, - mediainfo=mediainfo, - init_folder=True, - recursive=False, - overwrite=overwrite) + logger.info( + f"检测到BDMV原盘目录,只对根目录进行刮削:{fileitem.path}" + ) + self.scrape_metadata( + fileitem=fileitem, + mediainfo=mediainfo, + init_folder=True, + recursive=False, + overwrite=overwrite, + ) else: # 1. 收集fileitem和file_list中每个文件之间所有子目录 all_dirs = set() @@ -650,7 +753,10 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): sub_path = Path(sub_file) # 收集从根目录到文件的所有父目录 current_path = sub_path.parent - while current_path != root_path and current_path.is_relative_to(root_path): + while ( + current_path != root_path + and current_path.is_relative_to(root_path) + ): all_dirs.add(current_path) current_path = current_path.parent @@ -658,43 +764,59 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): # 2. 初始化一遍子目录,但不处理文件 for sub_dir in all_dirs: - sub_dir_item = self.storagechain.get_file_item(storage=fileitem.storage, path=sub_dir) + sub_dir_item = self.storagechain.get_file_item( + storage=fileitem.storage, path=sub_dir + ) if sub_dir_item: logger.info(f"为目录生成海报和nfo:{sub_dir}") # 初始化目录元数据,但不处理文件 - self.scrape_metadata(fileitem=sub_dir_item, - mediainfo=mediainfo, - init_folder=True, - recursive=False, - overwrite=overwrite) + self.scrape_metadata( + fileitem=sub_dir_item, + mediainfo=mediainfo, + init_folder=True, + recursive=False, + overwrite=overwrite, + ) else: logger.warn(f"无法获取目录项:{sub_dir}") # 3. 刮削每个文件 logger.info(f"开始刮削 {len(file_list)} 个文件") for sub_file_path in file_list: - sub_file_item = self.storagechain.get_file_item(storage=fileitem.storage, - path=Path(sub_file_path)) + sub_file_item = self.storagechain.get_file_item( + storage=fileitem.storage, path=Path(sub_file_path) + ) if sub_file_item: - self.scrape_metadata(fileitem=sub_file_item, - mediainfo=mediainfo, - init_folder=False, - overwrite=overwrite) + self.scrape_metadata( + fileitem=sub_file_item, + mediainfo=mediainfo, + init_folder=False, + overwrite=overwrite, + ) else: logger.warn(f"无法获取文件项:{sub_file_path}") else: # 执行全量刮削 logger.info(f"开始刮削目录 {fileitem.path} ...") - self.scrape_metadata(fileitem=fileitem, meta=meta, init_folder=True, - mediainfo=mediainfo, overwrite=overwrite) + self.scrape_metadata( + fileitem=fileitem, + meta=meta, + init_folder=True, + mediainfo=mediainfo, + overwrite=overwrite, + ) - def _scrape_nfo_generic(self, current_fileitem: schemas.FileItem, - meta: MetaBase, mediainfo: MediaInfo, - item_type: ScrapingTarget, - parent_fileitem: Optional[schemas.FileItem] = None, - overwrite: bool = False, - season_number: Optional[int] = None, - episode_number: Optional[int] = None): + def _scrape_nfo_generic( + self, + current_fileitem: schemas.FileItem, + meta: MetaBase, + mediainfo: MediaInfo, + item_type: ScrapingTarget, + parent_fileitem: Optional[schemas.FileItem] = None, + overwrite: bool = False, + season_number: Optional[int] = None, + episode_number: Optional[int] = None, + ): """ NFO 刮削 """ @@ -703,7 +825,9 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): # 检查刮削开关 if nfo_option.is_skip: - logger.info(f"{item_type.value} {ScrapingMetadata.NFO.value} 刮削策略 {nfo_option.policy.value}") + logger.info( + f"{item_type.value} {ScrapingMetadata.NFO.value} 刮削策略 {nfo_option.policy.value}" + ) return # 获取目标 FileItem (`base_item`) 和 Path (`nfo_path`) @@ -711,32 +835,41 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): current_fileitem=current_fileitem, item_type=item_type, metadata_type=ScrapingMetadata.NFO, - parent_fileitem=parent_fileitem + parent_fileitem=parent_fileitem, ) - if not nfo_path: # _get_target_fileitem_and_path 内部错误处理返回None + if not nfo_path: # _get_target_fileitem_and_path 内部错误处理返回None return # 文件存在检查 - file_exists = self.storagechain.get_file_item(storage=base_item.storage, path=nfo_path) + file_exists = self.storagechain.get_file_item( + storage=base_item.storage, path=nfo_path + ) # 刮削决策 if self._should_scrape(nfo_option, bool(file_exists), overwrite): # 生成 NFO 内容 - nfo_content = self.metadata_nfo(meta=meta, mediainfo=mediainfo, - season=season_number, episode=episode_number) + nfo_content = self.metadata_nfo( + meta=meta, + mediainfo=mediainfo, + season=season_number, + episode=episode_number, + ) if nfo_content: self._save_file(fileitem=base_item, path=nfo_path, content=nfo_content) else: logger.warn(f"{nfo_path.name} NFO 文件生成失败!") - def _scrape_images_generic(self, current_fileitem: schemas.FileItem, - mediainfo: MediaInfo, - item_type: ScrapingTarget, - parent_fileitem: Optional[schemas.FileItem] = None, - overwrite: bool = False, - season_number: Optional[int] = None, - episode_number: Optional[int] = None): + def _scrape_images_generic( + self, + current_fileitem: schemas.FileItem, + mediainfo: MediaInfo, + item_type: ScrapingTarget, + parent_fileitem: Optional[schemas.FileItem] = None, + overwrite: bool = False, + season_number: Optional[int] = None, + episode_number: Optional[int] = None, + ): """ 图片刮削 """ @@ -768,19 +901,33 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): option = self.scraping_policies.option(item_type, metadata_type) if option.is_skip: - logger.info(f"{item_type.value} {option.metadata.value} 刮削策略 {option.policy.value}") + logger.info( + f"{item_type.value} {option.metadata.value} 刮削策略 {option.policy.value}" + ) continue # 判断是否匹配当前刮削的季号 - if item_type == ScrapingTarget.TV and image_name.lower().startswith("season"): + if item_type == ScrapingTarget.TV and image_name.lower().startswith( + "season" + ): logger.info(f"当前为电视剧根目录刮削,跳过季图片:{image_name}") continue - if item_type == ScrapingTarget.SEASON and season_number is not None and image_name.lower().startswith("season"): + if ( + item_type == ScrapingTarget.SEASON + and season_number is not None + and image_name.lower().startswith("season") + ): # 检查是否只下载当前刮削季的图片 - image_season_str = "00" if "specials" in image_name.lower() else image_name[6:8] + image_season_str = ( + "00" if "specials" in image_name.lower() else image_name[6:8] + ) - if image_season_str is not None and image_season_str != str(season_number).rjust(2, '0'): - logger.info(f"当前刮削季为:{season_number},跳过非本季图片:{image_name}") + if image_season_str is not None and image_season_str != str( + season_number + ).rjust(2, "0"): + logger.info( + f"当前刮削季为:{season_number},跳过非本季图片:{image_name}" + ) continue # 获取目标 FileItem (`base_item`) 和 Path (`image_path`) @@ -789,25 +936,37 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): item_type=item_type, metadata_type=metadata_type, filename_hint=image_name, - parent_fileitem=parent_fileitem + parent_fileitem=parent_fileitem, ) if not image_path: continue # 文件存在检查 - file_exists = self.storagechain.get_file_item(storage=base_item.storage, path=image_path) + file_exists = self.storagechain.get_file_item( + storage=base_item.storage, path=image_path + ) # 刮削决策 if self._should_scrape(option, bool(file_exists), overwrite): - self._download_and_save_image(fileitem=base_item, path=image_path, url=image_url) + self._download_and_save_image( + fileitem=base_item, path=image_path, url=image_url + ) else: - logger.debug(f"未找到图片类型 {image_name} 对应的 ScrapingMetadata,跳过。") + logger.debug( + f"未找到图片类型 {image_name} 对应的 ScrapingMetadata,跳过。" + ) - def scrape_metadata(self, fileitem: schemas.FileItem, - meta: MetaBase = None, mediainfo: MediaInfo = None, - init_folder: bool = True, parent: schemas.FileItem = None, - overwrite: bool = False, recursive: bool = True): + def scrape_metadata( + self, + fileitem: schemas.FileItem, + meta: MetaBase = None, + mediainfo: MediaInfo = None, + init_folder: bool = True, + parent: schemas.FileItem = None, + overwrite: bool = False, + recursive: bool = True, + ): """ 手动刮削媒体信息 @@ -824,8 +983,9 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): # 当前文件路径 filepath = Path(fileitem.path) - if fileitem.type == "file" \ - and (not filepath.suffix or filepath.suffix.lower() not in settings.RMT_MEDIAEXT): + if fileitem.type == "file" and ( + not filepath.suffix or filepath.suffix.lower() not in settings.RMT_MEDIAEXT + ): return # 准备元数据和媒体信息 @@ -848,7 +1008,7 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): init_folder=init_folder, parent=parent, overwrite=overwrite, - recursive=recursive + recursive=recursive, ) else: self._handle_tv_scraping( @@ -858,15 +1018,21 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): init_folder=init_folder, parent=parent, overwrite=overwrite, - recursive=recursive + recursive=recursive, ) logger.info(f"{filepath.name} 刮削完成") - def _handle_movie_scraping(self, fileitem: schemas.FileItem, - meta: MetaBase, mediainfo: MediaInfo, - init_folder: bool, parent: schemas.FileItem, - overwrite: bool, recursive: bool): + def _handle_movie_scraping( + self, + fileitem: schemas.FileItem, + meta: MetaBase, + mediainfo: MediaInfo, + init_folder: bool, + parent: schemas.FileItem, + overwrite: bool, + recursive: bool, + ): """ 处理电影刮削 """ @@ -878,7 +1044,7 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): mediainfo=mediainfo, item_type=ScrapingTarget.MOVIE, parent_fileitem=parent, - overwrite=overwrite + overwrite=overwrite, ) else: # 电影目录:递归处理文件并初始化目录 @@ -889,13 +1055,19 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): init_folder=init_folder, parent=parent, overwrite=overwrite, - recursive=recursive + recursive=recursive, ) - def _handle_movie_directory(self, fileitem: schemas.FileItem, - meta: MetaBase, mediainfo: MediaInfo, - init_folder: bool, parent: schemas.FileItem, - overwrite: bool, recursive: bool): + def _handle_movie_directory( + self, + fileitem: schemas.FileItem, + meta: MetaBase, + mediainfo: MediaInfo, + init_folder: bool, + parent: schemas.FileItem, + overwrite: bool, + recursive: bool, + ): """ 处理电影目录刮削 """ @@ -907,11 +1079,13 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): for file in files: if file.type == "dir": continue - self.scrape_metadata(fileitem=file, - mediainfo=mediainfo, - init_folder=False, - parent=fileitem, - overwrite=overwrite) + self.scrape_metadata( + fileitem=file, + mediainfo=mediainfo, + init_folder=False, + parent=fileitem, + overwrite=overwrite, + ) # 初始化目录元数据 if init_folder: @@ -922,20 +1096,26 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): meta=meta, mediainfo=mediainfo, item_type=ScrapingTarget.MOVIE, - overwrite=overwrite + overwrite=overwrite, ) # 电影目录:处理图片 self._scrape_images_generic( current_fileitem=fileitem, mediainfo=mediainfo, item_type=ScrapingTarget.MOVIE, - overwrite=overwrite + overwrite=overwrite, ) - def _handle_tv_scraping(self, fileitem: schemas.FileItem, - meta: MetaBase, mediainfo: MediaInfo, - init_folder: bool, parent: schemas.FileItem, - overwrite: bool, recursive: bool): + def _handle_tv_scraping( + self, + fileitem: schemas.FileItem, + meta: MetaBase, + mediainfo: MediaInfo, + init_folder: bool, + parent: schemas.FileItem, + overwrite: bool, + recursive: bool, + ): """ 处理电视剧刮削 """ @@ -948,7 +1128,7 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): filepath=filepath, mediainfo=mediainfo, parent=parent, - overwrite=overwrite + overwrite=overwrite, ) else: # 电视剧目录:递归处理并初始化目录 @@ -960,14 +1140,17 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): init_folder=init_folder, parent=parent, overwrite=overwrite, - recursive=recursive + recursive=recursive, ) - def _handle_tv_episode_file(self, fileitem: schemas.FileItem, - filepath: Path, - mediainfo: MediaInfo, - parent: schemas.FileItem, - overwrite: bool): + def _handle_tv_episode_file( + self, + fileitem: schemas.FileItem, + filepath: Path, + mediainfo: MediaInfo, + parent: schemas.FileItem, + overwrite: bool, + ): """ 处理电视剧集文件刮削 """ @@ -977,8 +1160,11 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): logger.warn(f"{filepath.name} 无法识别文件集数!") return - file_mediainfo = self.recognize_media(meta=file_meta, tmdbid=mediainfo.tmdb_id, - episode_group=mediainfo.episode_group) + file_mediainfo = self.recognize_media( + meta=file_meta, + tmdbid=mediainfo.tmdb_id, + episode_group=mediainfo.episode_group, + ) if not file_mediainfo: logger.warn(f"{filepath.name} 无法识别文件媒体信息!") return @@ -992,7 +1178,7 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): parent_fileitem=parent, overwrite=overwrite, season_number=file_meta.begin_season, - episode_number=file_meta.begin_episode + episode_number=file_meta.begin_episode, ) # 处理图片 @@ -1003,14 +1189,20 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): parent_fileitem=parent, overwrite=overwrite, season_number=file_meta.begin_season, - episode_number=file_meta.begin_episode + episode_number=file_meta.begin_episode, ) - def _handle_tv_directory(self, fileitem: schemas.FileItem, - filepath: Path, - meta: MetaBase, mediainfo: MediaInfo, - init_folder: bool, parent: schemas.FileItem, - overwrite: bool, recursive: bool): + def _handle_tv_directory( + self, + fileitem: schemas.FileItem, + filepath: Path, + meta: MetaBase, + mediainfo: MediaInfo, + init_folder: bool, + parent: schemas.FileItem, + overwrite: bool, + recursive: bool, + ): """ 处理电视剧目录刮削 """ @@ -1025,11 +1217,13 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): ): # 电视剧不处理非季子目录 continue - self.scrape_metadata(fileitem=file, - mediainfo=mediainfo, - parent=fileitem if file.type == "file" else None, - init_folder=True if file.type == "dir" else False, - overwrite=overwrite) + self.scrape_metadata( + fileitem=file, + mediainfo=mediainfo, + parent=fileitem if file.type == "file" else None, + init_folder=True if file.type == "dir" else False, + overwrite=overwrite, + ) # 初始化目录元数据 if init_folder: @@ -1039,14 +1233,18 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): meta=meta, mediainfo=mediainfo, parent=parent, - overwrite=overwrite + overwrite=overwrite, ) - def _initialize_tv_directory_metadata(self, fileitem: schemas.FileItem, - filepath: Path, - meta: MetaBase, mediainfo: MediaInfo, - parent: schemas.FileItem, - overwrite: bool): + def _initialize_tv_directory_metadata( + self, + fileitem: schemas.FileItem, + filepath: Path, + meta: MetaBase, + mediainfo: MediaInfo, + parent: schemas.FileItem, + overwrite: bool, + ): """ 初始化电视剧目录元数据(识别季号并刮削) """ @@ -1072,7 +1270,7 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): mediainfo=mediainfo, item_type=ScrapingTarget.SEASON, overwrite=overwrite, - season_number=season_meta.begin_season + season_number=season_meta.begin_season, ) self._scrape_images_generic( current_fileitem=fileitem, @@ -1080,7 +1278,7 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): item_type=ScrapingTarget.SEASON, parent_fileitem=parent, overwrite=overwrite, - season_number=season_meta.begin_season + season_number=season_meta.begin_season, ) elif season_meta.name: # 剧集根目录:处理电视剧 NFO 和图片 @@ -1089,19 +1287,20 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): meta=meta, mediainfo=mediainfo, item_type=ScrapingTarget.TV, - overwrite=overwrite + overwrite=overwrite, ) self._scrape_images_generic( current_fileitem=fileitem, mediainfo=mediainfo, item_type=ScrapingTarget.TV, - overwrite=overwrite + overwrite=overwrite, ) else: logger.warn("无法识别元数据,跳过") - async def async_select_recognize_source(self, log_name: str, log_context: str, - native_fn, plugin_fn) -> Optional[MediaInfo]: + async def async_select_recognize_source( + self, log_name: str, log_context: str, native_fn, plugin_fn + ) -> Optional[MediaInfo]: """ 选择识别模式,插件优先或原生优先(异步版本) @@ -1117,46 +1316,60 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): logger.info(f"插件优先模式已开启。请求辅助识别,标题:{log_name} ...") mediainfo = await plugin_fn() if not mediainfo: - logger.info(f'辅助识别未识别到 {log_context} 的媒体信息,尝试使用原生识别') + logger.info( + f"辅助识别未识别到 {log_context} 的媒体信息,尝试使用原生识别" + ) mediainfo = await native_fn() else: # 原生优先 logger.info(f"插件优先模式未开启。尝试原生识别,标题:{log_name} ...") mediainfo = await native_fn() if not mediainfo and plugin_available: - logger.info(f'原生识别未识别到 {log_context} 的媒体信息,尝试使用辅助识别') + logger.info( + f"原生识别未识别到 {log_context} 的媒体信息,尝试使用辅助识别" + ) mediainfo = await plugin_fn() return mediainfo - async def async_recognize_by_meta(self, metainfo: MetaBase, - episode_group: Optional[str] = None) -> Optional[MediaInfo]: + async def async_recognize_by_meta( + self, metainfo: MetaBase, episode_group: Optional[str] = None + ) -> Optional[MediaInfo]: """ 根据主副标题识别媒体信息(异步版本) """ title = metainfo.title + # 定义识别函数 async def native_recognize(): - return await self.async_recognize_media(meta=metainfo, episode_group=episode_group) + return await self.async_recognize_media( + meta=metainfo, episode_group=episode_group + ) + async def plugin_recognize(): return await self.async_recognize_help(title=title, org_meta=metainfo) + # 按 config 中设置的识别顺序识别 mediainfo = await self.async_select_recognize_source( - log_name=title, - log_context=title, - native_fn=native_recognize, - plugin_fn=plugin_recognize - ) + log_name=title, + log_context=title, + native_fn=native_recognize, + plugin_fn=plugin_recognize, + ) if not mediainfo: - logger.warn(f'{title} 未识别到媒体信息') + logger.warn(f"{title} 未识别到媒体信息") return None # 识别成功 - logger.info(f'{title} 识别到媒体信息:{mediainfo.type.value} {mediainfo.title_year}') + logger.info( + f"{title} 识别到媒体信息:{mediainfo.type.value} {mediainfo.title_year}" + ) # 更新媒体图片 await self.async_obtain_images(mediainfo=mediainfo) # 返回上下文 return mediainfo - async def async_recognize_help(self, title: str, org_meta: MetaBase) -> Optional[MediaInfo]: + async def async_recognize_help( + self, title: str, org_meta: MetaBase + ) -> Optional[MediaInfo]: """ 请求辅助识别,返回媒体信息(异步版本) @@ -1167,14 +1380,14 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): result: Event = await eventmanager.async_send_event( ChainEventType.NameRecognize, { - 'title': title, - } + "title": title, + }, ) if not result: return None # 获取返回事件数据 event_data = result.event_data or {} - logger.info(f'获取到辅助识别结果:{event_data}') + logger.info(f"获取到辅助识别结果:{event_data}") # 处理数据格式 title, year, season_number, episode_number = None, None, None, None if event_data.get("name"): @@ -1187,15 +1400,15 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): episode_number = int(event_data["episode"]) if not title: return None - if title == 'Unknown': + if title == "Unknown": return None if not str(year).isdigit(): year = None # 结果赋值 if title == org_meta.name and year == org_meta.year: - logger.info(f'辅助识别与原始识别结果一致,无需重新识别媒体信息') + logger.info(f"辅助识别与原始识别结果一致,无需重新识别媒体信息") return None - logger.info(f'辅助识别结果与原始识别结果不一致,重新匹配媒体信息 ...') + logger.info(f"辅助识别结果与原始识别结果不一致,重新匹配媒体信息 ...") org_meta.name = title org_meta.year = year org_meta.begin_season = season_number @@ -1205,36 +1418,47 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): # 重新识别 return await self.async_recognize_media(meta=org_meta) - async def async_recognize_by_path(self, path: str, episode_group: Optional[str] = None) -> Optional[Context]: + async def async_recognize_by_path( + self, path: str, episode_group: Optional[str] = None + ) -> Optional[Context]: """ 根据文件路径识别媒体信息(异步版本) """ - logger.info(f'开始识别媒体信息,文件:{path} ...') + logger.info(f"开始识别媒体信息,文件:{path} ...") file_path = Path(path) # 元数据 file_meta = MetaInfoPath(file_path) + # 定义识别函数 async def native_recognize(): - return await self.async_recognize_media(meta=file_meta, episode_group=episode_group) + return await self.async_recognize_media( + meta=file_meta, episode_group=episode_group + ) + async def plugin_recognize(): return await self.async_recognize_help(title=path, org_meta=file_meta) + # 按 config 中设置的识别顺序识别 mediainfo = await self.async_select_recognize_source( - log_name=file_path.name, - log_context=path, - native_fn=native_recognize, - plugin_fn=plugin_recognize - ) + log_name=file_path.name, + log_context=path, + native_fn=native_recognize, + plugin_fn=plugin_recognize, + ) if not mediainfo: - logger.warn(f'{path} 未识别到媒体信息') + logger.warn(f"{path} 未识别到媒体信息") return Context(meta_info=file_meta) - logger.info(f'{path} 识别到媒体信息:{mediainfo.type.value} {mediainfo.title_year}') + logger.info( + f"{path} 识别到媒体信息:{mediainfo.type.value} {mediainfo.title_year}" + ) # 更新媒体图片 await self.async_obtain_images(mediainfo=mediainfo) # 返回上下文 return Context(meta_info=file_meta, media_info=mediainfo) - async def async_search(self, title: str) -> Tuple[Optional[MetaBase], List[MediaInfo]]: + async def async_search( + self, title: str + ) -> Tuple[Optional[MetaBase], List[MediaInfo]]: """ 搜索媒体/人物信息(异步版本) @@ -1242,7 +1466,9 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): :return: 识别元数据,媒体信息列表 """ # 提取要素 - mtype, key_word, season_num, episode_num, year, content = StringUtils.get_keyword(title) + mtype, key_word, season_num, episode_num, year, content = ( + StringUtils.get_keyword(title) + ) # 识别 meta = MetaInfo(content) if not meta.name: @@ -1277,15 +1503,17 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): return None @staticmethod - def _extract_year_from_tmdb(tmdbinfo: dict, season: Optional[int] = None) -> Optional[str]: + def _extract_year_from_tmdb( + tmdbinfo: dict, season: Optional[int] = None + ) -> Optional[str]: """ 从TMDB信息中提取年份 """ year = None - if tmdbinfo.get('release_date'): - year = tmdbinfo['release_date'][:4] - elif tmdbinfo.get('seasons') and season is not None: - for seainfo in tmdbinfo['seasons']: + if tmdbinfo.get("release_date"): + year = tmdbinfo["release_date"][:4] + elif tmdbinfo.get("seasons") and season is not None: + for seainfo in tmdbinfo["seasons"]: season_number = seainfo.get("season_number") if season_number is None: continue @@ -1295,39 +1523,45 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): break return year - def _match_tmdb_with_names(self, meta_names: list, year: Optional[str], - mtype: MediaType, season: Optional[int] = None) -> Optional[dict]: + def _match_tmdb_with_names( + self, + meta_names: list, + year: Optional[str], + mtype: MediaType, + season: Optional[int] = None, + ) -> Optional[dict]: """ 使用名称列表匹配TMDB信息 """ for name in meta_names: tmdbinfo = self.match_tmdbinfo( - name=name, - year=year, - mtype=mtype, - season=season + name=name, year=year, mtype=mtype, season=season ) if tmdbinfo: return tmdbinfo return None - async def _async_match_tmdb_with_names(self, meta_names: list, year: Optional[str], - mtype: MediaType, season: Optional[int] = None) -> Optional[dict]: + async def _async_match_tmdb_with_names( + self, + meta_names: list, + year: Optional[str], + mtype: MediaType, + season: Optional[int] = None, + ) -> Optional[dict]: """ 使用名称列表匹配TMDB信息(异步版本) """ for name in meta_names: tmdbinfo = await self.async_match_tmdbinfo( - name=name, - year=year, - mtype=mtype, - season=season + name=name, year=year, mtype=mtype, season=season ) if tmdbinfo: return tmdbinfo return None - async def async_get_tmdbinfo_by_doubanid(self, doubanid: str, mtype: MediaType = None) -> Optional[dict]: + async def async_get_tmdbinfo_by_doubanid( + self, doubanid: str, mtype: MediaType = None + ) -> Optional[dict]: """ 根据豆瓣ID获取TMDB信息(异步版本) """ @@ -1344,23 +1578,29 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): if doubaninfo.get("year"): meta.year = doubaninfo.get("year") # 处理类型 - if isinstance(doubaninfo.get('media_type'), MediaType): - meta.type = doubaninfo.get('media_type') + if isinstance(doubaninfo.get("media_type"), MediaType): + meta.type = doubaninfo.get("media_type") else: - meta.type = MediaType.MOVIE if doubaninfo.get("type") == "movie" else MediaType.TV + meta.type = ( + MediaType.MOVIE + if doubaninfo.get("type") == "movie" + else MediaType.TV + ) # 匹配TMDB信息 - meta_names = list(dict.fromkeys([k for k in [meta_org.name, - meta.cn_name, - meta.en_name] if k])) + meta_names = list( + dict.fromkeys( + [k for k in [meta_org.name, meta.cn_name, meta.en_name] if k] + ) + ) tmdbinfo = await self._async_match_tmdb_with_names( meta_names=meta_names, year=meta.year, mtype=mtype or meta.type, - season=meta.begin_season + season=meta.begin_season, ) if tmdbinfo: # 合季季后返回 - tmdbinfo['season'] = meta.begin_season + tmdbinfo["season"] = meta.begin_season return tmdbinfo async def async_get_tmdbinfo_by_bangumiid(self, bangumiid: int) -> Optional[dict]: @@ -1378,19 +1618,21 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): # 年份 year = self._extract_year_from_bangumi(bangumiinfo) # 识别TMDB媒体信息 - meta_names = list(dict.fromkeys([k for k in [meta_cn.name, - meta.name] if k])) + meta_names = list( + dict.fromkeys([k for k in [meta_cn.name, meta.name] if k]) + ) tmdbinfo = await self._async_match_tmdb_with_names( meta_names=meta_names, year=year, mtype=MediaType.TV, - season=meta.begin_season + season=meta.begin_season, ) return tmdbinfo return None - async def async_get_doubaninfo_by_tmdbid(self, tmdbid: int, mtype: MediaType = None, - season: Optional[int] = None) -> Optional[dict]: + async def async_get_doubaninfo_by_tmdbid( + self, tmdbid: int, mtype: MediaType = None, season: Optional[int] = None + ) -> Optional[dict]: """ 根据TMDBID获取豆瓣信息(异步版本) """ @@ -1403,10 +1645,7 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): # IMDBID imdbid = tmdbinfo.get("external_ids", {}).get("imdb_id") return await self.async_match_doubaninfo( - name=name, - year=year, - mtype=mtype, - imdbid=imdbid + name=name, year=year, mtype=mtype, imdbid=imdbid ) return None @@ -1425,9 +1664,6 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): year = self._extract_year_from_bangumi(bangumiinfo) # 使用名称识别豆瓣媒体信息 return await self.async_match_doubaninfo( - name=meta.name, - year=year, - mtype=MediaType.TV, - season=meta.begin_season + name=meta.name, year=year, mtype=MediaType.TV, season=meta.begin_season ) return None diff --git a/app/modules/themoviedb/scraper.py b/app/modules/themoviedb/scraper.py index 3d3380fe..85037d39 100644 --- a/app/modules/themoviedb/scraper.py +++ b/app/modules/themoviedb/scraper.py @@ -31,8 +31,13 @@ class TmdbScraper: return TmdbApi(language=mediainfo.original_language) return self.default_tmdb - def get_metadata_nfo(self, meta: MetaBase, mediainfo: MediaInfo, - season: Optional[int] = None, episode: Optional[int] = None) -> Optional[str]: + def get_metadata_nfo( + self, + meta: MetaBase, + mediainfo: MediaInfo, + season: Optional[int] = None, + episode: Optional[int] = None, + ) -> Optional[str]: """ 获取NFO文件内容文本 :param meta: 元数据 @@ -47,17 +52,29 @@ class TmdbScraper: if season is not None: # 查询季信息 if mediainfo.episode_group: - seasoninfo = self.default_tmdb.get_tv_group_detail(mediainfo.episode_group, season=season) + seasoninfo = self.default_tmdb.get_tv_group_detail( + mediainfo.episode_group, season=season + ) else: - seasoninfo = self.default_tmdb.get_tv_season_detail(mediainfo.tmdb_id, season=season) + seasoninfo = self.default_tmdb.get_tv_season_detail( + mediainfo.tmdb_id, season=season + ) if episode: # 集元数据文件 - episodeinfo = self.__get_episode_detail(seasoninfo, meta.begin_episode) - doc = self.__gen_tv_episode_nfo_file(episodeinfo=episodeinfo, tmdbid=mediainfo.tmdb_id, - season=season, episode=episode) + episodeinfo = self.__get_episode_detail( + seasoninfo, meta.begin_episode + ) + doc = self.__gen_tv_episode_nfo_file( + episodeinfo=episodeinfo, + tmdbid=mediainfo.tmdb_id, + season=season, + episode=episode, + ) else: # 季元数据文件 - doc = self.__gen_tv_season_nfo_file(seasoninfo=seasoninfo, season=season) + doc = self.__gen_tv_season_nfo_file( + seasoninfo=seasoninfo, season=season + ) else: # 电视剧元数据文件 doc = self.__gen_tv_nfo_file(mediainfo=mediainfo) @@ -66,8 +83,12 @@ class TmdbScraper: return None - def get_metadata_img(self, mediainfo: MediaInfo, season: Optional[int] = None, - episode: Optional[int] = None) -> dict: + def get_metadata_img( + self, + mediainfo: MediaInfo, + season: Optional[int] = None, + episode: Optional[int] = None, + ) -> dict: """ 获取图片名称和url :param mediainfo: 媒体信息 @@ -80,19 +101,26 @@ class TmdbScraper: if episode: # 集的图片 if mediainfo.episode_group: - seasoninfo = self.original_tmdb(mediainfo).get_tv_group_detail(mediainfo.episode_group, season) + seasoninfo = self.original_tmdb(mediainfo).get_tv_group_detail( + mediainfo.episode_group, season + ) else: - seasoninfo = self.original_tmdb(mediainfo).get_tv_season_detail(mediainfo.tmdb_id, season) + seasoninfo = self.original_tmdb(mediainfo).get_tv_season_detail( + mediainfo.tmdb_id, season + ) if seasoninfo: episodeinfo = self.__get_episode_detail(seasoninfo, episode) if still_path := episodeinfo.get("still_path"): # TMDB集still图片 - still_name = f"episode-thumb-{episode}" + ext = Path(still_path).suffix + still_name = f"episode-thumb{ext}" still_url = settings.TMDB_IMAGE_URL(still_path) images[still_name] = still_url else: # 季的图片 - seasoninfo = self.original_tmdb(mediainfo).get_tv_season_detail(mediainfo.tmdb_id, season) + seasoninfo = self.original_tmdb(mediainfo).get_tv_season_detail( + mediainfo.tmdb_id, season + ) if seasoninfo: # TMDB季poster图片 poster_name, poster_url = self.get_season_poster(seasoninfo, season) @@ -102,21 +130,29 @@ class TmdbScraper: else: # 获取媒体信息中原有图片(TheMovieDb或Fanart) for attr_name, attr_value in vars(mediainfo).items(): - if attr_value \ - and attr_name.endswith("_path") \ - and attr_value \ - and isinstance(attr_value, str) \ - and attr_value.startswith("http"): - image_name = attr_name.replace("_path", "") + Path(attr_value).suffix + if ( + attr_value + and attr_name.endswith("_path") + and attr_value + and isinstance(attr_value, str) + and attr_value.startswith("http") + ): + image_name = ( + attr_name.replace("_path", "") + Path(attr_value).suffix + ) images[image_name] = attr_value # 替换原语言Poster if settings.TMDB_SCRAP_ORIGINAL_IMAGE: - _mediainfo = self.original_tmdb(mediainfo).get_info(mediainfo.type, mediainfo.tmdb_id) + _mediainfo = self.original_tmdb(mediainfo).get_info( + mediainfo.type, mediainfo.tmdb_id + ) if _mediainfo: for attr_name, attr_value in _mediainfo.items(): if attr_name.endswith("_path") and attr_value is not None: image_url = settings.TMDB_IMAGE_URL(attr_value) - image_name = attr_name.replace("_path", "") + Path(image_url).suffix + image_name = ( + attr_name.replace("_path", "") + Path(image_url).suffix + ) images[image_name] = image_url return images @@ -126,7 +162,7 @@ class TmdbScraper: 获取季的海报 """ # TMDB季poster图片 - sea_seq = str(season).rjust(2, '0') + sea_seq = str(season).rjust(2, "0") if poster_path := seasoninfo.get("poster_path"): # 后缀 ext = Path(poster_path).suffix @@ -151,19 +187,25 @@ class TmdbScraper: return {} @staticmethod - def __gen_common_nfo(mediainfo: MediaInfo, doc: minidom.Document, root: minidom.Element): + def __gen_common_nfo( + mediainfo: MediaInfo, doc: minidom.Document, root: minidom.Element + ): """ 生成公共NFO """ # TMDB DomUtils.add_node(doc, root, "tmdbid", mediainfo.tmdb_id or "") - uniqueid_tmdb = DomUtils.add_node(doc, root, "uniqueid", mediainfo.tmdb_id or "") + uniqueid_tmdb = DomUtils.add_node( + doc, root, "uniqueid", mediainfo.tmdb_id or "" + ) uniqueid_tmdb.setAttribute("type", "tmdb") uniqueid_tmdb.setAttribute("default", "true") # TVDB if mediainfo.tvdb_id: DomUtils.add_node(doc, root, "tvdbid", str(mediainfo.tvdb_id)) - uniqueid_tvdb = DomUtils.add_node(doc, root, "uniqueid", str(mediainfo.tvdb_id)) + uniqueid_tvdb = DomUtils.add_node( + doc, root, "uniqueid", str(mediainfo.tvdb_id) + ) uniqueid_tvdb.setAttribute("type", "tvdb") # IMDB if mediainfo.imdb_id: @@ -180,7 +222,9 @@ class TmdbScraper: xoutline.appendChild(doc.createCDATASection(mediainfo.overview or "")) # 导演 for director in mediainfo.directors: - xdirector = DomUtils.add_node(doc, root, "director", director.get("name") or "") + xdirector = DomUtils.add_node( + doc, root, "director", director.get("name") or "" + ) xdirector.setAttribute("tmdbid", str(director.get("id") or "")) # 演员 for actor in mediainfo.actors: @@ -188,12 +232,20 @@ class TmdbScraper: xactor = DomUtils.add_node(doc, root, "actor") DomUtils.add_node(doc, xactor, "name", actor.get("name") or "") DomUtils.add_node(doc, xactor, "type", "Actor") - DomUtils.add_node(doc, xactor, "role", actor.get("character") or actor.get("role") or "") + DomUtils.add_node( + doc, xactor, "role", actor.get("character") or actor.get("role") or "" + ) DomUtils.add_node(doc, xactor, "tmdbid", actor.get("id") or "") - if profile_path := actor.get('profile_path'): - DomUtils.add_node(doc, xactor, "thumb", settings.TMDB_IMAGE_URL(profile_path)) - DomUtils.add_node(doc, xactor, "profile", - f"https://www.themoviedb.org/person/{actor.get('id')}") + if profile_path := actor.get("profile_path"): + DomUtils.add_node( + doc, xactor, "thumb", settings.TMDB_IMAGE_URL(profile_path) + ) + DomUtils.add_node( + doc, + xactor, + "profile", + f"https://www.themoviedb.org/person/{actor.get('id')}", + ) # 风格 genres = mediainfo.genres or [] for genre in genres: @@ -215,9 +267,7 @@ class TmdbScraper: doc = minidom.Document() root = DomUtils.add_node(doc, doc, "movie") # 公共部分 - doc = self.__gen_common_nfo(mediainfo=mediainfo, - doc=doc, - root=root) + doc = self.__gen_common_nfo(mediainfo=mediainfo, doc=doc, root=root) # 标题 DomUtils.add_node(doc, root, "title", mediainfo.title or "") DomUtils.add_node(doc, root, "originaltitle", mediainfo.original_title or "") @@ -236,9 +286,7 @@ class TmdbScraper: doc = minidom.Document() root = DomUtils.add_node(doc, doc, "tvshow") # 公共部分 - doc = self.__gen_common_nfo(mediainfo=mediainfo, - doc=doc, - root=root) + doc = self.__gen_common_nfo(mediainfo=mediainfo, doc=doc, root=root) # 标题 DomUtils.add_node(doc, root, "title", mediainfo.title or "") DomUtils.add_node(doc, root, "originaltitle", mediainfo.original_title or "") @@ -266,22 +314,27 @@ class TmdbScraper: xoutline = DomUtils.add_node(doc, root, "outline") xoutline.appendChild(doc.createCDATASection(seasoninfo.get("overview") or "")) # 标题 - DomUtils.add_node(doc, root, "title", seasoninfo.get("name") or "季 %s" % season) + DomUtils.add_node( + doc, root, "title", seasoninfo.get("name") or "季 %s" % season + ) # 发行日期 DomUtils.add_node(doc, root, "premiered", seasoninfo.get("air_date") or "") DomUtils.add_node(doc, root, "releasedate", seasoninfo.get("air_date") or "") # 发行年份 - DomUtils.add_node(doc, root, "year", - seasoninfo.get("air_date")[:4] if seasoninfo.get("air_date") else "") + DomUtils.add_node( + doc, + root, + "year", + seasoninfo.get("air_date")[:4] if seasoninfo.get("air_date") else "", + ) # seasonnumber DomUtils.add_node(doc, root, "seasonnumber", str(season)) return doc @staticmethod - def __gen_tv_episode_nfo_file(tmdbid: int, - episodeinfo: dict, - season: int, - episode: int) -> minidom.Document: + def __gen_tv_episode_nfo_file( + tmdbid: int, episodeinfo: dict, season: int, episode: int + ) -> minidom.Document: """ 生成电视剧集的NFO描述文件 :param tmdbid: TMDBID @@ -300,7 +353,9 @@ class TmdbScraper: # 应与uniqueid一致 使用剧集id 否则jellyfin/emby会将此id覆盖上面的uniqueid DomUtils.add_node(doc, root, "tmdbid", str(episodeinfo.get("id"))) # 标题 - DomUtils.add_node(doc, root, "title", episodeinfo.get("name") or "第 %s 集" % episode) + DomUtils.add_node( + doc, root, "title", episodeinfo.get("name") or "第 %s 集" % episode + ) # 简介 xplot = DomUtils.add_node(doc, root, "plot") xplot.appendChild(doc.createCDATASection(episodeinfo.get("overview") or "")) @@ -309,8 +364,12 @@ class TmdbScraper: # 发布日期 DomUtils.add_node(doc, root, "aired", episodeinfo.get("air_date") or "") # 年份 - DomUtils.add_node(doc, root, "year", - episodeinfo.get("air_date")[:4] if episodeinfo.get("air_date") else "") + DomUtils.add_node( + doc, + root, + "year", + episodeinfo.get("air_date")[:4] if episodeinfo.get("air_date") else "", + ) # 季 DomUtils.add_node(doc, root, "season", str(season)) # 集 @@ -321,7 +380,9 @@ class TmdbScraper: directors = episodeinfo.get("crew") or [] for director in directors: if director.get("known_for_department") == "Directing": - xdirector = DomUtils.add_node(doc, root, "director", director.get("name") or "") + xdirector = DomUtils.add_node( + doc, root, "director", director.get("name") or "" + ) xdirector.setAttribute("tmdbid", str(director.get("id") or "")) # 演员 actors = episodeinfo.get("guest_stars") or [] @@ -331,8 +392,14 @@ class TmdbScraper: DomUtils.add_node(doc, xactor, "name", actor.get("name") or "") DomUtils.add_node(doc, xactor, "type", "Actor") DomUtils.add_node(doc, xactor, "tmdbid", actor.get("id") or "") - if profile_path := actor.get('profile_path'): - DomUtils.add_node(doc, xactor, "thumb", settings.TMDB_IMAGE_URL(profile_path)) - DomUtils.add_node(doc, xactor, "profile", - f"https://www.themoviedb.org/person/{actor.get('id')}") + if profile_path := actor.get("profile_path"): + DomUtils.add_node( + doc, xactor, "thumb", settings.TMDB_IMAGE_URL(profile_path) + ) + DomUtils.add_node( + doc, + xactor, + "profile", + f"https://www.themoviedb.org/person/{actor.get('id')}", + ) return doc