Merge remote-tracking branch 'origin/aio-refactor' into aio-refactor

# Conflicts:
#	backend/src/module/parser/analyser/tmdb_parser.py
This commit is contained in:
EstrellaXD
2024-01-02 21:27:52 +08:00
16 changed files with 196 additions and 157 deletions

View File

@@ -2,4 +2,5 @@
ruff
black
pre-commit
pytest
pytest
pytest-asyncio

View File

@@ -7,8 +7,6 @@ fastapi==0.97.0
h11==0.14.0
idna==3.4
pydantic~=1.10
PySocks==1.7.1
qbittorrent-api==2023.9.53
httpx[http2,socks]==0.25.0
six==1.16.0
sniffio==1.3.0

View File

@@ -25,7 +25,7 @@ async def get_rss():
)
async def add_rss(rss: RSSItem):
with RSSEngine() as engine:
result = engine.add_rss(rss.url, rss.name, rss.aggregate, rss.parser)
result = await engine.add_rss(rss.url, rss.name, rss.aggregate, rss.parser)
return u_response(result)

View File

@@ -1,5 +1,6 @@
import threading
import time
import asyncio
from module.conf import settings
from module.downloader import DownloadClient
@@ -18,18 +19,12 @@ class RSSThread(ProgramStatus):
)
self.analyser = RSSAnalyser()
async def __loop_mission(self):
async with RSSEngine() as engine:
await engine.rss_checker(self.analyser, self.stop_event)
def rss_loop(self):
while not self.stop_event.is_set():
with DownloadClient() as client, RSSEngine() as engine:
# Analyse RSS
rss_list = engine.rss.search_aggregate()
for rss in rss_list:
self.analyser.rss_to_data(rss, engine)
# Run RSS Engine
engine.refresh_rss(client)
if settings.bangumi_manage.eps_complete:
eps_complete()
self.stop_event.wait(settings.program.rss_time)
asyncio.run(self.__loop_mission())
def rss_start(self):
self.rss_thread.start()

View File

@@ -6,6 +6,21 @@ from ..exceptions import ConflictError, AuthorizationError
logger = logging.getLogger(__name__)
QB_API_URL = {
"login": "/api/v2/auth/login",
"logout": "/api/v2/auth/logout",
"version": "/api/v2/app/version",
"setPreferences": "/api/v2/app/setPreferences",
"createCategory": "/api/v2/torrents/createCategory",
"info": "/api/v2/torrents/info",
"add": "/api/v2/torrents/add",
"delete": "/api/v2/torrents/delete",
"renameFile": "/api/v2/torrents/renameFile",
"setLocation": "/api/v2/torrents/setLocation",
"setCategory": "/api/v2/torrents/setCategory",
"addTags": "/api/v2/torrents/addTags",
}
class QbDownloader:
def __init__(self, host: str, username: str, password: str, ssl: bool):
@@ -16,33 +31,38 @@ class QbDownloader:
async def auth(self):
resp = await self._client.post(
url="/api/v2/auth/login",
url=QB_API_URL["login"],
data={"username": self.username, "password": self.password},
timeout=5,
)
return resp.text == "Ok."
async def logout(self):
logout_api = "/api/v2/auth/logout"
await self._client.post(url=logout_api, timeout=5)
resp = await self._client.post(
url=QB_API_URL["logout"],
timeout=5
)
return resp.text
async def check_host(self):
try:
await self._client.get(
url="/api/v2/app/version",
url=QB_API_URL["version"],
timeout=5
)
return True
except httpx.RequestError:
except httpx.RequestError or httpx.TimeoutException:
return False
async def prefs_init(self, prefs):
prefs_api = "/api/v2/app/setPreferences"
await self._client.post(url=prefs_api, data=prefs)
await self._client.post(
url=QB_API_URL["setPreferences"],
data=prefs
)
async def add_category(self, category):
await self._client.post(
url="/api/v2/torrents/createCategory",
url=QB_API_URL["createCategory"],
data={"category": category},
timeout=5,
)
@@ -54,7 +74,7 @@ class QbDownloader:
"tag": tag,
}
torrent_info = await self._client.get(
url="/api/v2/torrents/info",
url=QB_API_URL["info"],
params=data,
)
return torrent_info.json()
@@ -69,7 +89,7 @@ class QbDownloader:
"use_auto_torrent_management": False,
}
resp = await self._client.post(
url="/api/v2/torrents/add",
url=QB_API_URL["add"],
data=data,
)
return resp.status_code == 200
@@ -80,7 +100,7 @@ class QbDownloader:
"deleteFiles": True,
}
resp = await self._client.post(
url="/api/v2/torrents/delete",
url=QB_API_URL["delete"],
data=data,
)
return resp.status_code == 200
@@ -92,7 +112,7 @@ class QbDownloader:
"newPath": new_path,
}
resp = await self._client.post(
url="/api/v2/torrents/renameFile",
url=QB_API_URL["renameFile"],
data=data,
)
return resp.status_code == 200
@@ -103,7 +123,7 @@ class QbDownloader:
"location": new_location,
}
resp = await self._client.post(
url="/api/v2/torrents/setLocation",
url=QB_API_URL["setLocation"],
data=data,
)
return resp.status_code == 200
@@ -114,7 +134,7 @@ class QbDownloader:
"hashes": _hash,
}
resp = await self._client.post(
url="/api/v2/torrents/setCategory",
url=QB_API_URL["setCategory"],
data=data,
)
return resp.status_code == 200
@@ -125,7 +145,7 @@ class QbDownloader:
"tags": tag,
}
resp = await self._client.post(
url="/api/v2/torrents/addTags",
url=QB_API_URL["addTags"],
data=data,
)
return resp.status_code == 200
@@ -133,12 +153,14 @@ class QbDownloader:
async def __aenter__(self):
self._client = httpx.AsyncClient(
base_url=self.host,
trust_env=self.ssl,
)
while not await self.check_host():
logger.warning(f"[Downloader] Failed to connect to {self.host}, retry in 30 seconds.")
await asyncio.sleep(30)
if not await self.auth():
await self._client.aclose()
logger.error(f"[Downloader] Downloader authorize error. Please check your username/password.")
raise AuthorizationError("Failed to login to qbittorrent.")
return self

