feat:协程搜索 part1

This commit is contained in:
jxxghp
2025-07-31 20:51:39 +08:00
parent 673a03e656
commit 109164b673
4 changed files with 338 additions and 90 deletions

View File

@@ -16,23 +16,23 @@ router = APIRouter()
@router.get("/last", summary="查询搜索结果", response_model=List[schemas.Context])
def search_latest(_: schemas.TokenPayload = Depends(verify_token)) -> Any:
async def search_latest(_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
查询搜索结果
"""
torrents = SearchChain().last_search_results()
torrents = await SearchChain().async_last_search_results()
return [torrent.to_dict() for torrent in torrents]
@router.get("/media/{mediaid}", summary="精确搜索资源", response_model=schemas.Response)
def search_by_id(mediaid: str,
mtype: Optional[str] = None,
area: Optional[str] = "title",
title: Optional[str] = None,
year: Optional[str] = None,
season: Optional[str] = None,
sites: Optional[str] = None,
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
async def search_by_id(mediaid: str,
mtype: Optional[str] = None,
area: Optional[str] = "title",
title: Optional[str] = None,
year: Optional[str] = None,
season: Optional[str] = None,
sites: Optional[str] = None,
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
根据TMDBID/豆瓣ID精确搜索站点资源 tmdb:/douban:/bangumi:
"""
@@ -49,55 +49,59 @@ def search_by_id(mediaid: str,
else:
site_list = None
torrents = None
media_chain = MediaChain()
search_chain = SearchChain()
# 根据前缀识别媒体ID
if mediaid.startswith("tmdb:"):
tmdbid = int(mediaid.replace("tmdb:", ""))
if settings.RECOGNIZE_SOURCE == "douban":
# 通过TMDBID识别豆瓣ID
doubaninfo = MediaChain().get_doubaninfo_by_tmdbid(tmdbid=tmdbid, mtype=media_type)
doubaninfo = await media_chain.async_get_doubaninfo_by_tmdbid(tmdbid=tmdbid, mtype=media_type)
if doubaninfo:
torrents = SearchChain().search_by_id(doubanid=doubaninfo.get("id"),
mtype=media_type, area=area, season=media_season,
sites=site_list, cache_local=True)
torrents = await search_chain.async_search_by_id(doubanid=doubaninfo.get("id"),
mtype=media_type, area=area, season=media_season,
sites=site_list, cache_local=True)
else:
return schemas.Response(success=False, message="未识别到豆瓣媒体信息")
else:
torrents = SearchChain().search_by_id(tmdbid=tmdbid, mtype=media_type, area=area, season=media_season,
sites=site_list, cache_local=True)
torrents = await search_chain.async_search_by_id(tmdbid=tmdbid, mtype=media_type, area=area,
season=media_season,
sites=site_list, cache_local=True)
elif mediaid.startswith("douban:"):
doubanid = mediaid.replace("douban:", "")
if settings.RECOGNIZE_SOURCE == "themoviedb":
# 通过豆瓣ID识别TMDBID
tmdbinfo = MediaChain().get_tmdbinfo_by_doubanid(doubanid=doubanid, mtype=media_type)
tmdbinfo = await media_chain.async_get_tmdbinfo_by_doubanid(doubanid=doubanid, mtype=media_type)
if tmdbinfo:
if tmdbinfo.get('season') and not media_season:
media_season = tmdbinfo.get('season')
torrents = SearchChain().search_by_id(tmdbid=tmdbinfo.get("id"),
mtype=media_type, area=area, season=media_season,
sites=site_list, cache_local=True)
torrents = await search_chain.async_search_by_id(tmdbid=tmdbinfo.get("id"),
mtype=media_type, area=area, season=media_season,
sites=site_list, cache_local=True)
else:
return schemas.Response(success=False, message="未识别到TMDB媒体信息")
else:
torrents = SearchChain().search_by_id(doubanid=doubanid, mtype=media_type, area=area, season=media_season,
sites=site_list, cache_local=True)
torrents = await search_chain.async_search_by_id(doubanid=doubanid, mtype=media_type, area=area,
season=media_season,
sites=site_list, cache_local=True)
elif mediaid.startswith("bangumi:"):
bangumiid = int(mediaid.replace("bangumi:", ""))
if settings.RECOGNIZE_SOURCE == "themoviedb":
# 通过BangumiID识别TMDBID
tmdbinfo = MediaChain().get_tmdbinfo_by_bangumiid(bangumiid=bangumiid)
tmdbinfo = await media_chain.async_get_tmdbinfo_by_bangumiid(bangumiid=bangumiid)
if tmdbinfo:
torrents = SearchChain().search_by_id(tmdbid=tmdbinfo.get("id"),
mtype=media_type, area=area, season=media_season,
sites=site_list, cache_local=True)
torrents = await search_chain.async_search_by_id(tmdbid=tmdbinfo.get("id"),
mtype=media_type, area=area, season=media_season,
sites=site_list, cache_local=True)
else:
return schemas.Response(success=False, message="未识别到TMDB媒体信息")
else:
# 通过BangumiID识别豆瓣ID
doubaninfo = MediaChain().get_doubaninfo_by_bangumiid(bangumiid=bangumiid)
doubaninfo = await media_chain.async_get_doubaninfo_by_bangumiid(bangumiid=bangumiid)
if doubaninfo:
torrents = SearchChain().search_by_id(doubanid=doubaninfo.get("id"),
mtype=media_type, area=area, season=media_season,
sites=site_list, cache_local=True)
torrents = await search_chain.async_search_by_id(doubanid=doubaninfo.get("id"),
mtype=media_type, area=area, season=media_season,
sites=site_list, cache_local=True)
else:
return schemas.Response(success=False, message="未识别到豆瓣媒体信息")
else:
@@ -106,18 +110,18 @@ def search_by_id(mediaid: str,
mediaid=mediaid,
convert_type=settings.RECOGNIZE_SOURCE
)
event = eventmanager.send_event(ChainEventType.MediaRecognizeConvert, event_data)
event = await eventmanager.async_send_event(ChainEventType.MediaRecognizeConvert, event_data)
# 使用事件返回的上下文数据
if event and event.event_data:
event_data: MediaRecognizeConvertEventData = event.event_data
if event_data.media_dict:
search_id = event_data.media_dict.get("id")
if event_data.convert_type == "themoviedb":
torrents = SearchChain().search_by_id(tmdbid=search_id, mtype=media_type, area=area,
season=media_season, cache_local=True)
torrents = await search_chain.async_search_by_id(tmdbid=search_id, mtype=media_type, area=area,
season=media_season, cache_local=True)
elif event_data.convert_type == "douban":
torrents = SearchChain().search_by_id(doubanid=search_id, mtype=media_type, area=area,
season=media_season, cache_local=True)
torrents = await search_chain.async_search_by_id(doubanid=search_id, mtype=media_type, area=area,
season=media_season, cache_local=True)
else:
if not title:
return schemas.Response(success=False, message="未知的媒体ID")
@@ -130,14 +134,16 @@ def search_by_id(mediaid: str,
if media_season:
meta.type = MediaType.TV
meta.begin_season = media_season
mediainfo = MediaChain().recognize_media(meta=meta)
mediainfo = await media_chain.async_recognize_media(meta=meta)
if mediainfo:
if settings.RECOGNIZE_SOURCE == "themoviedb":
torrents = SearchChain().search_by_id(tmdbid=mediainfo.tmdb_id, mtype=media_type, area=area,
season=media_season, cache_local=True)
torrents = await search_chain.async_search_by_id(tmdbid=mediainfo.tmdb_id, mtype=media_type,
area=area,
season=media_season, cache_local=True)
else:
torrents = SearchChain().search_by_id(doubanid=mediainfo.douban_id, mtype=media_type, area=area,
season=media_season, cache_local=True)
torrents = await search_chain.async_search_by_id(doubanid=mediainfo.douban_id, mtype=media_type,
area=area,
season=media_season, cache_local=True)
# 返回搜索结果
if not torrents:
return schemas.Response(success=False, message="未搜索到任何资源")
@@ -146,16 +152,18 @@ def search_by_id(mediaid: str,
@router.get("/title", summary="模糊搜索资源", response_model=schemas.Response)
def search_by_title(keyword: Optional[str] = None,
page: Optional[int] = 0,
sites: Optional[str] = None,
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
async def search_by_title(keyword: Optional[str] = None,
page: Optional[int] = 0,
sites: Optional[str] = None,
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
根据名称模糊搜索站点资源,支持分页,关键词为空是返回首页资源
"""
torrents = SearchChain().search_by_title(title=keyword, page=page,
sites=[int(site) for site in sites.split(",") if site] if sites else None,
cache_local=True)
torrents = await SearchChain().async_search_by_title(
title=keyword, page=page,
sites=[int(site) for site in sites.split(",") if site] if sites else None,
cache_local=True
)
if not torrents:
return schemas.Response(success=False, message="未搜索到任何资源")
return schemas.Response(success=True, data=[torrent.to_dict() for torrent in torrents])

View File

@@ -7,6 +7,7 @@ from collections.abc import Callable
from pathlib import Path
from typing import Optional, Any, Tuple, List, Set, Union, Dict
import aiofiles
from qbittorrentapi import TorrentFilesList
from transmission_rpc import File
@@ -59,6 +60,32 @@ class ChainBase(metaclass=ABCMeta):
logger.error(f"加载缓存 {filename} 出错:{str(err)}")
return None
@staticmethod
async def async_load_cache(filename: str) -> Any:
"""
异步从本地加载缓存
"""
cache_path = settings.TEMP_PATH / filename
if cache_path.exists():
try:
async with aiofiles.open(cache_path, 'rb') as f:
content = await f.read()
return pickle.loads(content)
except Exception as err:
logger.error(f"加载缓存 {filename} 出错:{str(err)}")
return None
@staticmethod
async def async_save_cache(cache: Any, filename: str) -> None:
"""
异步保存缓存到本地
"""
try:
async with aiofiles.open(settings.TEMP_PATH / filename, 'wb') as f:
await f.write(pickle.dumps(cache))
except Exception as err:
logger.error(f"保存缓存 {filename} 出错:{str(err)}")
@staticmethod
def save_cache(cache: Any, filename: str) -> None:
"""

View File

@@ -230,17 +230,15 @@ class MediaChain(ChainBase):
meta_names = list(dict.fromkeys([k for k in [meta_org.name,
meta.cn_name,
meta.en_name] if k]))
for name in meta_names:
tmdbinfo = self.match_tmdbinfo(
name=name,
year=meta.year,
mtype=mtype or meta.type,
season=meta.begin_season
)
if tmdbinfo:
# 合季季后返回
tmdbinfo['season'] = meta.begin_season
break
tmdbinfo = self._match_tmdb_with_names(
meta_names=meta_names,
year=meta.year,
mtype=mtype or meta.type,
season=meta.begin_season
)
if tmdbinfo:
# 合季季后返回
tmdbinfo['season'] = meta.begin_season
return tmdbinfo
def get_tmdbinfo_by_bangumiid(self, bangumiid: int) -> Optional[dict]:
@@ -256,23 +254,17 @@ class MediaChain(ChainBase):
else:
meta_cn = meta = MetaInfo(title=bangumiinfo.get("name"))
# 年份
release_date = bangumiinfo.get("date") or bangumiinfo.get("air_date")
if release_date:
year = release_date[:4]
else:
year = None
year = self._extract_year_from_bangumi(bangumiinfo)
# 识别TMDB媒体信息
meta_names = list(dict.fromkeys([k for k in [meta_cn.name,
meta.name] if k]))
for name in meta_names:
tmdbinfo = self.match_tmdbinfo(
name=name,
year=year,
mtype=MediaType.TV,
season=meta.begin_season
)
if tmdbinfo:
return tmdbinfo
tmdbinfo = self._match_tmdb_with_names(
meta_names=meta_names,
year=year,
mtype=MediaType.TV,
season=meta.begin_season
)
return tmdbinfo
return None
def get_doubaninfo_by_tmdbid(self, tmdbid: int,
@@ -285,19 +277,7 @@ class MediaChain(ChainBase):
# 名称
name = tmdbinfo.get("title") or tmdbinfo.get("name")
# 年份
year = None
if tmdbinfo.get('release_date'):
year = tmdbinfo['release_date'][:4]
elif tmdbinfo.get('seasons') and season:
for seainfo in tmdbinfo['seasons']:
# 季
season_number = seainfo.get("season_number")
if not season_number:
continue
air_date = seainfo.get("air_date")
if air_date and season_number == season:
year = air_date[:4]
break
year = self._extract_year_from_tmdb(tmdbinfo, season)
# IMDBID
imdbid = tmdbinfo.get("external_ids", {}).get("imdb_id")
return self.match_doubaninfo(
@@ -320,11 +300,7 @@ class MediaChain(ChainBase):
else:
meta = MetaInfo(title=bangumiinfo.get("name"))
# 年份
release_date = bangumiinfo.get("date") or bangumiinfo.get("air_date")
if release_date:
year = release_date[:4]
else:
year = None
year = self._extract_year_from_bangumi(bangumiinfo)
# 使用名称识别豆瓣媒体信息
return self.match_doubaninfo(
name=meta.name,
@@ -889,3 +865,169 @@ class MediaChain(ChainBase):
logger.info(f"{content} 搜索到 {len(medias)} 条相关媒体信息")
# 识别的元数据,媒体信息列表
return meta, medias
@staticmethod
def _extract_year_from_bangumi(bangumiinfo: dict) -> Optional[str]:
"""
从Bangumi信息中提取年份
"""
release_date = bangumiinfo.get("date") or bangumiinfo.get("air_date")
if release_date:
return release_date[:4]
return None
@staticmethod
def _extract_year_from_tmdb(tmdbinfo: dict, season: Optional[int] = None) -> Optional[str]:
"""
从TMDB信息中提取年份
"""
year = None
if tmdbinfo.get('release_date'):
year = tmdbinfo['release_date'][:4]
elif tmdbinfo.get('seasons') and season:
for seainfo in tmdbinfo['seasons']:
season_number = seainfo.get("season_number")
if not season_number:
continue
air_date = seainfo.get("air_date")
if air_date and season_number == season:
year = air_date[:4]
break
return year
def _match_tmdb_with_names(self, meta_names: list, year: Optional[str],
mtype: MediaType, season: Optional[int] = None) -> Optional[dict]:
"""
使用名称列表匹配TMDB信息
"""
for name in meta_names:
tmdbinfo = self.match_tmdbinfo(
name=name,
year=year,
mtype=mtype,
season=season
)
if tmdbinfo:
return tmdbinfo
return None
async def _async_match_tmdb_with_names(self, meta_names: list, year: Optional[str],
mtype: MediaType, season: Optional[int] = None) -> Optional[dict]:
"""
使用名称列表匹配TMDB信息异步版本
"""
for name in meta_names:
tmdbinfo = await self.async_match_tmdbinfo(
name=name,
year=year,
mtype=mtype,
season=season
)
if tmdbinfo:
return tmdbinfo
return None
async def async_get_tmdbinfo_by_doubanid(self, doubanid: str, mtype: MediaType = None) -> Optional[dict]:
"""
根据豆瓣ID获取TMDB信息异步版本
"""
tmdbinfo = None
doubaninfo = await self.async_douban_info(doubanid=doubanid, mtype=mtype)
if doubaninfo:
# 优先使用原标题匹配
if doubaninfo.get("original_title"):
meta = MetaInfo(title=doubaninfo.get("title"))
meta_org = MetaInfo(title=doubaninfo.get("original_title"))
else:
meta_org = meta = MetaInfo(title=doubaninfo.get("title"))
# 年份
if doubaninfo.get("year"):
meta.year = doubaninfo.get("year")
# 处理类型
if isinstance(doubaninfo.get('media_type'), MediaType):
meta.type = doubaninfo.get('media_type')
else:
meta.type = MediaType.MOVIE if doubaninfo.get("type") == "movie" else MediaType.TV
# 匹配TMDB信息
meta_names = list(dict.fromkeys([k for k in [meta_org.name,
meta.cn_name,
meta.en_name] if k]))
tmdbinfo = await self._async_match_tmdb_with_names(
meta_names=meta_names,
year=meta.year,
mtype=mtype or meta.type,
season=meta.begin_season
)
if tmdbinfo:
# 合季季后返回
tmdbinfo['season'] = meta.begin_season
return tmdbinfo
async def async_get_tmdbinfo_by_bangumiid(self, bangumiid: int) -> Optional[dict]:
"""
根据BangumiID获取TMDB信息异步版本
"""
bangumiinfo = await self.async_bangumi_info(bangumiid=bangumiid)
if bangumiinfo:
# 优先使用原标题匹配
if bangumiinfo.get("name_cn"):
meta = MetaInfo(title=bangumiinfo.get("name"))
meta_cn = MetaInfo(title=bangumiinfo.get("name_cn"))
else:
meta_cn = meta = MetaInfo(title=bangumiinfo.get("name"))
# 年份
year = self._extract_year_from_bangumi(bangumiinfo)
# 识别TMDB媒体信息
meta_names = list(dict.fromkeys([k for k in [meta_cn.name,
meta.name] if k]))
tmdbinfo = await self._async_match_tmdb_with_names(
meta_names=meta_names,
year=year,
mtype=MediaType.TV,
season=meta.begin_season
)
return tmdbinfo
return None
async def async_get_doubaninfo_by_tmdbid(self, tmdbid: int, mtype: MediaType = None,
season: Optional[int] = None) -> Optional[dict]:
"""
根据TMDBID获取豆瓣信息异步版本
"""
tmdbinfo = await self.async_tmdb_info(tmdbid=tmdbid, mtype=mtype)
if tmdbinfo:
# 名称
name = tmdbinfo.get("title") or tmdbinfo.get("name")
# 年份
year = self._extract_year_from_tmdb(tmdbinfo, season)
# IMDBID
imdbid = tmdbinfo.get("external_ids", {}).get("imdb_id")
return await self.async_match_doubaninfo(
name=name,
year=year,
mtype=mtype,
imdbid=imdbid
)
return None
async def async_get_doubaninfo_by_bangumiid(self, bangumiid: int) -> Optional[dict]:
"""
根据BangumiID获取豆瓣信息异步版本
"""
bangumiinfo = await self.async_bangumi_info(bangumiid=bangumiid)
if bangumiinfo:
# 优先使用中文标题匹配
if bangumiinfo.get("name_cn"):
meta = MetaInfo(title=bangumiinfo.get("name_cn"))
else:
meta = MetaInfo(title=bangumiinfo.get("name"))
# 年份
year = self._extract_year_from_bangumi(bangumiinfo)
# 使用名称识别豆瓣媒体信息
return await self.async_match_doubaninfo(
name=meta.name,
year=year,
mtype=MediaType.TV,
season=meta.begin_season
)
return None

View File

@@ -97,6 +97,77 @@ class SearchChain(ChainBase):
logger.error(f'加载搜索结果失败:{str(e)} - {traceback.format_exc()}')
return []
async def async_last_search_results(self) -> List[Context]:
"""
异步获取上次搜索结果
"""
# 读取本地文件缓存
content = await self.async_load_cache(self.__result_temp_file)
if not content:
return []
try:
return pickle.loads(content)
except Exception as e:
logger.error(f'加载搜索结果失败:{str(e)} - {traceback.format_exc()}')
return []
async def async_search_by_id(self, tmdbid: Optional[int] = None, doubanid: Optional[str] = None,
mtype: MediaType = None, area: Optional[str] = "title", season: Optional[int] = None,
sites: List[int] = None, cache_local: bool = False) -> List[Context]:
"""
根据TMDBID/豆瓣ID异步搜索资源精确匹配不过滤本地存在的资源
:param tmdbid: TMDB ID
:param doubanid: 豆瓣 ID
:param mtype: 媒体,电影 or 电视剧
:param area: 搜索范围title or imdbid
:param season: 季数
:param sites: 站点ID列表
:param cache_local: 是否缓存到本地
"""
mediainfo = await self.async_recognize_media(tmdbid=tmdbid, doubanid=doubanid, mtype=mtype)
if not mediainfo:
logger.error(f'{tmdbid} 媒体信息识别失败!')
return []
no_exists = None
if season:
no_exists = {
tmdbid or doubanid: {
season: NotExistMediaInfo(episodes=[])
}
}
# TODO async
results = self.process(mediainfo=mediainfo, sites=sites, area=area, no_exists=no_exists)
# 保存到本地文件
if cache_local:
await self.async_save_cache(pickle.dumps(results), self.__result_temp_file)
return results
async def async_search_by_title(self, title: str, page: Optional[int] = 0,
sites: List[int] = None, cache_local: Optional[bool] = False) -> List[Context]:
"""
根据标题异步搜索资源,不识别不过滤,直接返回站点内容
:param title: 标题,为空时返回所有站点首页内容
:param page: 页码
:param sites: 站点ID列表
:param cache_local: 是否缓存到本地
"""
if title:
logger.info(f'开始搜索资源,关键词:{title} ...')
else:
logger.info(f'开始浏览资源,站点:{sites} ...')
# 搜索 TODO async
torrents = self.__search_all_sites(keywords=[title], sites=sites, page=page) or []
if not torrents:
logger.warn(f'{title} 未搜索到资源')
return []
# 组装上下文
contexts = [Context(meta_info=MetaInfo(title=torrent.title, subtitle=torrent.description),
torrent_info=torrent) for torrent in torrents]
# 保存到本地文件
if cache_local:
await self.async_save_cache(pickle.dumps(contexts), self.__result_temp_file)
return contexts
def process(self, mediainfo: MediaInfo,
keyword: Optional[str] = None,
no_exists: Dict[int, Dict[int, NotExistMediaInfo]] = None,