mirror of
https://github.com/EstrellaXD/Auto_Bangumi.git
synced 2026-04-13 18:11:03 +08:00
fix: Database, torrent_renamer fix #319, subtitle renamer fix #328, parser problem of language fix #327
This commit is contained in:
@@ -115,54 +115,27 @@ class BangumiDatabase(DataConnector):
|
||||
|
||||
def search_id(self, _id: int) -> BangumiData | None:
|
||||
condition = {"id": _id}
|
||||
value = self._search_data(table_name=self.__table_name, condition=condition)
|
||||
# self._cursor.execute(
|
||||
# """
|
||||
# SELECT * FROM bangumi WHERE id = :id
|
||||
# """,
|
||||
# {"id": _id},
|
||||
# )
|
||||
# values = self._cursor.fetchone()
|
||||
if value is None:
|
||||
dict_data = self._search_data(table_name=self.__table_name, condition=condition)
|
||||
if dict_data is None:
|
||||
return None
|
||||
keys = [x[0] for x in self._cursor.description]
|
||||
dict_data = dict(zip(keys, value))
|
||||
return self.__db_to_data(dict_data)
|
||||
|
||||
def search_official_title(self, official_title: str) -> BangumiData | None:
|
||||
value = self._search_data(
|
||||
dict_data = self._search_data(
|
||||
table_name=self.__table_name, condition={"official_title": official_title}
|
||||
)
|
||||
# self._cursor.execute(
|
||||
# """
|
||||
# SELECT * FROM bangumi WHERE official_title = :official_title
|
||||
# """,
|
||||
# {"official_title": official_title},
|
||||
# )
|
||||
# values = self._cursor.fetchone()
|
||||
if value is None:
|
||||
if dict_data is None:
|
||||
return None
|
||||
keys = [x[0] for x in self._cursor.description]
|
||||
dict_data = dict(zip(keys, value))
|
||||
return self.__db_to_data(dict_data)
|
||||
|
||||
def match_poster(self, bangumi_name: str) -> str:
|
||||
condition = f"INSTR({bangumi_name}, official_title) > 0"
|
||||
condition = {"_custom_condition": "INSTR(:bangumi_name, official_title) > 0"}
|
||||
keys = ["official_title", "poster_link"]
|
||||
data = self._search_data(
|
||||
table_name=self.__table_name,
|
||||
keys=keys,
|
||||
condition=condition,
|
||||
)
|
||||
# self._cursor.execute(
|
||||
# """
|
||||
# SELECT official_title, poster_link
|
||||
# FROM bangumi
|
||||
# WHERE INSTR(:bangumi_name, official_title) > 0
|
||||
# """,
|
||||
# {"bangumi_name": bangumi_name},
|
||||
# )
|
||||
# data = self._cursor.fetchone()
|
||||
if not data:
|
||||
return ""
|
||||
official_title, poster_link = data
|
||||
@@ -170,33 +143,26 @@ class BangumiDatabase(DataConnector):
|
||||
return ""
|
||||
return poster_link
|
||||
|
||||
@locked
|
||||
def match_list(self, torrent_list: list, rss_link: str) -> list:
|
||||
# Match title_raw in database
|
||||
keys = ["title_raw", "rss_link", "poster_link"]
|
||||
data = self._search_datas(
|
||||
match_datas = self._search_datas(
|
||||
table_name=self.__table_name,
|
||||
keys=keys,
|
||||
)
|
||||
# self._cursor.execute(
|
||||
# """
|
||||
# SELECT title_raw, rss_link, poster_link FROM bangumi
|
||||
# """
|
||||
# )
|
||||
# data = self._cursor.fetchall()
|
||||
if not data:
|
||||
if not match_datas:
|
||||
return torrent_list
|
||||
# Match title
|
||||
i = 0
|
||||
while i < len(torrent_list):
|
||||
torrent = torrent_list[i]
|
||||
for title_raw, rss_set, poster_link in data:
|
||||
if title_raw in torrent.name:
|
||||
if rss_link not in rss_set:
|
||||
rss_set += "," + rss_link
|
||||
self.update_rss(title_raw, rss_set)
|
||||
if not poster_link:
|
||||
self.update_poster(title_raw, torrent.poster_link)
|
||||
for match_data in match_datas:
|
||||
if match_data.get("title_raw") in torrent.name:
|
||||
if rss_link not in match_data.get("rss_link"):
|
||||
match_data["rss_link"] += f",{rss_link}"
|
||||
self.update_rss(match_data.get("title_raw"), match_data.get("rss_link"))
|
||||
if not match_data.get("poster_link"):
|
||||
self.update_poster(match_data.get("title_raw"), torrent.poster_link)
|
||||
torrent_list.pop(i)
|
||||
break
|
||||
else:
|
||||
@@ -205,18 +171,12 @@ class BangumiDatabase(DataConnector):
|
||||
|
||||
def not_complete(self) -> list[BangumiData]:
|
||||
# Find eps_complete = False
|
||||
condition = "eps_complete = 0"
|
||||
data = self._search_datas(
|
||||
condition = {"eps_complete": 0}
|
||||
dict_data = self._search_datas(
|
||||
table_name=self.__table_name,
|
||||
condition=condition,
|
||||
)
|
||||
|
||||
self._cursor.execute(
|
||||
"""
|
||||
SELECT * FROM bangumi WHERE eps_collect = 0
|
||||
"""
|
||||
)
|
||||
return self.__fetch_data()
|
||||
return [self.__db_to_data(x) for x in dict_data]
|
||||
|
||||
def not_added(self) -> list[BangumiData]:
|
||||
self._cursor.execute(
|
||||
@@ -254,4 +214,4 @@ class BangumiDatabase(DataConnector):
|
||||
for data in data_list:
|
||||
if self.__check_exist(data):
|
||||
return True
|
||||
return False
|
||||
return False
|
||||
@@ -94,23 +94,16 @@ class DataConnector:
|
||||
)
|
||||
self._conn.commit()
|
||||
|
||||
|
||||
def _delete_all(self, table_name: str):
|
||||
self._cursor.execute(f"DELETE FROM {table_name}")
|
||||
self._conn.commit()
|
||||
|
||||
|
||||
def _search_data(self, table_name: str, keys: list[str] | None, condition: str) -> dict:
|
||||
if keys is None:
|
||||
self._cursor.execute(f"SELECT * FROM {table_name} WHERE {condition}")
|
||||
else:
|
||||
self._cursor.execute(
|
||||
f"SELECT {', '.join(keys)} FROM {table_name} WHERE {condition}"
|
||||
)
|
||||
return dict(zip(keys, self._cursor.fetchone()))
|
||||
def _delete(self, table_name: str, condition: dict):
|
||||
condition_sql = " AND ".join([f"{key} = :{key}" for key in condition.keys()])
|
||||
self._cursor.execute(f"DELETE FROM {table_name} WHERE {condition_sql}", condition)
|
||||
self._conn.commit()
|
||||
|
||||
|
||||
def _search_datas(self, table_name: str, keys: list[str] | None, condition: str = None) -> list[dict]:
|
||||
def _search(self, table_name: str, keys: list[str] | None = None, condition: dict = None):
|
||||
if keys is None:
|
||||
select_sql = "*"
|
||||
else:
|
||||
@@ -118,9 +111,24 @@ class DataConnector:
|
||||
if condition is None:
|
||||
self._cursor.execute(f"SELECT {select_sql} FROM {table_name}")
|
||||
else:
|
||||
self._cursor.execute(
|
||||
f"SELECT {select_sql} FROM {table_name} WHERE {condition}"
|
||||
custom_condition = condition.pop("_custom_condition", None)
|
||||
condition_sql = " AND ".join([f"{key} = :{key}" for key in condition.keys()]) + (
|
||||
f" AND {custom_condition}" if custom_condition else ""
|
||||
)
|
||||
self._cursor.execute(
|
||||
f"SELECT {select_sql} FROM {table_name} WHERE {condition_sql}", condition
|
||||
)
|
||||
|
||||
def _search_data(self, table_name: str, keys: list[str] | None = None, condition: dict = None) -> dict:
|
||||
if keys is None:
|
||||
keys = self.__get_table_columns(table_name)
|
||||
self._search(table_name, keys, condition)
|
||||
return dict(zip(keys, self._cursor.fetchone()))
|
||||
|
||||
def _search_datas(self, table_name: str, keys: list[str] | None = None, condition: dict = None) -> list[dict]:
|
||||
if keys is None:
|
||||
keys = self.__get_table_columns(table_name)
|
||||
self._search(table_name, keys, condition)
|
||||
return [dict(zip(keys, row)) for row in self._cursor.fetchall()]
|
||||
|
||||
def _table_exists(self, table_name: str) -> bool:
|
||||
@@ -130,6 +138,10 @@ class DataConnector:
|
||||
)
|
||||
return len(self._cursor.fetchall()) == 1
|
||||
|
||||
def __get_table_columns(self, table_name: str) -> list[str]:
|
||||
self._cursor.execute(f"PRAGMA table_info({table_name})")
|
||||
return [column_info[1] for column_info in self._cursor.fetchall()]
|
||||
|
||||
@staticmethod
|
||||
def __python_to_sqlite_type(value) -> str:
|
||||
if isinstance(value, int):
|
||||
|
||||
@@ -179,3 +179,8 @@ def raw_parser(raw: str) -> Episode | None:
|
||||
return Episode(
|
||||
name_en, name_zh, name_jp, season, sr, episode, sub, group, dpi, source
|
||||
)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
title = "[动漫国字幕组&LoliHouse] THE MARGINAL SERVICE - 08 [WebRip 1080p HEVC-10bit AAC][简繁内封字幕]"
|
||||
print(raw_parser(title))
|
||||
|
||||
@@ -39,7 +39,8 @@ def is_animation(tv_id, language) -> bool:
|
||||
|
||||
|
||||
def get_season(seasons: list) -> int:
|
||||
for season in seasons:
|
||||
ss = sorted(seasons, key=lambda e: e.get("air_date"), reverse=True)
|
||||
for season in ss:
|
||||
if re.search(r"第 \d 季", season.get("season")) is not None:
|
||||
date = season.get("air_date").split("-")
|
||||
[year, _ , _] = date
|
||||
@@ -74,5 +75,5 @@ def tmdb_parser(title, language) -> TMDBInfo | None:
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
title = "鬼灭之刃"
|
||||
print(tmdb_parser(title, "zh"))
|
||||
title = "海盗战记"
|
||||
print(tmdb_parser(title, "zh").last_season)
|
||||
|
||||
@@ -13,7 +13,7 @@ RULES = [
|
||||
r"(.*) - (\d{1,4}(?!\d|p)|\d{1,4}\.\d{1,2}(?!\d|p))(?:v\d{1,2})?(?: )?(?:END)?(.*)",
|
||||
r"(.*)[\[\ E](\d{1,4}|\d{1,4}\.\d{1,2})(?:v\d{1,2})?(?: )?(?:END)?[\]\ ](.*)",
|
||||
r"(.*)\[(?:第)?(\d*\.*\d*)[话集話](?:END)?\](.*)",
|
||||
r"(.*)第(\d*\.*\d*)[话話集](?:END)?(.*)",
|
||||
r"(.*)第?(\d*\.*\d*)[话話集](?:END)?(.*)",
|
||||
r"(.*)(?:S\d{2})?EP?(\d+)(.*)",
|
||||
]
|
||||
|
||||
|
||||
@@ -49,7 +49,16 @@ class TitleParser:
|
||||
"jp": episode.title_jp,
|
||||
}
|
||||
title_raw = episode.title_en if episode.title_en else episode.title_zh
|
||||
official_title = titles[language] if titles[language] else titles["zh"]
|
||||
if titles[language]:
|
||||
official_title = titles[language]
|
||||
elif titles["zh"]:
|
||||
official_title = titles["zh"]
|
||||
elif titles["en"]:
|
||||
official_title = titles["en"]
|
||||
elif titles["jp"]:
|
||||
official_title = titles["jp"]
|
||||
else:
|
||||
official_title = title_raw
|
||||
_season = episode.season
|
||||
data = BangumiData(
|
||||
official_title=official_title,
|
||||
|
||||
Reference in New Issue
Block a user