Change Data Model

This commit is contained in:
EstrellaXD
2023-04-26 18:14:17 +08:00
parent b2fc5c2bba
commit 7dc6d8d8a5
15 changed files with 127 additions and 86 deletions

View File

@@ -22,7 +22,6 @@ jobs:
with:
python-version: "3.11"
- name: Install dependencies
working-directory: ./src
run: |
python -m pip install --upgrade pip
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi

View File

@@ -8,7 +8,7 @@ ENV TZ=Asia/Shanghai \
WORKDIR /app
COPY src/requirements.txt .
COPY requirements.txt .
RUN python3 -m pip install --upgrade pip \
&& pip install -r requirements.txt --no-cache-dir

View File

@@ -50,9 +50,10 @@ def reset_rule():
return api_func.reset_rule()
@router.get("api/v1/removeRule/{bangumi_title}")
def remove_rule(bangumi_title: str):
return api_func.remove_rule(bangumi_title)
@router.get("api/v1/removeRule/{bangumi_id}")
def remove_rule(bangumi_id: str):
bangumi_id = int(bangumi_id)
return api_func.remove_rule(bangumi_id)
@router.post("/api/v1/collection", tags=["download"])

View File

@@ -3,11 +3,12 @@ import time
import logging
from module.conf import settings, setup_logger, LOG_PATH, DATA_PATH, RSSLink, VERSION
from module.utils import json_config
from module.utils import load_program_data, save_program_data, json_config
from module.core import DownloadClient
from module.manager import Renamer, FullSeasonGet
from module.rss import RSSAnalyser
from module.models import ProgramData
logger = logging.getLogger(__name__)
@@ -19,42 +20,35 @@ def reset_log():
os.remove(LOG_PATH)
def load_data_file():
def load_data_file() -> ProgramData:
empty_data = ProgramData(
rss_link=RSS_LINK,
data_version=settings.data_version,
)
if not os.path.exists(DATA_PATH):
bangumi_data = {
"rss_link": RSS_LINK,
"data_version": settings.data_version,
"bangumi_info": []
}
program_data = empty_data
save_program_data(DATA_PATH, program_data)
logger.info("Building data information...")
else:
bangumi_data = json_config.load(DATA_PATH)
if bangumi_data["data_version"] != settings.data_version or bangumi_data["rss_link"] != RSS_LINK:
bangumi_data = {
"rss_link": RSS_LINK,
"data_version": settings.data_version,
"bangumi_info": []
}
program_data = load_program_data(DATA_PATH)
if program_data.rss_link != RSS_LINK or program_data.data_version != settings.data_version:
program_data = empty_data
logger.info("Rebuilding data information...")
return bangumi_data
return program_data
def save_data_file(bangumi_data):
json_config.save(DATA_PATH, bangumi_data)
logger.debug("Saved")
def main_process(bangumi_data, download_client: DownloadClient):
def main_process(program_data: ProgramData, download_client: DownloadClient):
rename = Renamer(download_client)
rss_analyser = RSSAnalyser()
while True:
times = 0
if settings.rss_parser.enable:
rss_analyser.run(bangumi_data["bangumi_info"], download_client)
if settings.bangumi_manage.eps_complete and bangumi_data["bangumi_info"] != []:
FullSeasonGet().eps_complete(bangumi_data["bangumi_info"], download_client)
rss_analyser.run(program_data.bangumi_info, download_client)
if settings.bangumi_manage.eps_complete and program_data.bangumi_info != []:
FullSeasonGet().eps_complete(program_data.bangumi_info, download_client)
logger.info("Running....")
save_data_file(bangumi_data)
save_program_data(DATA_PATH, program_data)
while times < settings.program.rename_times:
if settings.bangumi_manage.enable:
rename.rename()

View File

@@ -1,13 +1,12 @@
from .log import setup_logger, LOG_PATH
from .config import settings, VERSION
import re
TMDB_API = "32b19d6a05b512190a056fa4e747cbbc"
DATA_PATH = "data/data.json"
class RSSLink(str):
def __new__(cls):
if "://" not in settings.rss_parser.custom_url:

View File

@@ -37,7 +37,7 @@ class APIProcess:
if not self._client.authed:
self._client.auth()
data = self.link_process(link)
self._client.add_rss_feed(link, data.get("official_title"))
self._client.add_rss_feed(link, data.official_title)
self._client.set_rule(data, link)
return data
@@ -49,14 +49,15 @@ class APIProcess:
return "Success"
@staticmethod
def remove_rule(name):
def remove_rule(_id: int):
datas = json_config.load(DATA_PATH)["bangumi_info"]
for data in datas:
if re.search(name.lower(), data["title_raw"].lower()):
if data["id"] == _id:
datas.remove(data)
json_config.save(DATA_PATH, datas)
return "Success"
return "Not matched"
break
json_config.save(DATA_PATH, datas)
return "Success"
@staticmethod
def add_rule(title, season):

View File

