feat(notification): redesign system to support multiple providers

- Add NotificationProvider base class with send() and test() methods
- Add NotificationManager for handling multiple providers simultaneously
- Add new providers: Discord, Gotify, Pushover, generic Webhook
- Migrate existing providers (Telegram, Bark, Server Chan, WeChat Work) to new architecture
- Add API endpoints for testing providers (/notification/test, /notification/test-config)
- Auto-migrate legacy single-provider configs to new multi-provider format
- Update WebUI with card-based multi-provider settings UI
- Add test button for each provider in settings
- Generic webhook supports template variables: {{title}}, {{season}}, {{episode}}, {{poster_url}}

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Estrella Pan
2026-01-28 20:58:42 +01:00
parent 5eb21bfcfa
commit 48bf570697
25 changed files with 1865 additions and 190 deletions

View File

@@ -10,6 +10,7 @@ from .program import router as program_router
from .rss import router as rss_router
from .search import router as search_router
from .setup import router as setup_router
from .notification import router as notification_router
__all__ = "v1"
@@ -25,3 +26,4 @@ v1.include_router(downloader_router)
v1.include_router(rss_router)
v1.include_router(search_router)
v1.include_router(setup_router)
v1.include_router(notification_router)

View File

@@ -0,0 +1,117 @@
"""Notification API endpoints."""
import logging
from typing import Optional
from fastapi import APIRouter
from pydantic import BaseModel, Field
from module.notification import NotificationManager
from module.models.config import NotificationProvider as ProviderConfig
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/notification", tags=["notification"])
class TestProviderRequest(BaseModel):
"""Request body for testing a saved provider by index."""
provider_index: int = Field(..., description="Index of the provider to test")
class TestProviderConfigRequest(BaseModel):
"""Request body for testing an unsaved provider configuration."""
type: str = Field(..., description="Provider type")
enabled: bool = Field(True, description="Whether provider is enabled")
token: Optional[str] = Field(None, description="Auth token")
chat_id: Optional[str] = Field(None, description="Chat/channel ID")
webhook_url: Optional[str] = Field(None, description="Webhook URL")
server_url: Optional[str] = Field(None, description="Server URL")
device_key: Optional[str] = Field(None, description="Device key")
user_key: Optional[str] = Field(None, description="User key")
api_token: Optional[str] = Field(None, description="API token")
template: Optional[str] = Field(None, description="Custom template")
url: Optional[str] = Field(None, description="URL for generic webhook")
class TestResponse(BaseModel):
"""Response for test notification endpoints."""
success: bool
message: str
message_zh: str = ""
message_en: str = ""
@router.post("/test", response_model=TestResponse)
async def test_provider(request: TestProviderRequest):
"""Test a configured notification provider by its index.
Sends a test notification using the provider at the specified index
in the current configuration.
"""
try:
manager = NotificationManager()
if request.provider_index >= len(manager):
return TestResponse(
success=False,
message=f"Invalid provider index: {request.provider_index}",
message_zh=f"无效的提供者索引: {request.provider_index}",
message_en=f"Invalid provider index: {request.provider_index}",
)
success, message = await manager.test_provider(request.provider_index)
return TestResponse(
success=success,
message=message,
message_zh="测试成功" if success else f"测试失败: {message}",
message_en="Test successful" if success else f"Test failed: {message}",
)
except Exception as e:
logger.error(f"Failed to test provider: {e}")
return TestResponse(
success=False,
message=str(e),
message_zh=f"测试失败: {e}",
message_en=f"Test failed: {e}",
)
@router.post("/test-config", response_model=TestResponse)
async def test_provider_config(request: TestProviderConfigRequest):
"""Test an unsaved notification provider configuration.
Useful for testing a provider before saving it to the configuration.
"""
try:
# Convert request to ProviderConfig
config = ProviderConfig(
type=request.type,
enabled=request.enabled,
token=request.token or "",
chat_id=request.chat_id or "",
webhook_url=request.webhook_url or "",
server_url=request.server_url or "",
device_key=request.device_key or "",
user_key=request.user_key or "",
api_token=request.api_token or "",
template=request.template,
url=request.url or "",
)
success, message = await NotificationManager.test_provider_config(config)
return TestResponse(
success=success,
message=message,
message_zh="测试成功" if success else f"测试失败: {message}",
message_en="Test successful" if success else f"Test failed: {message}",
)
except Exception as e:
logger.error(f"Failed to test provider config: {e}")
return TestResponse(
success=False,
message=str(e),
message_zh=f"测试失败: {e}",
message_en=f"Test failed: {e}",
)

View File

@@ -8,7 +8,8 @@ from pydantic import BaseModel, Field
from module.conf import VERSION, settings
from module.models import Config, ResponseModel
from module.network import RequestContent
from module.notification.notification import getClient
from module.notification import PROVIDER_REGISTRY
from module.models.config import NotificationProvider as ProviderConfig
from module.security.jwt import get_password_hash
logger = logging.getLogger(__name__)
@@ -202,8 +203,8 @@ async def test_notification(req: TestNotificationRequest):
"""Send a test notification."""
_require_setup_needed()
NotifierClass = getClient(req.type)
if NotifierClass is None:
provider_cls = PROVIDER_REGISTRY.get(req.type.lower())
if provider_cls is None:
return TestResultResponse(
success=False,
message_en=f"Unknown notification type: {req.type}",
@@ -211,30 +212,27 @@ async def test_notification(req: TestNotificationRequest):
)
try:
notifier = NotifierClass(token=req.token, chat_id=req.chat_id)
async with notifier:
# Send a simple test message
data = {"chat_id": req.chat_id, "text": "AutoBangumi 通知测试成功!"}
if req.type.lower() == "telegram":
resp = await notifier.post_data(notifier.message_url, data)
if resp.status_code == 200:
return TestResultResponse(
success=True,
message_en="Test notification sent successfully.",
message_zh="测试通知发送成功。",
)
else:
return TestResultResponse(
success=False,
message_en="Failed to send test notification.",
message_zh="测试通知发送失败。",
)
else:
# For other providers, just verify the notifier can be created
# Create provider config
config = ProviderConfig(
type=req.type,
enabled=True,
token=req.token,
chat_id=req.chat_id,
)
provider = provider_cls(config)
async with provider:
success, message = await provider.test()
if success:
return TestResultResponse(
success=True,
message_en="Notification configuration is valid.",
message_zh="通知配置有效",
message_en="Test notification sent successfully.",
message_zh="测试通知发送成功",
)
else:
return TestResultResponse(
success=False,
message_en=f"Failed to send test notification: {message}",
message_zh=f"测试通知发送失败:{message}",
)
except Exception as e:
logger.error(f"[Setup] Notification test failed: {e}")
@@ -275,9 +273,14 @@ async def complete_setup(req: SetupCompleteRequest):
if req.notification_enable:
config_dict["notification"] = {
"enable": True,
"type": req.notification_type,
"token": req.notification_token,
"chat_id": req.notification_chat_id,
"providers": [
{
"type": req.notification_type,
"enabled": True,
"token": req.notification_token,
"chat_id": req.notification_chat_id,
}
],
}
settings.save(config_dict)

View File

