Complete database operator

This commit is contained in:
EstrellaXD
2023-04-29 11:52:08 +08:00
parent 2a1d2e3e17
commit 3e0ce6ef0d
3 changed files with 154 additions and 14 deletions

View File

@@ -3,7 +3,7 @@ from .config import settings, VERSION
TMDB_API = "32b19d6a05b512190a056fa4e747cbbc"
DATA_PATH = "data/data.json"
DATA_PATH = "data/data.db"
class RSSLink(str):

View File

@@ -1,16 +1,40 @@
from sqlite3 import Cursor
import os
import sqlite3
from module.conf import settings, DATA_PATH
from module.conf import DATA_PATH
class DataConnector:
def __init__(self):
pass
if not os.path.isfile(DATA_PATH):
os.makedirs(os.path.dirname(DATA_PATH), exist_ok=True)
self._conn = sqlite3.connect(DATA_PATH)
self._cursor = self._conn.cursor()
self._cursor.execute(
"""
CREATE TABLE IF NOT EXISTS bangumi (
id INTEGER PRIMARY KEY,
official_title TEXT NOT NULL,
title_raw TEXT NOT NULL,
season INTEGER NOT NULL,
season_raw TEXT NOT NULL,
subtitle TEXT,
group_name TEXT,
source TEXT,
dpi TEXT,
eps_collect INTEGER NOT NULL,
offset INTEGER NOT NULL,
filter TEXT NOT NULL
);
"""
)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
self._conn.commit()
self._conn.close()

View File

@@ -1,27 +1,143 @@
from .connector import DataConnector
from module.database.connector import DataConnector
from module.models import BangumiData
class DataOperator(DataConnector):
def data_to_db(self, data: BangumiData) -> dict:
db_data = data.dict()
for key, value in db_data.items():
if isinstance(value, bool):
db_data[key] = int(value)
elif isinstance(value, list):
db_data[key] = ",".join(value)
return db_data
def db_to_data(self, db_data: dict) -> BangumiData:
for key, item in db_data.items():
if isinstance(item, int):
if key not in ["id", "offset", "season"]:
db_data[key] = bool(item)
elif key == "filter":
db_data[key] = item.split(",")
return BangumiData(**db_data)
def insert(self, data: BangumiData):
pass
db_data = self.data_to_db(data)
self._cursor.execute('''
INSERT INTO bangumi (
id,
official_title,
title_raw,
season,
season_raw,
subtitle,
group_name,
source,
dpi,
eps_collect,
offset,
filter
) VALUES (
:id,
:official_title,
:title_raw,
:season,
:season_raw,
:subtitle,
:group,
:source,
:dpi,
:eps_collect,
:offset,
:filter
)
''', db_data)
def insert_list(self, data: list[BangumiData]):
pass
db_data = [self.data_to_db(x) for x in data]
self._cursor.executemany('''
INSERT INTO bangumi (
id,
official_title,
title_raw,
season,
season_raw,
subtitle,
group_name,
source,
dpi,
eps_collect,
offset,
filter
) VALUES (
:id,
:official_title,
:title_raw,
:season,
:season_raw,
:subtitle,
:group,
:source,
:dpi,
:eps_collect,
:offset,
:filter
)
''', db_data)
def update(self, data: BangumiData) -> bool:
pass
def search(self, id: int) -> bool:
pass
db_data = self.data_to_db(data)
self._cursor.execute('''
UPDATE bangumi SET
official_title = :official_title,
title_raw = :title_raw,
season = :season,
season_raw = :season_raw,
subtitle = :subtitle,
group_name = :group,
source = :source,
dpi = :dpi,
eps_collect = :eps_collect,
offset = :offset,
filter = :filter
WHERE id = :id
''', db_data)
return self._cursor.rowcount == 1
def search(self, _id: int) -> BangumiData | None:
self._cursor.execute('''
SELECT * FROM bangumi WHERE id = :id
''', {"id": _id})
values = self._cursor.fetchone()
if values is None:
return None
keys = [x[0] for x in self._cursor.description]
dict_data = dict(zip(keys, values))
return self.db_to_data(dict_data)
def match_title(self, title: str) -> bool:
# Select all title_raw
self._cursor.execute('''
SELECT title_raw FROM bangumi
''')
title_raws = [x[0] for x in self._cursor.fetchall()]
# Match title
for title_raw in title_raws:
if title_raw in title:
return True
return False
def gen_id(self) -> int:
return 1
self._cursor.execute('''
SELECT id FROM bangumi ORDER BY id DESC LIMIT 1
''')
return self._cursor.fetchone()[0] + 1
if __name__ == '__main__':
with DataOperator() as op:
a = op.match_title("THE IDOLM@STR CINDERELLA GIRLS U149")
print(a)