mirror of
https://github.com/EstrellaXD/Auto_Bangumi.git
synced 2026-04-13 18:11:03 +08:00
完善数据库拓展性
This commit is contained in:
@@ -13,34 +13,7 @@ class BangumiDatabase(DataConnector):
|
||||
def update_table(self):
|
||||
table_name = "bangumi"
|
||||
db_data = self.__data_to_db(BangumiData())
|
||||
columns = ", ".join([f"{key} {self.__python_to_sqlite_type(value)}" for key, value in db_data.items()])
|
||||
create_table_sql = f"CREATE TABLE IF NOT EXISTS {table_name} ({columns});"
|
||||
self._cursor.execute(create_table_sql)
|
||||
self._cursor.execute(f"PRAGMA table_info({table_name})")
|
||||
existing_columns = {column_info[1]: column_info for column_info in self._cursor.fetchall()}
|
||||
for key, value in db_data.items():
|
||||
if key not in existing_columns:
|
||||
add_column_sql = f"ALTER TABLE {table_name} ADD COLUMN {key} {self.__python_to_sqlite_type(value)} DEFAULT {value};"
|
||||
self._cursor.execute(add_column_sql)
|
||||
self._conn.commit()
|
||||
logger.debug("Create / Update table bangumi.")
|
||||
|
||||
@staticmethod
|
||||
def __python_to_sqlite_type(value) -> str:
|
||||
if isinstance(value, int):
|
||||
return "INTEGER NOT NULL"
|
||||
elif isinstance(value, float):
|
||||
return "REAL NOT NULL"
|
||||
elif isinstance(value, str):
|
||||
return "TEXT NOT NULL"
|
||||
elif isinstance(value, bool):
|
||||
return "INTEGER NOT NULL"
|
||||
elif isinstance(value, list):
|
||||
return "TEXT NOT NULL"
|
||||
elif value is None:
|
||||
return "TEXT"
|
||||
else:
|
||||
raise ValueError(f"Unsupported data type: {type(value)}")
|
||||
self._update_table(table_name, db_data)
|
||||
|
||||
@staticmethod
|
||||
def __data_to_db(data: BangumiData) -> dict:
|
||||
|
||||
@@ -1,17 +1,50 @@
|
||||
import os
|
||||
import sqlite3
|
||||
|
||||
import logging
|
||||
|
||||
from module.conf import DATA_PATH
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DataConnector:
|
||||
def __init__(self):
|
||||
if not os.path.isfile(DATA_PATH):
|
||||
os.makedirs(os.path.dirname(DATA_PATH), exist_ok=True)
|
||||
# Create folder if not exists
|
||||
if not os.path.exists(os.path.dirname(DATA_PATH)):
|
||||
os.makedirs(os.path.dirname(DATA_PATH))
|
||||
self._conn = sqlite3.connect(DATA_PATH)
|
||||
self._cursor = self._conn.cursor()
|
||||
|
||||
def _update_table(self, table_name: str, db_data: dict):
|
||||
columns = ", ".join([f"{key} {self.__python_to_sqlite_type(value)}" for key, value in db_data.items()])
|
||||
create_table_sql = f"CREATE TABLE IF NOT EXISTS {table_name} ({columns});"
|
||||
self._cursor.execute(create_table_sql)
|
||||
self._cursor.execute(f"PRAGMA table_info({table_name})")
|
||||
existing_columns = {column_info[1]: column_info for column_info in self._cursor.fetchall()}
|
||||
for key, value in db_data.items():
|
||||
if key not in existing_columns:
|
||||
add_column_sql = f"ALTER TABLE {table_name} ADD COLUMN {key} {self.__python_to_sqlite_type(value)} DEFAULT {value};"
|
||||
self._cursor.execute(add_column_sql)
|
||||
self._conn.commit()
|
||||
logger.debug(f"Create / Update table {table_name}.")
|
||||
|
||||
@staticmethod
|
||||
def __python_to_sqlite_type(value) -> str:
|
||||
if isinstance(value, int):
|
||||
return "INTEGER NOT NULL"
|
||||
elif isinstance(value, float):
|
||||
return "REAL NOT NULL"
|
||||
elif isinstance(value, str):
|
||||
return "TEXT NOT NULL"
|
||||
elif isinstance(value, bool):
|
||||
return "INTEGER NOT NULL"
|
||||
elif isinstance(value, list):
|
||||
return "TEXT NOT NULL"
|
||||
elif value is None:
|
||||
return "TEXT"
|
||||
else:
|
||||
raise ValueError(f"Unsupported data type: {type(value)}")
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
|
||||
@@ -8,7 +8,19 @@ from module.models import UserLogin
|
||||
|
||||
class AuthDB(DataConnector):
|
||||
def update_table(self):
|
||||
pass
|
||||
table_name = "user"
|
||||
db_data = self.__data_to_db(UserLogin())
|
||||
self._update_table(table_name, db_data)
|
||||
|
||||
@staticmethod
|
||||
def __data_to_db(data: UserLogin) -> dict:
|
||||
db_data = data.dict()
|
||||
db_data["password"] = get_password_hash(db_data["password"])
|
||||
return db_data
|
||||
|
||||
@staticmethod
|
||||
def __db_to_data(db_data: dict) -> UserLogin:
|
||||
return UserLogin(**db_data)
|
||||
|
||||
def auth_user(self, user: UserLogin) -> bool:
|
||||
username = user.username
|
||||
|
||||
@@ -17,8 +17,6 @@ class DownloadClient:
|
||||
def __init__(self):
|
||||
self.client = self.__getClient()
|
||||
self.authed = False
|
||||
self.download_path = settings.downloader.path
|
||||
self.group_tag = settings.bangumi_manage.group_tag
|
||||
|
||||
@staticmethod
|
||||
def __getClient():
|
||||
@@ -62,9 +60,9 @@ class DownloadClient:
|
||||
except Exception as e:
|
||||
logger.warning("Cannot add new category, maybe already exists.")
|
||||
logger.debug(e)
|
||||
if self.download_path == "":
|
||||
if settings.downloader.path == "":
|
||||
prefs = self.client.get_app_prefs()
|
||||
self.download_path = path.join(prefs["save_path"], "Bangumi")
|
||||
settings.downloader.path = path.join(prefs["save_path"], "Bangumi")
|
||||
|
||||
def set_rule(self, info: BangumiData):
|
||||
official_name = f"{info.official_title}({info.year})" if info.year else info.official_title
|
||||
@@ -88,13 +86,15 @@ class DownloadClient:
|
||||
"assignedCategory": "Bangumi",
|
||||
"savePath": str(
|
||||
path.join(
|
||||
self.download_path,
|
||||
settings.downloader.path,
|
||||
re.sub(r"[:/.]", " ", official_name).strip(),
|
||||
f"Season {season}",
|
||||
)
|
||||
),
|
||||
}
|
||||
rule_name = f"[{group}] {official_name}" if self.group_tag else official_name
|
||||
rule_name = f"[{group}] {official_name}" \
|
||||
if settings.bangumi_manage.group_tag \
|
||||
else official_name
|
||||
self.client.rss_set_rule(rule_name=f"{rule_name} S{season}", rule_def=rule)
|
||||
logger.info(f"Add {official_name} Season {season} to auto download rules.")
|
||||
|
||||
|
||||
0
src/module/rss/filter.py
Normal file
0
src/module/rss/filter.py
Normal file
Reference in New Issue
Block a user