Files
MoviePilot/app/db/systemconfig_oper.py
2025-06-03 23:08:58 +08:00

108 lines
3.5 KiB
Python

from typing import Any, Union
from app.db import DbOper
from app.db.models.systemconfig import SystemConfig
from app.schemas.types import SystemConfigKey, EventType
from app.utils.singleton import Singleton
from app.schemas import ConfigChangeEventData
class SystemConfigOper(DbOper, metaclass=Singleton):
# 配置对象
__SYSTEMCONF: dict = {}
def __init__(self):
"""
加载配置到内存
"""
super().__init__()
for item in SystemConfig.list(self._db):
self.__SYSTEMCONF[item.key] = item.value
def set(self, key: Union[str, SystemConfigKey], value: Any):
"""
设置系统设置
"""
if isinstance(key, SystemConfigKey):
key = key.value
# 旧值
old_value = self.__SYSTEMCONF.get(key)
# 更新内存
self.__SYSTEMCONF[key] = value
conf = SystemConfig.get_by_key(self._db, key)
if conf:
if value:
conf.update(self._db, {"value": value})
# 发送配置变更通知
if old_value != value:
from app.core.event import eventmanager
eventmanager.send_event(etype=EventType.ConfigChanged, data=ConfigChangeEventData(
key=key,
old_value=old_value,
new_value=value,
change_type="update"
))
else:
conf.delete(self._db, conf.id)
# 发送配置删除通知
from app.core.event import eventmanager
eventmanager.send_event(etype=EventType.ConfigChanged, data=ConfigChangeEventData(
key=key,
old_value=old_value,
new_value=value,
change_type="delete"
))
else:
conf = SystemConfig(key=key, value=value)
conf.create(self._db)
# 发送配置变更通知
from app.core.event import eventmanager
eventmanager.send_event(etype=EventType.ConfigChanged, data=ConfigChangeEventData(
key=key,
old_value=old_value,
new_value=value,
change_type="add"
))
def get(self, key: Union[str, SystemConfigKey] = None) -> Any:
"""
获取系统设置
"""
if isinstance(key, SystemConfigKey):
key = key.value
if not key:
return self.__SYSTEMCONF
return self.__SYSTEMCONF.get(key)
def all(self):
"""
获取所有系统设置
"""
return self.__SYSTEMCONF or {}
def delete(self, key: Union[str, SystemConfigKey]):
"""
删除系统设置
"""
if isinstance(key, SystemConfigKey):
key = key.value
# 更新内存
old_value = self.__SYSTEMCONF.pop(key, None)
# 写入数据库
conf = SystemConfig.get_by_key(self._db, key)
if conf:
conf.delete(self._db, conf.id)
# 发送配置变更通知
from app.core.event import eventmanager
eventmanager.send_event(etype=EventType.ConfigChanged, data=ConfigChangeEventData(
key=key,
old_value=old_value,
new_value=None,
change_type="delete"
))
return True
def __del__(self):
if self._db:
self._db.close()