chore: move Auto_Bangumi/src -> Auto_Bangumi/backend/src, prepare for merge WebUI repo

This commit is contained in:
zthxxx
2023-06-06 22:33:00 +08:00
parent f8411f9c8a
commit 069a86faa0
99 changed files with 179 additions and 0 deletions

View File

@@ -0,0 +1 @@
from .title_parser import TitleParser

View File

@@ -0,0 +1,3 @@
from .raw_parser import raw_parser
from .tmdb_parser import tmdb_parser
from .torrent_parser import torrent_parser

View File

@@ -0,0 +1,15 @@
from module.network import RequestContent
def search_url(e):
return f"https://api.bgm.tv/search/subject/{e}?responseGroup=large"
def bgm_parser(title):
url = search_url(title)
with RequestContent() as req:
contents = req.get_json(url)
if contents:
return contents[0]
else:
return None

View File

@@ -0,0 +1,181 @@
import logging
import re
from module.models import Episode
logger = logging.getLogger(__name__)
EPISODE_RE = re.compile(r"\d+")
TITLE_RE = re.compile(
r"(.*|\[.*])( -? \d+|\[\d+]|\[\d+.?[vV]\d]|第\d+[话話集]|\[第?\d+[话話集]]|\[\d+.?END]|[Ee][Pp]?\d+)(.*)"
)
RESOLUTION_RE = re.compile(r"1080|720|2160|4K")
SOURCE_RE = re.compile(r"B-Global|[Bb]aha|[Bb]ilibili|AT-X|Web")
SUB_RE = re.compile(r"[简繁日字幕]|CH|BIG5|GB")
PREFIX_RE = re.compile(r"[^\w\s\u4e00-\u9fff\u3040-\u309f\u30a0-\u30ff-]")
CHINESE_NUMBER_MAP = {
"": 1,
"": 2,
"": 3,
"": 4,
"": 5,
"": 6,
"": 7,
"": 8,
"": 9,
"": 10,
}
def get_group(name: str) -> str:
return re.split(r"[\[\]]", name)[1]
def pre_process(raw_name: str) -> str:
return raw_name.replace("", "[").replace("", "]")
def prefix_process(raw: str, group: str) -> str:
raw = re.sub(f".{group}.", "", raw)
raw_process = PREFIX_RE.sub("/", raw)
arg_group = raw_process.split("/")
while "" in arg_group:
arg_group.remove("")
if len(arg_group) == 1:
arg_group = arg_group[0].split(" ")
for arg in arg_group:
if re.search(r"新番|月?番", arg) and len(arg) <= 5:
raw = re.sub(f".{arg}.", "", raw)
elif re.search(r"港澳台地区", arg):
raw = re.sub(f".{arg}.", "", raw)
return raw
def season_process(season_info: str):
name_season = season_info
# if re.search(r"新番|月?番", season_info):
# name_season = re.sub(".*新番.", "", season_info)
# # 去除「新番」信息
# name_season = re.sub(r"^[^]】]*[]】]", "", name_season).strip()
season_rule = r"S\d{1,2}|Season \d{1,2}|[第].[季期]"
name_season = re.sub(r"[\[\]]", " ", name_season)
seasons = re.findall(season_rule, name_season)
if not seasons:
return name_season, "", 1
name = re.sub(season_rule, "", name_season)
for season in seasons:
season_raw = season
if re.search(r"Season|S", season) is not None:
season = int(re.sub(r"Season|S", "", season))
break
elif re.search(r"[第 ].*[季期(部分)]|部分", season) is not None:
season_pro = re.sub(r"[第季期 ]", "", season)
try:
season = int(season_pro)
except ValueError:
season = CHINESE_NUMBER_MAP[season_pro]
break
return name, season_raw, season
def name_process(name: str):
name_en, name_zh, name_jp = None, None, None
name = name.strip()
name = re.sub(r"[(]仅限港澳台地区[)]", "", name)
split = re.split(r"/|\s{2}|-\s{2}", name)
while "" in split:
split.remove("")
if len(split) == 1:
if re.search("_{1}", name) is not None:
split = re.split("_", name)
elif re.search(" - {1}", name) is not None:
split = re.split("-", name)
if len(split) == 1:
split_space = split[0].split(" ")
for idx, item in enumerate(split_space):
if re.search(r"^[\u4e00-\u9fa5]{2,}", item) is not None:
split_space.remove(item)
split = [item.strip(), " ".join(split_space).strip()]
break
for item in split:
if re.search(r"[\u0800-\u4e00]{2,}", item) and not name_jp:
name_jp = item.strip()
elif re.search(r"[\u4e00-\u9fa5]{2,}", item) and not name_zh:
name_zh = item.strip()
elif re.search(r"[a-zA-Z]{3,}", item) and not name_en:
name_en = item.strip()
return name_en, name_zh, name_jp
def find_tags(other):
elements = re.sub(r"[\[\]()]", " ", other).split(" ")
# find CHT
sub, resolution, source = None, None, None
for element in filter(lambda x: x != "", elements):
if SUB_RE.search(element):
sub = element
elif RESOLUTION_RE.search(element):
resolution = element
elif SOURCE_RE.search(element):
source = element
return clean_sub(sub), resolution, source
def clean_sub(sub: str | None) -> str | None:
if sub is None:
return sub
return re.sub(r"_MP4|_MKV", "", sub)
def process(raw_title: str):
raw_title = raw_title.strip()
content_title = pre_process(raw_title)
# 预处理标题
group = get_group(content_title)
# 翻译组的名字
match_obj = TITLE_RE.match(content_title)
# 处理标题
season_info, episode_info, other = list(
map(lambda x: x.strip(), match_obj.groups())
)
process_raw = prefix_process(season_info, group)
# 处理 前缀
raw_name, season_raw, season = season_process(process_raw)
# 处理 第n季
name_en, name_zh, name_jp = "", "", ""
try:
name_en, name_zh, name_jp = name_process(raw_name)
# 处理 名字
except ValueError:
pass
# 处理 集数
raw_episode = EPISODE_RE.search(episode_info)
episode = 0
if raw_episode is not None:
episode = int(raw_episode.group())
sub, dpi, source = find_tags(other) # 剩余信息处理
return (
name_en,
name_zh,
name_jp,
season,
season_raw,
episode,
sub,
dpi,
source,
group,
)
def raw_parser(raw: str) -> Episode | None:
ret = process(raw)
if ret is None:
logger.error(f"Parser cannot analyse {raw}")
return None
name_en, name_zh, name_jp, season, sr, episode, sub, dpi, source, group = ret
return Episode(
name_en, name_zh, name_jp, season, sr, episode, sub, group, dpi, source
)

