diff --git a/backend/src/module/api/__init__.py b/backend/src/module/api/__init__.py index 23b90263..8bd10e75 100644 --- a/backend/src/module/api/__init__.py +++ b/backend/src/module/api/__init__.py @@ -10,6 +10,7 @@ from .program import router as program_router from .rss import router as rss_router from .search import router as search_router from .setup import router as setup_router +from .notification import router as notification_router __all__ = "v1" @@ -25,3 +26,4 @@ v1.include_router(downloader_router) v1.include_router(rss_router) v1.include_router(search_router) v1.include_router(setup_router) +v1.include_router(notification_router) diff --git a/backend/src/module/api/notification.py b/backend/src/module/api/notification.py new file mode 100644 index 00000000..9c1acca6 --- /dev/null +++ b/backend/src/module/api/notification.py @@ -0,0 +1,117 @@ +"""Notification API endpoints.""" + +import logging +from typing import Optional + +from fastapi import APIRouter +from pydantic import BaseModel, Field + +from module.notification import NotificationManager +from module.models.config import NotificationProvider as ProviderConfig + +logger = logging.getLogger(__name__) +router = APIRouter(prefix="/notification", tags=["notification"]) + + +class TestProviderRequest(BaseModel): + """Request body for testing a saved provider by index.""" + + provider_index: int = Field(..., description="Index of the provider to test") + + +class TestProviderConfigRequest(BaseModel): + """Request body for testing an unsaved provider configuration.""" + + type: str = Field(..., description="Provider type") + enabled: bool = Field(True, description="Whether provider is enabled") + token: Optional[str] = Field(None, description="Auth token") + chat_id: Optional[str] = Field(None, description="Chat/channel ID") + webhook_url: Optional[str] = Field(None, description="Webhook URL") + server_url: Optional[str] = Field(None, description="Server URL") + device_key: Optional[str] = Field(None, description="Device key") + user_key: Optional[str] = Field(None, description="User key") + api_token: Optional[str] = Field(None, description="API token") + template: Optional[str] = Field(None, description="Custom template") + url: Optional[str] = Field(None, description="URL for generic webhook") + + +class TestResponse(BaseModel): + """Response for test notification endpoints.""" + + success: bool + message: str + message_zh: str = "" + message_en: str = "" + + +@router.post("/test", response_model=TestResponse) +async def test_provider(request: TestProviderRequest): + """Test a configured notification provider by its index. + + Sends a test notification using the provider at the specified index + in the current configuration. + """ + try: + manager = NotificationManager() + if request.provider_index >= len(manager): + return TestResponse( + success=False, + message=f"Invalid provider index: {request.provider_index}", + message_zh=f"无效的提供者索引: {request.provider_index}", + message_en=f"Invalid provider index: {request.provider_index}", + ) + + success, message = await manager.test_provider(request.provider_index) + return TestResponse( + success=success, + message=message, + message_zh="测试成功" if success else f"测试失败: {message}", + message_en="Test successful" if success else f"Test failed: {message}", + ) + except Exception as e: + logger.error(f"Failed to test provider: {e}") + return TestResponse( + success=False, + message=str(e), + message_zh=f"测试失败: {e}", + message_en=f"Test failed: {e}", + ) + + +@router.post("/test-config", response_model=TestResponse) +async def test_provider_config(request: TestProviderConfigRequest): + """Test an unsaved notification provider configuration. + + Useful for testing a provider before saving it to the configuration. + """ + try: + # Convert request to ProviderConfig + config = ProviderConfig( + type=request.type, + enabled=request.enabled, + token=request.token or "", + chat_id=request.chat_id or "", + webhook_url=request.webhook_url or "", + server_url=request.server_url or "", + device_key=request.device_key or "", + user_key=request.user_key or "", + api_token=request.api_token or "", + template=request.template, + url=request.url or "", + ) + + success, message = await NotificationManager.test_provider_config(config) + return TestResponse( + success=success, + message=message, + message_zh="测试成功" if success else f"测试失败: {message}", + message_en="Test successful" if success else f"Test failed: {message}", + ) + except Exception as e: + logger.error(f"Failed to test provider config: {e}") + return TestResponse( + success=False, + message=str(e), + message_zh=f"测试失败: {e}", + message_en=f"Test failed: {e}", + ) diff --git a/backend/src/module/api/setup.py b/backend/src/module/api/setup.py index 55c7b6e0..4161515f 100644 --- a/backend/src/module/api/setup.py +++ b/backend/src/module/api/setup.py @@ -8,7 +8,8 @@ from pydantic import BaseModel, Field from module.conf import VERSION, settings from module.models import Config, ResponseModel from module.network import RequestContent -from module.notification.notification import getClient +from module.notification import PROVIDER_REGISTRY +from module.models.config import NotificationProvider as ProviderConfig from module.security.jwt import get_password_hash logger = logging.getLogger(__name__) @@ -202,8 +203,8 @@ async def test_notification(req: TestNotificationRequest): """Send a test notification.""" _require_setup_needed() - NotifierClass = getClient(req.type) - if NotifierClass is None: + provider_cls = PROVIDER_REGISTRY.get(req.type.lower()) + if provider_cls is None: return TestResultResponse( success=False, message_en=f"Unknown notification type: {req.type}", @@ -211,30 +212,27 @@ async def test_notification(req: TestNotificationRequest): ) try: - notifier = NotifierClass(token=req.token, chat_id=req.chat_id) - async with notifier: - # Send a simple test message - data = {"chat_id": req.chat_id, "text": "AutoBangumi 通知测试成功!"} - if req.type.lower() == "telegram": - resp = await notifier.post_data(notifier.message_url, data) - if resp.status_code == 200: - return TestResultResponse( - success=True, - message_en="Test notification sent successfully.", - message_zh="测试通知发送成功。", - ) - else: - return TestResultResponse( - success=False, - message_en="Failed to send test notification.", - message_zh="测试通知发送失败。", - ) - else: - # For other providers, just verify the notifier can be created + # Create provider config + config = ProviderConfig( + type=req.type, + enabled=True, + token=req.token, + chat_id=req.chat_id, + ) + provider = provider_cls(config) + async with provider: + success, message = await provider.test() + if success: return TestResultResponse( success=True, - message_en="Notification configuration is valid.", - message_zh="通知配置有效。", + message_en="Test notification sent successfully.", + message_zh="测试通知发送成功。", + ) + else: + return TestResultResponse( + success=False, + message_en=f"Failed to send test notification: {message}", + message_zh=f"测试通知发送失败:{message}", ) except Exception as e: logger.error(f"[Setup] Notification test failed: {e}") @@ -275,9 +273,14 @@ async def complete_setup(req: SetupCompleteRequest): if req.notification_enable: config_dict["notification"] = { "enable": True, - "type": req.notification_type, - "token": req.notification_token, - "chat_id": req.notification_chat_id, + "providers": [ + { + "type": req.notification_type, + "enabled": True, + "token": req.notification_token, + "chat_id": req.notification_chat_id, + } + ], } settings.save(config_dict) diff --git a/backend/src/module/conf/const.py b/backend/src/module/conf/const.py index 606dfdfb..c4383f71 100644 --- a/backend/src/module/conf/const.py +++ b/backend/src/module/conf/const.py @@ -36,7 +36,7 @@ DEFAULT_SETTINGS = { "username": "", "password": "", }, - "notification": {"enable": False, "type": "telegram", "token": "", "chat_id": ""}, + "notification": {"enable": False, "providers": []}, "experimental_openai": { "enable": False, "api_key": "", diff --git a/backend/src/module/core/sub_thread.py b/backend/src/module/core/sub_thread.py index d59bb9f7..bc49d4c8 100644 --- a/backend/src/module/core/sub_thread.py +++ b/backend/src/module/core/sub_thread.py @@ -4,7 +4,7 @@ import logging from module.conf import settings from module.downloader import DownloadClient from module.manager import Renamer, TorrentManager, eps_complete -from module.notification import PostNotification +from module.notification import NotificationManager from module.rss import RSSAnalyser, RSSEngine from .offset_scanner import OffsetScanner @@ -66,10 +66,9 @@ class RenameThread(ProgramStatus): async with Renamer() as renamer: renamed_info = await renamer.rename() if settings.notification.enable and renamed_info: - async with PostNotification() as notifier: - await asyncio.gather( - *[notifier.send_msg(info) for info in renamed_info] - ) + manager = NotificationManager() + for info in renamed_info: + await manager.send_all(info) try: await asyncio.wait_for( self.stop_event.wait(), diff --git a/backend/src/module/models/config.py b/backend/src/module/models/config.py index 7a00426b..63de5924 100644 --- a/backend/src/module/models/config.py +++ b/backend/src/module/models/config.py @@ -1,7 +1,7 @@ from os.path import expandvars -from typing import Literal +from typing import Literal, Optional -from pydantic import BaseModel, Field, field_validator +from pydantic import BaseModel, Field, field_validator, model_validator class Program(BaseModel): @@ -68,19 +68,106 @@ class Proxy(BaseModel): return expandvars(self.password_) +class NotificationProvider(BaseModel): + """Configuration for a single notification provider.""" + + type: str = Field(..., description="Provider type (telegram, discord, bark, etc.)") + enabled: bool = Field(True, description="Whether this provider is enabled") + + # Common fields (with env var expansion) + token_: Optional[str] = Field(None, alias="token", description="Auth token") + chat_id_: Optional[str] = Field(None, alias="chat_id", description="Chat/channel ID") + + # Provider-specific fields + webhook_url_: Optional[str] = Field( + None, alias="webhook_url", description="Webhook URL for discord/wecom" + ) + server_url_: Optional[str] = Field( + None, alias="server_url", description="Server URL for gotify/bark" + ) + device_key_: Optional[str] = Field( + None, alias="device_key", description="Device key for bark" + ) + user_key_: Optional[str] = Field( + None, alias="user_key", description="User key for pushover" + ) + api_token_: Optional[str] = Field( + None, alias="api_token", description="API token for pushover" + ) + template: Optional[str] = Field( + None, description="Custom template for webhook provider" + ) + url_: Optional[str] = Field( + None, alias="url", description="URL for generic webhook provider" + ) + + @property + def token(self) -> str: + return expandvars(self.token_) if self.token_ else "" + + @property + def chat_id(self) -> str: + return expandvars(self.chat_id_) if self.chat_id_ else "" + + @property + def webhook_url(self) -> str: + return expandvars(self.webhook_url_) if self.webhook_url_ else "" + + @property + def server_url(self) -> str: + return expandvars(self.server_url_) if self.server_url_ else "" + + @property + def device_key(self) -> str: + return expandvars(self.device_key_) if self.device_key_ else "" + + @property + def user_key(self) -> str: + return expandvars(self.user_key_) if self.user_key_ else "" + + @property + def api_token(self) -> str: + return expandvars(self.api_token_) if self.api_token_ else "" + + @property + def url(self) -> str: + return expandvars(self.url_) if self.url_ else "" + + class Notification(BaseModel): - enable: bool = Field(False, description="Enable notification") - type: str = Field("telegram", description="Notification type") - token_: str = Field("", alias="token", description="Notification token") - chat_id_: str = Field("", alias="chat_id", description="Notification chat id") + """Notification configuration supporting multiple providers.""" + + enable: bool = Field(False, description="Enable notification system") + providers: list[NotificationProvider] = Field( + default_factory=list, description="List of notification providers" + ) + + # Legacy fields for backward compatibility (deprecated) + type: Optional[str] = Field(None, description="[Deprecated] Use providers instead") + token_: Optional[str] = Field(None, alias="token", description="[Deprecated]") + chat_id_: Optional[str] = Field(None, alias="chat_id", description="[Deprecated]") @property - def token(self): - return expandvars(self.token_) + def token(self) -> str: + return expandvars(self.token_) if self.token_ else "" @property - def chat_id(self): - return expandvars(self.chat_id_) + def chat_id(self) -> str: + return expandvars(self.chat_id_) if self.chat_id_ else "" + + @model_validator(mode="after") + def migrate_legacy_config(self) -> "Notification": + """Auto-migrate old single-provider config to new format.""" + if self.type and not self.providers: + # Old format detected, migrate to new format + legacy_provider = NotificationProvider( + type=self.type, + enabled=True, + token=self.token_ or "", + chat_id=self.chat_id_ or "", + ) + self.providers = [legacy_provider] + return self class ExperimentalOpenAI(BaseModel): diff --git a/backend/src/module/notification/__init__.py b/backend/src/module/notification/__init__.py index e6a713ae..b8d1b55c 100644 --- a/backend/src/module/notification/__init__.py +++ b/backend/src/module/notification/__init__.py @@ -1 +1,11 @@ from .notification import PostNotification +from .manager import NotificationManager +from .base import NotificationProvider +from .providers import PROVIDER_REGISTRY + +__all__ = [ + "PostNotification", + "NotificationManager", + "NotificationProvider", + "PROVIDER_REGISTRY", +] diff --git a/backend/src/module/notification/base.py b/backend/src/module/notification/base.py new file mode 100644 index 00000000..c2ebf8bf --- /dev/null +++ b/backend/src/module/notification/base.py @@ -0,0 +1,51 @@ +"""Base class for notification providers.""" + +from abc import ABC, abstractmethod + +from module.models.bangumi import Notification +from module.network import RequestContent + + +class NotificationProvider(RequestContent, ABC): + """Abstract base class for notification providers. + + All notification providers must inherit from this class and implement + the send() and test() methods. + """ + + @abstractmethod + async def send(self, notification: Notification) -> bool: + """Send a notification. + + Args: + notification: The notification data containing anime info. + + Returns: + True if the notification was sent successfully, False otherwise. + """ + pass + + @abstractmethod + async def test(self) -> tuple[bool, str]: + """Test the notification provider configuration. + + Returns: + A tuple of (success, message) where success is True if the test + passed and message contains details about the result. + """ + pass + + def _format_message(self, notify: Notification) -> str: + """Format the default notification message. + + Args: + notify: The notification data. + + Returns: + Formatted message string. + """ + return ( + f"番剧名称:{notify.official_title}\n" + f"季度: 第{notify.season}季\n" + f"更新集数: 第{notify.episode}集" + ) diff --git a/backend/src/module/notification/manager.py b/backend/src/module/notification/manager.py new file mode 100644 index 00000000..2cc672b5 --- /dev/null +++ b/backend/src/module/notification/manager.py @@ -0,0 +1,132 @@ +"""Notification manager for handling multiple providers.""" + +import asyncio +import logging +from typing import TYPE_CHECKING + +from module.conf import settings +from module.database import Database +from module.models.bangumi import Notification + +if TYPE_CHECKING: + from module.notification.base import NotificationProvider + from module.models.config import NotificationProvider as ProviderConfig + +logger = logging.getLogger(__name__) + + +class NotificationManager: + """Manager for handling notifications across multiple providers.""" + + def __init__(self): + self.providers: list["NotificationProvider"] = [] + self._load_providers() + + def _load_providers(self): + """Initialize providers from configuration.""" + from module.notification.providers import PROVIDER_REGISTRY + + for cfg in settings.notification.providers: + if not cfg.enabled: + continue + + provider_cls = PROVIDER_REGISTRY.get(cfg.type.lower()) + if provider_cls: + try: + provider = provider_cls(cfg) + self.providers.append(provider) + logger.debug(f"Loaded notification provider: {cfg.type}") + except Exception as e: + logger.warning(f"Failed to load provider {cfg.type}: {e}") + else: + logger.warning(f"Unknown notification provider type: {cfg.type}") + + async def _get_poster(self, notification: Notification): + """Fetch poster path from database if not already set.""" + if notification.poster_path: + return + + def _get_poster_sync(): + with Database() as db: + data = db.bangumi.search_official_title(notification.official_title) + if data: + notification.poster_path = data.poster_link + + await asyncio.to_thread(_get_poster_sync) + + async def send_all(self, notification: Notification): + """Send notification to all enabled providers. + + Args: + notification: The notification data to send. + """ + if not self.providers: + logger.debug("No notification providers configured") + return + + # Fetch poster if needed + await self._get_poster(notification) + + # Send to all providers in parallel + async def send_to_provider(provider: "NotificationProvider"): + try: + async with provider: + await provider.send(notification) + logger.debug( + f"Sent notification via {provider.__class__.__name__}: " + f"{notification.official_title}" + ) + except Exception as e: + logger.warning( + f"Failed to send notification via {provider.__class__.__name__}: {e}" + ) + + await asyncio.gather( + *[send_to_provider(p) for p in self.providers], + return_exceptions=True, + ) + + async def test_provider(self, index: int) -> tuple[bool, str]: + """Test a specific provider by index. + + Args: + index: The index of the provider in the providers list. + + Returns: + A tuple of (success, message). + """ + if index < 0 or index >= len(self.providers): + return False, f"Invalid provider index: {index}" + + provider = self.providers[index] + try: + async with provider: + return await provider.test() + except Exception as e: + return False, f"Test failed: {e}" + + @staticmethod + async def test_provider_config(config: "ProviderConfig") -> tuple[bool, str]: + """Test a provider configuration without saving it. + + Args: + config: The provider configuration to test. + + Returns: + A tuple of (success, message). + """ + from module.notification.providers import PROVIDER_REGISTRY + + provider_cls = PROVIDER_REGISTRY.get(config.type.lower()) + if not provider_cls: + return False, f"Unknown provider type: {config.type}" + + try: + provider = provider_cls(config) + async with provider: + return await provider.test() + except Exception as e: + return False, f"Test failed: {e}" + + def __len__(self) -> int: + return len(self.providers) diff --git a/backend/src/module/notification/providers/__init__.py b/backend/src/module/notification/providers/__init__.py new file mode 100644 index 00000000..6dd3b6e8 --- /dev/null +++ b/backend/src/module/notification/providers/__init__.py @@ -0,0 +1,40 @@ +"""Notification providers registry.""" + +from typing import TYPE_CHECKING + +from module.notification.providers.telegram import TelegramProvider +from module.notification.providers.discord import DiscordProvider +from module.notification.providers.bark import BarkProvider +from module.notification.providers.server_chan import ServerChanProvider +from module.notification.providers.wecom import WecomProvider +from module.notification.providers.gotify import GotifyProvider +from module.notification.providers.pushover import PushoverProvider +from module.notification.providers.webhook import WebhookProvider + +if TYPE_CHECKING: + from module.notification.base import NotificationProvider + +# Registry mapping provider type names to their classes +PROVIDER_REGISTRY: dict[str, type["NotificationProvider"]] = { + "telegram": TelegramProvider, + "discord": DiscordProvider, + "bark": BarkProvider, + "server-chan": ServerChanProvider, + "serverchan": ServerChanProvider, # Alternative name + "wecom": WecomProvider, + "gotify": GotifyProvider, + "pushover": PushoverProvider, + "webhook": WebhookProvider, +} + +__all__ = [ + "PROVIDER_REGISTRY", + "TelegramProvider", + "DiscordProvider", + "BarkProvider", + "ServerChanProvider", + "WecomProvider", + "GotifyProvider", + "PushoverProvider", + "WebhookProvider", +] diff --git a/backend/src/module/notification/providers/bark.py b/backend/src/module/notification/providers/bark.py new file mode 100644 index 00000000..7d0f5b58 --- /dev/null +++ b/backend/src/module/notification/providers/bark.py @@ -0,0 +1,55 @@ +"""Bark notification provider.""" + +import logging +from typing import TYPE_CHECKING + +from module.models.bangumi import Notification +from module.notification.base import NotificationProvider + +if TYPE_CHECKING: + from module.models.config import NotificationProvider as ProviderConfig + +logger = logging.getLogger(__name__) + + +class BarkProvider(NotificationProvider): + """Bark (iOS) notification provider.""" + + DEFAULT_SERVER = "https://api.day.app" + + def __init__(self, config: "ProviderConfig"): + super().__init__() + # Support both legacy token field and new device_key field + self.device_key = config.device_key or config.token + server_url = config.server_url or self.DEFAULT_SERVER + self.notification_url = f"{server_url.rstrip('/')}/push" + + async def send(self, notification: Notification) -> bool: + """Send notification via Bark.""" + text = self._format_message(notification) + data = { + "title": notification.official_title, + "body": text, + "icon": notification.poster_path or "", + "device_key": self.device_key, + } + + resp = await self.post_data(self.notification_url, data) + logger.debug(f"Bark notification: {resp.status_code}") + return resp.status_code == 200 + + async def test(self) -> tuple[bool, str]: + """Test Bark configuration by sending a test notification.""" + data = { + "title": "AutoBangumi", + "body": "通知测试成功!\nNotification test successful!", + "device_key": self.device_key, + } + try: + resp = await self.post_data(self.notification_url, data) + if resp.status_code == 200: + return True, "Bark test notification sent successfully" + else: + return False, f"Bark API returned status {resp.status_code}" + except Exception as e: + return False, f"Bark test failed: {e}" diff --git a/backend/src/module/notification/providers/discord.py b/backend/src/module/notification/providers/discord.py new file mode 100644 index 00000000..2119e14d --- /dev/null +++ b/backend/src/module/notification/providers/discord.py @@ -0,0 +1,61 @@ +"""Discord notification provider.""" + +import logging +from typing import TYPE_CHECKING + +from module.models.bangumi import Notification +from module.notification.base import NotificationProvider + +if TYPE_CHECKING: + from module.models.config import NotificationProvider as ProviderConfig + +logger = logging.getLogger(__name__) + + +class DiscordProvider(NotificationProvider): + """Discord webhook notification provider.""" + + def __init__(self, config: "ProviderConfig"): + super().__init__() + self.webhook_url = config.webhook_url + + async def send(self, notification: Notification) -> bool: + """Send notification via Discord webhook.""" + embed = { + "title": f"📺 {notification.official_title}", + "description": ( + f"**季度:** 第{notification.season}季\n" + f"**集数:** 第{notification.episode}集" + ), + "color": 0x00BFFF, # Deep Sky Blue + } + + # Add poster as thumbnail if available + if notification.poster_path and notification.poster_path != "https://mikanani.me": + embed["thumbnail"] = {"url": notification.poster_path} + + data = { + "embeds": [embed], + } + + resp = await self.post_data(self.webhook_url, data) + logger.debug(f"Discord notification: {resp.status_code}") + return resp.status_code in (200, 204) + + async def test(self) -> tuple[bool, str]: + """Test Discord webhook by sending a test message.""" + embed = { + "title": "AutoBangumi 通知测试", + "description": "通知测试成功!\nNotification test successful!", + "color": 0x00FF00, # Green + } + data = {"embeds": [embed]} + + try: + resp = await self.post_data(self.webhook_url, data) + if resp.status_code in (200, 204): + return True, "Discord test message sent successfully" + else: + return False, f"Discord API returned status {resp.status_code}" + except Exception as e: + return False, f"Discord test failed: {e}" diff --git a/backend/src/module/notification/providers/gotify.py b/backend/src/module/notification/providers/gotify.py new file mode 100644 index 00000000..3656ce9f --- /dev/null +++ b/backend/src/module/notification/providers/gotify.py @@ -0,0 +1,63 @@ +"""Gotify notification provider.""" + +import logging +from typing import TYPE_CHECKING + +from module.models.bangumi import Notification +from module.notification.base import NotificationProvider + +if TYPE_CHECKING: + from module.models.config import NotificationProvider as ProviderConfig + +logger = logging.getLogger(__name__) + + +class GotifyProvider(NotificationProvider): + """Gotify notification provider.""" + + def __init__(self, config: "ProviderConfig"): + super().__init__() + server_url = config.server_url.rstrip("/") + self.token = config.token + self.notification_url = f"{server_url}/message?token={self.token}" + + async def send(self, notification: Notification) -> bool: + """Send notification via Gotify.""" + message = self._format_message(notification) + + # Build extras for markdown support and image + extras = { + "client::display": {"contentType": "text/markdown"}, + } + + if notification.poster_path and notification.poster_path != "https://mikanani.me": + extras["client::notification"] = { + "bigImageUrl": notification.poster_path, + } + + data = { + "title": notification.official_title, + "message": message, + "priority": 5, + "extras": extras, + } + + resp = await self.post_data(self.notification_url, data) + logger.debug(f"Gotify notification: {resp.status_code}") + return resp.status_code == 200 + + async def test(self) -> tuple[bool, str]: + """Test Gotify configuration by sending a test message.""" + data = { + "title": "AutoBangumi 通知测试", + "message": "通知测试成功!\nNotification test successful!", + "priority": 5, + } + try: + resp = await self.post_data(self.notification_url, data) + if resp.status_code == 200: + return True, "Gotify test message sent successfully" + else: + return False, f"Gotify API returned status {resp.status_code}" + except Exception as e: + return False, f"Gotify test failed: {e}" diff --git a/backend/src/module/notification/providers/pushover.py b/backend/src/module/notification/providers/pushover.py new file mode 100644 index 00000000..a233a167 --- /dev/null +++ b/backend/src/module/notification/providers/pushover.py @@ -0,0 +1,69 @@ +"""Pushover notification provider.""" + +import logging +from typing import TYPE_CHECKING + +from module.models.bangumi import Notification +from module.notification.base import NotificationProvider + +if TYPE_CHECKING: + from module.models.config import NotificationProvider as ProviderConfig + +logger = logging.getLogger(__name__) + + +class PushoverProvider(NotificationProvider): + """Pushover notification provider.""" + + API_URL = "https://api.pushover.net/1/messages.json" + + def __init__(self, config: "ProviderConfig"): + super().__init__() + self.user_key = config.user_key + self.api_token = config.api_token + + async def send(self, notification: Notification) -> bool: + """Send notification via Pushover.""" + message = self._format_message(notification) + + data = { + "token": self.api_token, + "user": self.user_key, + "title": notification.official_title, + "message": message, + "html": 0, + } + + # Add poster as supplementary URL if available + if notification.poster_path and notification.poster_path != "https://mikanani.me": + data["url"] = notification.poster_path + data["url_title"] = "查看海报" + + resp = await self.post_data(self.API_URL, data) + logger.debug(f"Pushover notification: {resp.status_code}") + return resp.status_code == 200 + + async def test(self) -> tuple[bool, str]: + """Test Pushover configuration by sending a test message.""" + data = { + "token": self.api_token, + "user": self.user_key, + "title": "AutoBangumi 通知测试", + "message": "通知测试成功!\nNotification test successful!", + } + try: + resp = await self.post_data(self.API_URL, data) + if resp.status_code == 200: + return True, "Pushover test message sent successfully" + else: + # Try to parse error message from response + try: + error_data = resp.json() + errors = error_data.get("errors", []) + if errors: + return False, f"Pushover error: {', '.join(errors)}" + except Exception: + pass + return False, f"Pushover API returned status {resp.status_code}" + except Exception as e: + return False, f"Pushover test failed: {e}" diff --git a/backend/src/module/notification/providers/server_chan.py b/backend/src/module/notification/providers/server_chan.py new file mode 100644 index 00000000..362fcec5 --- /dev/null +++ b/backend/src/module/notification/providers/server_chan.py @@ -0,0 +1,48 @@ +"""Server Chan notification provider.""" + +import logging +from typing import TYPE_CHECKING + +from module.models.bangumi import Notification +from module.notification.base import NotificationProvider + +if TYPE_CHECKING: + from module.models.config import NotificationProvider as ProviderConfig + +logger = logging.getLogger(__name__) + + +class ServerChanProvider(NotificationProvider): + """Server Chan (Server酱) notification provider for WeChat.""" + + def __init__(self, config: "ProviderConfig"): + super().__init__() + token = config.token + self.notification_url = f"https://sctapi.ftqq.com/{token}.send" + + async def send(self, notification: Notification) -> bool: + """Send notification via Server Chan.""" + text = self._format_message(notification) + data = { + "title": notification.official_title, + "desp": text, + } + + resp = await self.post_data(self.notification_url, data) + logger.debug(f"ServerChan notification: {resp.status_code}") + return resp.status_code == 200 + + async def test(self) -> tuple[bool, str]: + """Test Server Chan configuration by sending a test message.""" + data = { + "title": "AutoBangumi 通知测试", + "desp": "通知测试成功!\nNotification test successful!", + } + try: + resp = await self.post_data(self.notification_url, data) + if resp.status_code == 200: + return True, "Server Chan test message sent successfully" + else: + return False, f"Server Chan API returned status {resp.status_code}" + except Exception as e: + return False, f"Server Chan test failed: {e}" diff --git a/backend/src/module/notification/providers/telegram.py b/backend/src/module/notification/providers/telegram.py new file mode 100644 index 00000000..ebd6f2fa --- /dev/null +++ b/backend/src/module/notification/providers/telegram.py @@ -0,0 +1,58 @@ +"""Telegram notification provider.""" + +import logging +from typing import TYPE_CHECKING + +from module.models.bangumi import Notification +from module.notification.base import NotificationProvider +from module.utils import load_image + +if TYPE_CHECKING: + from module.models.config import NotificationProvider as ProviderConfig + +logger = logging.getLogger(__name__) + + +class TelegramProvider(NotificationProvider): + """Telegram Bot notification provider.""" + + def __init__(self, config: "ProviderConfig"): + super().__init__() + token = config.token + self.chat_id = config.chat_id + self.photo_url = f"https://api.telegram.org/bot{token}/sendPhoto" + self.message_url = f"https://api.telegram.org/bot{token}/sendMessage" + + async def send(self, notification: Notification) -> bool: + """Send notification via Telegram.""" + text = self._format_message(notification) + data = { + "chat_id": self.chat_id, + "caption": text, + "text": text, + "disable_notification": True, + } + + photo = load_image(notification.poster_path) + if photo: + resp = await self.post_files(self.photo_url, data, files={"photo": photo}) + else: + resp = await self.post_data(self.message_url, data) + + logger.debug(f"Telegram notification: {resp.status_code}") + return resp.status_code == 200 + + async def test(self) -> tuple[bool, str]: + """Test Telegram configuration by sending a test message.""" + data = { + "chat_id": self.chat_id, + "text": "AutoBangumi 通知测试成功!\nNotification test successful!", + } + try: + resp = await self.post_data(self.message_url, data) + if resp.status_code == 200: + return True, "Telegram test message sent successfully" + else: + return False, f"Telegram API returned status {resp.status_code}" + except Exception as e: + return False, f"Telegram test failed: {e}" diff --git a/backend/src/module/notification/providers/webhook.py b/backend/src/module/notification/providers/webhook.py new file mode 100644 index 00000000..980a9182 --- /dev/null +++ b/backend/src/module/notification/providers/webhook.py @@ -0,0 +1,98 @@ +"""Generic webhook notification provider.""" + +import json +import logging +import re +from typing import TYPE_CHECKING + +from module.models.bangumi import Notification +from module.notification.base import NotificationProvider + +if TYPE_CHECKING: + from module.models.config import NotificationProvider as ProviderConfig + +logger = logging.getLogger(__name__) + +# Default template for webhook payload +DEFAULT_TEMPLATE = json.dumps( + { + "title": "{{title}}", + "season": "{{season}}", + "episode": "{{episode}}", + "poster_url": "{{poster_url}}", + }, + ensure_ascii=False, +) + + +class WebhookProvider(NotificationProvider): + """Generic webhook notification provider with customizable templates.""" + + def __init__(self, config: "ProviderConfig"): + super().__init__() + self.url = config.url + self.template = config.template or DEFAULT_TEMPLATE + + def _render_template(self, notification: Notification) -> dict: + """Render the template with notification data. + + Supported variables: + - {{title}} - Anime title + - {{season}} - Season number + - {{episode}} - Episode number + - {{poster_url}} - Poster image URL + """ + rendered = self.template + + # Replace template variables + replacements = { + "{{title}}": notification.official_title, + "{{season}}": str(notification.season), + "{{episode}}": str(notification.episode), + "{{poster_url}}": notification.poster_path or "", + } + + for pattern, value in replacements.items(): + # Escape special characters for JSON string values + escaped_value = value.replace("\\", "\\\\").replace('"', '\\"') + rendered = rendered.replace(pattern, escaped_value) + + try: + return json.loads(rendered) + except json.JSONDecodeError as e: + logger.warning(f"Invalid webhook template JSON: {e}") + # Fallback to default structure + return { + "title": notification.official_title, + "season": notification.season, + "episode": notification.episode, + "poster_url": notification.poster_path or "", + } + + async def send(self, notification: Notification) -> bool: + """Send notification via generic webhook.""" + data = self._render_template(notification) + + resp = await self.post_data(self.url, data) + logger.debug(f"Webhook notification: {resp.status_code}") + # Accept any 2xx status code as success + return 200 <= resp.status_code < 300 + + async def test(self) -> tuple[bool, str]: + """Test webhook by sending a test payload.""" + test_notification = Notification( + official_title="AutoBangumi 通知测试", + season=1, + episode=1, + poster_path="", + ) + data = self._render_template(test_notification) + + try: + resp = await self.post_data(self.url, data) + if 200 <= resp.status_code < 300: + return True, "Webhook test request sent successfully" + else: + return False, f"Webhook returned status {resp.status_code}" + except Exception as e: + return False, f"Webhook test failed: {e}" diff --git a/backend/src/module/notification/providers/wecom.py b/backend/src/module/notification/providers/wecom.py new file mode 100644 index 00000000..ed33b1b0 --- /dev/null +++ b/backend/src/module/notification/providers/wecom.py @@ -0,0 +1,68 @@ +"""WeChat Work (企业微信) notification provider.""" + +import logging +from typing import TYPE_CHECKING + +from module.models.bangumi import Notification +from module.notification.base import NotificationProvider + +if TYPE_CHECKING: + from module.models.config import NotificationProvider as ProviderConfig + +logger = logging.getLogger(__name__) + +# Default fallback image for when no poster is available +DEFAULT_POSTER = ( + "https://article.biliimg.com/bfs/article/" + "d8bcd0408bf32594fd82f27de7d2c685829d1b2e.png" +) + + +class WecomProvider(NotificationProvider): + """WeChat Work (企业微信) notification provider using news message format.""" + + def __init__(self, config: "ProviderConfig"): + super().__init__() + # Support both webhook_url and legacy chat_id field + self.notification_url = config.webhook_url or config.chat_id + self.token = config.token + + async def send(self, notification: Notification) -> bool: + """Send notification via WeChat Work.""" + title = f"【番剧更新】{notification.official_title}" + msg = self._format_message(notification) + + # Use default poster if none available or if it's just the base Mikan URL + picurl = notification.poster_path + if not picurl or picurl == "https://mikanani.me": + picurl = DEFAULT_POSTER + + data = { + "key": self.token, + "type": "news", + "title": title, + "msg": msg, + "picurl": picurl, + } + + resp = await self.post_data(self.notification_url, data) + logger.debug(f"Wecom notification: {resp.status_code}") + return resp.status_code == 200 + + async def test(self) -> tuple[bool, str]: + """Test WeChat Work configuration by sending a test message.""" + data = { + "key": self.token, + "type": "news", + "title": "AutoBangumi 通知测试", + "msg": "通知测试成功!\nNotification test successful!", + "picurl": DEFAULT_POSTER, + } + try: + resp = await self.post_data(self.notification_url, data) + if resp.status_code == 200: + return True, "WeChat Work test message sent successfully" + else: + return False, f"WeChat Work API returned status {resp.status_code}" + except Exception as e: + return False, f"WeChat Work test failed: {e}" diff --git a/backend/src/test/test_config.py b/backend/src/test/test_config.py index 057dc3cd..081474ab 100644 --- a/backend/src/test/test_config.py +++ b/backend/src/test/test_config.py @@ -61,10 +61,10 @@ class TestConfigDefaults: assert config.proxy.type == "http" def test_notification_defaults(self): - """Notification is disabled by default.""" + """Notification is disabled by default with empty providers.""" config = Config() assert config.notification.enable is False - assert config.notification.type == "telegram" + assert config.notification.providers == [] # --------------------------------------------------------------------------- diff --git a/backend/src/test/test_notification.py b/backend/src/test/test_notification.py index 0391ef1d..a9e005b1 100644 --- a/backend/src/test/test_notification.py +++ b/backend/src/test/test_notification.py @@ -1,132 +1,332 @@ -"""Tests for notification: client factory, send_msg, poster lookup.""" +"""Tests for notification: provider registry, manager, and provider implementations.""" import pytest from unittest.mock import AsyncMock, patch, MagicMock from module.models import Notification -from module.notification.notification import getClient, PostNotification +from module.models.config import NotificationProvider as ProviderConfig +from module.notification import PROVIDER_REGISTRY, NotificationManager +from module.notification.providers import ( + TelegramProvider, + DiscordProvider, + BarkProvider, + ServerChanProvider, + WecomProvider, + GotifyProvider, + PushoverProvider, + WebhookProvider, +) # --------------------------------------------------------------------------- -# getClient factory +# Provider Registry # --------------------------------------------------------------------------- -class TestGetClient: +class TestProviderRegistry: def test_telegram(self): - """Returns TelegramNotification for 'telegram' type.""" - from module.notification.plugin import TelegramNotification + """Registry contains TelegramProvider for 'telegram' type.""" + assert PROVIDER_REGISTRY["telegram"] is TelegramProvider - result = getClient("telegram") - assert result is TelegramNotification + def test_discord(self): + """Registry contains DiscordProvider for 'discord' type.""" + assert PROVIDER_REGISTRY["discord"] is DiscordProvider def test_bark(self): - """Returns BarkNotification for 'bark' type.""" - from module.notification.plugin import BarkNotification - - result = getClient("bark") - assert result is BarkNotification + """Registry contains BarkProvider for 'bark' type.""" + assert PROVIDER_REGISTRY["bark"] is BarkProvider def test_server_chan(self): - """Returns ServerChanNotification for 'server-chan' type.""" - from module.notification.plugin import ServerChanNotification - - result = getClient("server-chan") - assert result is ServerChanNotification + """Registry contains ServerChanProvider for 'server-chan' type.""" + assert PROVIDER_REGISTRY["server-chan"] is ServerChanProvider + assert PROVIDER_REGISTRY["serverchan"] is ServerChanProvider def test_wecom(self): - """Returns WecomNotification for 'wecom' type.""" - from module.notification.plugin import WecomNotification + """Registry contains WecomProvider for 'wecom' type.""" + assert PROVIDER_REGISTRY["wecom"] is WecomProvider - result = getClient("wecom") - assert result is WecomNotification + def test_gotify(self): + """Registry contains GotifyProvider for 'gotify' type.""" + assert PROVIDER_REGISTRY["gotify"] is GotifyProvider + + def test_pushover(self): + """Registry contains PushoverProvider for 'pushover' type.""" + assert PROVIDER_REGISTRY["pushover"] is PushoverProvider + + def test_webhook(self): + """Registry contains WebhookProvider for 'webhook' type.""" + assert PROVIDER_REGISTRY["webhook"] is WebhookProvider def test_unknown_type(self): """Returns None for unknown notification type.""" - result = getClient("unknown_service") + result = PROVIDER_REGISTRY.get("unknown_service") assert result is None - def test_case_insensitive(self): - """Type matching is case-insensitive.""" - from module.notification.plugin import TelegramNotification - - assert getClient("Telegram") is TelegramNotification - assert getClient("TELEGRAM") is TelegramNotification - # --------------------------------------------------------------------------- -# PostNotification +# NotificationManager # --------------------------------------------------------------------------- -class TestPostNotification: +class TestNotificationManager: @pytest.fixture - def mock_notifier(self): - """Create a mocked notifier instance.""" - notifier = AsyncMock() - notifier.post_msg = AsyncMock() - notifier.__aenter__ = AsyncMock(return_value=notifier) - notifier.__aexit__ = AsyncMock(return_value=False) - return notifier + def mock_settings(self): + """Mock settings with notification providers.""" + with patch("module.notification.manager.settings") as mock: + mock.notification.providers = [] + yield mock - @pytest.fixture - def post_notification(self, mock_notifier): - """Create PostNotification with mocked notifier.""" - with patch("module.notification.notification.settings") as mock_settings: - mock_settings.notification.type = "telegram" - mock_settings.notification.token = "test_token" - mock_settings.notification.chat_id = "12345" - with patch( - "module.notification.notification.getClient" - ) as mock_get_client: - MockClass = MagicMock() - MockClass.return_value = mock_notifier - mock_get_client.return_value = MockClass - pn = PostNotification() - pn.notifier = mock_notifier - return pn + def test_empty_providers(self, mock_settings): + """Manager handles empty provider list.""" + manager = NotificationManager() + assert len(manager) == 0 + + def test_load_single_provider(self, mock_settings): + """Manager loads a single enabled provider.""" + config = ProviderConfig(type="telegram", enabled=True, token="test", chat_id="123") + mock_settings.notification.providers = [config] + + manager = NotificationManager() + assert len(manager) == 1 + assert isinstance(manager.providers[0], TelegramProvider) + + def test_skip_disabled_provider(self, mock_settings): + """Manager skips disabled providers.""" + config = ProviderConfig(type="telegram", enabled=False, token="test", chat_id="123") + mock_settings.notification.providers = [config] + + manager = NotificationManager() + assert len(manager) == 0 + + def test_load_multiple_providers(self, mock_settings): + """Manager loads multiple enabled providers.""" + configs = [ + ProviderConfig(type="telegram", enabled=True, token="test", chat_id="123"), + ProviderConfig(type="discord", enabled=True, webhook_url="https://discord.com/webhook"), + ProviderConfig(type="bark", enabled=True, device_key="device123"), + ] + mock_settings.notification.providers = configs + + manager = NotificationManager() + assert len(manager) == 3 + assert isinstance(manager.providers[0], TelegramProvider) + assert isinstance(manager.providers[1], DiscordProvider) + assert isinstance(manager.providers[2], BarkProvider) + + def test_skip_unknown_provider(self, mock_settings): + """Manager skips unknown provider types.""" + configs = [ + ProviderConfig(type="telegram", enabled=True, token="test", chat_id="123"), + ProviderConfig(type="unknown_service", enabled=True), + ] + mock_settings.notification.providers = configs + + manager = NotificationManager() + assert len(manager) == 1 + + async def test_send_all(self, mock_settings): + """Manager sends to all providers.""" + configs = [ + ProviderConfig(type="telegram", enabled=True, token="test", chat_id="123"), + ProviderConfig(type="discord", enabled=True, webhook_url="https://discord.com/webhook"), + ] + mock_settings.notification.providers = configs + + manager = NotificationManager() + + # Mock the providers + for provider in manager.providers: + provider.send = AsyncMock(return_value=True) + provider.__aenter__ = AsyncMock(return_value=provider) + provider.__aexit__ = AsyncMock(return_value=None) - async def test_send_msg_success(self, post_notification, mock_notifier): - """send_msg calls notifier.post_msg and succeeds.""" notify = Notification(official_title="Test Anime", season=1, episode=5) - with patch.object(PostNotification, "_get_poster_sync"): - result = await post_notification.send_msg(notify) + with patch.object(manager, "_get_poster", new_callable=AsyncMock): + await manager.send_all(notify) - mock_notifier.post_msg.assert_called_once_with(notify) + for provider in manager.providers: + provider.send.assert_called_once_with(notify) - async def test_send_msg_failure_no_crash(self, post_notification, mock_notifier): - """send_msg catches exceptions and returns False.""" - mock_notifier.post_msg.side_effect = Exception("Network error") + async def test_test_provider(self, mock_settings): + """Manager can test a specific provider.""" + config = ProviderConfig(type="telegram", enabled=True, token="test", chat_id="123") + mock_settings.notification.providers = [config] + + manager = NotificationManager() + + # Mock the provider's test method + manager.providers[0].test = AsyncMock(return_value=(True, "Test successful")) + manager.providers[0].__aenter__ = AsyncMock(return_value=manager.providers[0]) + manager.providers[0].__aexit__ = AsyncMock(return_value=None) + + success, message = await manager.test_provider(0) + assert success is True + assert message == "Test successful" + + async def test_test_provider_invalid_index(self, mock_settings): + """Manager handles invalid provider index.""" + mock_settings.notification.providers = [] + manager = NotificationManager() + + success, message = await manager.test_provider(5) + assert success is False + assert "Invalid provider index" in message + + +# --------------------------------------------------------------------------- +# Provider Implementations +# --------------------------------------------------------------------------- + + +class TestTelegramProvider: + @pytest.fixture + def provider(self): + config = ProviderConfig(type="telegram", enabled=True, token="test_token", chat_id="12345") + return TelegramProvider(config) + + async def test_send_with_photo(self, provider): + """Sends photo when poster available.""" + notify = Notification( + official_title="Test Anime", season=1, episode=5, poster_path="/path/to/poster.jpg" + ) + + with patch.object(provider, "post_files", new_callable=AsyncMock) as mock_post: + mock_post.return_value = MagicMock(status_code=200) + with patch("module.notification.providers.telegram.load_image") as mock_load: + mock_load.return_value = b"image_data" + result = await provider.send(notify) + + assert result is True + mock_post.assert_called_once() + + async def test_send_without_photo(self, provider): + """Sends text when no poster available.""" notify = Notification(official_title="Test Anime", season=1, episode=5) - with patch.object(PostNotification, "_get_poster_sync"): - result = await post_notification.send_msg(notify) + with patch.object(provider, "post_data", new_callable=AsyncMock) as mock_post: + mock_post.return_value = MagicMock(status_code=200) + with patch("module.notification.providers.telegram.load_image") as mock_load: + mock_load.return_value = None + result = await provider.send(notify) - assert result is False + assert result is True + mock_post.assert_called_once() - def test_get_poster_sync_sets_path(self): - """_get_poster_sync queries DB and sets poster_path on notification.""" - notify = Notification(official_title="My Anime", season=1, episode=1) + async def test_test_success(self, provider): + """Test method sends test message.""" + with patch.object(provider, "post_data", new_callable=AsyncMock) as mock_post: + mock_post.return_value = MagicMock(status_code=200) + success, message = await provider.test() - with patch("module.notification.notification.Database") as MockDB: - mock_db = MagicMock() - mock_db.bangumi.match_poster.return_value = "/posters/my_anime.jpg" - MockDB.return_value.__enter__ = MagicMock(return_value=mock_db) - MockDB.return_value.__exit__ = MagicMock(return_value=False) - PostNotification._get_poster_sync(notify) + assert success is True + assert "successfully" in message.lower() - assert notify.poster_path == "/posters/my_anime.jpg" - def test_get_poster_sync_empty_when_not_found(self): - """_get_poster_sync sets empty string when no poster found in DB.""" - notify = Notification(official_title="Unknown", season=1, episode=1) +class TestDiscordProvider: + @pytest.fixture + def provider(self): + config = ProviderConfig( + type="discord", enabled=True, webhook_url="https://discord.com/api/webhooks/123" + ) + return DiscordProvider(config) - with patch("module.notification.notification.Database") as MockDB: - mock_db = MagicMock() - mock_db.bangumi.match_poster.return_value = "" - MockDB.return_value.__enter__ = MagicMock(return_value=mock_db) - MockDB.return_value.__exit__ = MagicMock(return_value=False) - PostNotification._get_poster_sync(notify) + async def test_send(self, provider): + """Sends embed message.""" + notify = Notification( + official_title="Test Anime", season=1, episode=5, poster_path="https://example.com/poster.jpg" + ) - assert notify.poster_path == "" + with patch.object(provider, "post_data", new_callable=AsyncMock) as mock_post: + mock_post.return_value = MagicMock(status_code=204) + result = await provider.send(notify) + + assert result is True + call_args = mock_post.call_args[0] + assert "embeds" in call_args[1] + + +class TestBarkProvider: + @pytest.fixture + def provider(self): + config = ProviderConfig(type="bark", enabled=True, device_key="device123") + return BarkProvider(config) + + async def test_send(self, provider): + """Sends push notification.""" + notify = Notification(official_title="Test Anime", season=1, episode=5) + + with patch.object(provider, "post_data", new_callable=AsyncMock) as mock_post: + mock_post.return_value = MagicMock(status_code=200) + result = await provider.send(notify) + + assert result is True + call_args = mock_post.call_args[0] + assert "device_key" in call_args[1] + + +class TestWebhookProvider: + @pytest.fixture + def provider(self): + config = ProviderConfig( + type="webhook", + enabled=True, + url="https://example.com/webhook", + template='{"anime": "{{title}}", "ep": {{episode}}}', + ) + return WebhookProvider(config) + + def test_render_template(self, provider): + """Template rendering replaces variables.""" + notify = Notification(official_title="Test Anime", season=1, episode=5) + result = provider._render_template(notify) + + assert result["anime"] == "Test Anime" + assert result["ep"] == 5 + + async def test_send(self, provider): + """Sends custom payload.""" + notify = Notification(official_title="Test Anime", season=1, episode=5) + + with patch.object(provider, "post_data", new_callable=AsyncMock) as mock_post: + mock_post.return_value = MagicMock(status_code=200) + result = await provider.send(notify) + + assert result is True + + +# --------------------------------------------------------------------------- +# Config Migration +# --------------------------------------------------------------------------- + + +class TestConfigMigration: + def test_legacy_config_migration(self): + """Old single-provider config migrates to new format.""" + from module.models.config import Notification as NotificationConfig + + # Old format + old_config = NotificationConfig( + enable=True, + type="telegram", + token="old_token", + chat_id="old_chat_id", + ) + + # Should have migrated to new format + assert len(old_config.providers) == 1 + assert old_config.providers[0].type == "telegram" + assert old_config.providers[0].enabled is True + + def test_new_config_no_migration(self): + """New format with providers doesn't trigger migration.""" + from module.models.config import Notification as NotificationConfig + + provider = ProviderConfig(type="discord", enabled=True, webhook_url="https://discord.com/webhook") + new_config = NotificationConfig( + enable=True, + providers=[provider], + ) + + assert len(new_config.providers) == 1 + assert new_config.providers[0].type == "discord" diff --git a/webui/src/api/notification.ts b/webui/src/api/notification.ts new file mode 100644 index 00000000..e1d6d5f9 --- /dev/null +++ b/webui/src/api/notification.ts @@ -0,0 +1,51 @@ +import type { NotificationProviderConfig, NotificationType } from '#/config'; +import type { TupleToUnion } from '#/utils'; + +export interface TestProviderRequest { + provider_index: number; +} + +export interface TestProviderConfigRequest { + type: TupleToUnion; + enabled?: boolean; + token?: string; + chat_id?: string; + webhook_url?: string; + server_url?: string; + device_key?: string; + user_key?: string; + api_token?: string; + template?: string; + url?: string; +} + +export interface TestResponse { + success: boolean; + message: string; + message_zh: string; + message_en: string; +} + +export const apiNotification = { + /** + * Test a configured provider by index + */ + async testProvider(request: TestProviderRequest) { + const { data } = await axios.post( + 'api/v1/notification/test', + request + ); + return { data }; + }, + + /** + * Test an unsaved provider configuration + */ + async testProviderConfig(request: TestProviderConfigRequest) { + const { data } = await axios.post( + 'api/v1/notification/test-config', + request + ); + return { data }; + }, +}; diff --git a/webui/src/components/setting/config-notification.vue b/webui/src/components/setting/config-notification.vue index 08d4b9d2..25912776 100644 --- a/webui/src/components/setting/config-notification.vue +++ b/webui/src/components/setting/config-notification.vue @@ -1,64 +1,479 @@