@@ -3,8 +3,8 @@ import logging
import os
from module.downloader import getClient
from module.conf import settings, RSSLink
from module.models import BangumiData
RSS_LINK = RSSLink()
@@ -36,8 +36,8 @@ class DownloadClient:
prefs = self.client.get_app_prefs()
settings.downloader.path = os.path.join(prefs["save_path"], "Bangumi")
def set_rule(self, info: dict, rss_link):
official_name, raw_name, season, group = info["official_title"], info["title_raw"], info["season"], info["group"]
def set_rule(self, info: BangumiData, rss_link):
official_name, raw_name, season, group = info.official_title, info.title_raw, info.season, info.group
rule = {
"enable": True,
"mustContain": raw_name,
@@ -76,13 +76,12 @@ class DownloadClient:
self.client.rss_add_feed(url=rss_link, item_path=item_path)
logger.info("Add RSS Feed successfully.")
def add_rules(self, bangumi_info, rss_link=RSS_LINK):
def add_rules(self, bangumi_info: list[BangumiData], rss_link=RSS_LINK):
logger.debug("Start adding rules.")
for info in bangumi_info:
if not info["added"]:
if not info.added:
self.set_rule(info, rss_link)
info["added"] = True
# logger.info("to rule.")
info.added = True
logger.debug("Finished.")
def get_torrent_info(self, category="Bangumi"):

View File

@@ -6,6 +6,7 @@ from module.conf import settings
from module.network import RequestContent
from module.core.download_client import DownloadClient
from module.models import BangumiData
logger = logging.getLogger(__name__)
SEARCH_KEY = ["group", "title_raw", "season_raw", "subtitle", "source", "dpi"]
@@ -17,51 +18,55 @@ class FullSeasonGet:
pass
@staticmethod
def init_eps_complete_search_str(data: dict):
test = [data.get(key).strip() for key in SEARCH_KEY if data.get(key) is not None]
def init_eps_complete_search_str(data: BangumiData):
test = []
for key in SEARCH_KEY:
data_dict = data.dict()
if data_dict[key] is not None:
test.append(data_dict[key])
search_str_pre = "+".join(test)
search_str = re.sub(r"[\W_ ]", "+", search_str_pre)
return search_str
def get_season_torrents(self, data: dict):
def get_season_torrents(self, data: BangumiData):
keyword = self.init_eps_complete_search_str(data)
with RequestContent() as req:
torrents = req.get_torrents(f"{CUSTOM_URL}/RSS/Search?searchstr={keyword}")
return torrents
@staticmethod
def collect_season_torrents(data: dict, torrents):
def collect_season_torrents(data: BangumiData, torrents):
downloads = []
for torrent in torrents:
download_info = {
"url": torrent.torrent_link,
"save_path": os.path.join(
settings.downloader.path,
data["official_title"],
f"Season {data['season']}")
data.official_title,
f"Season {data.season}")
}
downloads.append(download_info)
return downloads
def download_eps(self, data, download_client: DownloadClient):
logger.info(f"Start collecting {data['official_title']} Season {data['season']}...")
def download_eps(self, data: BangumiData, download_client: DownloadClient):
logger.info(f"Start collecting {data.official_title} Season {data.season}...")
torrents = self.get_season_torrents(data)
downloads = self.collect_season_torrents(data, torrents)
for download in downloads:
download_client.add_torrent(download)
logger.info("Completed!")
data["eps_collect"] = False
data.eps_collect = False
def eps_complete(self, bangumi_info, download_client: DownloadClient):
def eps_complete(self, bangumi_info: list[BangumiData], download_client: DownloadClient):
for data in bangumi_info:
if data["eps_collect"]:
if data.eps_collect:
self.download_eps(data, download_client)
def download_collection(self, data, link, download_client: DownloadClient):
def download_collection(self, data: BangumiData, link, download_client: DownloadClient):
with RequestContent() as req:
torrents = req.get_torrents(link)
downloads = self.collect_season_torrents(data, torrents)
logger.info(f"Starting download {data.get('official_title')}")
logger.info(f"Starting download {data.official_title} Season {data.season}...")
for download in downloads:
download_client.add_torrent(download)
logger.info("Completed!")

View File

@@ -5,7 +5,7 @@ from dataclasses import dataclass
class BangumiData(BaseModel):
id: int = Field(..., alias="id", title="番剧ID")
official_title: str = Field(..., alias="official_title", title="番剧中文名")
year: int = Field(..., alias="year", title="番剧年份")
year: int | None = Field(None, alias="year", title="番剧年份")
title_raw: str = Field(..., alias="title_raw", title="番剧原名")
season: int = Field(..., alias="season", title="番剧季度")
season_raw: str = Field(..., alias="season_raw", title="番剧季度原名")
@@ -19,6 +19,11 @@ class BangumiData(BaseModel):
filter: list[str] = Field(..., alias="filter", title="番剧过滤器")
class ProgramData(BaseModel):
rss_link: str = Field(..., alias="rss_link", title="RSS链接")
data_version: float = Field(..., alias="data_version", title="数据版本")
bangumi_info: list[BangumiData] = Field([], alias="bangumi_info", title="番剧信息")
@dataclass
class MatchRule:

