From 3fc2c7d6cc3fb9212d64de16786c1b7c0d97faee Mon Sep 17 00:00:00 2001 From: jxxghp Date: Thu, 31 Jul 2025 21:26:55 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9A=E5=8D=8F=E7=A8=8B=E6=90=9C?= =?UTF-8?q?=E7=B4=A2=20part2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/chain/__init__.py | 15 +++ app/chain/search.py | 283 +++++++++++++++++++++++++++++++++++++++++- app/db/models/site.py | 17 ++- app/db/site_oper.py | 12 ++ 4 files changed, 321 insertions(+), 6 deletions(-) diff --git a/app/chain/__init__.py b/app/chain/__init__.py index 5104cb6c..74a8289a 100644 --- a/app/chain/__init__.py +++ b/app/chain/__init__.py @@ -630,6 +630,21 @@ class ChainBase(metaclass=ABCMeta): return self.run_module("search_torrents", site=site, keywords=keywords, mtype=mtype, page=page) + async def async_search_torrents(self, site: dict, + keywords: List[str], + mtype: Optional[MediaType] = None, + page: Optional[int] = 0) -> List[TorrentInfo]: + """ + 异步搜索一个站点的种子资源 + :param site: 站点 + :param keywords: 搜索关键词列表 + :param mtype: 媒体类型 + :param page: 页码 + :reutrn: 资源列表 + """ + return await self.async_run_module("search_torrents", site=site, keywords=keywords, + mtype=mtype, page=page) + def refresh_torrents(self, site: dict, keyword: Optional[str] = None, cat: Optional[str] = None, page: Optional[int] = 0) -> List[TorrentInfo]: """ diff --git a/app/chain/search.py b/app/chain/search.py index 105f35e9..e369091f 100644 --- a/app/chain/search.py +++ b/app/chain/search.py @@ -1,3 +1,4 @@ +import asyncio import pickle import traceback from concurrent.futures import ThreadPoolExecutor, as_completed @@ -135,8 +136,7 @@ class SearchChain(ChainBase): season: NotExistMediaInfo(episodes=[]) } } - # TODO async - results = self.process(mediainfo=mediainfo, sites=sites, area=area, no_exists=no_exists) + results = await self.async_process(mediainfo=mediainfo, sites=sites, area=area, no_exists=no_exists) # 保存到本地文件 if cache_local: await self.async_save_cache(pickle.dumps(results), self.__result_temp_file) @@ -155,8 +155,8 @@ class SearchChain(ChainBase): logger.info(f'开始搜索资源,关键词:{title} ...') else: logger.info(f'开始浏览资源,站点:{sites} ...') - # 搜索 TODO async - torrents = self.__search_all_sites(keywords=[title], sites=sites, page=page) or [] + # 搜索 + torrents = await self.__async_search_all_sites(keywords=[title], sites=sites, page=page) or [] if not torrents: logger.warn(f'{title} 未搜索到资源') return [] @@ -355,6 +355,193 @@ class SearchChain(ChainBase): # 返回 return contexts + async def async_process(self, mediainfo: MediaInfo, + keyword: Optional[str] = None, + no_exists: Dict[int, Dict[int, NotExistMediaInfo]] = None, + sites: List[int] = None, + rule_groups: List[str] = None, + area: Optional[str] = "title", + custom_words: List[str] = None, + filter_params: Dict[str, str] = None) -> List[Context]: + """ + 根据媒体信息异步搜索种子资源,精确匹配,应用过滤规则,同时根据no_exists过滤本地已存在的资源 + :param mediainfo: 媒体信息 + :param keyword: 搜索关键词 + :param no_exists: 缺失的媒体信息 + :param sites: 站点ID列表,为空时搜索所有站点 + :param rule_groups: 过滤规则组名称列表 + :param area: 搜索范围,title or imdbid + :param custom_words: 自定义识别词列表 + :param filter_params: 过滤参数 + """ + + def __do_filter(torrent_list: List[TorrentInfo]) -> List[TorrentInfo]: + """ + 执行优先级过滤 + """ + return self.filter_torrents(rule_groups=rule_groups, + torrent_list=torrent_list, + mediainfo=mediainfo) or [] + + # 豆瓣标题处理 + if not mediainfo.tmdb_id: + meta = MetaInfo(title=mediainfo.title) + mediainfo.title = meta.name + mediainfo.season = meta.begin_season + logger.info(f'开始搜索资源,关键词:{keyword or mediainfo.title} ...') + + # 补充媒体信息 + if not mediainfo.names: + mediainfo: MediaInfo = await self.async_recognize_media(mtype=mediainfo.type, + tmdbid=mediainfo.tmdb_id, + doubanid=mediainfo.douban_id) + if not mediainfo: + logger.error(f'媒体信息识别失败!') + return [] + + # 缺失的季集 + mediakey = mediainfo.tmdb_id or mediainfo.douban_id + if no_exists and no_exists.get(mediakey): + # 过滤剧集 + season_episodes = {sea: info.episodes + for sea, info in no_exists[mediakey].items()} + elif mediainfo.season: + # 豆瓣只搜索当前季 + season_episodes = {mediainfo.season: []} + else: + season_episodes = None + + # 搜索关键词 + if keyword: + keywords = [keyword] + else: + # 去重去空,但要保持顺序 + keywords = list(dict.fromkeys([k for k in [mediainfo.title, + mediainfo.original_title, + mediainfo.en_title, + mediainfo.hk_title, + mediainfo.tw_title, + mediainfo.sg_title] if k])) + + # 执行搜索 + torrents: List[TorrentInfo] = await self.__async_search_all_sites( + mediainfo=mediainfo, + keywords=keywords, + sites=sites, + area=area + ) + if not torrents: + logger.warn(f'{keyword or mediainfo.title} 未搜索到资源') + return [] + + # 开始新进度 + progress = ProgressHelper() + progress.start(ProgressKey.Search) + + # 开始过滤 + progress.update(value=0, text=f'开始过滤,总 {len(torrents)} 个资源,请稍候...', + key=ProgressKey.Search) + # 匹配订阅附加参数 + if filter_params: + logger.info(f'开始附加参数过滤,附加参数:{filter_params} ...') + torrents = [torrent for torrent in torrents if TorrentHelper().filter_torrent(torrent, filter_params)] + # 开始过滤规则过滤 + if rule_groups is None: + # 取搜索过滤规则 + rule_groups: List[str] = SystemConfigOper().get(SystemConfigKey.SearchFilterRuleGroups) + if rule_groups: + logger.info(f'开始过滤规则/剧集过滤,使用规则组:{rule_groups} ...') + torrents = __do_filter(torrents) + if not torrents: + logger.warn(f'{keyword or mediainfo.title} 没有符合过滤规则的资源') + return [] + logger.info(f"过滤规则/剧集过滤完成,剩余 {len(torrents)} 个资源") + + # 过滤完成 + progress.update(value=50, text=f'过滤完成,剩余 {len(torrents)} 个资源', key=ProgressKey.Search) + + # 总数 + _total = len(torrents) + # 已处理数 + _count = 0 + + # 开始匹配 + _match_torrents = [] + torrenthelper = TorrentHelper() + try: + # 英文标题应该在别名/原标题中,不需要再匹配 + logger.info(f"开始匹配结果 标题:{mediainfo.title},原标题:{mediainfo.original_title},别名:{mediainfo.names}") + progress.update(value=51, text=f'开始匹配,总 {_total} 个资源 ...', key=ProgressKey.Search) + for torrent in torrents: + if global_vars.is_system_stopped: + break + _count += 1 + progress.update(value=(_count / _total) * 96, + text=f'正在匹配 {torrent.site_name},已完成 {_count} / {_total} ...', + key=ProgressKey.Search) + if not torrent.title: + continue + + # 识别元数据 + torrent_meta = MetaInfo(title=torrent.title, subtitle=torrent.description, + custom_words=custom_words) + if torrent.title != torrent_meta.org_string: + logger.info(f"种子名称应用识别词后发生改变:{torrent.title} => {torrent_meta.org_string}") + # 季集数过滤 + if season_episodes \ + and not torrenthelper.match_season_episodes(torrent=torrent, + meta=torrent_meta, + season_episodes=season_episodes): + continue + # 比对IMDBID + if torrent.imdbid \ + and mediainfo.imdb_id \ + and torrent.imdbid == mediainfo.imdb_id: + logger.info(f'{mediainfo.title} 通过IMDBID匹配到资源:{torrent.site_name} - {torrent.title}') + _match_torrents.append((torrent, torrent_meta)) + continue + + # 比对种子 + if torrenthelper.match_torrent(mediainfo=mediainfo, + torrent_meta=torrent_meta, + torrent=torrent): + # 匹配成功 + _match_torrents.append((torrent, torrent_meta)) + continue + # 匹配完成 + logger.info(f"匹配完成,共匹配到 {len(_match_torrents)} 个资源") + progress.update(value=97, + text=f'匹配完成,共匹配到 {len(_match_torrents)} 个资源', + key=ProgressKey.Search) + + # 去掉mediainfo中多余的数据 + mediainfo.clear() + # 组装上下文 + contexts = [Context(torrent_info=t[0], + media_info=mediainfo, + meta_info=t[1]) for t in _match_torrents] + finally: + torrents.clear() + del torrents + _match_torrents.clear() + del _match_torrents + + # 排序 + progress.update(value=99, + text=f'正在对 {len(contexts)} 个资源进行排序,请稍候...', + key=ProgressKey.Search) + contexts = torrenthelper.sort_torrents(contexts) + + # 结束进度 + logger.info(f'搜索完成,共 {len(contexts)} 个资源') + progress.update(value=100, + text=f'搜索完成,共 {len(contexts)} 个资源', + key=ProgressKey.Search) + progress.end(ProgressKey.Search) + + # 返回 + return contexts + def __search_all_sites(self, keywords: List[str], mediainfo: Optional[MediaInfo] = None, sites: List[int] = None, @@ -440,6 +627,94 @@ class SearchChain(ChainBase): # 返回 return results + async def __async_search_all_sites(self, keywords: List[str], + mediainfo: Optional[MediaInfo] = None, + sites: List[int] = None, + page: Optional[int] = 0, + area: Optional[str] = "title") -> Optional[List[TorrentInfo]]: + """ + 异步搜索多个站点 + :param mediainfo: 识别的媒体信息 + :param keywords: 搜索关键词列表 + :param sites: 指定站点ID列表,如有则只搜索指定站点,否则搜索所有站点 + :param page: 搜索页码 + :param area: 搜索区域 title or imdbid + :reutrn: 资源列表 + """ + # 未开启的站点不搜索 + indexer_sites = [] + + # 配置的索引站点 + if not sites: + sites = SystemConfigOper().get(SystemConfigKey.IndexerSites) or [] + + for indexer in await SitesHelper().async_get_indexers(): + # 检查站点索引开关 + if not sites or indexer.get("id") in sites: + indexer_sites.append(indexer) + if not indexer_sites: + logger.warn('未开启任何有效站点,无法搜索资源') + return [] + + # 开始进度 + progress = ProgressHelper() + progress.start(ProgressKey.Search) + # 开始计时 + start_time = datetime.now() + # 总数 + total_num = len(indexer_sites) + # 完成数 + finish_count = 0 + # 更新进度 + progress.update(value=0, + text=f"开始搜索,共 {total_num} 个站点 ...", + key=ProgressKey.Search) + # 结果集 + results = [] + + # 创建异步任务列表 + tasks = [] + for site in indexer_sites: + if area == "imdbid": + # 搜索IMDBID + task = self.async_search_torrents(site=site, + keywords=[mediainfo.imdb_id] if mediainfo else None, + mtype=mediainfo.type if mediainfo else None, + page=page) + else: + # 搜索标题 + task = self.async_search_torrents(site=site, + keywords=keywords, + mtype=mediainfo.type if mediainfo else None, + page=page) + tasks.append(task) + + # 使用asyncio.as_completed来处理并发任务 + for future in asyncio.as_completed(tasks): + if global_vars.is_system_stopped: + break + finish_count += 1 + result = await future + if result: + results.extend(result) + logger.info(f"站点搜索进度:{finish_count} / {total_num}") + progress.update(value=finish_count / total_num * 100, + text=f"正在搜索{keywords or ''},已完成 {finish_count} / {total_num} 个站点 ...", + key=ProgressKey.Search) + + # 计算耗时 + end_time = datetime.now() + # 更新进度 + progress.update(value=100, + text=f"站点搜索完成,有效资源数:{len(results)},总耗时 {(end_time - start_time).seconds} 秒", + key=ProgressKey.Search) + logger.info(f"站点搜索完成,有效资源数:{len(results)},总耗时 {(end_time - start_time).seconds} 秒") + # 结束进度 + progress.end(ProgressKey.Search) + + # 返回 + return results + @eventmanager.register(EventType.SiteDeleted) def remove_site(self, event: Event): """ diff --git a/app/db/models/site.py b/app/db/models/site.py index 3fa6ddd0..6aad03a1 100644 --- a/app/db/models/site.py +++ b/app/db/models/site.py @@ -1,9 +1,10 @@ from datetime import datetime -from sqlalchemy import Boolean, Column, Integer, String, Sequence, JSON +from sqlalchemy import Boolean, Column, Integer, String, Sequence, JSON, select +from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import Session -from app.db import db_query, db_update, Base +from app.db import db_query, db_update, Base, async_db_query class Site(Base): @@ -59,11 +60,23 @@ class Site(Base): def get_by_domain(cls, db: Session, domain: str): return db.query(cls).filter(cls.domain == domain).first() + @classmethod + @async_db_query + async def async_get_by_domain(cls, db: AsyncSession, domain: str): + result = await db.execute(select(cls).where(cls.domain == domain)) + return result.scalar_one_or_none() + @classmethod @db_query def get_actives(cls, db: Session): return db.query(cls).filter(cls.is_active == 1).all() + @classmethod + @async_db_query + async def async_get_actives(cls, db: AsyncSession): + result = await db.execute(select(cls).where(cls.is_active == 1)) + return result.all() + @classmethod @db_query def list_order_by_pri(cls, db: Session): diff --git a/app/db/site_oper.py b/app/db/site_oper.py index 77766c1e..da377fee 100644 --- a/app/db/site_oper.py +++ b/app/db/site_oper.py @@ -47,6 +47,12 @@ class SiteOper(DbOper): """ return Site.get_actives(self._db) + async def async_list_active(self) -> List[Site]: + """ + 异步按状态获取站点列表 + """ + return await Site.async_get_actives(self._db) + def delete(self, sid: int): """ 删除站点 @@ -67,6 +73,12 @@ class SiteOper(DbOper): """ return Site.get_by_domain(self._db, domain) + async def async_get_by_domain(self, domain: str) -> Site: + """ + 异步按域名获取站点 + """ + return await Site.async_get_by_domain(self._db, domain) + def get_domains_by_ids(self, ids: List[int]) -> List[str]: """ 按ID获取站点域名