- refactor
- change config from config.ini to config.json
This commit is contained in:
EstrellaXD
2023-03-04 20:18:31 +08:00
parent a7735ac366
commit f31acc70ff
49 changed files with 242 additions and 455 deletions

View File

@@ -0,0 +1 @@
from .title_parser import TitleParser

View File

@@ -0,0 +1,4 @@
from .raw_parser import RawParser
from .rename_parser import DownloadParser
from .tmdb_parser import TMDBMatcher

View File

@@ -0,0 +1,17 @@
from module.network import RequestContent
class BgmAPI:
def __init__(self):
self.search_url = lambda e: \
f"https://api.bgm.tv/search/subject/{e}?type=2"
self.info_url = lambda e: \
f"https://api.bgm.tv/subject/{e}"
self._request = RequestContent()
def search(self, title):
url = self.search_url(title)
contents = self._request.get_json(url)["list"]
if contents.__len__() == 0:
return None
return contents[0]["name"], contents[0]["name_cn"]

View File

@@ -0,0 +1,168 @@
import logging
import re
from dataclasses import dataclass
# from parser.episode import Episode
logger = logging.getLogger(__name__)
EPISODE_RE = re.compile(r"\d+")
TITLE_RE = re.compile(
r"(.*|\[.*])( -? \d+|\[\d+]|\[\d+.?[vV]\d{1}]|[第]\d+[话話集]|\[\d+.?END])(.*)"
)
RESOLUTION_RE = re.compile(r"1080|720|2160|4K")
SOURCE_RE = re.compile(r"B-Global|[Bb]aha|[Bb]ilibili|AT-X|Web")
SUB_RE = re.compile(r"[简繁日字幕]|CH|BIG5|GB")
CHINESE_NUMBER_MAP = {
"": 1,
"": 2,
"": 3,
"": 4,
"": 5,
"": 6,
"": 7,
"": 8,
"": 9,
"": 10,
}
@dataclass
class Episode:
title_en: str or None
title_zh: str or None
title_jp: str or None
season: int
season_raw: str
episode: int
sub: str
group: str
resolution: str
source: str
class RawParser:
@staticmethod
def get_group(name: str) -> str:
return re.split(r"[\[\]]", name)[1]
@staticmethod
def pre_process(raw_name: str) -> str:
return raw_name.replace("", "[").replace("", "]")
@staticmethod
def season_process(season_info: str):
if re.search(r"新番|月?番", season_info):
name_season = re.sub(".*新番.", "", season_info)
else:
name_season = re.sub(r"^[^]】]*[]】]", "", season_info).strip()
season_rule = r"S\d{1,2}|Season \d{1,2}|[第].[季期]"
name_season = re.sub(r"[\[\]]", " ", name_season)
seasons = re.findall(season_rule, name_season)
if not seasons:
return name_season, "", 1
name = re.sub(season_rule, "", name_season)
for season in seasons:
season_raw = season
if re.search(r"S|Season", season) is not None:
season = int(re.sub(r"S|Season", "", season))
break
elif re.search(r"[第 ].*[季期]", season) is not None:
season_pro = re.sub(r"[第季期 ]", "", season)
try:
season = int(season_pro)
except ValueError:
season = CHINESE_NUMBER_MAP[season_pro]
break
return name, season_raw, season
@staticmethod
def name_process(name: str):
name_en, name_zh, name_jp = None, None, None
name = name.strip()
name = re.sub(r"[(]仅限港澳台地区[)]", "", name)
split = re.split("/|\s{2}|-\s{2}", name)
while "" in split:
split.remove("")
if len(split) == 1:
if re.search("_{1}", name) is not None:
split = re.split("_", name)
elif re.search(" - {1}", name) is not None:
split = re.split("-", name)
if len(split) == 1:
split_space = name.split(" ")
for idx, item in enumerate(split_space):
if re.search(r"^[\u4e00-\u9fa5]{2,}", item) is not None:
split_space.remove(item)
split = [item.strip(), " ".join(split_space).strip()]
break
for item in split:
if re.search(r"[\u0800-\u4e00]{2,}", item) and not name_jp:
name_jp = item.strip()
elif re.search(r"[\u4e00-\u9fa5]{2,}", item) and not name_zh:
name_zh = item.strip()
elif re.search(r"[a-zA-Z]{3,}", item) and not name_en:
name_en = item.strip()
return name_en, name_zh, name_jp
@staticmethod
def find_tags(other):
elements = re.sub(r"[\[\]()]", " ", other).split(" ")
# find CHT
sub, resolution, source = None, None, None
for element in filter(lambda x: x != "", elements):
if SUB_RE.search(element):
sub = element
elif RESOLUTION_RE.search(element):
resolution = element
elif SOURCE_RE.search(element):
source = element
return RawParser.clean_sub(sub), resolution, source
@staticmethod
def clean_sub(sub: str | None) -> str | None:
if sub is None:
return sub
return re.sub(r"_MP4|_MKV", "", sub)
def process(self, raw_title: str):
raw_title = raw_title.strip()
content_title = self.pre_process(raw_title) # 预处理标题
group = self.get_group(content_title) # 翻译组的名字
match_obj = TITLE_RE.match(content_title) # 处理标题
season_info, episode_info, other = list(map(
lambda x: x.strip(), match_obj.groups()
))
raw_name, season_raw, season = self.season_process(season_info) # 处理 第n季
name_en, name_zh, name_jp = "", "", ""
try:
name_en, name_zh, name_jp = self.name_process(raw_name) # 处理 名字
except ValueError:
pass
# 处理 集数
raw_episode = EPISODE_RE.search(episode_info)
episode = 0
if raw_episode is not None:
episode = int(raw_episode.group())
sub, dpi, source = self.find_tags(other) # 剩余信息处理
return name_en, name_zh, name_jp, season, season_raw, episode, sub, dpi, source, group
def analyse(self, raw: str) -> Episode or None:
try:
ret = self.process(raw)
if ret is None:
return None
name_en, name_zh, name_jp, season, sr, episode, \
sub, dpi, source, group = ret
except Exception as e:
logger.error(f"Parser cannot analyse {raw} {e}")
return None
return Episode(name_en, name_zh, name_jp, season, sr, episode, sub, group, dpi, source)
if __name__ == "__main__":
test = RawParser()
test_txt = "[梦蓝字幕组]New Doraemon 哆啦A梦新番[716][2022.07.23][AVC][10080P][GB_JP]"
ep = test.analyse(test_txt)
print(f"en:{ep.title_en}, zh:{ep.title_zh}, jp:{ep.title_jp}, group:{ep.group}")

