Files
MoviePilot/app/db/site_oper.py
jxxghp eae1f8ee4d feat: add UpdateSiteCookieTool and enhance site operations
- Introduced UpdateSiteCookieTool to the toolset for managing site cookies.
- Updated __all__ exports in init.py and factory.py to include UpdateSiteCookieTool.
- Added async_get method in SiteOper for asynchronous retrieval of individual site records, improving database interaction.
2025-11-18 11:34:37 +08:00

329 lines
10 KiB
Python

from datetime import datetime
from typing import List, Tuple, Optional
from app.db import DbOper
from app.db.models import SiteIcon
from app.db.models.site import Site
from app.db.models.sitestatistic import SiteStatistic
from app.db.models.siteuserdata import SiteUserData
class SiteOper(DbOper):
"""
站点管理
"""
def add(self, **kwargs) -> Tuple[bool, str]:
"""
新增站点
"""
site = Site(**kwargs)
if not site.get_by_domain(self._db, kwargs.get("domain")):
site.create(self._db)
return True, "新增站点成功"
return False, "站点已存在"
def get(self, sid: int) -> Site:
"""
查询单个站点
"""
return Site.get(self._db, sid)
async def async_get(self, sid: int) -> Site:
"""
异步查询单个站点
"""
return await Site.async_get(self._db, sid)
def list(self) -> List[Site]:
"""
获取站点列表
"""
return Site.list(self._db)
async def async_list(self) -> List[Site]:
"""
异步获取站点列表
"""
return await Site.async_list(self._db)
def list_order_by_pri(self) -> List[Site]:
"""
获取站点列表
"""
return Site.list_order_by_pri(self._db)
def list_active(self) -> List[Site]:
"""
按状态获取站点列表
"""
return Site.get_actives(self._db)
async def async_list_active(self) -> List[Site]:
"""
异步按状态获取站点列表
"""
return await Site.async_get_actives(self._db)
def delete(self, sid: int):
"""
删除站点
"""
Site.delete(self._db, sid)
def update(self, sid: int, payload: dict) -> Site:
"""
更新站点
"""
site = Site.get(self._db, sid)
site.update(self._db, payload)
return site
def get_by_domain(self, domain: str) -> Site:
"""
按域名获取站点
"""
return Site.get_by_domain(self._db, domain)
async def async_get_by_domain(self, domain: str) -> Site:
"""
异步按域名获取站点
"""
return await Site.async_get_by_domain(self._db, domain)
async def async_get_by_name(self, name: str) -> Site:
"""
异步按名称获取站点
"""
return await Site.async_get_by_name(self._db, name)
def get_domains_by_ids(self, ids: List[int]) -> List[str]:
"""
按ID获取站点域名
"""
return Site.get_domains_by_ids(self._db, ids)
def exists(self, domain: str) -> bool:
"""
判断站点是否存在
"""
return Site.get_by_domain(self._db, domain) is not None
def update_cookie(self, domain: str, cookies: str) -> Tuple[bool, str]:
"""
更新站点Cookie
"""
site = Site.get_by_domain(self._db, domain)
if not site:
return False, "站点不存在"
site.update(self._db, {
"cookie": cookies
})
return True, "更新站点Cookie成功"
def update_rss(self, domain: str, rss: str) -> Tuple[bool, str]:
"""
更新站点rss
"""
site = Site.get_by_domain(self._db, domain)
if not site:
return False, "站点不存在"
site.update(self._db, {
"rss": rss
})
return True, "更新站点RSS地址成功"
def update_userdata(self, domain: str, name: str, payload: dict) -> Tuple[bool, str]:
"""
更新站点用户数据
"""
# 当前系统日期
current_day = datetime.now().strftime('%Y-%m-%d')
current_time = datetime.now().strftime('%H:%M:%S')
payload.update({
"domain": domain,
"name": name,
"updated_day": current_day,
"updated_time": current_time,
"err_msg": payload.get("err_msg") or ""
})
# 按站点+天判断是否存在数据
siteuserdatas = SiteUserData.get_by_domain(self._db, domain=domain, workdate=current_day)
if siteuserdatas:
# 存在则更新
if not payload.get("err_msg"):
siteuserdatas[0].update(self._db, payload)
else:
# 不存在则插入
SiteUserData(**payload).create(self._db)
return True, "更新站点用户数据成功"
def get_userdata(self) -> List[SiteUserData]:
"""
获取站点用户数据
"""
return SiteUserData.list(self._db)
def get_userdata_by_domain(self, domain: str, workdate: Optional[str] = None) -> List[SiteUserData]:
"""
获取站点用户数据
"""
return SiteUserData.get_by_domain(self._db, domain=domain, workdate=workdate)
def get_userdata_by_date(self, date: str) -> List[SiteUserData]:
"""
获取站点用户数据
"""
return SiteUserData.get_by_date(self._db, date)
def get_userdata_latest(self) -> List[SiteUserData]:
"""
获取站点最新数据
"""
return SiteUserData.get_latest(self._db)
def get_icon_by_domain(self, domain: str) -> SiteIcon:
"""
按域名获取站点图标
"""
return SiteIcon.get_by_domain(self._db, domain)
def update_icon(self, name: str, domain: str, icon_url: str, icon_base64: str) -> bool:
"""
更新站点图标
"""
icon_base64 = f"data:image/ico;base64,{icon_base64}" if icon_base64 else ""
siteicon = self.get_icon_by_domain(domain)
if not siteicon:
SiteIcon(name=name, domain=domain, url=icon_url, base64=icon_base64).create(self._db)
elif icon_base64:
siteicon.update(self._db, {
"url": icon_url,
"base64": icon_base64
})
return True
def success(self, domain: str, seconds: Optional[int] = None):
"""
站点访问成功
"""
lst_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
sta = SiteStatistic.get_by_domain(self._db, domain)
if sta:
# 使用深复制确保 note 是全新的字典对象
note = dict(sta.note) if sta.note else {}
avg_seconds = None
if seconds is not None:
note[lst_date] = seconds or 1
avg_times = len(note.keys())
if avg_times > 10:
note = dict(sorted(note.items(), key=lambda x: x[0], reverse=True)[:10])
avg_seconds = sum([v for v in note.values()]) // avg_times
sta.update(self._db, {
"success": sta.success + 1,
"seconds": avg_seconds or sta.seconds,
"lst_state": 0,
"lst_mod_date": lst_date,
"note": note
})
else:
note = {}
if seconds is not None:
note = {
lst_date: seconds or 1
}
SiteStatistic(
domain=domain,
success=1,
fail=0,
seconds=seconds or 1,
lst_state=0,
lst_mod_date=lst_date,
note=note
).create(self._db)
def fail(self, domain: str):
"""
站点访问失败
"""
lst_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
sta = SiteStatistic.get_by_domain(self._db, domain)
if sta:
sta.update(self._db, {
"fail": sta.fail + 1,
"lst_state": 1,
"lst_mod_date": lst_date
})
else:
SiteStatistic(
domain=domain,
success=0,
fail=1,
lst_state=1,
lst_mod_date=lst_date
).create(self._db)
async def async_success(self, domain: str, seconds: Optional[int] = None):
"""
异步站点访问成功
"""
lst_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
sta = await SiteStatistic.async_get_by_domain(self._db, domain)
if sta:
# 使用深复制确保 note 是全新的字典对象
note = dict(sta.note) if sta.note else {}
avg_seconds = None
if seconds is not None:
note[lst_date] = seconds or 1
avg_times = len(note.keys())
if avg_times > 10:
note = dict(sorted(note.items(), key=lambda x: x[0], reverse=True)[:10])
avg_seconds = sum([v for v in note.values()]) // avg_times
await sta.async_update(self._db, {
"success": sta.success + 1,
"seconds": avg_seconds or sta.seconds,
"lst_state": 0,
"lst_mod_date": lst_date,
"note": note
})
else:
note = {}
if seconds is not None:
note = {
lst_date: seconds or 1
}
await SiteStatistic(
domain=domain,
success=1,
fail=0,
seconds=seconds or 1,
lst_state=0,
lst_mod_date=lst_date,
note=note
).async_create(self._db)
async def async_fail(self, domain: str):
"""
异步站点访问失败
"""
lst_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
sta = await SiteStatistic.async_get_by_domain(self._db, domain)
if sta:
await sta.async_update(self._db, {
"fail": sta.fail + 1,
"lst_state": 1,
"lst_mod_date": lst_date
})
else:
await SiteStatistic(
domain=domain,
success=0,
fail=1,
lst_state=1,
lst_mod_date=lst_date
).async_create(self._db)