mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-02-13 07:25:53 +08:00
- Included UpdateSubscribeTool and QuerySiteUserdataTool in the tool definitions of MoviePilotToolFactory to enhance subscription management and user data querying capabilities. - Updated the factory.py file to reflect the addition of these new tools, improving overall functionality.
137 lines
6.5 KiB
Python
137 lines
6.5 KiB
Python
"""查询站点用户数据工具"""
|
|
|
|
import json
|
|
from typing import Optional, Type
|
|
|
|
from pydantic import BaseModel, Field
|
|
|
|
from app.agent.tools.base import MoviePilotTool
|
|
from app.db import AsyncSessionFactory
|
|
from app.db.models.site import Site
|
|
from app.db.models.siteuserdata import SiteUserData
|
|
from app.log import logger
|
|
|
|
|
|
class QuerySiteUserdataInput(BaseModel):
|
|
"""查询站点用户数据工具的输入参数模型"""
|
|
explanation: str = Field(..., description="Clear explanation of why this tool is being used in the current context")
|
|
site_id: int = Field(..., description="The ID of the site to query user data for")
|
|
workdate: Optional[str] = Field(None, description="Work date to query (optional, format: 'YYYY-MM-DD', if not specified returns latest data)")
|
|
|
|
|
|
class QuerySiteUserdataTool(MoviePilotTool):
|
|
name: str = "query_site_userdata"
|
|
description: str = "Query user data for a specific site including username, user level, upload/download statistics, seeding information, bonus points, and other account details. Supports querying data for a specific date or latest data."
|
|
args_schema: Type[BaseModel] = QuerySiteUserdataInput
|
|
|
|
def get_tool_message(self, **kwargs) -> Optional[str]:
|
|
"""根据查询参数生成友好的提示消息"""
|
|
site_id = kwargs.get("site_id")
|
|
workdate = kwargs.get("workdate")
|
|
|
|
message = f"正在查询站点 #{site_id} 的用户数据"
|
|
if workdate:
|
|
message += f" (日期: {workdate})"
|
|
else:
|
|
message += " (最新数据)"
|
|
|
|
return message
|
|
|
|
async def run(self, site_id: int, workdate: Optional[str] = None, **kwargs) -> str:
|
|
logger.info(f"执行工具: {self.name}, 参数: site_id={site_id}, workdate={workdate}")
|
|
|
|
try:
|
|
# 获取数据库会话
|
|
async with AsyncSessionFactory() as db:
|
|
# 获取站点
|
|
site = await Site.async_get(db, site_id)
|
|
if not site:
|
|
return json.dumps({
|
|
"success": False,
|
|
"message": f"站点不存在: {site_id}"
|
|
}, ensure_ascii=False)
|
|
|
|
# 获取站点用户数据
|
|
user_data_list = await SiteUserData.async_get_by_domain(
|
|
db,
|
|
domain=site.domain,
|
|
workdate=workdate
|
|
)
|
|
|
|
if not user_data_list:
|
|
return json.dumps({
|
|
"success": False,
|
|
"message": f"站点 {site.name} ({site.domain}) 暂无用户数据",
|
|
"site_id": site_id,
|
|
"site_name": site.name,
|
|
"site_domain": site.domain,
|
|
"workdate": workdate
|
|
}, ensure_ascii=False)
|
|
|
|
# 格式化用户数据
|
|
result = {
|
|
"success": True,
|
|
"site_id": site_id,
|
|
"site_name": site.name,
|
|
"site_domain": site.domain,
|
|
"workdate": workdate,
|
|
"data_count": len(user_data_list),
|
|
"user_data": []
|
|
}
|
|
|
|
for user_data in user_data_list:
|
|
# 格式化上传/下载量(转换为可读格式)
|
|
upload_gb = user_data.upload / (1024 ** 3) if user_data.upload else 0
|
|
download_gb = user_data.download / (1024 ** 3) if user_data.download else 0
|
|
seeding_size_gb = user_data.seeding_size / (1024 ** 3) if user_data.seeding_size else 0
|
|
leeching_size_gb = user_data.leeching_size / (1024 ** 3) if user_data.leeching_size else 0
|
|
|
|
user_data_dict = {
|
|
"domain": user_data.domain,
|
|
"name": user_data.name,
|
|
"username": user_data.username,
|
|
"userid": user_data.userid,
|
|
"user_level": user_data.user_level,
|
|
"join_at": user_data.join_at,
|
|
"bonus": user_data.bonus,
|
|
"upload": user_data.upload,
|
|
"upload_gb": round(upload_gb, 2),
|
|
"download": user_data.download,
|
|
"download_gb": round(download_gb, 2),
|
|
"ratio": round(user_data.ratio, 2) if user_data.ratio else 0,
|
|
"seeding": int(user_data.seeding) if user_data.seeding else 0,
|
|
"leeching": int(user_data.leeching) if user_data.leeching else 0,
|
|
"seeding_size": user_data.seeding_size,
|
|
"seeding_size_gb": round(seeding_size_gb, 2),
|
|
"leeching_size": user_data.leeching_size,
|
|
"leeching_size_gb": round(leeching_size_gb, 2),
|
|
"seeding_info": user_data.seeding_info if user_data.seeding_info else [],
|
|
"message_unread": user_data.message_unread,
|
|
"message_unread_contents": user_data.message_unread_contents if user_data.message_unread_contents else [],
|
|
"err_msg": user_data.err_msg,
|
|
"updated_day": user_data.updated_day,
|
|
"updated_time": user_data.updated_time
|
|
}
|
|
result["user_data"].append(user_data_dict)
|
|
|
|
# 如果有多条数据,只返回最新的(按更新时间排序)
|
|
if len(result["user_data"]) > 1:
|
|
result["user_data"].sort(
|
|
key=lambda x: (x.get("updated_day", ""), x.get("updated_time", "")),
|
|
reverse=True
|
|
)
|
|
result["message"] = f"找到 {len(result['user_data'])} 条数据,显示最新的一条"
|
|
result["user_data"] = [result["user_data"][0]]
|
|
|
|
return json.dumps(result, ensure_ascii=False, indent=2)
|
|
|
|
except Exception as e:
|
|
error_message = f"查询站点用户数据失败: {str(e)}"
|
|
logger.error(f"查询站点用户数据失败: {e}", exc_info=True)
|
|
return json.dumps({
|
|
"success": False,
|
|
"message": error_message,
|
|
"site_id": site_id
|
|
}, ensure_ascii=False)
|
|
|