View File

@@ -0,0 +1,102 @@
import re
import logging
from dataclasses import dataclass
logger = logging.getLogger(__name__)
@dataclass
class DownloadInfo:
name: str
season: int
suffix: str
file_name: str
folder_name: str
class DownloadParser:
def __init__(self):
self.rules = [
r"(.*) - (\d{1,4}|\d{1,4}\.\d{1,2})(?:v\d{1,2})?(?: )?(?:END)?(.*)",
r"(.*)[\[ E](\d{1,3}|\d{1,3}\.\d{1,2})(?:v\d{1,2})?(?: )?(?:END)?[\] ](.*)",
r"(.*)\[第(\d*\.*\d*)话(?:END)?\](.*)",
r"(.*)\[第(\d*\.*\d*)話(?:END)?\](.*)",
r"(.*)第(\d*\.*\d*)话(?:END)?(.*)",
r"(.*)第(\d*\.*\d*)話(?:END)?(.*)",
]
@staticmethod
def rename_init(name, folder_name, season, suffix) -> DownloadInfo:
n = re.split(r"[\[\]()【】()]", name)
suffix = suffix if suffix is not None else n[-1]
file_name = name.replace(f"[{n[1]}]", "")
if season < 10:
season = f"0{season}"
return DownloadInfo(name, season, suffix, file_name, folder_name)
def rename_normal(self, info: DownloadInfo):
for rule in self.rules:
match_obj = re.match(rule, info.name, re.I)
if match_obj is not None:
title = re.sub(r"([Ss]|Season )\d{1,3}", "", match_obj.group(1)).strip()
new_name = f"{title} S{info.season}E{match_obj.group(2)}{match_obj.group(3)}"
return new_name
def rename_pn(self, info: DownloadInfo):
for rule in self.rules:
match_obj = re.match(rule, info.file_name, re.I)
if match_obj is not None:
title = re.sub(r"([Ss]|Season )\d{1,3}", "", match_obj.group(1)).strip()
title = title if title != "" else info.folder_name
new_name = re.sub(
r"[\[\]]",
"",
f"{title} S{info.season}E{match_obj.group(2)}{info.suffix}",
)
return new_name
def rename_advance(self, info: DownloadInfo):
for rule in self.rules:
match_obj = re.match(rule, info.file_name, re.I)
if match_obj is not None:
new_name = re.sub(
r"[\[\]]",
"",
f"{info.folder_name} S{info.season}E{match_obj.group(2)}{info.suffix}",
)
return new_name
def rename_no_season_pn(self, info: DownloadInfo):
for rule in self.rules:
match_obj = re.match(rule, info.file_name, re.I)
if match_obj is not None:
title = match_obj.group(1).strip()
new_name = re.sub(
r"[\[\]]",
"",
f"{title} E{match_obj.group(2)}{info.suffix}",
)
return new_name
@staticmethod
def rename_none(info: DownloadInfo):
return info.name
def download_rename(self, name, folder_name, season, suffix, method):
rename_info = self.rename_init(name, folder_name, season, suffix)
method_dict = {
"normal": self.rename_normal,
"pn": self.rename_pn,
"advance": self.rename_advance,
"no_season_pn": self.rename_no_season_pn,
"none": self.rename_none
}
return method_dict[method.lower()](rename_info)
if __name__ == "__main__":
name = "[Lilith-Raws] Tate no Yuusha no Nariagari S02 - 02 [Baha][WEB-DL][1080p][AVC AAC][CHT][MP4]"
rename = DownloadParser()
new_name = rename.download_rename(name, "异世界舅舅2022", 1, ".mp4", "normal")
print(new_name)

