fix async apis

This commit is contained in:
jxxghp
2025-08-01 20:27:22 +08:00
parent cc69d3b8d1
commit e32b6e07b4
7 changed files with 79 additions and 44 deletions

View File

@@ -9,14 +9,14 @@ from app.core.config import settings
from app.core.context import MediaInfo
from app.core.metainfo import MetaInfo
from app.db.models import User
from app.db.user_oper import get_current_active_superuser
from app.db.user_oper import get_current_active_superuser, get_current_active_superuser_async
from app.utils.crypto import HashUtils
router = APIRouter()
@router.get("/cache", summary="获取种子缓存", response_model=schemas.Response)
def torrents_cache(_: User = Depends(get_current_active_superuser)):
async def torrents_cache(_: User = Depends(get_current_active_superuser_async)):
"""
获取当前种子缓存数据
"""
@@ -24,9 +24,9 @@ def torrents_cache(_: User = Depends(get_current_active_superuser)):
# 获取spider和rss两种缓存
if settings.SUBSCRIBE_MODE == "rss":
cache_info = torrents_chain.get_torrents("rss")
cache_info = await torrents_chain.async_get_torrents("rss")
else:
cache_info = torrents_chain.get_torrents("spider")
cache_info = await torrents_chain.async_get_torrents("spider")
# 统计信息
torrent_count = sum(len(torrents) for torrents in cache_info.values())
@@ -62,9 +62,8 @@ def torrents_cache(_: User = Depends(get_current_active_superuser)):
})
@router.delete("/cache/{domain}/{torrent_hash}", summary="删除指定种子缓存",
response_model=schemas.Response)
def delete_cache(domain: str, torrent_hash: str, _: User = Depends(get_current_active_superuser)):
@router.delete("/cache/{domain}/{torrent_hash}", summary="删除指定种子缓存", response_model=schemas.Response)
async def delete_cache(domain: str, torrent_hash: str, _: User = Depends(get_current_active_superuser_async)):
"""
删除指定的种子缓存
:param domain: 站点域名
@@ -76,7 +75,7 @@ def delete_cache(domain: str, torrent_hash: str, _: User = Depends(get_current_a
try:
# 获取当前缓存
cache_data = torrents_chain.get_torrents()
cache_data = await torrents_chain.async_get_torrents()
if domain not in cache_data:
return schemas.Response(success=False, message=f"站点 {domain} 缓存不存在")
@@ -92,7 +91,7 @@ def delete_cache(domain: str, torrent_hash: str, _: User = Depends(get_current_a
return schemas.Response(success=False, message="未找到指定的种子")
# 保存更新后的缓存
torrents_chain.save_cache(cache_data, torrents_chain.cache_file)
await torrents_chain.async_save_cache(cache_data, torrents_chain.cache_file)
return schemas.Response(success=True, message="种子删除成功")
except Exception as e:
@@ -100,14 +99,14 @@ def delete_cache(domain: str, torrent_hash: str, _: User = Depends(get_current_a
@router.delete("/cache", summary="清理种子缓存", response_model=schemas.Response)
def clear_cache(_: User = Depends(get_current_active_superuser)):
async def clear_cache(_: User = Depends(get_current_active_superuser_async)):
"""
清理所有种子缓存
"""
torrents_chain = TorrentsChain()
try:
torrents_chain.clear_torrents()
await torrents_chain.async_clear_torrents()
return schemas.Response(success=True, message="种子缓存清理完成")
except Exception as e:
return schemas.Response(success=False, message=f"清理失败:{str(e)}")
@@ -135,9 +134,9 @@ def refresh_cache(_: User = Depends(get_current_active_superuser)):
@router.post("/cache/reidentify/{domain}/{torrent_hash}", summary="重新识别种子", response_model=schemas.Response)
def reidentify_cache(domain: str, torrent_hash: str,
async def reidentify_cache(domain: str, torrent_hash: str,
tmdbid: Optional[int] = None, doubanid: Optional[str] = None,
_: User = Depends(get_current_active_superuser)):
_: User = Depends(get_current_active_superuser_async)):
"""
重新识别指定的种子
:param domain: 站点域名
@@ -152,7 +151,7 @@ def reidentify_cache(domain: str, torrent_hash: str,
try:
# 获取当前缓存
cache_data = torrents_chain.get_torrents()
cache_data = await torrents_chain.async_get_torrents()
if domain not in cache_data:
return schemas.Response(success=False, message=f"站点 {domain} 缓存不存在")
@@ -168,14 +167,13 @@ def reidentify_cache(domain: str, torrent_hash: str,
return schemas.Response(success=False, message="未找到指定的种子")
# 重新识别
meta = MetaInfo(title=target_context.torrent_info.title,
subtitle=target_context.torrent_info.description)
meta = MetaInfo(title=target_context.torrent_info.title, subtitle=target_context.torrent_info.description)
if tmdbid or doubanid:
# 手动指定媒体信息
mediainfo = MediaChain().recognize_media(meta=meta, tmdbid=tmdbid, doubanid=doubanid)
mediainfo = await media_chain.async_recognize_media(meta=meta, tmdbid=tmdbid, doubanid=doubanid)
else:
# 自动重新识别
mediainfo = media_chain.recognize_by_meta(meta)
mediainfo = await media_chain.async_recognize_by_meta(meta)
if not mediainfo:
# 创建空的媒体信息
@@ -188,7 +186,7 @@ def reidentify_cache(domain: str, torrent_hash: str,
target_context.media_info = mediainfo
# 保存更新后的缓存
torrents_chain.save_cache(cache_data, TorrentsChain().cache_file)
await torrents_chain.async_save_cache(cache_data, TorrentsChain().cache_file)
return schemas.Response(success=True, message="重新识别完成", data={
"media_name": mediainfo.title if mediainfo else "",

View File

@@ -59,7 +59,7 @@ def query_name(path: str, filetype: str,
@router.get("/queue", summary="查询整理队列", response_model=List[schemas.TransferJob])
def query_queue(_: schemas.TokenPayload = Depends(verify_token)) -> Any:
async def query_queue(_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
查询整理队列
:param _: Token校验
@@ -68,7 +68,7 @@ def query_queue(_: schemas.TokenPayload = Depends(verify_token)) -> Any:
@router.delete("/queue", summary="从整理队列中删除任务", response_model=schemas.Response)
def remove_queue(fileitem: schemas.FileItem, _: schemas.TokenPayload = Depends(verify_token)) -> Any:
async def remove_queue(fileitem: schemas.FileItem, _: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
查询整理队列
:param fileitem: 文件项

View File

@@ -1,6 +1,7 @@
from typing import Any, List, Annotated
from fastapi import APIRouter, HTTPException, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import Session
from app import schemas
@@ -9,7 +10,7 @@ from app.chain.tvdb import TvdbChain
from app.chain.subscribe import SubscribeChain
from app.core.metainfo import MetaInfo
from app.core.security import verify_apikey
from app.db import get_db
from app.db import get_db, get_async_db
from app.db.models.subscribe import Subscribe
from app.schemas import RadarrMovie, SonarrSeries
from app.schemas.types import MediaType
@@ -19,7 +20,7 @@ arr_router = APIRouter(tags=['servarr'])
@arr_router.get("/system/status", summary="系统状态")
def arr_system_status(_: Annotated[str, Depends(verify_apikey)]) -> Any:
async def arr_system_status(_: Annotated[str, Depends(verify_apikey)]) -> Any:
"""
模拟Radarr、Sonarr系统状态
"""
@@ -73,7 +74,7 @@ def arr_system_status(_: Annotated[str, Depends(verify_apikey)]) -> Any:
@arr_router.get("/qualityProfile", summary="质量配置")
def arr_qualityProfile(_: Annotated[str, Depends(verify_apikey)]) -> Any:
async def arr_qualityProfile(_: Annotated[str, Depends(verify_apikey)]) -> Any:
"""
模拟Radarr、Sonarr质量配置
"""
@@ -114,7 +115,7 @@ def arr_qualityProfile(_: Annotated[str, Depends(verify_apikey)]) -> Any:
@arr_router.get("/rootfolder", summary="根目录")
def arr_rootfolder(_: Annotated[str, Depends(verify_apikey)]) -> Any:
async def arr_rootfolder(_: Annotated[str, Depends(verify_apikey)]) -> Any:
"""
模拟Radarr、Sonarr根目录
"""
@@ -130,7 +131,7 @@ def arr_rootfolder(_: Annotated[str, Depends(verify_apikey)]) -> Any:
@arr_router.get("/tag", summary="标签")
def arr_tag(_: Annotated[str, Depends(verify_apikey)]) -> Any:
async def arr_tag(_: Annotated[str, Depends(verify_apikey)]) -> Any:
"""
模拟Radarr、Sonarr标签
"""
@@ -143,7 +144,7 @@ def arr_tag(_: Annotated[str, Depends(verify_apikey)]) -> Any:
@arr_router.get("/languageprofile", summary="语言")
def arr_languageprofile(_: Annotated[str, Depends(verify_apikey)]) -> Any:
async def arr_languageprofile(_: Annotated[str, Depends(verify_apikey)]) -> Any:
"""
模拟Radarr、Sonarr语言
"""
@@ -169,7 +170,7 @@ def arr_languageprofile(_: Annotated[str, Depends(verify_apikey)]) -> Any:
@arr_router.get("/movie", summary="所有订阅电影", response_model=List[schemas.RadarrMovie])
def arr_movies(_: Annotated[str, Depends(verify_apikey)], db: Session = Depends(get_db)) -> Any:
async def arr_movies(_: Annotated[str, Depends(verify_apikey)], db: AsyncSession = Depends(get_async_db)) -> Any:
"""
查询Rardar电影
"""
@@ -240,7 +241,7 @@ def arr_movies(_: Annotated[str, Depends(verify_apikey)], db: Session = Depends(
"""
# 查询所有电影订阅
result = []
subscribes = Subscribe.list(db)
subscribes = await Subscribe.async_list(db)
for subscribe in subscribes:
if subscribe.type != MediaType.MOVIE.value:
continue
@@ -306,11 +307,11 @@ def arr_movie_lookup(term: str, _: Annotated[str, Depends(verify_apikey)], db: S
@arr_router.get("/movie/{mid}", summary="电影订阅详情", response_model=schemas.RadarrMovie)
def arr_movie(mid: int, _: Annotated[str, Depends(verify_apikey)], db: Session = Depends(get_db)) -> Any:
async def arr_movie(mid: int, _: Annotated[str, Depends(verify_apikey)], db: AsyncSession = Depends(get_async_db)) -> Any:
"""
查询Rardar电影订阅
"""
subscribe = Subscribe.get(db, mid)
subscribe = await Subscribe.async_get(db, mid)
if subscribe:
return RadarrMovie(
id=subscribe.id,
@@ -363,13 +364,13 @@ def arr_add_movie(_: Annotated[str, Depends(verify_apikey)],
@arr_router.delete("/movie/{mid}", summary="删除电影订阅", response_model=schemas.Response)
def arr_remove_movie(mid: int, _: Annotated[str, Depends(verify_apikey)], db: Session = Depends(get_db)) -> Any:
async def arr_remove_movie(mid: int, _: Annotated[str, Depends(verify_apikey)], db: AsyncSession = Depends(get_async_db)) -> Any:
"""
删除Rardar电影订阅
"""
subscribe = Subscribe.get(db, mid)
subscribe = await Subscribe.async_get(db, mid)
if subscribe:
subscribe.delete(db, mid)
await subscribe.async_delete(db, mid)
return schemas.Response(success=True)
else:
raise HTTPException(
@@ -379,7 +380,7 @@ def arr_remove_movie(mid: int, _: Annotated[str, Depends(verify_apikey)], db: Se
@arr_router.get("/series", summary="所有剧集", response_model=List[schemas.SonarrSeries])
def arr_series(_: Annotated[str, Depends(verify_apikey)], db: Session = Depends(get_db)) -> Any:
async def arr_series(_: Annotated[str, Depends(verify_apikey)], db: AsyncSession = Depends(get_async_db)) -> Any:
"""
查询Sonarr剧集
"""
@@ -487,7 +488,7 @@ def arr_series(_: Annotated[str, Depends(verify_apikey)], db: Session = Depends(
"""
# 查询所有电视剧订阅
result = []
subscribes = Subscribe.list(db)
subscribes = await Subscribe.async_list(db)
for subscribe in subscribes:
if subscribe.type != MediaType.TV.value:
continue
@@ -605,11 +606,11 @@ def arr_series_lookup(term: str, _: Annotated[str, Depends(verify_apikey)], db:
@arr_router.get("/series/{tid}", summary="剧集详情")
def arr_serie(tid: int, _: Annotated[str, Depends(verify_apikey)], db: Session = Depends(get_db)) -> Any:
async def arr_serie(tid: int, _: Annotated[str, Depends(verify_apikey)], db: AsyncSession = Depends(get_async_db)) -> Any:
"""
查询Sonarr剧集
"""
subscribe = Subscribe.get(db, tid)
subscribe = await Subscribe.async_get(db, tid)
if subscribe:
return SonarrSeries(
id=subscribe.id,
@@ -691,13 +692,13 @@ def arr_update_series(tv: schemas.SonarrSeries) -> Any:
@arr_router.delete("/series/{tid}", summary="删除剧集订阅")
def arr_remove_series(tid: int, _: Annotated[str, Depends(verify_apikey)], db: Session = Depends(get_db)) -> Any:
async def arr_remove_series(tid: int, _: Annotated[str, Depends(verify_apikey)], db: AsyncSession = Depends(get_async_db)) -> Any:
"""
删除Sonarr剧集订阅
"""
subscribe = Subscribe.get(db, tid)
subscribe = await Subscribe.async_get(db, tid)
if subscribe:
subscribe.delete(db, tid)
await subscribe.async_delete(db, tid)
return schemas.Response(success=True)
else:
raise HTTPException(

View File

@@ -51,12 +51,12 @@ cookie_router = APIRouter(route_class=GzipRoute,
@cookie_router.get("/", response_class=PlainTextResponse)
def get_root():
async def get_root():
return "Hello MoviePilot! COOKIECLOUD API ROOT = /cookiecloud"
@cookie_router.post("/", response_class=PlainTextResponse)
def post_root():
async def post_root():
return "Hello MoviePilot! COOKIECLOUD API ROOT = /cookiecloud"

View File

@@ -8,6 +8,7 @@ from pathlib import Path
from typing import Optional, Any, Tuple, List, Set, Union, Dict
import aiofiles
from aiopath import AsyncPath
from qbittorrentapi import TorrentFilesList
from transmission_rpc import File
@@ -106,6 +107,18 @@ class ChainBase(metaclass=ABCMeta):
if cache_path.exists():
cache_path.unlink()
@staticmethod
async def async_remove_cache(filename: str) -> None:
"""
异步删除本地缓存
"""
cache_path = AsyncPath(settings.TEMP_PATH) / filename
if await cache_path.exists():
try:
await cache_path.unlink()
except Exception as err:
logger.error(f"异步删除缓存 {filename} 出错:{str(err)}")
@staticmethod
def __is_valid_empty(ret):
"""

View File

@@ -60,6 +60,21 @@ class TorrentsChain(ChainBase):
else:
return self.load_cache(self._rss_file) or {}
async def async_get_torrents(self, stype: Optional[str] = None) -> Dict[str, List[Context]]:
"""
异步获取当前缓存的种子
:param stype: 强制指定缓存类型spider:爬虫缓存rss:rss缓存
"""
if not stype:
stype = settings.SUBSCRIBE_MODE
# 异步读取缓存
if stype == 'spider':
return await self.async_load_cache(self._spider_file) or {}
else:
return await self.async_load_cache(self._rss_file) or {}
def clear_torrents(self):
"""
清理种子缓存数据
@@ -69,6 +84,15 @@ class TorrentsChain(ChainBase):
self.remove_cache(self._rss_file)
logger.info(f'种子缓存数据清理完成')
async def async_clear_torrents(self):
"""
异步清理种子缓存数据
"""
logger.info(f'开始异步清理种子缓存数据 ...')
await self.async_remove_cache(self._spider_file)
await self.async_remove_cache(self._rss_file)
logger.info(f'异步种子缓存数据清理完成')
def browse(self, domain: str, keyword: Optional[str] = None, cat: Optional[str] = None,
page: Optional[int] = 0) -> List[TorrentInfo]:
"""

View File

@@ -1,4 +1,3 @@
import gc
import queue
import re
import threading