diff --git a/app/api/endpoints/message.py b/app/api/endpoints/message.py index d429c96e..fafc285e 100644 --- a/app/api/endpoints/message.py +++ b/app/api/endpoints/message.py @@ -1,4 +1,5 @@ import json +import time from typing import Union, Any, List, Optional from fastapi import APIRouter, BackgroundTasks, Depends, Request @@ -13,16 +14,54 @@ from app.core.security import verify_token, verify_apitoken from app.db import get_async_db from app.db.models import User from app.db.message_oper import MessageOper +from app.db.systemconfig_oper import SystemConfigOper from app.db.user_oper import get_current_active_superuser from app.helper.service import ServiceConfigHelper from app.helper.webpush import is_webpush_subscription_gone from app.log import logger from app.modules.wechat.WXBizMsgCrypt3 import WXBizMsgCrypt -from app.schemas.types import MessageChannel +from app.schemas.types import MessageChannel, SystemConfigKey router = APIRouter() +def _normalize_notification_clear_timestamp(value: Any) -> int: + """ + 规范化通知清理时间戳。 + """ + try: + normalized_value = int(value or 0) + except (TypeError, ValueError): + return 0 + return normalized_value if normalized_value > 0 else 0 + + +def _get_notification_clear_before() -> schemas.NotificationClearBefore: + """ + 读取通知中心清理时间配置。 + """ + value = SystemConfigOper().get(SystemConfigKey.NotificationClearBefore) + if isinstance(value, dict): + return schemas.NotificationClearBefore( + all=_normalize_notification_clear_timestamp(value.get("all")), + system=_normalize_notification_clear_timestamp(value.get("system")), + media=_normalize_notification_clear_timestamp(value.get("media")), + ) + return schemas.NotificationClearBefore( + all=_normalize_notification_clear_timestamp(value), + ) + + +def _format_notification_clear_time(value: int) -> Optional[str]: + """ + 将清理时间戳转换为消息表使用的时间字符串。 + """ + if not value: + return None + timestamp = value / 1000 if value > 10000000000 else value + return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(timestamp)) + + def start_message_chain(body: Any, form: Any, args: Any): """ 启动链式任务 @@ -140,10 +179,32 @@ async def get_notification_message( """ 获取系统发送的通知消息列表。 """ - messages = await MessageOper(db).async_list_sent_by_page(page=page, count=count) + clear_before = _get_notification_clear_before() + messages = await MessageOper(db).async_list_sent_by_page( + page=page, + count=count, + all_clear_before=_format_notification_clear_time(clear_before.all), + system_clear_before=_format_notification_clear_time(clear_before.system), + media_clear_before=_format_notification_clear_time(clear_before.media), + ) return [schemas.NotificationHistoryItem(**message.to_dict()) for message in messages] +@router.delete("/notification", summary="清理通知消息", response_model=schemas.Response) +async def clear_notification_message( + scope: schemas.NotificationClearScope = schemas.NotificationClearScope.All, + _: schemas.TokenPayload = Depends(verify_token), +): + """ + 记录通知中心清理时间,后续通知历史查询会在服务端过滤。 + """ + clear_before = _get_notification_clear_before() + value = clear_before.model_dump() + value[scope.value] = int(time.time() * 1000) + await SystemConfigOper().async_set(SystemConfigKey.NotificationClearBefore, value) + return schemas.Response(success=True, data={"clear_before": value}) + + def wechat_verify( echostr: str, msg_signature: str, diff --git a/app/db/message_oper.py b/app/db/message_oper.py index 29ddb0cb..d0a8065b 100644 --- a/app/db/message_oper.py +++ b/app/db/message_oper.py @@ -114,9 +114,21 @@ class MessageOper(DbOper): return await Message.async_list_by_page(self._db, page, count) async def async_list_sent_by_page( - self, page: Optional[int] = 1, count: Optional[int] = 30 + self, + page: Optional[int] = 1, + count: Optional[int] = 30, + all_clear_before: Optional[str] = None, + system_clear_before: Optional[str] = None, + media_clear_before: Optional[str] = None, ) -> list[Message]: """ 分页获取系统发送的通知消息。 """ - return await Message.async_list_sent_by_page(self._db, page, count) + return await Message.async_list_sent_by_page( + self._db, + page, + count, + all_clear_before=all_clear_before, + system_clear_before=system_clear_before, + media_clear_before=media_clear_before, + ) diff --git a/app/db/models/message.py b/app/db/models/message.py index f13b7380..1384b545 100644 --- a/app/db/models/message.py +++ b/app/db/models/message.py @@ -1,6 +1,6 @@ from typing import List, Optional -from sqlalchemy import Column, Integer, String, JSON, Index, select +from sqlalchemy import Column, Integer, String, JSON, Index, and_, or_, select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import Session @@ -81,14 +81,38 @@ class Message(Base): @classmethod @async_db_query async def async_list_sent_by_page( - cls, db: AsyncSession, page: Optional[int] = 1, count: Optional[int] = 30 + cls, + db: AsyncSession, + page: Optional[int] = 1, + count: Optional[int] = 30, + all_clear_before: Optional[str] = None, + system_clear_before: Optional[str] = None, + media_clear_before: Optional[str] = None, ) -> List["Message"]: """ 分页获取系统发送的通知消息。 """ + statement = select(cls).where(cls.action == 1) + if all_clear_before: + statement = statement.where(cls.reg_time > all_clear_before) + if system_clear_before: + statement = statement.where( + or_( + and_(cls.image.isnot(None), cls.image != ""), + cls.reg_time > system_clear_before, + ) + ) + if media_clear_before: + statement = statement.where( + or_( + cls.image.is_(None), + cls.image == "", + cls.reg_time > media_clear_before, + ) + ) + result = await db.execute( - select(cls) - .where(cls.action == 1) + statement .order_by(cls.reg_time.desc(), cls.id.desc()) .offset((page - 1) * count) .limit(count) diff --git a/app/schemas/message.py b/app/schemas/message.py index 7fdb6160..e02261c3 100644 --- a/app/schemas/message.py +++ b/app/schemas/message.py @@ -7,6 +7,32 @@ from pydantic import BaseModel, Field, field_validator from app.schemas.types import ContentType, NotificationType, MessageChannel +class NotificationClearScope(str, Enum): + """ + 通知中心清理范围。 + """ + + # 全部消息 + All = "all" + # 系统消息 + System = "system" + # 媒体消息 + Media = "media" + + +class NotificationClearBefore(BaseModel): + """ + 通知中心按范围记录的清理时间。 + """ + + # 全部消息清理时间 + all: int = 0 + # 系统消息清理时间 + system: int = 0 + # 媒体消息清理时间 + media: int = 0 + + class MessageResponse(BaseModel): """ 消息发送响应,包含消息ID等信息用于后续编辑 diff --git a/app/schemas/types.py b/app/schemas/types.py index 0c385fd0..0d2e1912 100644 --- a/app/schemas/types.py +++ b/app/schemas/types.py @@ -267,6 +267,8 @@ class SystemConfigKey(Enum): AIAgentConfig = "AIAgentConfig" # 通知消息格式模板 NotificationTemplates = "NotificationTemplates" + # 通知中心清理时间 + NotificationClearBefore = "NotificationClearBefore" # 刮削开关设置 ScrapingSwitchs = "ScrapingSwitchs" # 插件安装统计 diff --git a/skills/moviepilot-api/SKILL.md b/skills/moviepilot-api/SKILL.md index b51f9675..d2be6f64 100644 --- a/skills/moviepilot-api/SKILL.md +++ b/skills/moviepilot-api/SKILL.md @@ -420,7 +420,7 @@ All endpoints are under the base URL `{MP_HOST}`. Path parameters are shown as ` | POST | `/api/v1/torrent/cache/refresh` | Refresh torrent cache | | POST | `/api/v1/torrent/cache/reidentify/{domain}/{torrent_hash}` | Re-identify torrent. Params: `tmdbid`, `doubanid` | -### Message (6 endpoints) +### Message (8 endpoints) | Method | Path | Description | |--------|------|-------------| @@ -428,6 +428,8 @@ All endpoints are under the base URL `{MP_HOST}`. Path parameters are shown as ` | GET | `/api/v1/message/` | Callback verification. Params: `token`, `echostr`, `msg_signature`, `timestamp`, `nonce`, `source` | | POST | `/api/v1/message/web` | Send web message. Params: `text` (required) | | GET | `/api/v1/message/web` | Get web messages. Params: `page`, `count` | +| GET | `/api/v1/message/notification` | Get notification history. Params: `page`, `count`; server filters cleared history | +| DELETE | `/api/v1/message/notification` | Mark notification history as cleared. Params: `scope` (`all`, `system`, `media`) | | POST | `/api/v1/message/webpush/subscribe` | WebPush subscribe. Body: Subscription JSON | | POST | `/api/v1/message/webpush/send` | Send WebPush notification. Body: SubscriptionMessage JSON | diff --git a/tests/test_message_notifications.py b/tests/test_message_notifications.py index f7c915c8..fd1b9e28 100644 --- a/tests/test_message_notifications.py +++ b/tests/test_message_notifications.py @@ -1,15 +1,18 @@ +import asyncio import json from unittest.mock import Mock -from app.db import SessionFactory -from app.db.message_oper import MessageOper -from app.db.models.message import Message +from app.api.endpoints.message import clear_notification_message, get_notification_message from app.chain import ChainBase from app.core.context import Context, MediaInfo, TorrentInfo from app.core.meta import MetaBase +from app.db import AsyncSessionFactory, SessionFactory +from app.db.message_oper import MessageOper +from app.db.models.message import Message +from app.db.systemconfig_oper import SystemConfigOper from app.helper.message import MessageHelper -from app.schemas import Notification -from app.schemas.types import MediaType, NotificationType +from app.schemas import Notification, NotificationClearScope +from app.schemas.types import MediaType, NotificationType, SystemConfigKey def _clear_messages() -> None: @@ -19,6 +22,7 @@ def _clear_messages() -> None: with SessionFactory() as db: db.query(Message).delete() db.commit() + SystemConfigOper().delete(SystemConfigKey.NotificationClearBefore) def _reset_message_helper(helper: MessageHelper) -> None: @@ -30,6 +34,15 @@ def _reset_message_helper(helper: MessageHelper) -> None: helper._recent_notification_keys.clear() +def _set_message_time(title: str, reg_time: str) -> None: + """ + 调整测试消息时间,避免消息写入时的当前秒影响清理边界断言。 + """ + with SessionFactory() as db: + db.query(Message).filter(Message.title == title).update({"reg_time": reg_time}) + db.commit() + + def test_notification_history_only_lists_sent_messages() -> None: """ 通知历史应返回已发送消息,包含通过消息链登记的智能体消息。 @@ -58,6 +71,48 @@ def test_web_message_history_returns_all_messages() -> None: assert [message.title for message in messages] == ["普通通知", "用户消息", "智能体回复"] +def test_notification_clear_marker_filters_history_across_requests() -> None: + """ + 通知清理时间写入后端后,后续通知历史查询应直接返回过滤后的结果。 + """ + _clear_messages() + oper = MessageOper() + oper.add( + title="旧系统通知", + text="任务失败", + action=1, + mtype=NotificationType.Other, + ) + oper.add( + title="旧媒体通知", + text="影片入库", + image="https://example.com/poster.jpg", + action=1, + ) + _set_message_time("旧系统通知", "2026-01-01 00:00:00") + _set_message_time("旧媒体通知", "2026-01-01 00:00:00") + + asyncio.run(clear_notification_message(scope=NotificationClearScope.Media)) + + oper.add( + title="新媒体通知", + text="影片入库", + image="https://example.com/new.jpg", + action=1, + ) + _set_message_time("新媒体通知", "2999-01-01 00:00:00") + + async def _load_titles() -> list[str]: + """ + 通过异步接口读取通知标题。 + """ + async with AsyncSessionFactory() as db: + messages = await get_notification_message(db=db) + return [message.title for message in messages] + + assert asyncio.run(_load_titles()) == ["新媒体通知", "旧系统通知"] + + def test_system_helper_message_only_enters_sse_queue() -> None: """ 系统实时消息只进入前端 SSE 队列,不写入通知历史。