View File

@@ -0,0 +1,73 @@
import re
import time
from dataclasses import dataclass
from module.network import RequestContent
from module.conf import TMDB_API
@dataclass
class TMDBInfo:
id: int
title_jp: str
title_zh: str
season: dict
last_season: int
year_number: int
class TMDBMatcher:
def __init__(self):
self.search_url = lambda e: \
f"https://api.themoviedb.org/3/search/tv?api_key={TMDB_API}&page=1&query={e}&include_adult=false"
self.info_url = lambda e: \
f"https://api.themoviedb.org/3/tv/{e}?api_key={TMDB_API}&language=zh-CN"
self._request = RequestContent()
def is_animation(self, tv_id) -> bool:
url_info = self.info_url(tv_id)
type_id = self._request.get_json(url_info)["genres"]
for type in type_id:
if type.get("id") == 16:
return True
return False
# def get_zh_title(self, id):
# alt_title_url = self.alt_title_url(id)
# titles = self._request.get_content(alt_title_url, content="json")
# for title in titles:
# if title["iso_3166_1"] == "CN":
# return title["title"]
# return None
@staticmethod
def get_season(seasons: list) -> int:
for season in seasons:
if re.search(r"\d 季", season.get("season")) is not None:
date = season.get("air_date").split("-")
[year, _ , _] = date
now_year = time.localtime().tm_year
if int(year) == now_year:
return int(re.findall(r"\d", season.get("season"))[0])
def tmdb_search(self, title) -> TMDBInfo:
url = self.search_url(title)
contents = self._request.get_json(url).get("results")
if contents.__len__() == 0:
url = self.search_url(title.replace(" ", ""))
contents = self._request.get_json(url).get("results")
# 判断动画
for content in contents:
id = content["id"]
if self.is_animation(id):
break
url_info = self.info_url(id)
info_content = self._request.get_json(url_info)
# 关闭链接
self._request.close()
season = [{"season": s.get("name"), "air_date": s.get("air_date")} for s in info_content.get("seasons")]
last_season = self.get_season(season)
title_jp = info_content.get("original_name")
title_zh = info_content.get("name")
year_number = info_content.get("first_air_date").split("-")[0]
return TMDBInfo(id, title_jp, title_zh, season, last_season, year_number)

