fix rename_method env transfer

sample raw parser func
This commit is contained in:
EstrellaXD
2023-04-27 16:17:47 +08:00
parent e2fa43bae2
commit cca5745a73
9 changed files with 144 additions and 158 deletions

View File

@@ -2,8 +2,7 @@ name: Release Drafter
on:
pull_request:
tags:
- '\d+\.\d+\.\d+'
permissions:
contents: read

View File

@@ -77,7 +77,7 @@ ENV_TO_ATTR = {
},
"bangumi_manage": {
"AB_RENAME": ("enable", lambda e: e.lower() in ("true", "1", "t")),
"AB_METHOD": "method",
"AB_METHOD": ("method", lambda e: e.lower()),
"AB_GROUP_TAG": ("group_tag", lambda e: e.lower() in ("true", "1", "t")),
"AB_EP_COMPLETE": ("eps_complete", lambda e: e.lower() in ("true", "1", "t")),
"AB_REMOVE_BAD_BT": ("remove_bad_torrent", lambda e: e.lower() in ("true", "1", "t")),

View File

@@ -1,2 +1,2 @@
from .eps_complete import FullSeasonGet
from .renamer import Renamer
from .renamer import Renamer

View File

@@ -5,7 +5,7 @@ import logging
from module.conf import settings
from module.network import RequestContent
from module.core.download_client import DownloadClient
from module.core import DownloadClient
from module.models import BangumiData
logger = logging.getLogger(__name__)

View File

@@ -3,7 +3,7 @@ from dataclasses import dataclass
class BangumiData(BaseModel):
id: int = Field(..., alias="id", title="番剧ID")
id: int | None = Field(None, alias="id", title="番剧ID")
official_title: str = Field(..., alias="official_title", title="番剧中文名")
year: int | None = Field(None, alias="year", title="番剧年份")
title_raw: str = Field(..., alias="title_raw", title="番剧原名")

View File

@@ -1,4 +1,4 @@
from .raw_parser import RawParser
from .raw_parser import raw_parser
from .rename_parser import DownloadParser
from .tmdb_parser import TMDBMatcher

View File

@@ -30,153 +30,139 @@ CHINESE_NUMBER_MAP = {
}
def get_group(name: str) -> str:
return re.split(r"[\[\]]", name)[1]
def pre_process(raw_name: str) -> str:
return raw_name.replace("", "[").replace("", "]")
class RawParser:
@staticmethod
def get_group(name: str) -> str:
return re.split(r"[\[\]]", name)[1]
@staticmethod
def pre_process(raw_name: str) -> str:
return raw_name.replace("", "[").replace("", "]")
def prefix_process(raw: str, group: str) -> str:
raw = re.sub(f".{group}.", "", raw)
raw_process = PREFIX_RE.sub("/", raw)
arg_group = raw_process.split("/")
for arg in arg_group:
if re.search(r"新番|月?番", arg) and len(arg) <= 5:
raw = re.sub(f".{arg}.", "", raw)
elif re.search(r"港澳台地区", arg):
raw = re.sub(f".{arg}.", "", raw)
return raw
@staticmethod
def prefix_process(raw: str, group: str) -> str:
raw = re.sub(f".{group}.", "", raw)
raw_process = PREFIX_RE.sub("/", raw)
arg_group = raw_process.split("/")
for arg in arg_group:
if re.search(r"新番|月?番", arg) and len(arg) <= 5:
raw = re.sub(f".{arg}.", "", raw)
elif re.search(r"港澳台地区", arg):
raw = re.sub(f".{arg}.", "", raw)
return raw
@staticmethod
def season_process(season_info: str):
name_season = season_info
# if re.search(r"新番|月?番", season_info):
# name_season = re.sub(".*新番.", "", season_info)
# # 去除「新番」信息
# name_season = re.sub(r"^[^]】]*[]】]", "", name_season).strip()
season_rule = r"S\d{1,2}|Season \d{1,2}|[第].[季期]"
name_season = re.sub(r"[\[\]]", " ", name_season)
seasons = re.findall(season_rule, name_season)
if not seasons:
return name_season, "", 1
name = re.sub(season_rule, "", name_season)
for season in seasons:
season_raw = season
if re.search(r"Season|S", season) is not None:
season = int(re.sub(r"Season|S", "", season))
def season_process(season_info: str):
name_season = season_info
# if re.search(r"新番|月?番", season_info):
# name_season = re.sub(".*新番.", "", season_info)
# # 去除「新番」信息
# name_season = re.sub(r"^[^]】]*[]】]", "", name_season).strip()
season_rule = r"S\d{1,2}|Season \d{1,2}|[第].[季期]"
name_season = re.sub(r"[\[\]]", " ", name_season)
seasons = re.findall(season_rule, name_season)
if not seasons:
return name_season, "", 1
name = re.sub(season_rule, "", name_season)
for season in seasons:
season_raw = season
if re.search(r"Season|S", season) is not None:
season = int(re.sub(r"Season|S", "", season))
break
elif re.search(r"[第 ].*[季期(部分)]|部分", season) is not None:
season_pro = re.sub(r"[第季期 ]", "", season)
try:
season = int(season_pro)
except ValueError:
season = CHINESE_NUMBER_MAP[season_pro]
break
elif re.search(r"[第 ].*[季期(部分)]|部分", season) is not None:
season_pro = re.sub(r"[第季期 ]", "", season)
try:
season = int(season_pro)
except ValueError:
season = CHINESE_NUMBER_MAP[season_pro]
break
return name, season_raw, season
@staticmethod
def name_process(name: str):
name_en, name_zh, name_jp = None, None, None
name = name.strip()
name = re.sub(r"[(]仅限港澳台地区[)]", "", name)
split = re.split("/|\s{2}|-\s{2}", name)
while "" in split:
split.remove("")
if len(split) == 1:
if re.search("_{1}", name) is not None:
split = re.split("_", name)
elif re.search(" - {1}", name) is not None:
split = re.split("-", name)
if len(split) == 1:
split_space = split[0].split(" ")
for idx, item in enumerate(split_space):
if re.search(r"^[\u4e00-\u9fa5]{2,}", item) is not None:
split_space.remove(item)
split = [item.strip(), " ".join(split_space).strip()]
break
for item in split:
if re.search(r"[\u0800-\u4e00]{2,}", item) and not name_jp:
name_jp = item.strip()
elif re.search(r"[\u4e00-\u9fa5]{2,}", item) and not name_zh:
name_zh = item.strip()
elif re.search(r"[a-zA-Z]{3,}", item) and not name_en:
name_en = item.strip()
return name_en, name_zh, name_jp
@staticmethod
def find_tags(other):
elements = re.sub(r"[\[\]()]", " ", other).split(" ")
# find CHT
sub, resolution, source = None, None, None
for element in filter(lambda x: x != "", elements):
if SUB_RE.search(element):
sub = element
elif RESOLUTION_RE.search(element):
resolution = element
elif SOURCE_RE.search(element):
source = element
return RawParser.clean_sub(sub), resolution, source
@staticmethod
def clean_sub(sub: str | None) -> str | None:
if sub is None:
return sub
return re.sub(r"_MP4|_MKV", "", sub)
def process(self, raw_title: str):
raw_title = raw_title.strip()
content_title = self.pre_process(raw_title)
# 预处理标题
group = self.get_group(content_title)
# 翻译组的名字
match_obj = TITLE_RE.match(content_title)
# 处理标题
season_info, episode_info, other = list(map(
lambda x: x.strip(), match_obj.groups()
))
process_raw = self.prefix_process(season_info, group)
# 处理 前缀
raw_name, season_raw, season = self.season_process(process_raw)
# 处理 第n季
name_en, name_zh, name_jp = "", "", ""
try:
name_en, name_zh, name_jp = self.name_process(raw_name)
# 处理 名字
except ValueError:
pass
# 处理 集数
raw_episode = EPISODE_RE.search(episode_info)
episode = 0
if raw_episode is not None:
episode = int(raw_episode.group())
sub, dpi, source = self.find_tags(other) # 剩余信息处理
return name_en, name_zh, name_jp, season, season_raw, episode, sub, dpi, source, group
def analyse(self, raw: str) -> Episode | None:
ret = self.process(raw)
if ret is None:
logger.error(f"Parser cannot analyse {raw}")
return None
name_en, name_zh, name_jp, season, sr, episode, \
sub, dpi, source, group = ret
return Episode(name_en, name_zh, name_jp, season, sr, episode, sub, group, dpi, source)
return name, season_raw, season
def name_process(name: str):
name_en, name_zh, name_jp = None, None, None
name = name.strip()
name = re.sub(r"[(]仅限港澳台地区[)]", "", name)
split = re.split("/|\s{2}|-\s{2}", name)
while "" in split:
split.remove("")
if len(split) == 1:
if re.search("_{1}", name) is not None:
split = re.split("_", name)
elif re.search(" - {1}", name) is not None:
split = re.split("-", name)
if len(split) == 1:
split_space = split[0].split(" ")
for idx, item in enumerate(split_space):
if re.search(r"^[\u4e00-\u9fa5]{2,}", item) is not None:
split_space.remove(item)
split = [item.strip(), " ".join(split_space).strip()]
break
for item in split:
if re.search(r"[\u0800-\u4e00]{2,}", item) and not name_jp:
name_jp = item.strip()
elif re.search(r"[\u4e00-\u9fa5]{2,}", item) and not name_zh:
name_zh = item.strip()
elif re.search(r"[a-zA-Z]{3,}", item) and not name_en:
name_en = item.strip()
return name_en, name_zh, name_jp
def find_tags(other):
elements = re.sub(r"[\[\]()]", " ", other).split(" ")
# find CHT
sub, resolution, source = None, None, None
for element in filter(lambda x: x != "", elements):
if SUB_RE.search(element):
sub = element
elif RESOLUTION_RE.search(element):
resolution = element
elif SOURCE_RE.search(element):
source = element
return clean_sub(sub), resolution, source
def clean_sub(sub: str | None) -> str | None:
if sub is None:
return sub
return re.sub(r"_MP4|_MKV", "", sub)
def process(raw_title: str):
raw_title = raw_title.strip()
content_title = pre_process(raw_title)
# 预处理标题
group = get_group(content_title)
# 翻译组的名字
match_obj = TITLE_RE.match(content_title)
# 处理标题
season_info, episode_info, other = list(map(
lambda x: x.strip(), match_obj.groups()
))
process_raw = prefix_process(season_info, group)
# 处理 前缀
raw_name, season_raw, season = season_process(process_raw)
# 处理 第n季
name_en, name_zh, name_jp = "", "", ""
try:
name_en, name_zh, name_jp = name_process(raw_name)
# 处理 名字
except ValueError:
pass
# 处理 集数
raw_episode = EPISODE_RE.search(episode_info)
episode = 0
if raw_episode is not None:
episode = int(raw_episode.group())
sub, dpi, source = find_tags(other) # 剩余信息处理
return name_en, name_zh, name_jp, season, season_raw, episode, sub, dpi, source, group
def raw_parser(raw: str) -> Episode | None:
ret = process(raw)
if ret is None:
logger.error(f"Parser cannot analyse {raw}")
return None
name_en, name_zh, name_jp, season, sr, episode, \
sub, dpi, source, group = ret
return Episode(name_en, name_zh, name_jp, season, sr, episode, sub, group, dpi, source)
if __name__ == '__main__':
test_list = [
"[Lilith-Raws] 关于我在无意间被隔壁的天使变成废柴这件事 / Otonari no Tenshi-sama - 09 [Baha][WEB-DL][1080p][AVC AAC][CHT][MP4]",
"【幻樱字幕组】【4月新番】【古见同学有交流障碍症 第二季 Komi-san wa, Komyushou Desu. S02】【22】【GB_MP4】【1920X1080】",
"[百冬练习组&LoliHouse] BanG Dream! 少女乐团派对☆PICO FEVER / Garupa Pico: Fever! - 26 [WebRip 1080p HEVC-10bit AAC][简繁内封字幕][END]"
]
parser = RawParser()
for l in test_list:
ep = parser.analyse(l)
print(f"en: {ep.title_en}, zh: {ep.title_zh}, jp: {ep.title_jp}, group: {ep.group}")

View File

@@ -1,6 +1,6 @@
import logging
from .analyser import RawParser, DownloadParser, TMDBMatcher
from .analyser import raw_parser, DownloadParser, TMDBMatcher
from module.conf import settings
from module.models import BangumiData
@@ -11,13 +11,9 @@ LANGUAGE = settings.rss_parser.language
class TitleParser:
def __init__(self):
self._raw_parser = RawParser()
self._download_parser = DownloadParser()
self._tmdb_parser = TMDBMatcher()
def raw_parser(self, raw: str):
return self._raw_parser.analyse(raw)
def download_parser(
self,
download_raw: str,
@@ -47,15 +43,20 @@ class TitleParser:
official_title = official_title if official_title else title
return official_title, tmdb_season
def return_data(self, _raw: str, _id: int) -> BangumiData:
def raw_parser(self, raw: str, _id: int | None = None) -> BangumiData:
try:
episode = self.raw_parser(_raw)
episode = raw_parser(raw)
titles = {
"zh": episode.title_zh,
"en": episode.title_en,
"jp": episode.title_jp
}
title_search = episode.title_zh if episode.title_zh else episode.title_en
title_raw = episode.title_en if episode.title_en else episode.title_zh
if settings.rss_parser.enable_tmdb:
official_title, _season = self.tmdb_parser(title_search, episode.season)
else:
official_title = title_search if LANGUAGE == "zh" else title_raw
official_title = titles[LANGUAGE] if titles[LANGUAGE] else titles["zh"]
_season = episode.season
data = BangumiData(
id=_id,
@@ -72,7 +73,7 @@ class TitleParser:
offset=0,
filter=settings.rss_parser.filter
)
logger.debug(f"RAW:{_raw} >> {episode.title_en}")
logger.debug(f"RAW:{raw} >> {episode.title_en}")
return data
except Exception as e:
logger.debug(e)

View File

@@ -37,7 +37,7 @@ class RSSAnalyser:
break
if extra_add:
_id += 1
data = self._title_analyser.return_data(raw_title, _id)
data = self._title_analyser.raw_parser(raw_title, _id)
if data is not None and data.official_title not in bangumi_info:
bangumi_info.append(data)
return bangumi_info
@@ -47,7 +47,7 @@ class RSSAnalyser:
rss_torrents = req.get_torrents(url, filter)
for torrent in rss_torrents:
try:
data = self._title_analyser.return_data(torrent.name, 9999)
data = self._title_analyser.raw_parser(torrent.name)
return data
except Exception as e:
logger.debug(e)