Finished new version of rss parser.

This commit is contained in:
EstrellaXD
2023-05-09 15:34:58 +08:00
parent 3ac4381e31
commit f4469bf892
9 changed files with 154 additions and 270 deletions

View File

@@ -1,11 +1,11 @@
import logging
from module.models import Config
from .config import settings
LOG_PATH = "data/log.txt"
def setup_logger(settings: Config):
def setup_logger():
level = logging.DEBUG if settings.log.debug_enable else logging.INFO
logging.addLevelName(logging.DEBUG, "DEBUG:")
logging.addLevelName(logging.INFO, "INFO:")

View File

@@ -11,29 +11,6 @@ class DataConnector:
os.makedirs(os.path.dirname(DATA_PATH), exist_ok=True)
self._conn = sqlite3.connect(DATA_PATH)
self._cursor = self._conn.cursor()
self._cursor.execute(
"""
CREATE TABLE IF NOT EXISTS bangumi (
id INTEGER PRIMARY KEY,
official_title TEXT NOT NULL,
year INTEGER,
title_raw TEXT NOT NULL,
season INTEGER NOT NULL,
season_raw TEXT NOT NULL,
group_name TEXT,
dpi TEXT,
source TEXT,
subtitle TEXT,
eps_collect INTEGER NOT NULL,
offset INTEGER NOT NULL,
filter TEXT NOT NULL,
rss_link TEXT NOT NULL,
poster_link TEXT,
added INTEGER NOT NULL
);
"""
)
self._conn.commit()
def __enter__(self):
return self

View File