View File

@@ -0,0 +1,91 @@
import re
import time
from dataclasses import dataclass
from module.conf import TMDB_API
from module.network import RequestContent
@dataclass
class TMDBInfo:
id: int
title: str
original_title: str
season: list[dict]
last_season: int
year: str
LANGUAGE = {"zh": "zh-CN", "jp": "ja-JP", "en": "en-US"}
def search_url(e):
return f"https://api.themoviedb.org/3/search/tv?api_key={TMDB_API}&page=1&query={e}&include_adult=false"
def info_url(e, key):
return f"https://api.themoviedb.org/3/tv/{e}?api_key={TMDB_API}&language={LANGUAGE[key]}"
def is_animation(tv_id, language) -> bool:
url_info = info_url(tv_id, language)
with RequestContent() as req:
type_id = req.get_json(url_info)["genres"]
for type in type_id:
if type.get("id") == 16:
return True
return False
def get_season(seasons: list) -> int:
for season in seasons:
if re.search(r"\d 季", season.get("season")) is not None:
date = season.get("air_date").split("-")
[year, _, _] = date
now_year = time.localtime().tm_year
if int(year) <= now_year:
return int(re.findall(r"\d", season.get("season"))[0])
def tmdb_parser(title, language) -> TMDBInfo | None:
with RequestContent() as req:
url = search_url(title)
contents = req.get_json(url).get("results")
if contents.__len__() == 0:
url = search_url(title.replace(" ", ""))
contents = req.get_json(url).get("results")
# 判断动画
if contents:
for content in contents:
id = content["id"]
if is_animation(id, language):
break
url_info = info_url(id, language)
info_content = req.get_json(url_info)
season = [
{
"season": s.get("name"),
"air_date": s.get("air_date"),
"poster_path": s.get("poster_path"),
}
for s in info_content.get("seasons")
]
last_season = get_season(season)
original_title = info_content.get("original_name")
official_title = info_content.get("name")
year_number = info_content.get("first_air_date").split("-")[0]
return TMDBInfo(
id,
official_title,
original_title,
season,
last_season,
str(year_number),
)
else:
return None
if __name__ == '__main__':
title = "鬼灭之刃"
print(tmdb_parser(title, "zh"))

View File