View File

@@ -57,7 +57,7 @@ class Notification(BaseModel):
class Config(BaseModel):
data_version: float = Field(4.0, description="Data version")
data_version: float = Field(5.0, description="Data version")
program: Program = Program()
downloader: Downloader = Downloader()
rss_parser: RSSParser = RSSParser()

View File

@@ -3,7 +3,7 @@ import logging
from .analyser import RawParser, DownloadParser, TMDBMatcher
from module.conf import settings
from module.models import SeasonInfo
from module.models import BangumiData
logger = logging.getLogger(__name__)
LANGUAGE = settings.rss_parser.language
@@ -46,7 +46,7 @@ class TitleParser:
official_title = official_title if official_title else title
return official_title, tmdb_season
def return_dict(self, _raw: str) -> dict:
def return_data(self, _raw: str, _id: int) -> BangumiData:
try:
episode = self.raw_parser(_raw)
title_search = episode.title_zh if episode.title_zh else episode.title_en
@@ -56,20 +56,21 @@ class TitleParser:
else:
official_title = title_search if LANGUAGE == "zh" else title_raw
_season = episode.season
data = {
"official_title": official_title,
"title_raw": title_raw,
"season": _season,
"season_raw": episode.season_raw,
"group": episode.group,
"dpi": episode.resolution,
"source": episode.source,
"subtitle": episode.sub,
"added": False,
"eps_collect": True if episode.episode > 1 else False,
"offset": 0,
"filter": settings.rss_parser.filter
}
data = BangumiData(
id=_id,
official_title=official_title,
title_raw=title_raw,
season=_season,
season_raw=episode.season_raw,
group=episode.group,
dpi=episode.resolution,
source=episode.source,
subtitle=episode.sub,
added=False,
eps_collect=True if episode.episode > 1 else False,
offset=0,
filter=settings.rss_parser.filter
)
logger.debug(f"RAW:{_raw} >> {episode.title_en}")
return data
except Exception as e:

View File

@@ -2,10 +2,9 @@ import re
import logging
from module.network import RequestContent
from module.parser import TitleParser
from module.conf import RSSLink
from module.core import DownloadClient
from module.models import BangumiData
logger = logging.getLogger(__name__)
RSS_LINK = RSSLink()
@@ -15,35 +14,46 @@ class RSSAnalyser:
def __init__(self):
self._title_analyser = TitleParser()
def rss_to_datas(self, bangumi_info: list) -> list:
def find_id(self, bangumi_info: list[BangumiData]) -> int:
_id = 0
for info in bangumi_info:
if info.id > _id:
_id = info.id
return _id
def rss_to_datas(self, bangumi_info: list[BangumiData]) -> list[BangumiData]:
with RequestContent() as req:
rss_torrents = req.get_torrents(RSS_LINK)
# Find largest bangumi id
_id = self.find_id(bangumi_info)
for torrent in rss_torrents:
raw_title = torrent.name
extra_add = True
if bangumi_info is not []:
for d in bangumi_info:
if re.search(d["title_raw"], raw_title) is not None:
logger.debug(f"Had added {d['title_raw']} in auto_download rule before")
for info in bangumi_info:
if re.search(info.title_raw, raw_title) is not None:
logger.debug(f"Had added {info.official_title} in auto_download rule before")
extra_add = False
break
if extra_add:
data = self._title_analyser.return_dict(raw_title)
if data is not None and data["official_title"] not in bangumi_info:
_id += 1
data = self._title_analyser.return_data(raw_title, _id)
if data is not None and data.official_title not in bangumi_info:
bangumi_info.append(data)
return bangumi_info
def rss_to_data(self, url, filter: bool = True) -> dict:
def rss_to_data(self, url, filter: bool = True) -> BangumiData:
with RequestContent() as req:
rss_torrents = req.get_torrents(url, filter)
for torrent in rss_torrents:
try:
data = self._title_analyser.return_dict(torrent.name)
data = self._title_analyser.return_data(torrent.name, 9999)
return data
except Exception as e:
logger.debug(e)
def run(self, bangumi_info: list, download_client: DownloadClient):
def run(self, bangumi_info: list[BangumiData], download_client: DownloadClient):
logger.info("Start collecting RSS info.")
try:
self.rss_to_datas(bangumi_info)

View File

@@ -0,0 +1 @@
from .bangumi_data import load_program_data, save_program_data

View File

@@ -0,0 +1,26 @@
import logging
from .json_config import save, load
from module.models import ProgramData
logger = logging.getLogger(__name__)
def load_program_data(path: str) -> ProgramData:
data = load(path)
try:
data = ProgramData(**data)
logger.info("Data file loaded")
except TypeError:
logger.warning("Data file is not compatible with the current version, rebuilding...")
data = ProgramData(
rss_link=data["rss_link"],
data_version=data["data_version"],
bangumi_info=[]
)
return data
def save_program_data(path: str, data: ProgramData):
save(path, data.dict())
logger.debug("Data file saved")