mirror of
https://github.com/EstrellaXD/Auto_Bangumi.git
synced 2026-04-24 18:40:03 +08:00
主程序迁移完成
This commit is contained in:
@@ -1 +1 @@
|
||||
from .operator import DataOperator
|
||||
from .bangumi import BangumiDatabase
|
||||
|
||||
@@ -6,12 +6,11 @@ from module.models import BangumiData
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DataOperator(DataConnector):
|
||||
class BangumiDatabase(DataConnector):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.__update_table()
|
||||
|
||||
def __update_table(self):
|
||||
def update_table(self):
|
||||
table_name = "bangumi"
|
||||
db_data = self.__data_to_db(BangumiData())
|
||||
columns = ", ".join([f"{key} {self.__python_to_sqlite_type(value)}" for key, value in db_data.items()])
|
||||
@@ -63,6 +62,14 @@ class DataOperator(DataConnector):
|
||||
db_data[key] = item.split(",")
|
||||
return BangumiData(**db_data)
|
||||
|
||||
def __fetch_data(self) -> list[BangumiData] | None:
|
||||
values = self._cursor.fetchall()
|
||||
if values is None:
|
||||
return None
|
||||
keys = [x[0] for x in self._cursor.description]
|
||||
dict_data = [dict(zip(keys, value)) for value in values]
|
||||
return [self.__db_to_data(x) for x in dict_data]
|
||||
|
||||
def insert(self, data: BangumiData):
|
||||
db_data = self.__data_to_db(data)
|
||||
columns = ", ".join(db_data.keys())
|
||||
@@ -86,6 +93,12 @@ class DataOperator(DataConnector):
|
||||
self._conn.commit()
|
||||
return self._cursor.rowcount == 1
|
||||
|
||||
def update_list(self, data: list[BangumiData]):
|
||||
db_data = [self.__data_to_db(x) for x in data]
|
||||
update_columns = ", ".join([f"{key} = :{key}" for key in db_data[0].keys() if key != "id"])
|
||||
self._cursor.executemany(f"UPDATE bangumi SET {update_columns} WHERE id = :id", db_data)
|
||||
self._conn.commit()
|
||||
|
||||
def update_rss(self, title_raw, rss_set: str):
|
||||
# Update rss and added
|
||||
self._cursor.execute(
|
||||
@@ -190,19 +203,22 @@ class DataOperator(DataConnector):
|
||||
self.update_rss(title_raw, rss_set)
|
||||
return titles
|
||||
|
||||
def get_to_complete(self) -> list[BangumiData] | None:
|
||||
def not_complete(self) -> list[BangumiData] | None:
|
||||
# Find eps_complete = False
|
||||
self._cursor.execute(
|
||||
"""
|
||||
SELECT * FROM bangumi WHERE eps_collect = 1
|
||||
SELECT * FROM bangumi WHERE eps_collect = 0
|
||||
"""
|
||||
)
|
||||
values = self._cursor.fetchall()
|
||||
if values is None:
|
||||
return None
|
||||
keys = [x[0] for x in self._cursor.description]
|
||||
dict_data = [dict(zip(keys, value)) for value in values]
|
||||
return [self.__db_to_data(x) for x in dict_data]
|
||||
return self.__fetch_data()
|
||||
|
||||
def not_added(self) -> list[BangumiData] | None:
|
||||
self._cursor.execute(
|
||||
"""
|
||||
SELECT * FROM bangumi WHERE added = 0
|
||||
"""
|
||||
)
|
||||
return self.__fetch_data()
|
||||
|
||||
def gen_id(self) -> int:
|
||||
self._cursor.execute(
|
||||
Reference in New Issue
Block a user