@@ -0,0 +1,100 @@
import logging
import ntpath as win_path
import os.path as unix_path
import re
from module.models import EpisodeFile, SubtitleFile
logger = logging.getLogger(__name__)
PLATFORM = "Unix"
RULES = [
r"(.*) - (\d{1,4}(?!\d|p)|\d{1,4}\.\d{1,2}(?!\d|p))(?:v\d{1,2})?(?: )?(?:END)?(.*)",
r"(.*)[\[\ E](\d{1,4}|\d{1,4}\.\d{1,2})(?:v\d{1,2})?(?: )?(?:END)?[\]\ ](.*)",
r"(.*)\[(?:第)?(\d*\.*\d*)[话集話](?:END)?\](.*)",
r"(.*)第(\d*\.*\d*)[话話集](?:END)?(.*)",
r"(.*)(?:S\d{2})?EP?(\d+)(.*)",
]
SUBTITLE_LANG = {
"zh-tw": ["TC", "CHT", "cht", "", "zh-tw"],
"zh": ["SC", "CHS", "chs", "", "zh"],
}
def split_path(torrent_path: str) -> str:
if PLATFORM == "Windows":
return win_path.split(torrent_path)[-1]
else:
return unix_path.split(torrent_path)[-1]
def get_group(group_and_title) -> tuple[str | None, str]:
n = re.split(r"[\[\]()【】()]", group_and_title)
while "" in n:
n.remove("")
if len(n) > 1:
if re.match(r"\d+", n[1]):
return None, group_and_title
return n[0], n[1]
else:
return None, n[0]
def get_season_and_title(season_and_title) -> tuple[str, int]:
title = re.sub(r"([Ss]|Season )\d{1,3}", "", season_and_title).strip()
try:
season = re.search(r"([Ss]|Season )(\d{1,3})", season_and_title, re.I).group(2)
except AttributeError:
season = 1
return title, int(season)
def get_subtitle_lang(subtitle_name: str) -> str:
for key, value in SUBTITLE_LANG.items():
for v in value:
if v in subtitle_name:
return key
def torrent_parser(
torrent_path: str,
torrent_name: str | None = None,
season: int | None = None,
file_type: str = "media",
) -> EpisodeFile | SubtitleFile:
media_path = split_path(torrent_path)
for rule in RULES:
if torrent_name:
match_obj = re.match(rule, torrent_name, re.I)
else:
match_obj = re.match(rule, media_path, re.I)
if match_obj:
group, title = get_group(match_obj.group(1))
if not season:
title, season = get_season_and_title(title)
else:
title, _ = get_season_and_title(title)
episode = int(match_obj.group(2))
suffix = unix_path.splitext(torrent_path)[-1]
if file_type == "media":
return EpisodeFile(
media_path=torrent_path,
group=group,
title=title,
season=season,
episode=episode,
suffix=suffix,
)
elif file_type == "subtitle":
language = get_subtitle_lang(media_path)
return SubtitleFile(
media_path=torrent_path,
group=group,
title=title,
season=season,
language=language,
episode=episode,
suffix=suffix,
)

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,73 @@
import logging
from module.conf import settings
from module.models import BangumiData
from .analyser import raw_parser, tmdb_parser, torrent_parser
logger = logging.getLogger(__name__)
class TitleParser:
def __init__(self):
pass
@staticmethod
def torrent_parser(
torrent_path: str,
torrent_name: str | None = None,
season: int | None = None,
file_type: str = "media",
):
try:
return torrent_parser(torrent_path, torrent_name, season, file_type)
except Exception as e:
logger.warning(f"Cannot parse {torrent_path} with error {e}")
@staticmethod
def tmdb_parser(title: str, season: int, language: str):
official_title, tmdb_season, year = title, season, None
tmdb_info = tmdb_parser(title, language)
if tmdb_info:
logger.debug(f"TMDB Matched, official title is {tmdb_info.title}")
tmdb_season = tmdb_info.last_season if tmdb_info.last_season else season
official_title = tmdb_info.title
year = tmdb_info.year
else:
logger.warning(f"Cannot match {title} in TMDB. Use raw title instead.")
logger.warning("Please change bangumi info manually.")
return official_title, tmdb_season, year
@staticmethod
def raw_parser(raw: str, rss_link: str) -> BangumiData | None:
language = settings.rss_parser.language
try:
episode = raw_parser(raw)
titles = {
"zh": episode.title_zh,
"en": episode.title_en,
"jp": episode.title_jp,
}
title_raw = episode.title_en if episode.title_en else episode.title_zh
official_title = titles[language] if titles[language] else titles["zh"]
_season = episode.season
data = BangumiData(
official_title=official_title,
title_raw=title_raw,
season=_season,
season_raw=episode.season_raw,
group_name=episode.group,
dpi=episode.resolution,
source=episode.source,
subtitle=episode.sub,
eps_collect=False if episode.episode > 1 else True,
offset=0,
filter=settings.rss_parser.filter,
rss_link=[rss_link],
)
logger.debug(f"RAW:{raw} >> {title_raw}")
return data
except Exception as e:
logger.debug(e)
logger.warning(f"Cannot parse {raw}.")
return None