From 109164b673258963eee45b34f91fc746e2bc22e0 Mon Sep 17 00:00:00 2001 From: jxxghp Date: Thu, 31 Jul 2025 20:51:39 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9A=E5=8D=8F=E7=A8=8B=E6=90=9C?= =?UTF-8?q?=E7=B4=A2=20part1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/api/endpoints/search.py | 102 ++++++++-------- app/chain/__init__.py | 27 +++++ app/chain/media.py | 228 +++++++++++++++++++++++++++++------- app/chain/search.py | 71 +++++++++++ 4 files changed, 338 insertions(+), 90 deletions(-) diff --git a/app/api/endpoints/search.py b/app/api/endpoints/search.py index d69d45a1..ec6a9e90 100644 --- a/app/api/endpoints/search.py +++ b/app/api/endpoints/search.py @@ -16,23 +16,23 @@ router = APIRouter() @router.get("/last", summary="查询搜索结果", response_model=List[schemas.Context]) -def search_latest(_: schemas.TokenPayload = Depends(verify_token)) -> Any: +async def search_latest(_: schemas.TokenPayload = Depends(verify_token)) -> Any: """ 查询搜索结果 """ - torrents = SearchChain().last_search_results() + torrents = await SearchChain().async_last_search_results() return [torrent.to_dict() for torrent in torrents] @router.get("/media/{mediaid}", summary="精确搜索资源", response_model=schemas.Response) -def search_by_id(mediaid: str, - mtype: Optional[str] = None, - area: Optional[str] = "title", - title: Optional[str] = None, - year: Optional[str] = None, - season: Optional[str] = None, - sites: Optional[str] = None, - _: schemas.TokenPayload = Depends(verify_token)) -> Any: +async def search_by_id(mediaid: str, + mtype: Optional[str] = None, + area: Optional[str] = "title", + title: Optional[str] = None, + year: Optional[str] = None, + season: Optional[str] = None, + sites: Optional[str] = None, + _: schemas.TokenPayload = Depends(verify_token)) -> Any: """ 根据TMDBID/豆瓣ID精确搜索站点资源 tmdb:/douban:/bangumi: """ @@ -49,55 +49,59 @@ def search_by_id(mediaid: str, else: site_list = None torrents = None + media_chain = MediaChain() + search_chain = SearchChain() # 根据前缀识别媒体ID if mediaid.startswith("tmdb:"): tmdbid = int(mediaid.replace("tmdb:", "")) if settings.RECOGNIZE_SOURCE == "douban": # 通过TMDBID识别豆瓣ID - doubaninfo = MediaChain().get_doubaninfo_by_tmdbid(tmdbid=tmdbid, mtype=media_type) + doubaninfo = await media_chain.async_get_doubaninfo_by_tmdbid(tmdbid=tmdbid, mtype=media_type) if doubaninfo: - torrents = SearchChain().search_by_id(doubanid=doubaninfo.get("id"), - mtype=media_type, area=area, season=media_season, - sites=site_list, cache_local=True) + torrents = await search_chain.async_search_by_id(doubanid=doubaninfo.get("id"), + mtype=media_type, area=area, season=media_season, + sites=site_list, cache_local=True) else: return schemas.Response(success=False, message="未识别到豆瓣媒体信息") else: - torrents = SearchChain().search_by_id(tmdbid=tmdbid, mtype=media_type, area=area, season=media_season, - sites=site_list, cache_local=True) + torrents = await search_chain.async_search_by_id(tmdbid=tmdbid, mtype=media_type, area=area, + season=media_season, + sites=site_list, cache_local=True) elif mediaid.startswith("douban:"): doubanid = mediaid.replace("douban:", "") if settings.RECOGNIZE_SOURCE == "themoviedb": # 通过豆瓣ID识别TMDBID - tmdbinfo = MediaChain().get_tmdbinfo_by_doubanid(doubanid=doubanid, mtype=media_type) + tmdbinfo = await media_chain.async_get_tmdbinfo_by_doubanid(doubanid=doubanid, mtype=media_type) if tmdbinfo: if tmdbinfo.get('season') and not media_season: media_season = tmdbinfo.get('season') - torrents = SearchChain().search_by_id(tmdbid=tmdbinfo.get("id"), - mtype=media_type, area=area, season=media_season, - sites=site_list, cache_local=True) + torrents = await search_chain.async_search_by_id(tmdbid=tmdbinfo.get("id"), + mtype=media_type, area=area, season=media_season, + sites=site_list, cache_local=True) else: return schemas.Response(success=False, message="未识别到TMDB媒体信息") else: - torrents = SearchChain().search_by_id(doubanid=doubanid, mtype=media_type, area=area, season=media_season, - sites=site_list, cache_local=True) + torrents = await search_chain.async_search_by_id(doubanid=doubanid, mtype=media_type, area=area, + season=media_season, + sites=site_list, cache_local=True) elif mediaid.startswith("bangumi:"): bangumiid = int(mediaid.replace("bangumi:", "")) if settings.RECOGNIZE_SOURCE == "themoviedb": # 通过BangumiID识别TMDBID - tmdbinfo = MediaChain().get_tmdbinfo_by_bangumiid(bangumiid=bangumiid) + tmdbinfo = await media_chain.async_get_tmdbinfo_by_bangumiid(bangumiid=bangumiid) if tmdbinfo: - torrents = SearchChain().search_by_id(tmdbid=tmdbinfo.get("id"), - mtype=media_type, area=area, season=media_season, - sites=site_list, cache_local=True) + torrents = await search_chain.async_search_by_id(tmdbid=tmdbinfo.get("id"), + mtype=media_type, area=area, season=media_season, + sites=site_list, cache_local=True) else: return schemas.Response(success=False, message="未识别到TMDB媒体信息") else: # 通过BangumiID识别豆瓣ID - doubaninfo = MediaChain().get_doubaninfo_by_bangumiid(bangumiid=bangumiid) + doubaninfo = await media_chain.async_get_doubaninfo_by_bangumiid(bangumiid=bangumiid) if doubaninfo: - torrents = SearchChain().search_by_id(doubanid=doubaninfo.get("id"), - mtype=media_type, area=area, season=media_season, - sites=site_list, cache_local=True) + torrents = await search_chain.async_search_by_id(doubanid=doubaninfo.get("id"), + mtype=media_type, area=area, season=media_season, + sites=site_list, cache_local=True) else: return schemas.Response(success=False, message="未识别到豆瓣媒体信息") else: @@ -106,18 +110,18 @@ def search_by_id(mediaid: str, mediaid=mediaid, convert_type=settings.RECOGNIZE_SOURCE ) - event = eventmanager.send_event(ChainEventType.MediaRecognizeConvert, event_data) + event = await eventmanager.async_send_event(ChainEventType.MediaRecognizeConvert, event_data) # 使用事件返回的上下文数据 if event and event.event_data: event_data: MediaRecognizeConvertEventData = event.event_data if event_data.media_dict: search_id = event_data.media_dict.get("id") if event_data.convert_type == "themoviedb": - torrents = SearchChain().search_by_id(tmdbid=search_id, mtype=media_type, area=area, - season=media_season, cache_local=True) + torrents = await search_chain.async_search_by_id(tmdbid=search_id, mtype=media_type, area=area, + season=media_season, cache_local=True) elif event_data.convert_type == "douban": - torrents = SearchChain().search_by_id(doubanid=search_id, mtype=media_type, area=area, - season=media_season, cache_local=True) + torrents = await search_chain.async_search_by_id(doubanid=search_id, mtype=media_type, area=area, + season=media_season, cache_local=True) else: if not title: return schemas.Response(success=False, message="未知的媒体ID") @@ -130,14 +134,16 @@ def search_by_id(mediaid: str, if media_season: meta.type = MediaType.TV meta.begin_season = media_season - mediainfo = MediaChain().recognize_media(meta=meta) + mediainfo = await media_chain.async_recognize_media(meta=meta) if mediainfo: if settings.RECOGNIZE_SOURCE == "themoviedb": - torrents = SearchChain().search_by_id(tmdbid=mediainfo.tmdb_id, mtype=media_type, area=area, - season=media_season, cache_local=True) + torrents = await search_chain.async_search_by_id(tmdbid=mediainfo.tmdb_id, mtype=media_type, + area=area, + season=media_season, cache_local=True) else: - torrents = SearchChain().search_by_id(doubanid=mediainfo.douban_id, mtype=media_type, area=area, - season=media_season, cache_local=True) + torrents = await search_chain.async_search_by_id(doubanid=mediainfo.douban_id, mtype=media_type, + area=area, + season=media_season, cache_local=True) # 返回搜索结果 if not torrents: return schemas.Response(success=False, message="未搜索到任何资源") @@ -146,16 +152,18 @@ def search_by_id(mediaid: str, @router.get("/title", summary="模糊搜索资源", response_model=schemas.Response) -def search_by_title(keyword: Optional[str] = None, - page: Optional[int] = 0, - sites: Optional[str] = None, - _: schemas.TokenPayload = Depends(verify_token)) -> Any: +async def search_by_title(keyword: Optional[str] = None, + page: Optional[int] = 0, + sites: Optional[str] = None, + _: schemas.TokenPayload = Depends(verify_token)) -> Any: """ 根据名称模糊搜索站点资源,支持分页,关键词为空是返回首页资源 """ - torrents = SearchChain().search_by_title(title=keyword, page=page, - sites=[int(site) for site in sites.split(",") if site] if sites else None, - cache_local=True) + torrents = await SearchChain().async_search_by_title( + title=keyword, page=page, + sites=[int(site) for site in sites.split(",") if site] if sites else None, + cache_local=True + ) if not torrents: return schemas.Response(success=False, message="未搜索到任何资源") return schemas.Response(success=True, data=[torrent.to_dict() for torrent in torrents]) diff --git a/app/chain/__init__.py b/app/chain/__init__.py index bdae04d7..5104cb6c 100644 --- a/app/chain/__init__.py +++ b/app/chain/__init__.py @@ -7,6 +7,7 @@ from collections.abc import Callable from pathlib import Path from typing import Optional, Any, Tuple, List, Set, Union, Dict +import aiofiles from qbittorrentapi import TorrentFilesList from transmission_rpc import File @@ -59,6 +60,32 @@ class ChainBase(metaclass=ABCMeta): logger.error(f"加载缓存 {filename} 出错:{str(err)}") return None + @staticmethod + async def async_load_cache(filename: str) -> Any: + """ + 异步从本地加载缓存 + """ + cache_path = settings.TEMP_PATH / filename + if cache_path.exists(): + try: + async with aiofiles.open(cache_path, 'rb') as f: + content = await f.read() + return pickle.loads(content) + except Exception as err: + logger.error(f"加载缓存 {filename} 出错:{str(err)}") + return None + + @staticmethod + async def async_save_cache(cache: Any, filename: str) -> None: + """ + 异步保存缓存到本地 + """ + try: + async with aiofiles.open(settings.TEMP_PATH / filename, 'wb') as f: + await f.write(pickle.dumps(cache)) + except Exception as err: + logger.error(f"保存缓存 {filename} 出错:{str(err)}") + @staticmethod def save_cache(cache: Any, filename: str) -> None: """ diff --git a/app/chain/media.py b/app/chain/media.py index 2abd7017..2d4cf352 100644 --- a/app/chain/media.py +++ b/app/chain/media.py @@ -230,17 +230,15 @@ class MediaChain(ChainBase): meta_names = list(dict.fromkeys([k for k in [meta_org.name, meta.cn_name, meta.en_name] if k])) - for name in meta_names: - tmdbinfo = self.match_tmdbinfo( - name=name, - year=meta.year, - mtype=mtype or meta.type, - season=meta.begin_season - ) - if tmdbinfo: - # 合季季后返回 - tmdbinfo['season'] = meta.begin_season - break + tmdbinfo = self._match_tmdb_with_names( + meta_names=meta_names, + year=meta.year, + mtype=mtype or meta.type, + season=meta.begin_season + ) + if tmdbinfo: + # 合季季后返回 + tmdbinfo['season'] = meta.begin_season return tmdbinfo def get_tmdbinfo_by_bangumiid(self, bangumiid: int) -> Optional[dict]: @@ -256,23 +254,17 @@ class MediaChain(ChainBase): else: meta_cn = meta = MetaInfo(title=bangumiinfo.get("name")) # 年份 - release_date = bangumiinfo.get("date") or bangumiinfo.get("air_date") - if release_date: - year = release_date[:4] - else: - year = None + year = self._extract_year_from_bangumi(bangumiinfo) # 识别TMDB媒体信息 meta_names = list(dict.fromkeys([k for k in [meta_cn.name, meta.name] if k])) - for name in meta_names: - tmdbinfo = self.match_tmdbinfo( - name=name, - year=year, - mtype=MediaType.TV, - season=meta.begin_season - ) - if tmdbinfo: - return tmdbinfo + tmdbinfo = self._match_tmdb_with_names( + meta_names=meta_names, + year=year, + mtype=MediaType.TV, + season=meta.begin_season + ) + return tmdbinfo return None def get_doubaninfo_by_tmdbid(self, tmdbid: int, @@ -285,19 +277,7 @@ class MediaChain(ChainBase): # 名称 name = tmdbinfo.get("title") or tmdbinfo.get("name") # 年份 - year = None - if tmdbinfo.get('release_date'): - year = tmdbinfo['release_date'][:4] - elif tmdbinfo.get('seasons') and season: - for seainfo in tmdbinfo['seasons']: - # 季 - season_number = seainfo.get("season_number") - if not season_number: - continue - air_date = seainfo.get("air_date") - if air_date and season_number == season: - year = air_date[:4] - break + year = self._extract_year_from_tmdb(tmdbinfo, season) # IMDBID imdbid = tmdbinfo.get("external_ids", {}).get("imdb_id") return self.match_doubaninfo( @@ -320,11 +300,7 @@ class MediaChain(ChainBase): else: meta = MetaInfo(title=bangumiinfo.get("name")) # 年份 - release_date = bangumiinfo.get("date") or bangumiinfo.get("air_date") - if release_date: - year = release_date[:4] - else: - year = None + year = self._extract_year_from_bangumi(bangumiinfo) # 使用名称识别豆瓣媒体信息 return self.match_doubaninfo( name=meta.name, @@ -889,3 +865,169 @@ class MediaChain(ChainBase): logger.info(f"{content} 搜索到 {len(medias)} 条相关媒体信息") # 识别的元数据,媒体信息列表 return meta, medias + + @staticmethod + def _extract_year_from_bangumi(bangumiinfo: dict) -> Optional[str]: + """ + 从Bangumi信息中提取年份 + """ + release_date = bangumiinfo.get("date") or bangumiinfo.get("air_date") + if release_date: + return release_date[:4] + return None + + @staticmethod + def _extract_year_from_tmdb(tmdbinfo: dict, season: Optional[int] = None) -> Optional[str]: + """ + 从TMDB信息中提取年份 + """ + year = None + if tmdbinfo.get('release_date'): + year = tmdbinfo['release_date'][:4] + elif tmdbinfo.get('seasons') and season: + for seainfo in tmdbinfo['seasons']: + season_number = seainfo.get("season_number") + if not season_number: + continue + air_date = seainfo.get("air_date") + if air_date and season_number == season: + year = air_date[:4] + break + return year + + def _match_tmdb_with_names(self, meta_names: list, year: Optional[str], + mtype: MediaType, season: Optional[int] = None) -> Optional[dict]: + """ + 使用名称列表匹配TMDB信息 + """ + for name in meta_names: + tmdbinfo = self.match_tmdbinfo( + name=name, + year=year, + mtype=mtype, + season=season + ) + if tmdbinfo: + return tmdbinfo + return None + + async def _async_match_tmdb_with_names(self, meta_names: list, year: Optional[str], + mtype: MediaType, season: Optional[int] = None) -> Optional[dict]: + """ + 使用名称列表匹配TMDB信息(异步版本) + """ + for name in meta_names: + tmdbinfo = await self.async_match_tmdbinfo( + name=name, + year=year, + mtype=mtype, + season=season + ) + if tmdbinfo: + return tmdbinfo + return None + + async def async_get_tmdbinfo_by_doubanid(self, doubanid: str, mtype: MediaType = None) -> Optional[dict]: + """ + 根据豆瓣ID获取TMDB信息(异步版本) + """ + tmdbinfo = None + doubaninfo = await self.async_douban_info(doubanid=doubanid, mtype=mtype) + if doubaninfo: + # 优先使用原标题匹配 + if doubaninfo.get("original_title"): + meta = MetaInfo(title=doubaninfo.get("title")) + meta_org = MetaInfo(title=doubaninfo.get("original_title")) + else: + meta_org = meta = MetaInfo(title=doubaninfo.get("title")) + # 年份 + if doubaninfo.get("year"): + meta.year = doubaninfo.get("year") + # 处理类型 + if isinstance(doubaninfo.get('media_type'), MediaType): + meta.type = doubaninfo.get('media_type') + else: + meta.type = MediaType.MOVIE if doubaninfo.get("type") == "movie" else MediaType.TV + # 匹配TMDB信息 + meta_names = list(dict.fromkeys([k for k in [meta_org.name, + meta.cn_name, + meta.en_name] if k])) + tmdbinfo = await self._async_match_tmdb_with_names( + meta_names=meta_names, + year=meta.year, + mtype=mtype or meta.type, + season=meta.begin_season + ) + if tmdbinfo: + # 合季季后返回 + tmdbinfo['season'] = meta.begin_season + return tmdbinfo + + async def async_get_tmdbinfo_by_bangumiid(self, bangumiid: int) -> Optional[dict]: + """ + 根据BangumiID获取TMDB信息(异步版本) + """ + bangumiinfo = await self.async_bangumi_info(bangumiid=bangumiid) + if bangumiinfo: + # 优先使用原标题匹配 + if bangumiinfo.get("name_cn"): + meta = MetaInfo(title=bangumiinfo.get("name")) + meta_cn = MetaInfo(title=bangumiinfo.get("name_cn")) + else: + meta_cn = meta = MetaInfo(title=bangumiinfo.get("name")) + # 年份 + year = self._extract_year_from_bangumi(bangumiinfo) + # 识别TMDB媒体信息 + meta_names = list(dict.fromkeys([k for k in [meta_cn.name, + meta.name] if k])) + tmdbinfo = await self._async_match_tmdb_with_names( + meta_names=meta_names, + year=year, + mtype=MediaType.TV, + season=meta.begin_season + ) + return tmdbinfo + return None + + async def async_get_doubaninfo_by_tmdbid(self, tmdbid: int, mtype: MediaType = None, + season: Optional[int] = None) -> Optional[dict]: + """ + 根据TMDBID获取豆瓣信息(异步版本) + """ + tmdbinfo = await self.async_tmdb_info(tmdbid=tmdbid, mtype=mtype) + if tmdbinfo: + # 名称 + name = tmdbinfo.get("title") or tmdbinfo.get("name") + # 年份 + year = self._extract_year_from_tmdb(tmdbinfo, season) + # IMDBID + imdbid = tmdbinfo.get("external_ids", {}).get("imdb_id") + return await self.async_match_doubaninfo( + name=name, + year=year, + mtype=mtype, + imdbid=imdbid + ) + return None + + async def async_get_doubaninfo_by_bangumiid(self, bangumiid: int) -> Optional[dict]: + """ + 根据BangumiID获取豆瓣信息(异步版本) + """ + bangumiinfo = await self.async_bangumi_info(bangumiid=bangumiid) + if bangumiinfo: + # 优先使用中文标题匹配 + if bangumiinfo.get("name_cn"): + meta = MetaInfo(title=bangumiinfo.get("name_cn")) + else: + meta = MetaInfo(title=bangumiinfo.get("name")) + # 年份 + year = self._extract_year_from_bangumi(bangumiinfo) + # 使用名称识别豆瓣媒体信息 + return await self.async_match_doubaninfo( + name=meta.name, + year=year, + mtype=MediaType.TV, + season=meta.begin_season + ) + return None diff --git a/app/chain/search.py b/app/chain/search.py index e4b32f58..105f35e9 100644 --- a/app/chain/search.py +++ b/app/chain/search.py @@ -97,6 +97,77 @@ class SearchChain(ChainBase): logger.error(f'加载搜索结果失败:{str(e)} - {traceback.format_exc()}') return [] + async def async_last_search_results(self) -> List[Context]: + """ + 异步获取上次搜索结果 + """ + # 读取本地文件缓存 + content = await self.async_load_cache(self.__result_temp_file) + if not content: + return [] + try: + return pickle.loads(content) + except Exception as e: + logger.error(f'加载搜索结果失败:{str(e)} - {traceback.format_exc()}') + return [] + + async def async_search_by_id(self, tmdbid: Optional[int] = None, doubanid: Optional[str] = None, + mtype: MediaType = None, area: Optional[str] = "title", season: Optional[int] = None, + sites: List[int] = None, cache_local: bool = False) -> List[Context]: + """ + 根据TMDBID/豆瓣ID异步搜索资源,精确匹配,不过滤本地存在的资源 + :param tmdbid: TMDB ID + :param doubanid: 豆瓣 ID + :param mtype: 媒体,电影 or 电视剧 + :param area: 搜索范围,title or imdbid + :param season: 季数 + :param sites: 站点ID列表 + :param cache_local: 是否缓存到本地 + """ + mediainfo = await self.async_recognize_media(tmdbid=tmdbid, doubanid=doubanid, mtype=mtype) + if not mediainfo: + logger.error(f'{tmdbid} 媒体信息识别失败!') + return [] + no_exists = None + if season: + no_exists = { + tmdbid or doubanid: { + season: NotExistMediaInfo(episodes=[]) + } + } + # TODO async + results = self.process(mediainfo=mediainfo, sites=sites, area=area, no_exists=no_exists) + # 保存到本地文件 + if cache_local: + await self.async_save_cache(pickle.dumps(results), self.__result_temp_file) + return results + + async def async_search_by_title(self, title: str, page: Optional[int] = 0, + sites: List[int] = None, cache_local: Optional[bool] = False) -> List[Context]: + """ + 根据标题异步搜索资源,不识别不过滤,直接返回站点内容 + :param title: 标题,为空时返回所有站点首页内容 + :param page: 页码 + :param sites: 站点ID列表 + :param cache_local: 是否缓存到本地 + """ + if title: + logger.info(f'开始搜索资源,关键词:{title} ...') + else: + logger.info(f'开始浏览资源,站点:{sites} ...') + # 搜索 TODO async + torrents = self.__search_all_sites(keywords=[title], sites=sites, page=page) or [] + if not torrents: + logger.warn(f'{title} 未搜索到资源') + return [] + # 组装上下文 + contexts = [Context(meta_info=MetaInfo(title=torrent.title, subtitle=torrent.description), + torrent_info=torrent) for torrent in torrents] + # 保存到本地文件 + if cache_local: + await self.async_save_cache(pickle.dumps(contexts), self.__result_temp_file) + return contexts + def process(self, mediainfo: MediaInfo, keyword: Optional[str] = None, no_exists: Dict[int, Dict[int, NotExistMediaInfo]] = None,