View File

@@ -0,0 +1,52 @@
from thefuzz import fuzz
import logging
from utils import json_config
from conf import settings
logger = logging.getLogger(__name__)
class FuzzMatch:
def __init__(self):
try:
anidb_data = json_config.get(settings.anidb_url)
json_config.save(settings.anidb_path, anidb_data)
except Exception as e:
logger.debug(e)
logger.info(f"Fail to get anidb data, reading local data")
anidb_data = json_config.load(settings.anidb_path)
self.match_data = anidb_data
@staticmethod
def match(title_raw, info: dict):
compare_value = []
for tag in ["main", "en", "ja", "zh-Hans", "zh-Hant"]:
if info[tag] is not None:
a = fuzz.token_sort_ratio(title_raw.lower(), info[tag].lower())
compare_value.append(a)
for compare in info["other"]:
a = fuzz.token_sort_ratio(title_raw.lower(), compare.lower())
compare_value.append(a)
return max(compare_value)
def find_max_name(self, title_raw):
max_value = 0
max_info = None
for info in self.match_data:
a = self.match(title_raw, info)
if a > max_value:
max_value = a
max_info = info
return max_value, max_info["main"]
# logger.debug(max(value))
if __name__ == "__main__":
from conf.const_dev import DEV_SETTINGS
settings.init(DEV_SETTINGS)
f = FuzzMatch()
name = "勇者、辞职不干了"
value, title = f.find_max_name(name)
print(f"Raw Name: {name} \n"
f"Match Name: {title} \n"
f"Match Value: {value}")

View File

@@ -0,0 +1,64 @@
import logging
from .analyser import RawParser, DownloadParser, TMDBMatcher
from module.conf import settings
logger = logging.getLogger(__name__)
LANGUAGE = settings.rss_parser.language
class TitleParser:
def __init__(self):
self._raw_parser = RawParser()
self._download_parser = DownloadParser()
self._tmdb_parser = TMDBMatcher()
def raw_parser(self, raw: str):
return self._raw_parser.analyse(raw)
def download_parser(self, download_raw, folder_name, season, suffix, method=settings.bangumi_manage.method):
return self._download_parser.download_rename(download_raw, folder_name, season, suffix, method)
def tmdb_parser(self, title: str, season: int):
official_title, tmdb_season = None, None
try:
tmdb_info = self._tmdb_parser.tmdb_search(title)
logger.debug(f"TMDB Matched, official title is {tmdb_info.title_zh}")
except Exception as e:
logger.debug(e)
logger.warning("Not Matched with TMDB")
return title, season
if LANGUAGE == "zh":
official_title = f"{tmdb_info.title_zh} ({tmdb_info.year_number})"
elif LANGUAGE == "jp":
official_title = f"{tmdb_info.title_jp} ({tmdb_info.year_number})"
tmdb_season = tmdb_info.last_season if tmdb_info.last_season else season
official_title = official_title if official_title else title
return official_title, tmdb_season
def return_dict(self, _raw: str):
try:
episode = self.raw_parser(_raw)
title_search = episode.title_zh if episode.title_zh else episode.title_en
title_raw = episode.title_en if episode.title_en else episode.title_zh
if settings.rss_parser.enable_tmdb:
official_title, _season = self.tmdb_parser(title_search, episode.season)
else:
official_title = title_search if LANGUAGE == "zh" else title_raw
_season = episode.season
data = {
"official_title": official_title,
"title_raw": title_raw,
"season": _season,
"season_raw": episode.season_raw,
"group": episode.group,
"dpi": episode.resolution,
"source": episode.source,
"subtitle": episode.sub,
"added": False,
"eps_collect": True if episode.episode > 1 else False,
}
logger.debug(f"RAW:{_raw} >> {episode.title_en}")
return data
except Exception as e:
logger.debug(e)