@@ -7,8 +7,44 @@ logger = logging.getLogger(__name__)
class DataOperator(DataConnector):
def __init__(self):
super().__init__()
self.__update_table()
def __update_table(self):
table_name = "bangumi"
db_data = self.__data_to_db(BangumiData())
columns = ", ".join([f"{key} {self.__python_to_sqlite_type(value)}" for key, value in db_data.items()])
create_table_sql = f"CREATE TABLE IF NOT EXISTS {table_name} ({columns});"
self._cursor.execute(create_table_sql)
self._cursor.execute(f"PRAGMA table_info({table_name})")
existing_columns = {column_info[1]: column_info for column_info in self._cursor.fetchall()}
for key, value in db_data.items():
if key not in existing_columns:
add_column_sql = f"ALTER TABLE {table_name} ADD COLUMN {key} {self.__python_to_sqlite_type(value)} DEFAULT {value};"
self._cursor.execute(add_column_sql)
self._conn.commit()
logger.debug("Create / Update table bangumi.")
@staticmethod
def data_to_db(data: BangumiData) -> dict:
def __python_to_sqlite_type(value) -> str:
if isinstance(value, int):
return "INTEGER NOT NULL"
elif isinstance(value, float):
return "REAL NOT NULL"
elif isinstance(value, str):
return "TEXT NOT NULL"
elif isinstance(value, bool):
return "INTEGER NOT NULL"
elif isinstance(value, list):
return "TEXT NOT NULL"
elif value is None:
return "TEXT"
else:
raise ValueError(f"Unsupported data type: {type(value)}")
@staticmethod
def __data_to_db(data: BangumiData) -> dict:
db_data = data.dict()
for key, value in db_data.items():
if isinstance(value, bool):
@@ -18,7 +54,7 @@ class DataOperator(DataConnector):
return db_data
@staticmethod
def db_to_data(db_data: dict) -> BangumiData:
def __db_to_data(db_data: dict) -> BangumiData:
for key, item in db_data.items():
if isinstance(item, int):
if key not in ["id", "offset", "season", "year"]:
@@ -28,123 +64,28 @@ class DataOperator(DataConnector):
return BangumiData(**db_data)
def insert(self, data: BangumiData):
db_data = self.data_to_db(data)
self._cursor.execute(
"""
INSERT INTO bangumi (
id,
official_title,
year,
title_raw,
season,
season_raw,
group_name,
dpi,
source,
subtitle,
eps_collect,
offset,
filter,
rss_link,
poster_link,
added
) VALUES (
:id,
:official_title,
:year,
:title_raw,
:season,
:season_raw,
:group,
:dpi,
:source,
:subtitle,
:eps_collect,
:offset,
:filter,
:rss_link,
:poster_link,
:added
)
""",
db_data,
)
db_data = self.__data_to_db(data)
columns = ", ".join(db_data.keys())
values = ", ".join([f":{key}" for key in db_data.keys()])
self._cursor.execute(f"INSERT INTO bangumi ({columns}) VALUES ({values})", db_data)
logger.debug(f"Add {data.official_title} into database.")
self._conn.commit()
def insert_list(self, data: list[BangumiData]):
db_data = [self.data_to_db(x) for x in data]
self._cursor.executemany(
"""
INSERT INTO bangumi (
id,
official_title,
year,
title_raw,
season,
season_raw,
group_name,
dpi,
source,
subtitle,
eps_collect,
offset,
filter,
rss_link,
poster_link,
added
) VALUES (
:id,
:official_title,
:year,
:title_raw,
:season,
:season_raw,
:group,
:dpi,
:source,
:subtitle,
:eps_collect,
:offset,
:filter,
:rss_link,
:poster_link,
:added
)
""",
db_data,
)
db_data = [self.__data_to_db(x) for x in data]
columns = ", ".join(db_data[0].keys())
values = ", ".join([f":{key}" for key in db_data[0].keys()])
self._cursor.executemany(f"INSERT INTO bangumi ({columns}) VALUES ({values})", db_data)
logger.debug(f"Add {len(data)} bangumi into database.")
self._conn.commit()
def update(self, data: BangumiData) -> bool:
db_data = self.data_to_db(data)
self._cursor.execute(
"""
UPDATE bangumi SET
official_title = :official_title,
year = :year,
title_raw = :title_raw,
season = :season,
season_raw = :season_raw,
group_name = :group,
dpi = :dpi,
source = :source,
subtitle = :subtitle,
eps_collect = :eps_collect,
offset = :offset,
filter = :filter,
rss_link = :rss_link,
poster_link = :poster_link,
added = :added
WHERE id = :id
""",
db_data,
)
db_data = self.__data_to_db(data)
update_columns = ", ".join([f"{key} = :{key}" for key in db_data.keys() if key != "id"])
self._cursor.execute(f"UPDATE bangumi SET {update_columns} WHERE id = :id", db_data)
self._conn.commit()
return self._cursor.rowcount == 1
def update_column(self, title_raw: str, column: str, value: str):
def update_rss(self, title_raw, rss_set: str):
# Update rss and select all data
self._cursor.execute(
@@ -168,7 +109,7 @@ class DataOperator(DataConnector):
return None
keys = [x[0] for x in self._cursor.description]
dict_data = dict(zip(keys, values))
return self.db_to_data(dict_data)
return self.__db_to_data(dict_data)
def search_official_title(self, official_title: str) -> BangumiData | None:
self._cursor.execute(
@@ -182,7 +123,7 @@ class DataOperator(DataConnector):
return None
keys = [x[0] for x in self._cursor.description]
dict_data = dict(zip(keys, values))
return self.db_to_data(dict_data)
return self.__db_to_data(dict_data)
def match_official_title(self, title: str) -> bool:
self._cursor.execute(
@@ -260,7 +201,7 @@ class DataOperator(DataConnector):
return None
keys = [x[0] for x in self._cursor.description]
dict_data = [dict(zip(keys, value)) for value in values]
return [self.db_to_data(x) for x in dict_data]
return [self.__db_to_data(x) for x in dict_data]
def gen_id(self) -> int:
self._cursor.execute(
@@ -272,12 +213,3 @@ class DataOperator(DataConnector):
if data is None:
return 1
return data[0] + 1
if __name__ == '__main__':
with DataOperator() as op:
datas = op.get_to_complete()
_id = op.gen_id()
for data in datas:
print(data)
print(_id)

View File

@@ -5,7 +5,7 @@ from dataclasses import dataclass
class BangumiData(BaseModel):
id: int = Field(0, alias="id", title="番剧ID")
official_title: str = Field("official_title", alias="official_title", title="番剧中文名")
year: int | None = Field(None, alias="year", title="番剧年份")
year: str | None = Field(None, alias="year", title="番剧年份")
title_raw: str = Field("title_raw", alias="title_raw", title="番剧原名")
season: int = Field(1, alias="season", title="番剧季度")
season_raw: str | None = Field(None, alias="season_raw", title="番剧季度原名")
@@ -16,7 +16,7 @@ class BangumiData(BaseModel):
eps_collect: bool = Field(False, alias="eps_collect", title="是否已收集")
offset: int = Field(0, alias="offset", title="番剧偏移量")
filter: list[str] = Field(["720", "\\d+-\\d+"], alias="filter", title="番剧过滤器")
rss_link: list[str] = Field(..., alias="rss_link", title="番剧RSS链接")
rss_link: list[str] = Field([], alias="rss_link", title="番剧RSS链接")
poster_link: str | None = Field(None, alias="poster_link", title="番剧海报链接")
added: bool = Field(False, alias="added", title="是否已添加")
@@ -28,21 +28,6 @@ class Notification(BaseModel):
poster_link: str | None = Field(None, alias="poster_link", title="番剧海报链接")
@dataclass
class MatchRule:
keyword: str
filter: list
rss_link: str
@dataclass
class GroupFilter:
name: str
filter: list
@dataclass
class Episode:
title_en: str | None

View File

@@ -23,7 +23,7 @@ class RSSParser(BaseModel):
type: str = Field("mikan", description="RSS parser type")
token: str = Field("token", description="RSS parser token")
custom_url: str = Field("mikanani.me", description="Custom RSS host url")
enable_tmdb: bool = Field(False, description="Enable TMDB")
parser_type: str = Field("parser", description="Parser type")
filter: list[str] = Field(["720", r"\d+-\d"], description="Filter")
language: str = "zh"

View File

@@ -1,3 +1,3 @@
from .raw_parser import raw_parser
from .torrent_parser import torrent_parser
from .tmdb_parser import TMDBMatcher
from .tmdb_parser import tmdb_parser

View File

@@ -9,65 +9,70 @@ from module.conf import TMDB_API
@dataclass
class TMDBInfo:
id: int
title_jp: str
title_zh: str
title: str
original_title: str
season: list[dict]
last_season: int
year_number: int
year: str
class TMDBMatcher:
def __init__(self):
self.search_url = lambda e: \
f"https://api.themoviedb.org/3/search/tv?api_key={TMDB_API}&page=1&query={e}&include_adult=false"
self.info_url = lambda e: \
f"https://api.themoviedb.org/3/tv/{e}?api_key={TMDB_API}&language=zh-CN"
LANGUAGE = {
"zh": "zh-CN",
"jp": "ja-JP",
"en": "en-US"
}
def is_animation(self, tv_id) -> bool:
url_info = self.info_url(tv_id)
with RequestContent() as req:
type_id = req.get_json(url_info)["genres"]
for type in type_id:
if type.get("id") == 16:
return True
return False
search_url = lambda e: \
f"https://api.themoviedb.org/3/search/tv?api_key={TMDB_API}&page=1&query={e}&include_adult=false"
info_url = lambda e, key: \
f"https://api.themoviedb.org/3/tv/{e}?api_key={TMDB_API}&language={LANGUAGE[key]}"
# def get_zh_title(self, id):
# alt_title_url = self.alt_title_url(id)
# titles = self._request.get_content(alt_title_url, content="json")
# for title in titles:
# if title["iso_3166_1"] == "CN":
# return title["title"]
# return None
@staticmethod
def get_season(seasons: list) -> int:
for season in seasons:
if re.search(r"\d 季", season.get("season")) is not None:
date = season.get("air_date").split("-")
[year, _ , _] = date
now_year = time.localtime().tm_year
if int(year) == now_year:
return int(re.findall(r"\d", season.get("season"))[0])
def is_animation(tv_id, language) -> bool:
url_info = info_url(tv_id, language)
with RequestContent() as req:
type_id = req.get_json(url_info)["genres"]
for type in type_id:
if type.get("id") == 16:
return True
return False
def tmdb_search(self, title) -> TMDBInfo:
with RequestContent() as req:
url = self.search_url(title)
def get_season(seasons: list) -> int:
for season in seasons:
if re.search(r"\d 季", season.get("season")) is not None:
date = season.get("air_date").split("-")
[year, _ , _] = date
now_year = time.localtime().tm_year
if int(year) <= now_year:
return int(re.findall(r"\d", season.get("season"))[0])
def tmdb_parser(title, language) -> TMDBInfo | None:
with RequestContent() as req:
url = search_url(title)
contents = req.get_json(url).get("results")
if contents.__len__() == 0:
url = search_url(title.replace(" ", ""))
contents = req.get_json(url).get("results")
if contents.__len__() == 0:
url = self.search_url(title.replace(" ", ""))
contents = req.get_json(url).get("results")
# 判断动画
# 判断动画
if contents:
for content in contents:
id = content["id"]
if self.is_animation(id):
if is_animation(id, language):
break
url_info = self.info_url(id)
url_info = info_url(id, language)
info_content = req.get_json(url_info)
season = [{"season": s.get("name"), "air_date": s.get("air_date")} for s in info_content.get("seasons")]
last_season = get_season(season)
original_title = info_content.get("original_name")
official_title = info_content.get("name")
year_number = info_content.get("first_air_date").split("-")[0]
return TMDBInfo(id, official_title, original_title, season, last_season, str(year_number))
else:
return None
season = [{"season": s.get("name"), "air_date": s.get("air_date")} for s in info_content.get("seasons")]
last_season = self.get_season(season)
title_jp = info_content.get("original_name")
title_zh = info_content.get("name")
year_number = info_content.get("first_air_date").split("-")[0]
return TMDBInfo(id, title_jp, title_zh, season, last_season, year_number)
if __name__ == '__main__':
title = "鬼灭之刃+刀匠村篇"
print(tmdb_parser(title, "zh"))

View File

@@ -1,15 +1,16 @@
import logging
from .analyser import raw_parser, torrent_parser, TMDBMatcher
from .analyser import raw_parser, torrent_parser, tmdb_parser
from module.models import BangumiData, Config
from module.models import BangumiData
from module.conf import settings
logger = logging.getLogger(__name__)
class TitleParser:
def __init__(self):
self._tmdb_parser = TMDBMatcher()
pass
@staticmethod
def torrent_parser(
@@ -18,25 +19,22 @@ class TitleParser:
):
return torrent_parser(torrent_path, season)
def tmdb_parser(self, title: str, season: int, language: str):
official_title, tmdb_season = None, None
try:
tmdb_info = self._tmdb_parser.tmdb_search(title)
logger.debug(f"TMDB Matched, official title is {tmdb_info.title_zh}")
except Exception as e:
logger.debug(e)
logger.warning(f"{title} can not Matched with TMDB")
logger.info("Please change the bangumi info in webui")
return title, season
if language == "zh":
official_title = f"{tmdb_info.title_zh} ({tmdb_info.year_number})"
elif language == "jp":
official_title = f"{tmdb_info.title_jp} ({tmdb_info.year_number})"
tmdb_season = tmdb_info.last_season if tmdb_info.last_season else season
official_title = official_title if official_title else title
return official_title, tmdb_season
@staticmethod
def tmdb_parser(title: str, season: int, language: str):
official_title, tmdb_season, year = title, season, None
tmdb_info = tmdb_parser(title, language)
if tmdb_info:
logger.debug(f"TMDB Matched, official title is {tmdb_info.title}")
tmdb_season = tmdb_info.last_season if tmdb_info.last_season else season
official_title = tmdb_info.title
year = tmdb_info.year
else:
logger.warning(f"Cannot match {title} in TMDB. Use raw title instead.")
logger.warning("Please change bangumi info manually.")
return official_title, tmdb_season, year
def raw_parser(self, raw: str, rss_link: str, settings: Config, _id: int = 0) -> BangumiData:
@staticmethod
def raw_parser(raw: str, rss_link: str, _id: int = 0) -> BangumiData:
language = settings.rss_parser.language
try:
episode = raw_parser(raw)
@@ -45,7 +43,6 @@ class TitleParser:
"en": episode.title_en,
"jp": episode.title_jp,
}
title_search = episode.title_zh if episode.title_zh else episode.title_en
title_raw = episode.title_en if episode.title_en else episode.title_zh
official_title = titles[language] if titles[language] else titles["zh"]
_season = episode.season
@@ -59,13 +56,13 @@ class TitleParser:
dpi=episode.resolution,
source=episode.source,
subtitle=episode.sub,
eps_collect=True if episode.episode > 1 else False,
eps_collect=False if episode.episode > 1 else True,
offset=0,
filter=settings.rss_parser.filter,
rss_link=[rss_link],
)
logger.debug(f"RAW:{raw} >> {episode.title_en}")
logger.debug(f"RAW:{raw} >> {title_raw}")
return data
except Exception as e:
logger.debug(e)
print(e)
logger.warning(f"Cannot parse {raw}.")

View File

@@ -4,23 +4,21 @@ from module.network import RequestContent
from module.parser import TitleParser
from module.models import Config, BangumiData
from module.database import DataOperator
from module.core import DownloadClient
from module.conf import settings
logger = logging.getLogger(__name__)
class RSSAnalyser:
def __init__(self, settings: Config):
def __init__(self):
self._title_analyser = TitleParser()
self.settings = settings
def rss_to_datas(self, rss_link: str) -> list[BangumiData]:
def rss_to_data(self, rss_link: str, full_parse: bool = True) -> list[BangumiData]:
with RequestContent() as req:
rss_torrents = req.get_torrents(rss_link)
title_dict = {torrent.name: torrent.homepage for torrent in rss_torrents}
with DataOperator() as op:
new_dict = op.match_list(title_dict, rss_link)
print(new_dict)
if not new_dict:
logger.debug("No new title found.")
return []
@@ -30,54 +28,44 @@ class RSSAnalyser:
with RequestContent() as req:
for raw_title, homepage in new_dict.items():
data = self._title_analyser.raw_parser(
raw=raw_title, settings=self.settings, rss_link=rss_link, _id=_id
raw=raw_title, rss_link=rss_link, _id=_id
)
if data is not None:
if data and data.title_raw not in [i.title_raw for i in new_data]:
poster_link, official_title = req.get_mikan_info(homepage)
data.poster_link = poster_link
# Official title type
if self.settings.rss_parser.parser_type == "mikan":
if settings.rss_parser.parser_type == "mikan":
data.official_title = official_title
elif self.settings.rss_parser.parser_type == "tmdb":
official_title, year, season = self._title_analyser.tmdb_parser()
elif settings.rss_parser.parser_type == "tmdb":
official_title, season, year = self._title_analyser.tmdb_parser(
data.official_title,
data.season,
settings.rss_parser.language
)
data.official_title = official_title
data.year = year
data.season = season
else:
pass
if not full_parse:
op.insert(data)
return [data]
new_data.append(data)
_id += 1
logger.debug(f"New title found: {data.official_title}")
op.insert_list(new_data)
return new_data
def rss_to_data(self, url, _filter: bool = True) -> BangumiData:
with RequestContent() as req:
rss_torrents = req.get_torrents(url, _filter)
for torrent in rss_torrents:
try:
data = self._title_analyser.raw_parser(
torrent.name, settings=self.settings, rss_link=url
)
if data is not None:
with DataOperator() as op:
_id = op.gen_id()
data.id = _id
op.insert(data)
return data
except Exception as e:
logger.debug(e)
def run(self, rss_link: str):
logger.info("Start collecting RSS info.")
try:
self.rss_to_datas(rss_link)
self.rss_to_data(rss_link)
except Exception as e:
logger.debug(e)
if __name__ == '__main__':
from module.conf import settings, setup_logger
setup_logger(settings)
link = "https://mikanani.me/RSS/MyBangumi?token=Td8ceWZZv3s2OZm5ji9RoMer8vk5VS3xzC1Hmg8A26E%3d"
data = RSSAnalyser(settings).rss_to_datas(link)
from module.conf import setup_logger
setup_logger()
link = "https://mikanani.me/RSS/Bangumi?bangumiId=2906&subgroupid=552"
data = RSSAnalyser().rss_to_data(link)