Files
Auto_Bangumi/backend/src/module/database/bangumi.py
2023-09-17 16:23:38 +08:00

163 lines
5.9 KiB
Python

import logging
from sqlmodel import Session, select, delete, or_, and_
from sqlalchemy.sql import func
from typing import Optional
from module.models import Bangumi, BangumiUpdate
logger = logging.getLogger(__name__)
class BangumiDatabase:
def __init__(self, session: Session):
self.session = session
def add(self, data: Bangumi):
self.session.add(data)
self.session.commit()
logger.debug(f"[Database] Insert {data.official_title} into database.")
def add_all(self, datas: list[Bangumi]):
self.session.add_all(datas)
self.session.commit()
logger.debug(f"[Database] Insert {len(datas)} bangumi into database.")
def update(self, data: Bangumi | BangumiUpdate, _id: int = None) -> bool:
if _id and isinstance(data, BangumiUpdate):
db_data = self.session.get(Bangumi, _id)
elif isinstance(data, Bangumi):
db_data = self.session.get(Bangumi, data.id)
else:
return False
if not db_data:
return False
bangumi_data = data.dict(exclude_unset=True)
for key, value in bangumi_data.items():
setattr(db_data, key, value)
self.session.add(db_data)
self.session.commit()
self.session.refresh(db_data)
logger.debug(f"[Database] Update {data.official_title}")
return True
def update_all(self, datas: list[Bangumi]):
self.session.add_all(datas)
self.session.commit()
logger.debug(f"[Database] Update {len(datas)} bangumi.")
def update_rss(self, title_raw, rss_set: str):
# Update rss and added
statement = select(Bangumi).where(Bangumi.title_raw == title_raw)
bangumi = self.session.exec(statement).first()
bangumi.rss_link = rss_set
bangumi.added = False
self.session.add(bangumi)
self.session.commit()
self.session.refresh(bangumi)
logger.debug(f"[Database] Update {title_raw} rss_link to {rss_set}.")
def update_poster(self, title_raw, poster_link: str):
statement = select(Bangumi).where(Bangumi.title_raw == title_raw)
bangumi = self.session.exec(statement).first()
bangumi.poster_link = poster_link
self.session.add(bangumi)
self.session.commit()
self.session.refresh(bangumi)
logger.debug(f"[Database] Update {title_raw} poster_link to {poster_link}.")
def delete_one(self, _id: int):
statement = select(Bangumi).where(Bangumi.id == _id)
bangumi = self.session.exec(statement).first()
self.session.delete(bangumi)
self.session.commit()
logger.debug(f"[Database] Delete bangumi id: {_id}.")
def delete_all(self):
statement = delete(Bangumi)
self.session.exec(statement)
self.session.commit()
def search_all(self) -> list[Bangumi]:
statement = select(Bangumi)
return self.session.exec(statement).all()
def search_id(self, _id: int) -> Optional[Bangumi]:
statement = select(Bangumi).where(Bangumi.id == _id)
bangumi = self.session.exec(statement).first()
if bangumi is None:
logger.warning(f"[Database] Cannot find bangumi id: {_id}.")
return None
else:
logger.debug(f"[Database] Find bangumi id: {_id}.")
return self.session.exec(statement).first()
def match_poster(self, bangumi_name: str) -> str:
# Use like to match
statement = select(Bangumi).where(
func.instr(bangumi_name, Bangumi.official_title) > 0
)
data = self.session.exec(statement).first()
if data:
return data.poster_link
else:
return ""
def match_list(self, torrent_list: list, rss_link: str) -> list:
match_datas = self.search_all()
if not match_datas:
return torrent_list
# Match title
i = 0
while i < len(torrent_list):
torrent = torrent_list[i]
for match_data in match_datas:
if match_data.title_raw in torrent.name:
if rss_link not in match_data.rss_link:
match_data.rss_link += f",{rss_link}"
self.update_rss(match_data.title_raw, match_data.rss_link)
# if not match_data.poster_link:
# self.update_poster(match_data.title_raw, torrent.poster_link)
torrent_list.pop(i)
break
else:
i += 1
return torrent_list
def match_torrent(self, torrent_name: str) -> Optional[Bangumi]:
statement = select(Bangumi).where(
and_(
func.instr(torrent_name, Bangumi.title_raw) > 0,
Bangumi.deleted == False,
)
)
return self.session.exec(statement).first()
def not_complete(self) -> list[Bangumi]:
# Find eps_complete = False
condition = select(Bangumi).where(Bangumi.eps_collect == False)
datas = self.session.exec(condition).all()
return datas
def not_added(self) -> list[Bangumi]:
conditions = select(Bangumi).where(
or_(
Bangumi.added == 0, Bangumi.rule_name is None, Bangumi.save_path is None
)
)
datas = self.session.exec(conditions).all()
return datas
def disable_rule(self, _id: int):
statement = select(Bangumi).where(Bangumi.id == _id)
bangumi = self.session.exec(statement).first()
bangumi.deleted = True
self.session.add(bangumi)
self.session.commit()
self.session.refresh(bangumi)
logger.debug(f"[Database] Disable rule {bangumi.title_raw}.")
def search_rss(self, rss_link: str) -> list[Bangumi]:
statement = select(Bangumi).where(func.instr(rss_link, Bangumi.rss_link) > 0)
return self.session.exec(statement).all()