diff --git a/app/api/endpoints/torrent.py b/app/api/endpoints/torrent.py index 2fd400f6..84945ada 100644 --- a/app/api/endpoints/torrent.py +++ b/app/api/endpoints/torrent.py @@ -9,14 +9,14 @@ from app.core.config import settings from app.core.context import MediaInfo from app.core.metainfo import MetaInfo from app.db.models import User -from app.db.user_oper import get_current_active_superuser +from app.db.user_oper import get_current_active_superuser, get_current_active_superuser_async from app.utils.crypto import HashUtils router = APIRouter() @router.get("/cache", summary="获取种子缓存", response_model=schemas.Response) -def torrents_cache(_: User = Depends(get_current_active_superuser)): +async def torrents_cache(_: User = Depends(get_current_active_superuser_async)): """ 获取当前种子缓存数据 """ @@ -24,9 +24,9 @@ def torrents_cache(_: User = Depends(get_current_active_superuser)): # 获取spider和rss两种缓存 if settings.SUBSCRIBE_MODE == "rss": - cache_info = torrents_chain.get_torrents("rss") + cache_info = await torrents_chain.async_get_torrents("rss") else: - cache_info = torrents_chain.get_torrents("spider") + cache_info = await torrents_chain.async_get_torrents("spider") # 统计信息 torrent_count = sum(len(torrents) for torrents in cache_info.values()) @@ -62,9 +62,8 @@ def torrents_cache(_: User = Depends(get_current_active_superuser)): }) -@router.delete("/cache/{domain}/{torrent_hash}", summary="删除指定种子缓存", - response_model=schemas.Response) -def delete_cache(domain: str, torrent_hash: str, _: User = Depends(get_current_active_superuser)): +@router.delete("/cache/{domain}/{torrent_hash}", summary="删除指定种子缓存", response_model=schemas.Response) +async def delete_cache(domain: str, torrent_hash: str, _: User = Depends(get_current_active_superuser_async)): """ 删除指定的种子缓存 :param domain: 站点域名 @@ -76,7 +75,7 @@ def delete_cache(domain: str, torrent_hash: str, _: User = Depends(get_current_a try: # 获取当前缓存 - cache_data = torrents_chain.get_torrents() + cache_data = await torrents_chain.async_get_torrents() if domain not in cache_data: return schemas.Response(success=False, message=f"站点 {domain} 缓存不存在") @@ -92,7 +91,7 @@ def delete_cache(domain: str, torrent_hash: str, _: User = Depends(get_current_a return schemas.Response(success=False, message="未找到指定的种子") # 保存更新后的缓存 - torrents_chain.save_cache(cache_data, torrents_chain.cache_file) + await torrents_chain.async_save_cache(cache_data, torrents_chain.cache_file) return schemas.Response(success=True, message="种子删除成功") except Exception as e: @@ -100,14 +99,14 @@ def delete_cache(domain: str, torrent_hash: str, _: User = Depends(get_current_a @router.delete("/cache", summary="清理种子缓存", response_model=schemas.Response) -def clear_cache(_: User = Depends(get_current_active_superuser)): +async def clear_cache(_: User = Depends(get_current_active_superuser_async)): """ 清理所有种子缓存 """ torrents_chain = TorrentsChain() try: - torrents_chain.clear_torrents() + await torrents_chain.async_clear_torrents() return schemas.Response(success=True, message="种子缓存清理完成") except Exception as e: return schemas.Response(success=False, message=f"清理失败:{str(e)}") @@ -135,9 +134,9 @@ def refresh_cache(_: User = Depends(get_current_active_superuser)): @router.post("/cache/reidentify/{domain}/{torrent_hash}", summary="重新识别种子", response_model=schemas.Response) -def reidentify_cache(domain: str, torrent_hash: str, +async def reidentify_cache(domain: str, torrent_hash: str, tmdbid: Optional[int] = None, doubanid: Optional[str] = None, - _: User = Depends(get_current_active_superuser)): + _: User = Depends(get_current_active_superuser_async)): """ 重新识别指定的种子 :param domain: 站点域名 @@ -152,7 +151,7 @@ def reidentify_cache(domain: str, torrent_hash: str, try: # 获取当前缓存 - cache_data = torrents_chain.get_torrents() + cache_data = await torrents_chain.async_get_torrents() if domain not in cache_data: return schemas.Response(success=False, message=f"站点 {domain} 缓存不存在") @@ -168,14 +167,13 @@ def reidentify_cache(domain: str, torrent_hash: str, return schemas.Response(success=False, message="未找到指定的种子") # 重新识别 - meta = MetaInfo(title=target_context.torrent_info.title, - subtitle=target_context.torrent_info.description) + meta = MetaInfo(title=target_context.torrent_info.title, subtitle=target_context.torrent_info.description) if tmdbid or doubanid: # 手动指定媒体信息 - mediainfo = MediaChain().recognize_media(meta=meta, tmdbid=tmdbid, doubanid=doubanid) + mediainfo = await media_chain.async_recognize_media(meta=meta, tmdbid=tmdbid, doubanid=doubanid) else: # 自动重新识别 - mediainfo = media_chain.recognize_by_meta(meta) + mediainfo = await media_chain.async_recognize_by_meta(meta) if not mediainfo: # 创建空的媒体信息 @@ -188,7 +186,7 @@ def reidentify_cache(domain: str, torrent_hash: str, target_context.media_info = mediainfo # 保存更新后的缓存 - torrents_chain.save_cache(cache_data, TorrentsChain().cache_file) + await torrents_chain.async_save_cache(cache_data, TorrentsChain().cache_file) return schemas.Response(success=True, message="重新识别完成", data={ "media_name": mediainfo.title if mediainfo else "", diff --git a/app/api/endpoints/transfer.py b/app/api/endpoints/transfer.py index 698dc7fb..ca327c9b 100644 --- a/app/api/endpoints/transfer.py +++ b/app/api/endpoints/transfer.py @@ -59,7 +59,7 @@ def query_name(path: str, filetype: str, @router.get("/queue", summary="查询整理队列", response_model=List[schemas.TransferJob]) -def query_queue(_: schemas.TokenPayload = Depends(verify_token)) -> Any: +async def query_queue(_: schemas.TokenPayload = Depends(verify_token)) -> Any: """ 查询整理队列 :param _: Token校验 @@ -68,7 +68,7 @@ def query_queue(_: schemas.TokenPayload = Depends(verify_token)) -> Any: @router.delete("/queue", summary="从整理队列中删除任务", response_model=schemas.Response) -def remove_queue(fileitem: schemas.FileItem, _: schemas.TokenPayload = Depends(verify_token)) -> Any: +async def remove_queue(fileitem: schemas.FileItem, _: schemas.TokenPayload = Depends(verify_token)) -> Any: """ 查询整理队列 :param fileitem: 文件项 diff --git a/app/api/servarr.py b/app/api/servarr.py index 37120d8a..d4efc9d6 100644 --- a/app/api/servarr.py +++ b/app/api/servarr.py @@ -1,6 +1,7 @@ from typing import Any, List, Annotated from fastapi import APIRouter, HTTPException, Depends +from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import Session from app import schemas @@ -9,7 +10,7 @@ from app.chain.tvdb import TvdbChain from app.chain.subscribe import SubscribeChain from app.core.metainfo import MetaInfo from app.core.security import verify_apikey -from app.db import get_db +from app.db import get_db, get_async_db from app.db.models.subscribe import Subscribe from app.schemas import RadarrMovie, SonarrSeries from app.schemas.types import MediaType @@ -19,7 +20,7 @@ arr_router = APIRouter(tags=['servarr']) @arr_router.get("/system/status", summary="系统状态") -def arr_system_status(_: Annotated[str, Depends(verify_apikey)]) -> Any: +async def arr_system_status(_: Annotated[str, Depends(verify_apikey)]) -> Any: """ 模拟Radarr、Sonarr系统状态 """ @@ -73,7 +74,7 @@ def arr_system_status(_: Annotated[str, Depends(verify_apikey)]) -> Any: @arr_router.get("/qualityProfile", summary="质量配置") -def arr_qualityProfile(_: Annotated[str, Depends(verify_apikey)]) -> Any: +async def arr_qualityProfile(_: Annotated[str, Depends(verify_apikey)]) -> Any: """ 模拟Radarr、Sonarr质量配置 """ @@ -114,7 +115,7 @@ def arr_qualityProfile(_: Annotated[str, Depends(verify_apikey)]) -> Any: @arr_router.get("/rootfolder", summary="根目录") -def arr_rootfolder(_: Annotated[str, Depends(verify_apikey)]) -> Any: +async def arr_rootfolder(_: Annotated[str, Depends(verify_apikey)]) -> Any: """ 模拟Radarr、Sonarr根目录 """ @@ -130,7 +131,7 @@ def arr_rootfolder(_: Annotated[str, Depends(verify_apikey)]) -> Any: @arr_router.get("/tag", summary="标签") -def arr_tag(_: Annotated[str, Depends(verify_apikey)]) -> Any: +async def arr_tag(_: Annotated[str, Depends(verify_apikey)]) -> Any: """ 模拟Radarr、Sonarr标签 """ @@ -143,7 +144,7 @@ def arr_tag(_: Annotated[str, Depends(verify_apikey)]) -> Any: @arr_router.get("/languageprofile", summary="语言") -def arr_languageprofile(_: Annotated[str, Depends(verify_apikey)]) -> Any: +async def arr_languageprofile(_: Annotated[str, Depends(verify_apikey)]) -> Any: """ 模拟Radarr、Sonarr语言 """ @@ -169,7 +170,7 @@ def arr_languageprofile(_: Annotated[str, Depends(verify_apikey)]) -> Any: @arr_router.get("/movie", summary="所有订阅电影", response_model=List[schemas.RadarrMovie]) -def arr_movies(_: Annotated[str, Depends(verify_apikey)], db: Session = Depends(get_db)) -> Any: +async def arr_movies(_: Annotated[str, Depends(verify_apikey)], db: AsyncSession = Depends(get_async_db)) -> Any: """ 查询Rardar电影 """ @@ -240,7 +241,7 @@ def arr_movies(_: Annotated[str, Depends(verify_apikey)], db: Session = Depends( """ # 查询所有电影订阅 result = [] - subscribes = Subscribe.list(db) + subscribes = await Subscribe.async_list(db) for subscribe in subscribes: if subscribe.type != MediaType.MOVIE.value: continue @@ -306,11 +307,11 @@ def arr_movie_lookup(term: str, _: Annotated[str, Depends(verify_apikey)], db: S @arr_router.get("/movie/{mid}", summary="电影订阅详情", response_model=schemas.RadarrMovie) -def arr_movie(mid: int, _: Annotated[str, Depends(verify_apikey)], db: Session = Depends(get_db)) -> Any: +async def arr_movie(mid: int, _: Annotated[str, Depends(verify_apikey)], db: AsyncSession = Depends(get_async_db)) -> Any: """ 查询Rardar电影订阅 """ - subscribe = Subscribe.get(db, mid) + subscribe = await Subscribe.async_get(db, mid) if subscribe: return RadarrMovie( id=subscribe.id, @@ -363,13 +364,13 @@ def arr_add_movie(_: Annotated[str, Depends(verify_apikey)], @arr_router.delete("/movie/{mid}", summary="删除电影订阅", response_model=schemas.Response) -def arr_remove_movie(mid: int, _: Annotated[str, Depends(verify_apikey)], db: Session = Depends(get_db)) -> Any: +async def arr_remove_movie(mid: int, _: Annotated[str, Depends(verify_apikey)], db: AsyncSession = Depends(get_async_db)) -> Any: """ 删除Rardar电影订阅 """ - subscribe = Subscribe.get(db, mid) + subscribe = await Subscribe.async_get(db, mid) if subscribe: - subscribe.delete(db, mid) + await subscribe.async_delete(db, mid) return schemas.Response(success=True) else: raise HTTPException( @@ -379,7 +380,7 @@ def arr_remove_movie(mid: int, _: Annotated[str, Depends(verify_apikey)], db: Se @arr_router.get("/series", summary="所有剧集", response_model=List[schemas.SonarrSeries]) -def arr_series(_: Annotated[str, Depends(verify_apikey)], db: Session = Depends(get_db)) -> Any: +async def arr_series(_: Annotated[str, Depends(verify_apikey)], db: AsyncSession = Depends(get_async_db)) -> Any: """ 查询Sonarr剧集 """ @@ -487,7 +488,7 @@ def arr_series(_: Annotated[str, Depends(verify_apikey)], db: Session = Depends( """ # 查询所有电视剧订阅 result = [] - subscribes = Subscribe.list(db) + subscribes = await Subscribe.async_list(db) for subscribe in subscribes: if subscribe.type != MediaType.TV.value: continue @@ -605,11 +606,11 @@ def arr_series_lookup(term: str, _: Annotated[str, Depends(verify_apikey)], db: @arr_router.get("/series/{tid}", summary="剧集详情") -def arr_serie(tid: int, _: Annotated[str, Depends(verify_apikey)], db: Session = Depends(get_db)) -> Any: +async def arr_serie(tid: int, _: Annotated[str, Depends(verify_apikey)], db: AsyncSession = Depends(get_async_db)) -> Any: """ 查询Sonarr剧集 """ - subscribe = Subscribe.get(db, tid) + subscribe = await Subscribe.async_get(db, tid) if subscribe: return SonarrSeries( id=subscribe.id, @@ -691,13 +692,13 @@ def arr_update_series(tv: schemas.SonarrSeries) -> Any: @arr_router.delete("/series/{tid}", summary="删除剧集订阅") -def arr_remove_series(tid: int, _: Annotated[str, Depends(verify_apikey)], db: Session = Depends(get_db)) -> Any: +async def arr_remove_series(tid: int, _: Annotated[str, Depends(verify_apikey)], db: AsyncSession = Depends(get_async_db)) -> Any: """ 删除Sonarr剧集订阅 """ - subscribe = Subscribe.get(db, tid) + subscribe = await Subscribe.async_get(db, tid) if subscribe: - subscribe.delete(db, tid) + await subscribe.async_delete(db, tid) return schemas.Response(success=True) else: raise HTTPException( diff --git a/app/api/servcookie.py b/app/api/servcookie.py index 39cdb071..cbd394a8 100644 --- a/app/api/servcookie.py +++ b/app/api/servcookie.py @@ -51,12 +51,12 @@ cookie_router = APIRouter(route_class=GzipRoute, @cookie_router.get("/", response_class=PlainTextResponse) -def get_root(): +async def get_root(): return "Hello MoviePilot! COOKIECLOUD API ROOT = /cookiecloud" @cookie_router.post("/", response_class=PlainTextResponse) -def post_root(): +async def post_root(): return "Hello MoviePilot! COOKIECLOUD API ROOT = /cookiecloud" diff --git a/app/chain/__init__.py b/app/chain/__init__.py index 6aa172e6..ede046d5 100644 --- a/app/chain/__init__.py +++ b/app/chain/__init__.py @@ -8,6 +8,7 @@ from pathlib import Path from typing import Optional, Any, Tuple, List, Set, Union, Dict import aiofiles +from aiopath import AsyncPath from qbittorrentapi import TorrentFilesList from transmission_rpc import File @@ -106,6 +107,18 @@ class ChainBase(metaclass=ABCMeta): if cache_path.exists(): cache_path.unlink() + @staticmethod + async def async_remove_cache(filename: str) -> None: + """ + 异步删除本地缓存 + """ + cache_path = AsyncPath(settings.TEMP_PATH) / filename + if await cache_path.exists(): + try: + await cache_path.unlink() + except Exception as err: + logger.error(f"异步删除缓存 {filename} 出错:{str(err)}") + @staticmethod def __is_valid_empty(ret): """ diff --git a/app/chain/torrents.py b/app/chain/torrents.py index 4b89b6a6..78d8443c 100644 --- a/app/chain/torrents.py +++ b/app/chain/torrents.py @@ -60,6 +60,21 @@ class TorrentsChain(ChainBase): else: return self.load_cache(self._rss_file) or {} + async def async_get_torrents(self, stype: Optional[str] = None) -> Dict[str, List[Context]]: + """ + 异步获取当前缓存的种子 + :param stype: 强制指定缓存类型,spider:爬虫缓存,rss:rss缓存 + """ + + if not stype: + stype = settings.SUBSCRIBE_MODE + + # 异步读取缓存 + if stype == 'spider': + return await self.async_load_cache(self._spider_file) or {} + else: + return await self.async_load_cache(self._rss_file) or {} + def clear_torrents(self): """ 清理种子缓存数据 @@ -69,6 +84,15 @@ class TorrentsChain(ChainBase): self.remove_cache(self._rss_file) logger.info(f'种子缓存数据清理完成') + async def async_clear_torrents(self): + """ + 异步清理种子缓存数据 + """ + logger.info(f'开始异步清理种子缓存数据 ...') + await self.async_remove_cache(self._spider_file) + await self.async_remove_cache(self._rss_file) + logger.info(f'异步种子缓存数据清理完成') + def browse(self, domain: str, keyword: Optional[str] = None, cat: Optional[str] = None, page: Optional[int] = 0) -> List[TorrentInfo]: """ diff --git a/app/chain/transfer.py b/app/chain/transfer.py index 901488ce..29958793 100755 --- a/app/chain/transfer.py +++ b/app/chain/transfer.py @@ -1,4 +1,3 @@ -import gc import queue import re import threading