mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-02-02 18:22:39 +08:00
- Introduced UpdateSiteCookieTool to the toolset for managing site cookies. - Updated __all__ exports in init.py and factory.py to include UpdateSiteCookieTool. - Added async_get method in SiteOper for asynchronous retrieval of individual site records, improving database interaction.
329 lines
10 KiB
Python
329 lines
10 KiB
Python
from datetime import datetime
|
|
from typing import List, Tuple, Optional
|
|
|
|
from app.db import DbOper
|
|
from app.db.models import SiteIcon
|
|
from app.db.models.site import Site
|
|
from app.db.models.sitestatistic import SiteStatistic
|
|
from app.db.models.siteuserdata import SiteUserData
|
|
|
|
|
|
class SiteOper(DbOper):
|
|
"""
|
|
站点管理
|
|
"""
|
|
|
|
def add(self, **kwargs) -> Tuple[bool, str]:
|
|
"""
|
|
新增站点
|
|
"""
|
|
site = Site(**kwargs)
|
|
if not site.get_by_domain(self._db, kwargs.get("domain")):
|
|
site.create(self._db)
|
|
return True, "新增站点成功"
|
|
return False, "站点已存在"
|
|
|
|
def get(self, sid: int) -> Site:
|
|
"""
|
|
查询单个站点
|
|
"""
|
|
return Site.get(self._db, sid)
|
|
|
|
async def async_get(self, sid: int) -> Site:
|
|
"""
|
|
异步查询单个站点
|
|
"""
|
|
return await Site.async_get(self._db, sid)
|
|
|
|
def list(self) -> List[Site]:
|
|
"""
|
|
获取站点列表
|
|
"""
|
|
return Site.list(self._db)
|
|
|
|
async def async_list(self) -> List[Site]:
|
|
"""
|
|
异步获取站点列表
|
|
"""
|
|
return await Site.async_list(self._db)
|
|
|
|
def list_order_by_pri(self) -> List[Site]:
|
|
"""
|
|
获取站点列表
|
|
"""
|
|
return Site.list_order_by_pri(self._db)
|
|
|
|
def list_active(self) -> List[Site]:
|
|
"""
|
|
按状态获取站点列表
|
|
"""
|
|
return Site.get_actives(self._db)
|
|
|
|
async def async_list_active(self) -> List[Site]:
|
|
"""
|
|
异步按状态获取站点列表
|
|
"""
|
|
return await Site.async_get_actives(self._db)
|
|
|
|
def delete(self, sid: int):
|
|
"""
|
|
删除站点
|
|
"""
|
|
Site.delete(self._db, sid)
|
|
|
|
def update(self, sid: int, payload: dict) -> Site:
|
|
"""
|
|
更新站点
|
|
"""
|
|
site = Site.get(self._db, sid)
|
|
site.update(self._db, payload)
|
|
return site
|
|
|
|
def get_by_domain(self, domain: str) -> Site:
|
|
"""
|
|
按域名获取站点
|
|
"""
|
|
return Site.get_by_domain(self._db, domain)
|
|
|
|
async def async_get_by_domain(self, domain: str) -> Site:
|
|
"""
|
|
异步按域名获取站点
|
|
"""
|
|
return await Site.async_get_by_domain(self._db, domain)
|
|
|
|
async def async_get_by_name(self, name: str) -> Site:
|
|
"""
|
|
异步按名称获取站点
|
|
"""
|
|
return await Site.async_get_by_name(self._db, name)
|
|
|
|
def get_domains_by_ids(self, ids: List[int]) -> List[str]:
|
|
"""
|
|
按ID获取站点域名
|
|
"""
|
|
return Site.get_domains_by_ids(self._db, ids)
|
|
|
|
def exists(self, domain: str) -> bool:
|
|
"""
|
|
判断站点是否存在
|
|
"""
|
|
return Site.get_by_domain(self._db, domain) is not None
|
|
|
|
def update_cookie(self, domain: str, cookies: str) -> Tuple[bool, str]:
|
|
"""
|
|
更新站点Cookie
|
|
"""
|
|
site = Site.get_by_domain(self._db, domain)
|
|
if not site:
|
|
return False, "站点不存在"
|
|
site.update(self._db, {
|
|
"cookie": cookies
|
|
})
|
|
return True, "更新站点Cookie成功"
|
|
|
|
def update_rss(self, domain: str, rss: str) -> Tuple[bool, str]:
|
|
"""
|
|
更新站点rss
|
|
"""
|
|
site = Site.get_by_domain(self._db, domain)
|
|
if not site:
|
|
return False, "站点不存在"
|
|
site.update(self._db, {
|
|
"rss": rss
|
|
})
|
|
return True, "更新站点RSS地址成功"
|
|
|
|
def update_userdata(self, domain: str, name: str, payload: dict) -> Tuple[bool, str]:
|
|
"""
|
|
更新站点用户数据
|
|
"""
|
|
# 当前系统日期
|
|
current_day = datetime.now().strftime('%Y-%m-%d')
|
|
current_time = datetime.now().strftime('%H:%M:%S')
|
|
payload.update({
|
|
"domain": domain,
|
|
"name": name,
|
|
"updated_day": current_day,
|
|
"updated_time": current_time,
|
|
"err_msg": payload.get("err_msg") or ""
|
|
})
|
|
# 按站点+天判断是否存在数据
|
|
siteuserdatas = SiteUserData.get_by_domain(self._db, domain=domain, workdate=current_day)
|
|
if siteuserdatas:
|
|
# 存在则更新
|
|
if not payload.get("err_msg"):
|
|
siteuserdatas[0].update(self._db, payload)
|
|
else:
|
|
# 不存在则插入
|
|
SiteUserData(**payload).create(self._db)
|
|
return True, "更新站点用户数据成功"
|
|
|
|
def get_userdata(self) -> List[SiteUserData]:
|
|
"""
|
|
获取站点用户数据
|
|
"""
|
|
return SiteUserData.list(self._db)
|
|
|
|
def get_userdata_by_domain(self, domain: str, workdate: Optional[str] = None) -> List[SiteUserData]:
|
|
"""
|
|
获取站点用户数据
|
|
"""
|
|
return SiteUserData.get_by_domain(self._db, domain=domain, workdate=workdate)
|
|
|
|
def get_userdata_by_date(self, date: str) -> List[SiteUserData]:
|
|
"""
|
|
获取站点用户数据
|
|
"""
|
|
return SiteUserData.get_by_date(self._db, date)
|
|
|
|
def get_userdata_latest(self) -> List[SiteUserData]:
|
|
"""
|
|
获取站点最新数据
|
|
"""
|
|
return SiteUserData.get_latest(self._db)
|
|
|
|
def get_icon_by_domain(self, domain: str) -> SiteIcon:
|
|
"""
|
|
按域名获取站点图标
|
|
"""
|
|
return SiteIcon.get_by_domain(self._db, domain)
|
|
|
|
def update_icon(self, name: str, domain: str, icon_url: str, icon_base64: str) -> bool:
|
|
"""
|
|
更新站点图标
|
|
"""
|
|
icon_base64 = f"data:image/ico;base64,{icon_base64}" if icon_base64 else ""
|
|
siteicon = self.get_icon_by_domain(domain)
|
|
if not siteicon:
|
|
SiteIcon(name=name, domain=domain, url=icon_url, base64=icon_base64).create(self._db)
|
|
elif icon_base64:
|
|
siteicon.update(self._db, {
|
|
"url": icon_url,
|
|
"base64": icon_base64
|
|
})
|
|
return True
|
|
|
|
def success(self, domain: str, seconds: Optional[int] = None):
|
|
"""
|
|
站点访问成功
|
|
"""
|
|
lst_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
sta = SiteStatistic.get_by_domain(self._db, domain)
|
|
if sta:
|
|
# 使用深复制确保 note 是全新的字典对象
|
|
note = dict(sta.note) if sta.note else {}
|
|
avg_seconds = None
|
|
|
|
if seconds is not None:
|
|
note[lst_date] = seconds or 1
|
|
avg_times = len(note.keys())
|
|
if avg_times > 10:
|
|
note = dict(sorted(note.items(), key=lambda x: x[0], reverse=True)[:10])
|
|
avg_seconds = sum([v for v in note.values()]) // avg_times
|
|
|
|
sta.update(self._db, {
|
|
"success": sta.success + 1,
|
|
"seconds": avg_seconds or sta.seconds,
|
|
"lst_state": 0,
|
|
"lst_mod_date": lst_date,
|
|
"note": note
|
|
})
|
|
else:
|
|
note = {}
|
|
if seconds is not None:
|
|
note = {
|
|
lst_date: seconds or 1
|
|
}
|
|
SiteStatistic(
|
|
domain=domain,
|
|
success=1,
|
|
fail=0,
|
|
seconds=seconds or 1,
|
|
lst_state=0,
|
|
lst_mod_date=lst_date,
|
|
note=note
|
|
).create(self._db)
|
|
|
|
def fail(self, domain: str):
|
|
"""
|
|
站点访问失败
|
|
"""
|
|
lst_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
sta = SiteStatistic.get_by_domain(self._db, domain)
|
|
if sta:
|
|
sta.update(self._db, {
|
|
"fail": sta.fail + 1,
|
|
"lst_state": 1,
|
|
"lst_mod_date": lst_date
|
|
})
|
|
else:
|
|
SiteStatistic(
|
|
domain=domain,
|
|
success=0,
|
|
fail=1,
|
|
lst_state=1,
|
|
lst_mod_date=lst_date
|
|
).create(self._db)
|
|
|
|
async def async_success(self, domain: str, seconds: Optional[int] = None):
|
|
"""
|
|
异步站点访问成功
|
|
"""
|
|
lst_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
sta = await SiteStatistic.async_get_by_domain(self._db, domain)
|
|
if sta:
|
|
# 使用深复制确保 note 是全新的字典对象
|
|
note = dict(sta.note) if sta.note else {}
|
|
avg_seconds = None
|
|
|
|
if seconds is not None:
|
|
note[lst_date] = seconds or 1
|
|
avg_times = len(note.keys())
|
|
if avg_times > 10:
|
|
note = dict(sorted(note.items(), key=lambda x: x[0], reverse=True)[:10])
|
|
avg_seconds = sum([v for v in note.values()]) // avg_times
|
|
|
|
await sta.async_update(self._db, {
|
|
"success": sta.success + 1,
|
|
"seconds": avg_seconds or sta.seconds,
|
|
"lst_state": 0,
|
|
"lst_mod_date": lst_date,
|
|
"note": note
|
|
})
|
|
else:
|
|
note = {}
|
|
if seconds is not None:
|
|
note = {
|
|
lst_date: seconds or 1
|
|
}
|
|
await SiteStatistic(
|
|
domain=domain,
|
|
success=1,
|
|
fail=0,
|
|
seconds=seconds or 1,
|
|
lst_state=0,
|
|
lst_mod_date=lst_date,
|
|
note=note
|
|
).async_create(self._db)
|
|
|
|
async def async_fail(self, domain: str):
|
|
"""
|
|
异步站点访问失败
|
|
"""
|
|
lst_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
sta = await SiteStatistic.async_get_by_domain(self._db, domain)
|
|
if sta:
|
|
await sta.async_update(self._db, {
|
|
"fail": sta.fail + 1,
|
|
"lst_state": 1,
|
|
"lst_mod_date": lst_date
|
|
})
|
|
else:
|
|
await SiteStatistic(
|
|
domain=domain,
|
|
success=0,
|
|
fail=1,
|
|
lst_state=1,
|
|
lst_mod_date=lst_date
|
|
).async_create(self._db)
|