feat: new orm module

This commit is contained in:
EstrellaXD
2023-06-14 09:49:16 +08:00
parent 86088a3370
commit 76112ad3e1
6 changed files with 230 additions and 0 deletions

View File

@@ -0,0 +1 @@
from .connector import Connector

View File

@@ -0,0 +1,44 @@
import sqlite3
from .delete import Delete
from .insert import Insert
from .select import Select
from .update import Update
from module.conf import DATA_PATH
class Connector:
def __init__(self, database: str = DATA_PATH, table_name: str = None, data: dict = None):
self._conn = sqlite3.connect(database)
self._cursor = self._conn.cursor()
self.update = Update(self, table_name, data)
self.insert = Insert(self, table_name, data)
self.select = Select(self, table_name, data)
self.delete = Delete(self, table_name, data)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self._conn.close()
def execute(self, sql: str, params: tuple = None):
if params is None:
self._cursor.execute(sql)
else:
self._cursor.execute(sql, params)
self._conn.commit()
def executemany(self, sql: str, params: list[tuple]):
self._cursor.executemany(sql, params)
self._conn.commit()
def fetchall(self):
return self._cursor.fetchall()
def fetchone(self):
return self._cursor.fetchone()
def fetchmany(self, size: int):
return self._cursor.fetchmany(size)

View File

@@ -0,0 +1,26 @@
class Delete:
def __init__(self, connector: Connector, table_name: str, data: dict):
self.db = connector
self._table_name = table_name
self._data = data
def one(self, _id: int) -> bool:
self.db.execute(
f"""
DELETE FROM {self._table_name}
WHERE id = :id
""",
{"id": _id},
)
return True
def all(self):
self.db.execute(
f"""
DELETE FROM {self._table_name}
""",
)
return True

View File

@@ -0,0 +1,43 @@
from .connector import Connector
class Insert:
def __init__(self, db: Connector, table_name: str, data: dict):
self.db = db
self._table_name = table_name
self._columns = data.items()
def __gen_id(self) -> int:
self.db.execute(f"SELECT MAX(id) FROM {self._table_name}")
max_id = self.db.fetchone()[0]
if max_id is None:
return 1
return max_id + 1
def one(self, data: dict) -> bool:
_id = self.__gen_id()
data["id"] = _id
columns = ", ".join(data.keys())
placeholders = ", ".join([f":{key}" for key in data.keys()])
self.db.execute(
f"""
INSERT INTO {self._table_name} ({columns})
VALUES ({placeholders})
""",
data,
)
return True
def list(self, data: list[dict]):
columns = ", ".join(data[0].keys())
placeholders = ", ".join([f":{key}" for key in data[0].keys()])
self.db.executemany(
f"""
INSERT INTO {self._table_name} ({columns})
VALUES ({placeholders})
""",
data,
)
return True

View File

@@ -0,0 +1,48 @@
class Select:
def __init__(self, connector: Connector, table_name: str, data: dict):
self._connector = connector
self._table_name = table_name
self._data = data
def id(self, _id: int):
self._connector.execute(
f"""
SELECT * FROM {self._table_name}
WHERE id = :id
""",
{"id": _id},
)
return self._connector.fetchone()
def all(self):
self._connector.execute(
f"""
SELECT * FROM {self._table_name}
""",
)
return self._connector.fetchall()
def one(self, keys: list[str], values: list[str]):
columns = ", ".join(keys)
placeholders = ", ".join([f":{key}" for key in keys])
self._connector.execute(
f"""
SELECT {columns} FROM {self._table_name}
WHERE {placeholders}
""",
dict(zip(keys, values)),
)
return self._connector.fetchone()
def list(self, keys: list[str], values: list[str]):
columns = ", ".join(keys)
placeholders = ", ".join([f":{key}" for key in keys])
self._connector.execute(
f"""
SELECT {columns} FROM {self._table_name}
WHERE {placeholders}
""",
dict(zip(keys, values)),
)
return self._connector.fetchall()

View File

@@ -0,0 +1,68 @@
import logging
from .connector import Connector
logger = logging.getLogger(__name__)
class Update:
def __init__(self, db: Connector, table_name: str, data: dict):
self.db = db
self._table_name = table_name
self._columns = data.items()
def table(self):
columns = ", ".join(
[
f"{key} {self.__python_to_sqlite_type(value)}"
for key, value in self._columns
]
)
create_table_sql = f"CREATE TABLE IF NOT EXISTS {self._table_name} ({columns});"
self.db.execute(create_table_sql)
self.db.execute(f"PRAGMA table_info({self._table_name})")
existing_columns = {
column_info[1]: column_info for column_info in self.db.fetchall()
}
for key, value in self._columns:
if key not in existing_columns:
insert_column = self.__python_to_sqlite_type(value)
if value is None:
value = "NULL"
add_column_sql = f"ALTER TABLE {self._table_name} ADD COLUMN {key} {insert_column} DEFAULT {value};"
self.db.execute(add_column_sql)
logger.debug(f"Create / Update table {self._table_name}.")
def one(self, data: dict) -> bool:
_id = data.pop("id")
set_sql = ", ".join([f"{key} = :{key}" for key in data.keys()])
self.db.execute(
f"""
UPDATE {self._table_name}
SET {set_sql}
WHERE id = {_id}
""",
data,
)
logger.debug(f"Update {_id} in {self._table_name}.")
return True
def list(self, data: list[dict]):
for item in data:
self.one(item)
@staticmethod
def __python_to_sqlite_type(value) -> str:
if isinstance(value, int):
return "INTEGER NOT NULL"
elif isinstance(value, float):
return "REAL NOT NULL"
elif isinstance(value, str):
return "TEXT NOT NULL"
elif isinstance(value, bool):
return "INTEGER NOT NULL"
elif isinstance(value, list):
return "TEXT NOT NULL"
elif value is None:
return "TEXT"
else:
raise ValueError(f"Unsupported data type: {type(value)}")