@@ -36,7 +36,7 @@ DEFAULT_SETTINGS = {
"username": "",
"password": "",
},
"notification": {"enable": False, "type": "telegram", "token": "", "chat_id": ""},
"notification": {"enable": False, "providers": []},
"experimental_openai": {
"enable": False,
"api_key": "",

View File

@@ -4,7 +4,7 @@ import logging
from module.conf import settings
from module.downloader import DownloadClient
from module.manager import Renamer, TorrentManager, eps_complete
from module.notification import PostNotification
from module.notification import NotificationManager
from module.rss import RSSAnalyser, RSSEngine
from .offset_scanner import OffsetScanner
@@ -66,10 +66,9 @@ class RenameThread(ProgramStatus):
async with Renamer() as renamer:
renamed_info = await renamer.rename()
if settings.notification.enable and renamed_info:
async with PostNotification() as notifier:
await asyncio.gather(
*[notifier.send_msg(info) for info in renamed_info]
)
manager = NotificationManager()
for info in renamed_info:
await manager.send_all(info)
try:
await asyncio.wait_for(
self.stop_event.wait(),

View File

@@ -1,7 +1,7 @@
from os.path import expandvars
from typing import Literal
from typing import Literal, Optional
from pydantic import BaseModel, Field, field_validator
from pydantic import BaseModel, Field, field_validator, model_validator
class Program(BaseModel):
@@ -68,19 +68,106 @@ class Proxy(BaseModel):
return expandvars(self.password_)
class NotificationProvider(BaseModel):
"""Configuration for a single notification provider."""
type: str = Field(..., description="Provider type (telegram, discord, bark, etc.)")
enabled: bool = Field(True, description="Whether this provider is enabled")
# Common fields (with env var expansion)
token_: Optional[str] = Field(None, alias="token", description="Auth token")
chat_id_: Optional[str] = Field(None, alias="chat_id", description="Chat/channel ID")
# Provider-specific fields
webhook_url_: Optional[str] = Field(
None, alias="webhook_url", description="Webhook URL for discord/wecom"
)
server_url_: Optional[str] = Field(
None, alias="server_url", description="Server URL for gotify/bark"
)
device_key_: Optional[str] = Field(
None, alias="device_key", description="Device key for bark"
)
user_key_: Optional[str] = Field(
None, alias="user_key", description="User key for pushover"
)
api_token_: Optional[str] = Field(
None, alias="api_token", description="API token for pushover"
)
template: Optional[str] = Field(
None, description="Custom template for webhook provider"
)
url_: Optional[str] = Field(
None, alias="url", description="URL for generic webhook provider"
)
@property
def token(self) -> str:
return expandvars(self.token_) if self.token_ else ""
@property
def chat_id(self) -> str:
return expandvars(self.chat_id_) if self.chat_id_ else ""
@property
def webhook_url(self) -> str:
return expandvars(self.webhook_url_) if self.webhook_url_ else ""
@property
def server_url(self) -> str:
return expandvars(self.server_url_) if self.server_url_ else ""
@property
def device_key(self) -> str:
return expandvars(self.device_key_) if self.device_key_ else ""
@property
def user_key(self) -> str:
return expandvars(self.user_key_) if self.user_key_ else ""
@property
def api_token(self) -> str:
return expandvars(self.api_token_) if self.api_token_ else ""
@property
def url(self) -> str:
return expandvars(self.url_) if self.url_ else ""
class Notification(BaseModel):
enable: bool = Field(False, description="Enable notification")
type: str = Field("telegram", description="Notification type")
token_: str = Field("", alias="token", description="Notification token")
chat_id_: str = Field("", alias="chat_id", description="Notification chat id")
"""Notification configuration supporting multiple providers."""
enable: bool = Field(False, description="Enable notification system")
providers: list[NotificationProvider] = Field(
default_factory=list, description="List of notification providers"
)
# Legacy fields for backward compatibility (deprecated)
type: Optional[str] = Field(None, description="[Deprecated] Use providers instead")
token_: Optional[str] = Field(None, alias="token", description="[Deprecated]")
chat_id_: Optional[str] = Field(None, alias="chat_id", description="[Deprecated]")
@property
def token(self):
return expandvars(self.token_)
def token(self) -> str:
return expandvars(self.token_) if self.token_ else ""
@property
def chat_id(self):
return expandvars(self.chat_id_)
def chat_id(self) -> str:
return expandvars(self.chat_id_) if self.chat_id_ else ""
@model_validator(mode="after")
def migrate_legacy_config(self) -> "Notification":
"""Auto-migrate old single-provider config to new format."""
if self.type and not self.providers:
# Old format detected, migrate to new format
legacy_provider = NotificationProvider(
type=self.type,
enabled=True,
token=self.token_ or "",
chat_id=self.chat_id_ or "",
)
self.providers = [legacy_provider]
return self
class ExperimentalOpenAI(BaseModel):

View File

@@ -1 +1,11 @@
from .notification import PostNotification
from .manager import NotificationManager
from .base import NotificationProvider
from .providers import PROVIDER_REGISTRY
__all__ = [
"PostNotification",
"NotificationManager",
"NotificationProvider",
"PROVIDER_REGISTRY",
]

View File

@@ -0,0 +1,51 @@
"""Base class for notification providers."""
from abc import ABC, abstractmethod
from module.models.bangumi import Notification
from module.network import RequestContent
class NotificationProvider(RequestContent, ABC):
"""Abstract base class for notification providers.
All notification providers must inherit from this class and implement
the send() and test() methods.
"""
@abstractmethod
async def send(self, notification: Notification) -> bool:
"""Send a notification.
Args:
notification: The notification data containing anime info.
Returns:
True if the notification was sent successfully, False otherwise.
"""
pass
@abstractmethod
async def test(self) -> tuple[bool, str]:
"""Test the notification provider configuration.
Returns:
A tuple of (success, message) where success is True if the test
passed and message contains details about the result.
"""
pass
def _format_message(self, notify: Notification) -> str:
"""Format the default notification message.
Args:
notify: The notification data.
Returns:
Formatted message string.
"""
return (
f"番剧名称:{notify.official_title}\n"
f"季度: 第{notify.season}\n"
f"更新集数: 第{notify.episode}"
)

View File

@@ -0,0 +1,132 @@
"""Notification manager for handling multiple providers."""
import asyncio
import logging
from typing import TYPE_CHECKING
from module.conf import settings
from module.database import Database
from module.models.bangumi import Notification
if TYPE_CHECKING:
from module.notification.base import NotificationProvider
from module.models.config import NotificationProvider as ProviderConfig
logger = logging.getLogger(__name__)
class NotificationManager:
"""Manager for handling notifications across multiple providers."""
def __init__(self):
self.providers: list["NotificationProvider"] = []
self._load_providers()
def _load_providers(self):
"""Initialize providers from configuration."""
from module.notification.providers import PROVIDER_REGISTRY
for cfg in settings.notification.providers:
if not cfg.enabled:
continue
provider_cls = PROVIDER_REGISTRY.get(cfg.type.lower())
if provider_cls:
try:
provider = provider_cls(cfg)
self.providers.append(provider)
logger.debug(f"Loaded notification provider: {cfg.type}")
except Exception as e:
logger.warning(f"Failed to load provider {cfg.type}: {e}")
else:
logger.warning(f"Unknown notification provider type: {cfg.type}")
async def _get_poster(self, notification: Notification):
"""Fetch poster path from database if not already set."""
if notification.poster_path:
return
def _get_poster_sync():
with Database() as db:
data = db.bangumi.search_official_title(notification.official_title)
if data:
notification.poster_path = data.poster_link
await asyncio.to_thread(_get_poster_sync)
async def send_all(self, notification: Notification):
"""Send notification to all enabled providers.
Args:
notification: The notification data to send.
"""
if not self.providers:
logger.debug("No notification providers configured")
return
# Fetch poster if needed
await self._get_poster(notification)
# Send to all providers in parallel
async def send_to_provider(provider: "NotificationProvider"):
try:
async with provider:
await provider.send(notification)
logger.debug(
f"Sent notification via {provider.__class__.__name__}: "
f"{notification.official_title}"
)
except Exception as e:
logger.warning(
f"Failed to send notification via {provider.__class__.__name__}: {e}"
)
await asyncio.gather(
*[send_to_provider(p) for p in self.providers],
return_exceptions=True,
)
async def test_provider(self, index: int) -> tuple[bool, str]:
"""Test a specific provider by index.
Args:
index: The index of the provider in the providers list.
Returns:
A tuple of (success, message).
"""
if index < 0 or index >= len(self.providers):
return False, f"Invalid provider index: {index}"
provider = self.providers[index]
try:
async with provider:
return await provider.test()
except Exception as e:
return False, f"Test failed: {e}"
@staticmethod
async def test_provider_config(config: "ProviderConfig") -> tuple[bool, str]:
"""Test a provider configuration without saving it.
Args:
config: The provider configuration to test.
Returns:
A tuple of (success, message).
"""
from module.notification.providers import PROVIDER_REGISTRY
provider_cls = PROVIDER_REGISTRY.get(config.type.lower())
if not provider_cls:
return False, f"Unknown provider type: {config.type}"
try:
provider = provider_cls(config)
async with provider:
return await provider.test()
except Exception as e:
return False, f"Test failed: {e}"
def __len__(self) -> int:
return len(self.providers)

View File

@@ -0,0 +1,40 @@
"""Notification providers registry."""
from typing import TYPE_CHECKING
from module.notification.providers.telegram import TelegramProvider
from module.notification.providers.discord import DiscordProvider
from module.notification.providers.bark import BarkProvider
from module.notification.providers.server_chan import ServerChanProvider
from module.notification.providers.wecom import WecomProvider
from module.notification.providers.gotify import GotifyProvider
from module.notification.providers.pushover import PushoverProvider
from module.notification.providers.webhook import WebhookProvider
if TYPE_CHECKING:
from module.notification.base import NotificationProvider
# Registry mapping provider type names to their classes
PROVIDER_REGISTRY: dict[str, type["NotificationProvider"]] = {
"telegram": TelegramProvider,
"discord": DiscordProvider,
"bark": BarkProvider,
"server-chan": ServerChanProvider,
"serverchan": ServerChanProvider, # Alternative name
"wecom": WecomProvider,
"gotify": GotifyProvider,
"pushover": PushoverProvider,
"webhook": WebhookProvider,
}
__all__ = [
"PROVIDER_REGISTRY",
"TelegramProvider",
"DiscordProvider",
"BarkProvider",
"ServerChanProvider",
"WecomProvider",
"GotifyProvider",
"PushoverProvider",
"WebhookProvider",
]

View File

@@ -0,0 +1,55 @@
"""Bark notification provider."""
import logging
from typing import TYPE_CHECKING
from module.models.bangumi import Notification
from module.notification.base import NotificationProvider
if TYPE_CHECKING:
from module.models.config import NotificationProvider as ProviderConfig
logger = logging.getLogger(__name__)
class BarkProvider(NotificationProvider):
"""Bark (iOS) notification provider."""
DEFAULT_SERVER = "https://api.day.app"
def __init__(self, config: "ProviderConfig"):
super().__init__()
# Support both legacy token field and new device_key field
self.device_key = config.device_key or config.token
server_url = config.server_url or self.DEFAULT_SERVER
self.notification_url = f"{server_url.rstrip('/')}/push"
async def send(self, notification: Notification) -> bool:
"""Send notification via Bark."""
text = self._format_message(notification)
data = {
"title": notification.official_title,
"body": text,
"icon": notification.poster_path or "",
"device_key": self.device_key,
}
resp = await self.post_data(self.notification_url, data)
logger.debug(f"Bark notification: {resp.status_code}")
return resp.status_code == 200
async def test(self) -> tuple[bool, str]:
"""Test Bark configuration by sending a test notification."""
data = {
"title": "AutoBangumi",
"body": "通知测试成功!\nNotification test successful!",
"device_key": self.device_key,
}
try:
resp = await self.post_data(self.notification_url, data)
if resp.status_code == 200:
return True, "Bark test notification sent successfully"
else:
return False, f"Bark API returned status {resp.status_code}"
except Exception as e:
return False, f"Bark test failed: {e}"

View File

@@ -0,0 +1,61 @@
"""Discord notification provider."""
import logging
from typing import TYPE_CHECKING
from module.models.bangumi import Notification
from module.notification.base import NotificationProvider
if TYPE_CHECKING:
from module.models.config import NotificationProvider as ProviderConfig
logger = logging.getLogger(__name__)
class DiscordProvider(NotificationProvider):
"""Discord webhook notification provider."""
def __init__(self, config: "ProviderConfig"):
super().__init__()
self.webhook_url = config.webhook_url
async def send(self, notification: Notification) -> bool:
"""Send notification via Discord webhook."""
embed = {
"title": f"📺 {notification.official_title}",
"description": (
f"**季度:** 第{notification.season}\n"
f"**集数:** 第{notification.episode}"
),
"color": 0x00BFFF, # Deep Sky Blue
}
# Add poster as thumbnail if available
if notification.poster_path and notification.poster_path != "https://mikanani.me":
embed["thumbnail"] = {"url": notification.poster_path}
data = {
"embeds": [embed],
}
resp = await self.post_data(self.webhook_url, data)
logger.debug(f"Discord notification: {resp.status_code}")
return resp.status_code in (200, 204)
async def test(self) -> tuple[bool, str]:
"""Test Discord webhook by sending a test message."""
embed = {
"title": "AutoBangumi 通知测试",
"description": "通知测试成功!\nNotification test successful!",
"color": 0x00FF00, # Green
}
data = {"embeds": [embed]}
try:
resp = await self.post_data(self.webhook_url, data)
if resp.status_code in (200, 204):
return True, "Discord test message sent successfully"
else:
return False, f"Discord API returned status {resp.status_code}"
except Exception as e:
return False, f"Discord test failed: {e}"

View File

@@ -0,0 +1,63 @@
"""Gotify notification provider."""
import logging
from typing import TYPE_CHECKING
from module.models.bangumi import Notification
from module.notification.base import NotificationProvider
if TYPE_CHECKING:
from module.models.config import NotificationProvider as ProviderConfig
logger = logging.getLogger(__name__)
class GotifyProvider(NotificationProvider):
"""Gotify notification provider."""
def __init__(self, config: "ProviderConfig"):
super().__init__()
server_url = config.server_url.rstrip("/")
self.token = config.token
self.notification_url = f"{server_url}/message?token={self.token}"
async def send(self, notification: Notification) -> bool:
"""Send notification via Gotify."""
message = self._format_message(notification)
# Build extras for markdown support and image
extras = {
"client::display": {"contentType": "text/markdown"},
}
if notification.poster_path and notification.poster_path != "https://mikanani.me":
extras["client::notification"] = {
"bigImageUrl": notification.poster_path,
}
data = {
"title": notification.official_title,
"message": message,
"priority": 5,
"extras": extras,
}
resp = await self.post_data(self.notification_url, data)
logger.debug(f"Gotify notification: {resp.status_code}")
return resp.status_code == 200
async def test(self) -> tuple[bool, str]:
"""Test Gotify configuration by sending a test message."""
data = {
"title": "AutoBangumi 通知测试",
"message": "通知测试成功!\nNotification test successful!",
"priority": 5,
}
try:
resp = await self.post_data(self.notification_url, data)
if resp.status_code == 200:
return True, "Gotify test message sent successfully"
else:
return False, f"Gotify API returned status {resp.status_code}"
except Exception as e:
return False, f"Gotify test failed: {e}"

View File

@@ -0,0 +1,69 @@
"""Pushover notification provider."""
import logging
from typing import TYPE_CHECKING
from module.models.bangumi import Notification
from module.notification.base import NotificationProvider
if TYPE_CHECKING:
from module.models.config import NotificationProvider as ProviderConfig
logger = logging.getLogger(__name__)
class PushoverProvider(NotificationProvider):
"""Pushover notification provider."""
API_URL = "https://api.pushover.net/1/messages.json"
def __init__(self, config: "ProviderConfig"):
super().__init__()
self.user_key = config.user_key
self.api_token = config.api_token
async def send(self, notification: Notification) -> bool:
"""Send notification via Pushover."""
message = self._format_message(notification)
data = {
"token": self.api_token,
"user": self.user_key,
"title": notification.official_title,
"message": message,
"html": 0,
}
# Add poster as supplementary URL if available
if notification.poster_path and notification.poster_path != "https://mikanani.me":
data["url"] = notification.poster_path
data["url_title"] = "查看海报"
resp = await self.post_data(self.API_URL, data)
logger.debug(f"Pushover notification: {resp.status_code}")
return resp.status_code == 200
async def test(self) -> tuple[bool, str]:
"""Test Pushover configuration by sending a test message."""
data = {
"token": self.api_token,
"user": self.user_key,
"title": "AutoBangumi 通知测试",
"message": "通知测试成功!\nNotification test successful!",
}
try:
resp = await self.post_data(self.API_URL, data)
if resp.status_code == 200:
return True, "Pushover test message sent successfully"
else:
# Try to parse error message from response
try:
error_data = resp.json()
errors = error_data.get("errors", [])
if errors:
return False, f"Pushover error: {', '.join(errors)}"
except Exception:
pass
return False, f"Pushover API returned status {resp.status_code}"
except Exception as e:
return False, f"Pushover test failed: {e}"

View File

@@ -0,0 +1,48 @@
"""Server Chan notification provider."""
import logging
from typing import TYPE_CHECKING
from module.models.bangumi import Notification
from module.notification.base import NotificationProvider
if TYPE_CHECKING:
from module.models.config import NotificationProvider as ProviderConfig
logger = logging.getLogger(__name__)
class ServerChanProvider(NotificationProvider):
"""Server Chan (Server酱) notification provider for WeChat."""
def __init__(self, config: "ProviderConfig"):
super().__init__()
token = config.token
self.notification_url = f"https://sctapi.ftqq.com/{token}.send"
async def send(self, notification: Notification) -> bool:
"""Send notification via Server Chan."""
text = self._format_message(notification)
data = {
"title": notification.official_title,
"desp": text,
}
resp = await self.post_data(self.notification_url, data)
logger.debug(f"ServerChan notification: {resp.status_code}")
return resp.status_code == 200
async def test(self) -> tuple[bool, str]:
"""Test Server Chan configuration by sending a test message."""
data = {
"title": "AutoBangumi 通知测试",
"desp": "通知测试成功!\nNotification test successful!",
}
try:
resp = await self.post_data(self.notification_url, data)
if resp.status_code == 200:
return True, "Server Chan test message sent successfully"
else:
return False, f"Server Chan API returned status {resp.status_code}"
except Exception as e:
return False, f"Server Chan test failed: {e}"

View File

@@ -0,0 +1,58 @@
"""Telegram notification provider."""
import logging
from typing import TYPE_CHECKING
from module.models.bangumi import Notification
from module.notification.base import NotificationProvider
from module.utils import load_image
if TYPE_CHECKING:
from module.models.config import NotificationProvider as ProviderConfig
logger = logging.getLogger(__name__)
class TelegramProvider(NotificationProvider):
"""Telegram Bot notification provider."""
def __init__(self, config: "ProviderConfig"):
super().__init__()
token = config.token
self.chat_id = config.chat_id
self.photo_url = f"https://api.telegram.org/bot{token}/sendPhoto"
self.message_url = f"https://api.telegram.org/bot{token}/sendMessage"
async def send(self, notification: Notification) -> bool:
"""Send notification via Telegram."""
text = self._format_message(notification)
data = {
"chat_id": self.chat_id,
"caption": text,
"text": text,
"disable_notification": True,
}
photo = load_image(notification.poster_path)
if photo:
resp = await self.post_files(self.photo_url, data, files={"photo": photo})
else:
resp = await self.post_data(self.message_url, data)
logger.debug(f"Telegram notification: {resp.status_code}")
return resp.status_code == 200
async def test(self) -> tuple[bool, str]:
"""Test Telegram configuration by sending a test message."""
data = {
"chat_id": self.chat_id,
"text": "AutoBangumi 通知测试成功!\nNotification test successful!",
}
try:
resp = await self.post_data(self.message_url, data)
if resp.status_code == 200:
return True, "Telegram test message sent successfully"
else:
return False, f"Telegram API returned status {resp.status_code}"
except Exception as e:
return False, f"Telegram test failed: {e}"

View File

@@ -0,0 +1,98 @@
"""Generic webhook notification provider."""
import json
import logging
import re
from typing import TYPE_CHECKING
from module.models.bangumi import Notification
from module.notification.base import NotificationProvider
if TYPE_CHECKING:
from module.models.config import NotificationProvider as ProviderConfig
logger = logging.getLogger(__name__)
# Default template for webhook payload
DEFAULT_TEMPLATE = json.dumps(
{
"title": "{{title}}",
"season": "{{season}}",
"episode": "{{episode}}",
"poster_url": "{{poster_url}}",
},
ensure_ascii=False,
)
class WebhookProvider(NotificationProvider):
"""Generic webhook notification provider with customizable templates."""
def __init__(self, config: "ProviderConfig"):
super().__init__()
self.url = config.url
self.template = config.template or DEFAULT_TEMPLATE
def _render_template(self, notification: Notification) -> dict:
"""Render the template with notification data.
Supported variables:
- {{title}} - Anime title
- {{season}} - Season number
- {{episode}} - Episode number
- {{poster_url}} - Poster image URL
"""
rendered = self.template
# Replace template variables
replacements = {
"{{title}}": notification.official_title,
"{{season}}": str(notification.season),
"{{episode}}": str(notification.episode),
"{{poster_url}}": notification.poster_path or "",
}
for pattern, value in replacements.items():
# Escape special characters for JSON string values
escaped_value = value.replace("\\", "\\\\").replace('"', '\\"')
rendered = rendered.replace(pattern, escaped_value)
try:
return json.loads(rendered)
except json.JSONDecodeError as e:
logger.warning(f"Invalid webhook template JSON: {e}")
# Fallback to default structure
return {
"title": notification.official_title,
"season": notification.season,
"episode": notification.episode,
"poster_url": notification.poster_path or "",
}
async def send(self, notification: Notification) -> bool:
"""Send notification via generic webhook."""
data = self._render_template(notification)
resp = await self.post_data(self.url, data)
logger.debug(f"Webhook notification: {resp.status_code}")
# Accept any 2xx status code as success
return 200 <= resp.status_code < 300
async def test(self) -> tuple[bool, str]:
"""Test webhook by sending a test payload."""
test_notification = Notification(
official_title="AutoBangumi 通知测试",
season=1,
episode=1,
poster_path="",
)
data = self._render_template(test_notification)
try:
resp = await self.post_data(self.url, data)
if 200 <= resp.status_code < 300:
return True, "Webhook test request sent successfully"
else:
return False, f"Webhook returned status {resp.status_code}"
except Exception as e:
return False, f"Webhook test failed: {e}"

View File

@@ -0,0 +1,68 @@
"""WeChat Work (企业微信) notification provider."""
import logging
from typing import TYPE_CHECKING
from module.models.bangumi import Notification
from module.notification.base import NotificationProvider
if TYPE_CHECKING:
from module.models.config import NotificationProvider as ProviderConfig
logger = logging.getLogger(__name__)
# Default fallback image for when no poster is available
DEFAULT_POSTER = (
"https://article.biliimg.com/bfs/article/"
"d8bcd0408bf32594fd82f27de7d2c685829d1b2e.png"
)
class WecomProvider(NotificationProvider):
"""WeChat Work (企业微信) notification provider using news message format."""
def __init__(self, config: "ProviderConfig"):
super().__init__()
# Support both webhook_url and legacy chat_id field
self.notification_url = config.webhook_url or config.chat_id
self.token = config.token
async def send(self, notification: Notification) -> bool:
"""Send notification via WeChat Work."""
title = f"【番剧更新】{notification.official_title}"
msg = self._format_message(notification)
# Use default poster if none available or if it's just the base Mikan URL
picurl = notification.poster_path
if not picurl or picurl == "https://mikanani.me":
picurl = DEFAULT_POSTER
data = {
"key": self.token,
"type": "news",
"title": title,
"msg": msg,
"picurl": picurl,
}
resp = await self.post_data(self.notification_url, data)
logger.debug(f"Wecom notification: {resp.status_code}")
return resp.status_code == 200
async def test(self) -> tuple[bool, str]:
"""Test WeChat Work configuration by sending a test message."""
data = {
"key": self.token,
"type": "news",
"title": "AutoBangumi 通知测试",
"msg": "通知测试成功!\nNotification test successful!",
"picurl": DEFAULT_POSTER,
}
try:
resp = await self.post_data(self.notification_url, data)
if resp.status_code == 200:
return True, "WeChat Work test message sent successfully"
else:
return False, f"WeChat Work API returned status {resp.status_code}"
except Exception as e:
return False, f"WeChat Work test failed: {e}"

View File

@@ -61,10 +61,10 @@ class TestConfigDefaults:
assert config.proxy.type == "http"
def test_notification_defaults(self):
"""Notification is disabled by default."""
"""Notification is disabled by default with empty providers."""
config = Config()
assert config.notification.enable is False
assert config.notification.type == "telegram"
assert config.notification.providers == []
# ---------------------------------------------------------------------------

View File

@@ -1,132 +1,332 @@
"""Tests for notification: client factory, send_msg, poster lookup."""
"""Tests for notification: provider registry, manager, and provider implementations."""
import pytest
from unittest.mock import AsyncMock, patch, MagicMock
from module.models import Notification
from module.notification.notification import getClient, PostNotification
from module.models.config import NotificationProvider as ProviderConfig
from module.notification import PROVIDER_REGISTRY, NotificationManager
from module.notification.providers import (
TelegramProvider,
DiscordProvider,
BarkProvider,
ServerChanProvider,
WecomProvider,
GotifyProvider,
PushoverProvider,
WebhookProvider,
)
# ---------------------------------------------------------------------------
# getClient factory
# Provider Registry
# ---------------------------------------------------------------------------
class TestGetClient:
class TestProviderRegistry:
def test_telegram(self):
"""Returns TelegramNotification for 'telegram' type."""
from module.notification.plugin import TelegramNotification
"""Registry contains TelegramProvider for 'telegram' type."""
assert PROVIDER_REGISTRY["telegram"] is TelegramProvider
result = getClient("telegram")
assert result is TelegramNotification
def test_discord(self):
"""Registry contains DiscordProvider for 'discord' type."""
assert PROVIDER_REGISTRY["discord"] is DiscordProvider
def test_bark(self):
"""Returns BarkNotification for 'bark' type."""
from module.notification.plugin import BarkNotification
result = getClient("bark")
assert result is BarkNotification
"""Registry contains BarkProvider for 'bark' type."""
assert PROVIDER_REGISTRY["bark"] is BarkProvider
def test_server_chan(self):
"""Returns ServerChanNotification for 'server-chan' type."""
from module.notification.plugin import ServerChanNotification
result = getClient("server-chan")
assert result is ServerChanNotification
"""Registry contains ServerChanProvider for 'server-chan' type."""
assert PROVIDER_REGISTRY["server-chan"] is ServerChanProvider
assert PROVIDER_REGISTRY["serverchan"] is ServerChanProvider
def test_wecom(self):
"""Returns WecomNotification for 'wecom' type."""
from module.notification.plugin import WecomNotification
"""Registry contains WecomProvider for 'wecom' type."""
assert PROVIDER_REGISTRY["wecom"] is WecomProvider
result = getClient("wecom")
assert result is WecomNotification
def test_gotify(self):
"""Registry contains GotifyProvider for 'gotify' type."""
assert PROVIDER_REGISTRY["gotify"] is GotifyProvider
def test_pushover(self):
"""Registry contains PushoverProvider for 'pushover' type."""
assert PROVIDER_REGISTRY["pushover"] is PushoverProvider
def test_webhook(self):
"""Registry contains WebhookProvider for 'webhook' type."""
assert PROVIDER_REGISTRY["webhook"] is WebhookProvider
def test_unknown_type(self):
"""Returns None for unknown notification type."""
result = getClient("unknown_service")
result = PROVIDER_REGISTRY.get("unknown_service")
assert result is None
def test_case_insensitive(self):
"""Type matching is case-insensitive."""
from module.notification.plugin import TelegramNotification
assert getClient("Telegram") is TelegramNotification
assert getClient("TELEGRAM") is TelegramNotification
# ---------------------------------------------------------------------------
# PostNotification
# NotificationManager
# ---------------------------------------------------------------------------
class TestPostNotification:
class TestNotificationManager:
@pytest.fixture
def mock_notifier(self):
"""Create a mocked notifier instance."""
notifier = AsyncMock()
notifier.post_msg = AsyncMock()
notifier.__aenter__ = AsyncMock(return_value=notifier)
notifier.__aexit__ = AsyncMock(return_value=False)
return notifier
def mock_settings(self):
"""Mock settings with notification providers."""
with patch("module.notification.manager.settings") as mock:
mock.notification.providers = []
yield mock
@pytest.fixture
def post_notification(self, mock_notifier):
"""Create PostNotification with mocked notifier."""
with patch("module.notification.notification.settings") as mock_settings:
mock_settings.notification.type = "telegram"
mock_settings.notification.token = "test_token"
mock_settings.notification.chat_id = "12345"
with patch(
"module.notification.notification.getClient"
) as mock_get_client:
MockClass = MagicMock()
MockClass.return_value = mock_notifier
mock_get_client.return_value = MockClass
pn = PostNotification()
pn.notifier = mock_notifier
return pn
def test_empty_providers(self, mock_settings):
"""Manager handles empty provider list."""
manager = NotificationManager()
assert len(manager) == 0
def test_load_single_provider(self, mock_settings):
"""Manager loads a single enabled provider."""
config = ProviderConfig(type="telegram", enabled=True, token="test", chat_id="123")
mock_settings.notification.providers = [config]
manager = NotificationManager()
assert len(manager) == 1
assert isinstance(manager.providers[0], TelegramProvider)
def test_skip_disabled_provider(self, mock_settings):
"""Manager skips disabled providers."""
config = ProviderConfig(type="telegram", enabled=False, token="test", chat_id="123")
mock_settings.notification.providers = [config]
manager = NotificationManager()
assert len(manager) == 0
def test_load_multiple_providers(self, mock_settings):
"""Manager loads multiple enabled providers."""
configs = [
ProviderConfig(type="telegram", enabled=True, token="test", chat_id="123"),
ProviderConfig(type="discord", enabled=True, webhook_url="https://discord.com/webhook"),
ProviderConfig(type="bark", enabled=True, device_key="device123"),
]
mock_settings.notification.providers = configs
manager = NotificationManager()
assert len(manager) == 3
assert isinstance(manager.providers[0], TelegramProvider)
assert isinstance(manager.providers[1], DiscordProvider)
assert isinstance(manager.providers[2], BarkProvider)
def test_skip_unknown_provider(self, mock_settings):
"""Manager skips unknown provider types."""
configs = [
ProviderConfig(type="telegram", enabled=True, token="test", chat_id="123"),
ProviderConfig(type="unknown_service", enabled=True),
]
mock_settings.notification.providers = configs
manager = NotificationManager()
assert len(manager) == 1
async def test_send_all(self, mock_settings):
"""Manager sends to all providers."""
configs = [
ProviderConfig(type="telegram", enabled=True, token="test", chat_id="123"),
ProviderConfig(type="discord", enabled=True, webhook_url="https://discord.com/webhook"),
]
mock_settings.notification.providers = configs
manager = NotificationManager()
# Mock the providers
for provider in manager.providers:
provider.send = AsyncMock(return_value=True)
provider.__aenter__ = AsyncMock(return_value=provider)
provider.__aexit__ = AsyncMock(return_value=None)
async def test_send_msg_success(self, post_notification, mock_notifier):
"""send_msg calls notifier.post_msg and succeeds."""
notify = Notification(official_title="Test Anime", season=1, episode=5)
with patch.object(PostNotification, "_get_poster_sync"):
result = await post_notification.send_msg(notify)
with patch.object(manager, "_get_poster", new_callable=AsyncMock):
await manager.send_all(notify)
mock_notifier.post_msg.assert_called_once_with(notify)
for provider in manager.providers:
provider.send.assert_called_once_with(notify)
async def test_send_msg_failure_no_crash(self, post_notification, mock_notifier):
"""send_msg catches exceptions and returns False."""
mock_notifier.post_msg.side_effect = Exception("Network error")
async def test_test_provider(self, mock_settings):
"""Manager can test a specific provider."""
config = ProviderConfig(type="telegram", enabled=True, token="test", chat_id="123")
mock_settings.notification.providers = [config]
manager = NotificationManager()
# Mock the provider's test method
manager.providers[0].test = AsyncMock(return_value=(True, "Test successful"))
manager.providers[0].__aenter__ = AsyncMock(return_value=manager.providers[0])
manager.providers[0].__aexit__ = AsyncMock(return_value=None)
success, message = await manager.test_provider(0)
assert success is True
assert message == "Test successful"
async def test_test_provider_invalid_index(self, mock_settings):
"""Manager handles invalid provider index."""
mock_settings.notification.providers = []
manager = NotificationManager()
success, message = await manager.test_provider(5)
assert success is False
assert "Invalid provider index" in message
# ---------------------------------------------------------------------------
# Provider Implementations
# ---------------------------------------------------------------------------
class TestTelegramProvider:
@pytest.fixture
def provider(self):
config = ProviderConfig(type="telegram", enabled=True, token="test_token", chat_id="12345")
return TelegramProvider(config)
async def test_send_with_photo(self, provider):
"""Sends photo when poster available."""
notify = Notification(
official_title="Test Anime", season=1, episode=5, poster_path="/path/to/poster.jpg"
)
with patch.object(provider, "post_files", new_callable=AsyncMock) as mock_post:
mock_post.return_value = MagicMock(status_code=200)
with patch("module.notification.providers.telegram.load_image") as mock_load:
mock_load.return_value = b"image_data"
result = await provider.send(notify)
assert result is True
mock_post.assert_called_once()
async def test_send_without_photo(self, provider):
"""Sends text when no poster available."""
notify = Notification(official_title="Test Anime", season=1, episode=5)
with patch.object(PostNotification, "_get_poster_sync"):
result = await post_notification.send_msg(notify)
with patch.object(provider, "post_data", new_callable=AsyncMock) as mock_post:
mock_post.return_value = MagicMock(status_code=200)
with patch("module.notification.providers.telegram.load_image") as mock_load:
mock_load.return_value = None
result = await provider.send(notify)
assert result is False
assert result is True
mock_post.assert_called_once()
def test_get_poster_sync_sets_path(self):
"""_get_poster_sync queries DB and sets poster_path on notification."""
notify = Notification(official_title="My Anime", season=1, episode=1)
async def test_test_success(self, provider):
"""Test method sends test message."""
with patch.object(provider, "post_data", new_callable=AsyncMock) as mock_post:
mock_post.return_value = MagicMock(status_code=200)
success, message = await provider.test()
with patch("module.notification.notification.Database") as MockDB:
mock_db = MagicMock()
mock_db.bangumi.match_poster.return_value = "/posters/my_anime.jpg"
MockDB.return_value.__enter__ = MagicMock(return_value=mock_db)
MockDB.return_value.__exit__ = MagicMock(return_value=False)
PostNotification._get_poster_sync(notify)
assert success is True
assert "successfully" in message.lower()
assert notify.poster_path == "/posters/my_anime.jpg"
def test_get_poster_sync_empty_when_not_found(self):
"""_get_poster_sync sets empty string when no poster found in DB."""
notify = Notification(official_title="Unknown", season=1, episode=1)
class TestDiscordProvider:
@pytest.fixture
def provider(self):
config = ProviderConfig(
type="discord", enabled=True, webhook_url="https://discord.com/api/webhooks/123"
)
return DiscordProvider(config)
with patch("module.notification.notification.Database") as MockDB:
mock_db = MagicMock()
mock_db.bangumi.match_poster.return_value = ""
MockDB.return_value.__enter__ = MagicMock(return_value=mock_db)
MockDB.return_value.__exit__ = MagicMock(return_value=False)
PostNotification._get_poster_sync(notify)
async def test_send(self, provider):
"""Sends embed message."""
notify = Notification(
official_title="Test Anime", season=1, episode=5, poster_path="https://example.com/poster.jpg"
)
assert notify.poster_path == ""
with patch.object(provider, "post_data", new_callable=AsyncMock) as mock_post:
mock_post.return_value = MagicMock(status_code=204)
result = await provider.send(notify)
assert result is True
call_args = mock_post.call_args[0]
assert "embeds" in call_args[1]
class TestBarkProvider:
@pytest.fixture
def provider(self):
config = ProviderConfig(type="bark", enabled=True, device_key="device123")
return BarkProvider(config)
async def test_send(self, provider):
"""Sends push notification."""
notify = Notification(official_title="Test Anime", season=1, episode=5)
with patch.object(provider, "post_data", new_callable=AsyncMock) as mock_post:
mock_post.return_value = MagicMock(status_code=200)
result = await provider.send(notify)
assert result is True
call_args = mock_post.call_args[0]
assert "device_key" in call_args[1]
class TestWebhookProvider:
@pytest.fixture
def provider(self):
config = ProviderConfig(
type="webhook",
enabled=True,
url="https://example.com/webhook",
template='{"anime": "{{title}}", "ep": {{episode}}}',
)
return WebhookProvider(config)
def test_render_template(self, provider):
"""Template rendering replaces variables."""
notify = Notification(official_title="Test Anime", season=1, episode=5)
result = provider._render_template(notify)
assert result["anime"] == "Test Anime"
assert result["ep"] == 5
async def test_send(self, provider):
"""Sends custom payload."""
notify = Notification(official_title="Test Anime", season=1, episode=5)
with patch.object(provider, "post_data", new_callable=AsyncMock) as mock_post:
mock_post.return_value = MagicMock(status_code=200)
result = await provider.send(notify)
assert result is True
# ---------------------------------------------------------------------------
# Config Migration
# ---------------------------------------------------------------------------
class TestConfigMigration:
def test_legacy_config_migration(self):
"""Old single-provider config migrates to new format."""
from module.models.config import Notification as NotificationConfig
# Old format
old_config = NotificationConfig(
enable=True,
type="telegram",
token="old_token",
chat_id="old_chat_id",
)
# Should have migrated to new format
assert len(old_config.providers) == 1
assert old_config.providers[0].type == "telegram"
assert old_config.providers[0].enabled is True
def test_new_config_no_migration(self):
"""New format with providers doesn't trigger migration."""
from module.models.config import Notification as NotificationConfig
provider = ProviderConfig(type="discord", enabled=True, webhook_url="https://discord.com/webhook")
new_config = NotificationConfig(
enable=True,
providers=[provider],
)
assert len(new_config.providers) == 1
assert new_config.providers[0].type == "discord"

View File

@@ -0,0 +1,51 @@
import type { NotificationProviderConfig, NotificationType } from '#/config';
import type { TupleToUnion } from '#/utils';
export interface TestProviderRequest {
provider_index: number;
}
export interface TestProviderConfigRequest {
type: TupleToUnion<NotificationType>;
enabled?: boolean;
token?: string;
chat_id?: string;
webhook_url?: string;
server_url?: string;
device_key?: string;
user_key?: string;
api_token?: string;
template?: string;
url?: string;
}
export interface TestResponse {
success: boolean;
message: string;
message_zh: string;
message_en: string;
}
export const apiNotification = {
/**
* Test a configured provider by index
*/
async testProvider(request: TestProviderRequest) {
const { data } = await axios.post<TestResponse>(
'api/v1/notification/test',
request
);
return { data };
},
/**
* Test an unsaved provider configuration
*/
async testProviderConfig(request: TestProviderConfigRequest) {
const { data } = await axios.post<TestResponse>(
'api/v1/notification/test-config',
request
);
return { data };
},
};

View File

@@ -1,64 +1,479 @@
<script lang="ts" setup>
import type { Notification, NotificationType } from '#/config';
import type { SettingItem } from '#/components';
import type { NotificationProviderConfig, NotificationType } from '#/config';
import type { TupleToUnion } from '#/utils';
import { apiNotification } from '@/api/notification';
const { t } = useMyI18n();
const { getSettingGroup } = useConfigStore();
const notification = getSettingGroup('notification');
const notificationType: NotificationType = [
'telegram',
'server-chan',
'bark',
'wecom',
const notificationRef = getSettingGroup('notification');
// Provider types with display names
const providerTypes: { value: TupleToUnion<NotificationType>; label: string }[] = [
{ value: 'telegram', label: 'Telegram' },
{ value: 'discord', label: 'Discord' },
{ value: 'bark', label: 'Bark' },
{ value: 'server-chan', label: 'Server Chan' },
{ value: 'wecom', label: 'WeChat Work' },
{ value: 'gotify', label: 'Gotify' },
{ value: 'pushover', label: 'Pushover' },
{ value: 'webhook', label: 'Webhook' },
];
const items: SettingItem<Notification>[] = [
{
configKey: 'enable',
label: () => t('config.notification_set.enable'),
type: 'switch',
bottomLine: true,
},
{
configKey: 'type',
label: () => t('config.notification_set.type'),
type: 'select',
css: 'w-140',
prop: {
items: notificationType,
// Provider field configurations
const providerFields: Record<
string,
{ key: keyof NotificationProviderConfig; label: string; placeholder: string }[]
> = {
telegram: [
{ key: 'token', label: 'Bot Token', placeholder: 'bot token' },
{ key: 'chat_id', label: 'Chat ID', placeholder: 'chat id' },
],
discord: [
{
key: 'webhook_url',
label: 'Webhook URL',
placeholder: 'https://discord.com/api/webhooks/...',
},
},
{
configKey: 'token',
label: () => t('config.notification_set.token'),
type: 'input',
prop: {
type: 'text',
placeholder: 'token',
],
bark: [
{ key: 'device_key', label: 'Device Key', placeholder: 'device key' },
{
key: 'server_url',
label: 'Server URL (optional)',
placeholder: 'https://api.day.app',
},
},
{
configKey: 'chat_id',
label: () => t('config.notification_set.chat_id'),
type: 'input',
prop: {
type: 'text',
placeholder: 'chat id',
],
'server-chan': [{ key: 'token', label: 'SendKey', placeholder: 'sendkey' }],
wecom: [
{ key: 'webhook_url', label: 'Webhook URL', placeholder: 'webhook url' },
{ key: 'token', label: 'Key', placeholder: 'key' },
],
gotify: [
{
key: 'server_url',
label: 'Server URL',
placeholder: 'https://gotify.example.com',
},
{ key: 'token', label: 'App Token', placeholder: 'app token' },
],
pushover: [
{ key: 'user_key', label: 'User Key', placeholder: 'user key' },
{ key: 'api_token', label: 'API Token', placeholder: 'api token' },
],
webhook: [
{
key: 'url',
label: 'Webhook URL',
placeholder: 'https://example.com/webhook',
},
{
key: 'template',
label: 'Template (JSON)',
placeholder: '{"title": "{{title}}", "episode": {{episode}}}',
},
],
};
// Dialog state
const showAddDialog = ref(false);
const showEditDialog = ref(false);
const editingIndex = ref(-1);
const newProvider = ref<NotificationProviderConfig>({
type: 'telegram',
enabled: true,
});
// Testing state
const testingIndex = ref(-1);
const testResult = ref<{ success: boolean; message: string } | null>(null);
// Computed properties to access notification settings
const notificationEnabled = computed({
get: () => notificationRef.value.enable,
set: (val) => {
notificationRef.value.enable = val;
},
];
});
const providers = computed({
get: () => notificationRef.value.providers || [],
set: (val) => {
notificationRef.value.providers = val;
},
});
// Initialize providers array if not exists
if (!notificationRef.value.providers) {
notificationRef.value.providers = [];
}
function getProviderLabel(type: string): string {
return providerTypes.find((p) => p.value === type)?.label || type;
}
function getProviderIcon(type: string): string {
const icons: Record<string, string> = {
telegram: 'i-simple-icons-telegram',
discord: 'i-simple-icons-discord',
bark: 'i-carbon-notification',
'server-chan': 'i-simple-icons-wechat',
wecom: 'i-simple-icons-wechat',
gotify: 'i-carbon-notification-filled',
pushover: 'i-carbon-mobile',
webhook: 'i-carbon-webhook',
};
return icons[type] || 'i-carbon-notification';
}
function openAddDialog() {
newProvider.value = {
type: 'telegram',
enabled: true,
};
testResult.value = null;
showAddDialog.value = true;
}
function openEditDialog(index: number) {
editingIndex.value = index;
newProvider.value = { ...providers.value[index] };
testResult.value = null;
showEditDialog.value = true;
}
function addProvider() {
const newProviders = [...providers.value, { ...newProvider.value }];
providers.value = newProviders;
showAddDialog.value = false;
}
function saveProvider() {
if (editingIndex.value >= 0) {
const newProviders = [...providers.value];
newProviders[editingIndex.value] = { ...newProvider.value };
providers.value = newProviders;
}
showEditDialog.value = false;
editingIndex.value = -1;
}
function removeProvider(index: number) {
const newProviders = providers.value.filter((_, i) => i !== index);
providers.value = newProviders;
}
function toggleProvider(index: number) {
const newProviders = [...providers.value];
newProviders[index] = {
...newProviders[index],
enabled: !newProviders[index].enabled,
};
providers.value = newProviders;
}
async function testProvider(index: number) {
testingIndex.value = index;
testResult.value = null;
try {
const response = await apiNotification.testProvider({ provider_index: index });
testResult.value = {
success: response.data.success,
message: response.data.message_zh || response.data.message,
};
} catch (error: any) {
testResult.value = {
success: false,
message: error.message || 'Test failed',
};
} finally {
testingIndex.value = -1;
}
}
async function testNewProvider() {
testingIndex.value = -999; // Special index for new provider
testResult.value = null;
try {
const response = await apiNotification.testProviderConfig(
newProvider.value as any
);
testResult.value = {
success: response.data.success,
message: response.data.message_zh || response.data.message,
};
} catch (error: any) {
testResult.value = {
success: false,
message: error.message || 'Test failed',
};
} finally {
testingIndex.value = -1;
}
}
function getFieldsForType(type: string) {
return providerFields[type] || [];
}
</script>
<template>
<ab-fold-panel :title="$t('config.notification_set.title')">
<div space-y-8>
<div space-y-4>
<!-- Global enable switch -->
<ab-setting
v-for="i in items"
:key="i.configKey"
v-bind="i"
v-model:data="notification[i.configKey]"
></ab-setting>
config-key="enable"
:label="() => t('config.notification_set.enable')"
type="switch"
v-model:data="notificationEnabled"
bottom-line
/>
<!-- Provider list -->
<div v-if="notificationEnabled" space-y-3>
<div
v-for="(provider, index) in providers"
:key="index"
class="provider-card"
:class="{ 'provider-disabled': !provider.enabled }"
>
<div flex items-center gap-3>
<div :class="getProviderIcon(provider.type)" text-xl />
<div flex-1>
<div font-medium>{{ getProviderLabel(provider.type) }}</div>
<div text-sm op-60>
{{
provider.enabled
? $t('config.notification_set.enabled')
: $t('config.notification_set.disabled')
}}
</div>
</div>
<div flex items-center gap-2>
<button
class="btn-icon"
:disabled="testingIndex === index"
:title="$t('config.notification_set.test')"
@click="testProvider(index)"
>
<div
v-if="testingIndex === index"
i-carbon-circle-dash
animate-spin
/>
<div v-else i-carbon-play />
</button>
<button
class="btn-icon"
:title="$t('config.notification_set.edit')"
@click="openEditDialog(index)"
>
<div i-carbon-edit />
</button>
<button
class="btn-icon"
:title="
provider.enabled
? $t('config.notification_set.disable')
: $t('config.notification_set.enable_provider')
"
@click="toggleProvider(index)"
>
<div
:class="provider.enabled ? 'i-carbon-view' : 'i-carbon-view-off'"
/>
</button>
<button
class="btn-icon btn-danger"
:title="$t('config.notification_set.remove')"
@click="removeProvider(index)"
>
<div i-carbon-trash-can />
</button>
</div>
</div>
</div>
<!-- Test result message -->
<div
v-if="testResult"
class="test-result"
:class="testResult.success ? 'test-success' : 'test-error'"
>
{{ testResult.message }}
</div>
<!-- Add provider button -->
<button class="btn-add" @click="openAddDialog">
<div i-carbon-add />
{{ $t('config.notification_set.add_provider') }}
</button>
</div>
</div>
<!-- Add Dialog -->
<ab-dialog
v-model:visible="showAddDialog"
:title="$t('config.notification_set.add_provider')"
@confirm="addProvider"
>
<div space-y-4>
<div>
<label class="field-label">{{ $t('config.notification_set.type') }}</label>
<select v-model="newProvider.type" class="field-input">
<option v-for="pt in providerTypes" :key="pt.value" :value="pt.value">
{{ pt.label }}
</option>
</select>
</div>
<div v-for="field in getFieldsForType(newProvider.type)" :key="field.key">
<label class="field-label">{{ field.label }}</label>
<input
v-if="field.key !== 'template'"
v-model="(newProvider as any)[field.key]"
:placeholder="field.placeholder"
class="field-input"
/>
<textarea
v-else
v-model="(newProvider as any)[field.key]"
:placeholder="field.placeholder"
class="field-input field-textarea"
rows="3"
/>
</div>
<div flex items-center justify-between>
<button
class="btn-test"
:disabled="testingIndex === -999"
@click="testNewProvider"
>
<div
v-if="testingIndex === -999"
i-carbon-circle-dash
animate-spin
/>
<div v-else i-carbon-play />
{{ $t('config.notification_set.test') }}
</button>
<div
v-if="testResult"
:class="testResult.success ? 'text-green-500' : 'text-red-500'"
text-sm
>
{{ testResult.message }}
</div>
</div>
</div>
</ab-dialog>
<!-- Edit Dialog -->
<ab-dialog
v-model:visible="showEditDialog"
:title="$t('config.notification_set.edit_provider')"
@confirm="saveProvider"
>
<div space-y-4>
<div>
<label class="field-label">{{ $t('config.notification_set.type') }}</label>
<select v-model="newProvider.type" class="field-input" disabled>
<option v-for="pt in providerTypes" :key="pt.value" :value="pt.value">
{{ pt.label }}
</option>
</select>
</div>
<div v-for="field in getFieldsForType(newProvider.type)" :key="field.key">
<label class="field-label">{{ field.label }}</label>
<input
v-if="field.key !== 'template'"
v-model="(newProvider as any)[field.key]"
:placeholder="field.placeholder"
class="field-input"
/>
<textarea
v-else
v-model="(newProvider as any)[field.key]"
:placeholder="field.placeholder"
class="field-input field-textarea"
rows="3"
/>
</div>
<div flex items-center justify-between>
<button
class="btn-test"
:disabled="testingIndex === -999"
@click="testNewProvider"
>
<div
v-if="testingIndex === -999"
i-carbon-circle-dash
animate-spin
/>
<div v-else i-carbon-play />
{{ $t('config.notification_set.test') }}
</button>
<div
v-if="testResult"
:class="testResult.success ? 'text-green-500' : 'text-red-500'"
text-sm
>
{{ testResult.message }}
</div>
</div>
</div>
</ab-dialog>
</ab-fold-panel>
</template>
<style scoped>
.provider-card {
@apply p-3 rounded-lg bg-gray-100 dark:bg-gray-800 transition-opacity;
}
.provider-disabled {
@apply opacity-50;
}
.btn-icon {
@apply p-2 rounded hover:bg-gray-200 dark:hover:bg-gray-700 transition-colors;
}
.btn-danger {
@apply hover:bg-red-100 dark:hover:bg-red-900 hover:text-red-500;
}
.btn-add {
@apply w-full p-3 rounded-lg border-2 border-dashed border-gray-300 dark:border-gray-600
flex items-center justify-center gap-2 hover:border-primary hover:text-primary
transition-colors cursor-pointer;
}
.btn-test {
@apply px-3 py-1.5 rounded bg-primary text-white flex items-center gap-2 hover:bg-primary-dark
disabled:opacity-50 disabled:cursor-not-allowed transition-colors;
}
.field-label {
@apply block text-sm font-medium mb-1;
}
.field-input {
@apply w-full p-2 rounded border border-gray-300 dark:border-gray-600 bg-white dark:bg-gray-800;
}
.field-textarea {
@apply resize-none font-mono text-sm;
}
.test-result {
@apply p-2 rounded text-sm;
}
.test-success {
@apply bg-green-100 dark:bg-green-900 text-green-700 dark:text-green-300;
}
.test-error {
@apply bg-red-100 dark:bg-red-900 text-red-700 dark:text-red-300;
}
</style>

View File

@@ -43,11 +43,22 @@
"web_port": "WebUI Port"
},
"notification_set": {
"chat_id": "Chat ID",
"enable": "Enable",
"title": "Notification Setting",
"enable": "Enable",
"enabled": "Enabled",
"disabled": "Disabled",
"type": "Type",
"token": "Token",
"type": "Type"
"chat_id": "Chat ID",
"add_provider": "Add Provider",
"edit_provider": "Edit Provider",
"remove": "Remove",
"edit": "Edit",
"test": "Test",
"enable_provider": "Enable",
"disable": "Disable",
"test_success": "Test successful",
"test_failed": "Test failed"
},
"parser_set": {
"enable": "Enable",

View File

@@ -43,11 +43,22 @@
"web_port": "网页端口"
},
"notification_set": {
"chat_id": "Chat ID",
"enable": "启用",
"title": "通知设置",
"enable": "启用",
"enabled": "已启用",
"disabled": "已禁用",
"type": "类型",
"token": "Token",
"type": "类型"
"chat_id": "Chat ID",
"add_provider": "添加通知服务",
"edit_provider": "编辑通知服务",
"remove": "删除",
"edit": "编辑",
"test": "测试",
"enable_provider": "启用",
"disable": "禁用",
"test_success": "测试成功",
"test_failed": "测试失败"
},
"parser_set": {
"enable": "启用",

View File

@@ -13,7 +13,16 @@ export type RenameMethod = ['normal', 'pn', 'advance', 'none'];
/** 代理类型 */
export type ProxyType = ['http', 'https', 'socks5'];
/** 通知类型 */
export type NotificationType = ['telegram', 'server-chan', 'bark', 'wecom'];
export type NotificationType = [
'telegram',
'discord',
'bark',
'server-chan',
'wecom',
'gotify',
'pushover',
'webhook',
];
/** OpenAI Model List */
export type OpenAIModel = ['gpt-4o', 'gpt-4o-mini', 'gpt-3.5-turbo'];
/** OpenAI API Type */
@@ -60,11 +69,30 @@ export interface Proxy {
username: string;
password: string;
}
/** Notification provider configuration */
export interface NotificationProviderConfig {
type: TupleToUnion<NotificationType>;
enabled: boolean;
// Common fields
token?: string;
chat_id?: string;
// Provider-specific fields
webhook_url?: string;
server_url?: string;
device_key?: string;
user_key?: string;
api_token?: string;
template?: string;
url?: string;
}
export interface Notification {
enable: boolean;
type: 'telegram' | 'server-chan' | 'bark' | 'wecom';
token: string;
chat_id: string;
providers: NotificationProviderConfig[];
// Legacy fields (deprecated, for backward compatibility)
type?: 'telegram' | 'server-chan' | 'bark' | 'wecom';
token?: string;
chat_id?: string;
}
export interface ExperimentalOpenAI {
enable: boolean;
@@ -131,9 +159,7 @@ export const initConfig: Config = {
},
notification: {
enable: false,
type: 'telegram',
token: '',
chat_id: '',
providers: [],
},
experimental_openai: {
enable: false,