View File

@@ -11,10 +11,8 @@ logger = logging.getLogger(__name__)
def getClient():
# TODO 多下载器支持
type = settings.downloader.type
if type == "qbittorrent":
if settings.downloader.type == "qbittorrent":
from .client.qb_downloader import QbDownloader
return QbDownloader
else:
logger.error(f"[Downloader] Unsupported downloader type: {type}")

View File

@@ -0,0 +1,16 @@
from module.conf import settings
@property
def set_proxy():
auth = f"{settings.proxy.username}:{settings.proxy.password}@" \
if settings.proxy.username else \
""
if "http" in settings.proxy.type:
proxy = f"{settings.proxy.type}://{auth}{settings.proxy.host}:{settings.proxy.port}"
elif settings.proxy.type == "socks5":
proxy = f"socks5://{auth}{settings.proxy.host}:{settings.proxy.port}"
else:
proxy = None
logger.error(f"[Network] Unsupported proxy type: {settings.proxy.type}")
return proxy

View File

@@ -11,17 +11,22 @@ from .site import rss_parser
logger = logging.getLogger(__name__)
@property
def gen_filter():
return "|".join(settings.rss.filter)
class RequestContent(RequestURL):
async def get_torrents(
self,
_url: str,
_filter: str = "|".join(settings.rss_parser.filter),
_filter: str = gen_filter,
limit: int = None,
retry: int = 3,
) -> list[Torrent]:
soup = await self.get_xml(_url, retry)
if soup:
torrent_titles, torrent_urls, torrent_homepage = rss_parser(soup)
feeds = await self.get_xml(_url, retry)
if feeds:
torrent_titles, torrent_urls, torrent_homepage = rss_parser(feeds)
torrents: list[Torrent] = []
for _title, torrent_url, homepage in zip(
torrent_titles, torrent_urls, torrent_homepage
@@ -30,12 +35,9 @@ class RequestContent(RequestURL):
torrents.append(
Torrent(name=_title, url=torrent_url, homepage=homepage)
)
if isinstance(limit, int):
if len(torrents) >= limit:
break
return torrents
return torrents if limit is None else torrents[:limit]
else:
logger.warning(f"[Network] Failed to get torrents: {_url}")
logger.error(f"[Network] Torrents list is empty: {_url}")
return []
async def get_xml(self, _url, retry: int = 3) -> xml.etree.ElementTree.Element:
@@ -49,14 +51,8 @@ class RequestContent(RequestURL):
if req:
return req.json()
async def post_json(self, _url, data: dict) -> dict:
return await self.post_url(_url, data)
async def post_data(self, _url, data: dict) -> dict:
return await self.post_json(_url, data)
async def post_files(self, _url, data: dict, files: dict) -> dict:
return await self.post_form(_url, data, files)
async def post_data(self, _url, data: dict, files: dict[str, bytes]) -> dict:
return await self.post_url(_url, data, files)
async def get_html(self, _url):
req = await self.get_url(_url)

View File

@@ -1,100 +1,74 @@
import asyncio
import logging
import time
import httpx
from .proxy import set_proxy
from module.conf import settings
logger = logging.getLogger(__name__)
def retry_async(times=3):
def decorator(func):
async def wrapper(*args, **kwargs):
url = kwargs.get("url", None)
if url is None:
url = args[0]
for _ in range(times):
try:
resp = await func(*args, **kwargs)
logger.debug(f"[Network] Successfully connected to {url}")
return resp
except httpx.RequestError:
if _ < times - 1:
await asyncio.sleep(5) # 延迟5秒后重试
logger.debug(
f"[Network] Cannot connect to {url}. Wait for 5 seconds."
)
except Exception as e:
logger.debug(e)
logger.error(f"[Network] Cannot connect to {url}")
break
return None
return wrapper
return decorator
class RequestURL:
def __init__(self):
self.header = {"user-agent": "Mozilla/5.0", "Accept": "application/xml"}
self.proxy = set_proxy if settings.proxy.enable else None
async def get_url(self, url, retry=3):
try_time = 0
while True:
try:
req = await self.client.get(url=url, headers=self.header, timeout=5)
req.raise_for_status()
return req
except httpx.RequestError:
logger.warning(
f"[Network] Cannot connect to {url}. Wait for 5 seconds."
)
try_time += 1
if try_time >= retry:
break
time.sleep(5)
except Exception as e:
logger.debug(e)
break
logger.error(f"[Network] Failed connecting to {url}")
logger.warning("[Network] Please check DNS/Connection settings")
return None
@retry_async()
async def get_url(self, url):
req = await self.client.get(url=url)
req.raise_for_status()
return req
async def post_url(self, url: str, data: dict, retry=3):
try_time = 0
while True:
try:
req = await self.client.post(
url=url, headers=self.header, data=data, timeout=5
)
req.raise_for_status()
return req
except httpx.RequestError:
logger.warning(
f"[Network] Cannot connect to {url}. Wait for 5 seconds."
)
try_time += 1
if try_time >= retry:
break
time.sleep(5)
except Exception as e:
logger.debug(e)
break
logger.error(f"[Network] Failed connecting to {url}")
logger.warning("[Network] Please check DNS/Connection settings")
return None
@retry_async()
async def post_url(self, url: str, data: dict, files: dict[str, bytes] = None):
req = await self.client.post(url=url, data=data, files=files)
req.raise_for_status()
return req
async def check_url(self, url: str):
if "://" not in url:
url = f"http://{url}"
try:
req = await self.client.get(url=url, headers=self.header, timeout=5)
req = await self.client.get(url=url)
req.raise_for_status()
return True
except httpx.RequestError:
logger.debug(f"[Network] Cannot connect to {url}.")
return False
async def post_form(self, url: str, data: dict, files):
try:
req = await self.client.post(
url=url, headers=self.header, data=data, files=files, timeout=5
)
req.raise_for_status()
return req
except httpx.RequestError:
logger.warning(f"[Network] Cannot connect to {url}.")
return None
async def __aenter__(self):
proxy = None
if settings.proxy.enable:
auth = f"{settings.proxy.username}:{settings.proxy.password}@" \
if settings.proxy.username else \
""
if "http" in settings.proxy.type:
proxy = f"{settings.proxy.type}://{auth}{settings.proxy.host}:{settings.proxy.port}"
elif settings.proxy.type == "socks5":
proxy = f"socks5://{auth}{settings.proxy.host}:{settings.proxy.port}"
else:
logger.error(f"[Network] Unsupported proxy type: {settings.proxy.type}")
self.client = httpx.AsyncClient(
http2=True,
proxies=proxy,
http2=True, proxies=self.proxy, headers=self.header, timeout=5
)
return self

View File

@@ -33,9 +33,8 @@ def info_url(e, key):
async def is_animation(tv_id, language, req) -> bool:
url_info = info_url(tv_id, language)
type_id = await req.get_json(url_info)
type_id = type_id.get("genres")
for type in type_id:
type_ids = await req.get_json(url_info)
for type in type_ids["genres"]:
if type.get("id") == 16:
return True
return False
@@ -59,18 +58,18 @@ def get_season(seasons: list) -> tuple[int, str]:
async def tmdb_parser(title, language, test: bool = False) -> TMDBInfo | None:
async with RequestContent() as req:
url = search_url(title)
contents = await req.get_json(url)
contents = contents.get("results")
json_contents = await req.get_json(url)
contents = json_contents.get("results")
if contents.__len__() == 0:
url = search_url(title.replace(" ", ""))
contents = req.get_json(url).get("results")
# 判断动画
if contents:
for content in contents:
_id = content["id"]
if await is_animation(_id, language, req):
id = content["id"]
if await is_animation(id, language, req):
break
url_info = info_url(_id, language)
url_info = info_url(id, language)
info_content = await req.get_json(url_info)
season = [
{
@@ -95,7 +94,7 @@ async def tmdb_parser(title, language, test: bool = False) -> TMDBInfo | None:
else:
poster_link = None
return TMDBInfo(
_id,
id,
official_title,
original_title,
season,
@@ -108,12 +107,4 @@ async def tmdb_parser(title, language, test: bool = False) -> TMDBInfo | None:
if __name__ == "__main__":
import asyncio
async def parse(title, language):
info = await tmdb_parser(title, language)
for key, value in info.__dict__.items():
print(key, value)
asyncio.run(parse("葬送的芙莉莲", "jp"))
print(tmdb_parser("魔法禁书目录", "zh"))

View File

@@ -1,11 +1,13 @@
import logging
import asyncio
import re
from typing import Optional
from typing import Optional, Callable
from module.database import Database, engine
from module.downloader import DownloadClient
from module.models import Bangumi, ResponseModel, RSSItem, Torrent
from module.network import RequestContent
from module.conf import settings
logger = logging.getLogger(__name__)
@@ -15,6 +17,23 @@ class RSSEngine(Database):
super().__init__(_engine)
self._to_refresh = False
async def rss_checker(self, callback: Callable[[list[Torrent]], None]):
torrent_pool = []
torrent_name_pool = []
while 1:
rss_items = self.rss.search_active()
if rss_items:
for item in rss_items:
torrents = await self.pull_rss(item)
for torrent in torrents:
if torrent.name not in torrent_name_pool:
torrent_pool.append(torrent)
torrent_name_pool.append(torrent.name)
if torrent_pool:
callback(torrent_pool)
torrent_pool.clear()
await asyncio.sleep(settings.rss.interval)
@staticmethod
async def _get_torrents(rss: RSSItem) -> list[Torrent]:
async with RequestContent() as req:
@@ -94,8 +113,8 @@ class RSSEngine(Database):
msg_zh="删除 RSS 成功。",
)
def pull_rss(self, rss_item: RSSItem) -> list[Torrent]:
torrents = self._get_torrents(rss_item)
async def pull_rss(self, rss_item: RSSItem) -> list[Torrent]:
torrents = await self._get_torrents(rss_item)
new_torrents = self.torrent.check_new(torrents)
return new_torrents
@@ -110,22 +129,21 @@ class RSSEngine(Database):
return matched
return None
def refresh_rss(self, client: DownloadClient, rss_id: Optional[int] = None):
async def refresh_rss(self, client: DownloadClient, rss_id: Optional[int] = None):
# Get All RSS Items
if not rss_id:
rss_items: list[RSSItem] = self.rss.search_active()
else:
rss_item = self.rss.search_id(rss_id)
rss_items = [rss_item] if rss_item else []
rss_items = [self.rss.search_id(rss_id)]
# From RSS Items, get all torrents
logger.debug(f"[Engine] Get {len(rss_items)} RSS items")
for rss_item in rss_items:
new_torrents = self.pull_rss(rss_item)
new_torrents = await self.pull_rss(rss_item)
# Get all enabled bangumi data
for torrent in new_torrents:
matched_data = self.match_torrent(torrent)
if matched_data:
if client.add_torrent(torrent, matched_data):
if await client.add_torrent(torrent, matched_data):
logger.debug(f"[Engine] Add torrent {torrent.name} to client")
torrent.downloaded = True
# Add all torrents to database

View File

@@ -0,0 +1,24 @@
import asyncio
from typing import Callable
from module.models import RSSItem, Torrent
from module.network import RequestContent
from module.conf import settings
async def rss_checker(rss: list[RSSItem], callback: Callable[[list[Torrent]], None]):
torrent_pool = []
torrent_name_pool = []
while 1:
async with RequestContent() as req:
for item in rss:
torrents = await req.get_torrents(item.url)
for torrent in torrents:
if torrent.name not in torrent_name_pool:
torrent_pool.append(torrent)
torrent_name_pool.append(torrent.name)
if torrent_pool:
callback(torrent_pool)
torrent_pool.clear()
await asyncio.sleep(settings.rss.interval)

View File

@@ -7,8 +7,8 @@ from module.utils import save_image
from module.network import RequestContent
def from_30_to_31():
with RSSEngine() as db:
async def from_30_to_31():
async with RSSEngine() as db:
db.migrate()
# Update poster link
bangumis = db.bangumi.search_all()
@@ -29,16 +29,16 @@ def from_30_to_31():
aggregate = True
else:
aggregate = False
db.add_rss(rss_link=rss, aggregate=aggregate)
await db.add_rss(rss_link=rss, aggregate=aggregate)
def cache_image():
with RSSEngine() as db, RequestContent() as req:
async def cache_image():
async with RSSEngine() as db, RequestContent() as req:
bangumis = db.bangumi.search_all()
for bangumi in bangumis:
if bangumi.poster_link:
# Hash local path
img = req.get_content(bangumi.poster_link)
img = await req.get_content(bangumi.poster_link)
suffix = bangumi.poster_link.split(".")[-1]
img_path = save_image(img, suffix)
bangumi.poster_link = img_path

View File

@@ -46,7 +46,8 @@ def test_bangumi_database():
# match torrent
result = db.bangumi.match_torrent(
"[Lilith-Raws] 无职转生,到了异世界就拿出真本事 / Mushoku Tensei - 11 [Baha][WEB-DL][1080p][AVC AAC][CHT][MP4]"
"[Lilith-Raws] 无职转生,到了异世界就拿出真本事 / Mushoku Tensei - 11 [Baha][WEB-DL][1080p][AVC AAC][CHT][MP4]",
"test",
)
assert result.official_title == "无职转生到了异世界就拿出真本事II"

View File

@@ -1,18 +1,20 @@
import pytest
from module.rss.engine import RSSEngine
from .test_database import engine as e
@pytest.mark.asyncio
async def test_rss_engine():
with RSSEngine(e) as engine:
rss_link = "https://mikanani.me/RSS/Bangumi?bangumiId=2353&subgroupid=552"
await engine.add_rss(rss_link, aggregate=False)
resp = await engine.add_rss(rss_link, aggregate=False)
assert resp.status
result = engine.rss.search_active()
assert result[1].name == "Mikan Project - 无职转生~到了异世界就拿出真本事~"
assert result[0].name == "Mikan Project - 无职转生~到了异世界就拿出真本事~"
new_torrents = engine.pull_rss(result[1])
new_torrents = await engine.pull_rss(result[1])
torrent = new_torrents[0]
assert torrent.name == "[Lilith-Raws] 无职转生,到了异世界就拿出真本事 / Mushoku Tensei - 11 [Baha][WEB-DL][1080p][AVC AAC][CHT][MP4]"

View File

@@ -1,6 +1,9 @@
import pytest
from module.parser.analyser.tmdb_parser import tmdb_parser
@pytest.mark.asyncio
async def test_tmdb_parser():
bangumi_title = "海盗战记"
bangumi_year = "2019"