Merge pull request #5352 from cddjr/fix_concurrency_systemconfig

This commit is contained in:
jxxghp
2026-01-13 16:58:21 +08:00
committed by GitHub
2 changed files with 100 additions and 41 deletions

View File

@@ -1,4 +1,6 @@
import asyncio
import copy
import threading
from typing import Any, Optional, Union
from app.db import DbOper
@@ -17,6 +19,8 @@ class SystemConfigOper(DbOper, metaclass=Singleton):
"""
super().__init__()
self.__SYSTEMCONF = {}
self._rlock = threading.RLock()
self._alock = asyncio.Lock()
for item in SystemConfig.list(self._db):
self.__SYSTEMCONF[item.key] = item.value
@@ -29,23 +33,24 @@ class SystemConfigOper(DbOper, metaclass=Singleton):
"""
if isinstance(key, SystemConfigKey):
key = key.value
# 旧值
old_value = self.__SYSTEMCONF.get(key)
# 更新内存(deepcopy避免内存共享)
self.__SYSTEMCONF[key] = copy.deepcopy(value)
conf = SystemConfig.get_by_key(self._db, key)
if conf:
if old_value != value:
if value:
conf.update(self._db, {"value": value})
else:
conf.delete(self._db, conf.id)
with self._rlock:
# 旧值
old_value = self.__SYSTEMCONF.get(key)
# 更新内存(deepcopy避免内存共享)
self.__SYSTEMCONF[key] = copy.deepcopy(value)
conf = SystemConfig.get_by_key(self._db, key)
if conf:
if old_value != value:
if value:
conf.update(self._db, {"value": value})
else:
conf.delete(self._db, conf.id)
return True
return None
else:
conf = SystemConfig(key=key, value=value)
conf.create(self._db)
return True
return None
else:
conf = SystemConfig(key=key, value=value)
conf.create(self._db)
return True
async def async_set(self, key: Union[str, SystemConfigKey], value: Any) -> Optional[bool]:
"""
@@ -56,22 +61,32 @@ class SystemConfigOper(DbOper, metaclass=Singleton):
"""
if isinstance(key, SystemConfigKey):
key = key.value
# 旧值
old_value = self.__SYSTEMCONF.get(key)
# 更新内存(deepcopy避免内存共享)
self.__SYSTEMCONF[key] = copy.deepcopy(value)
conf = await SystemConfig.async_get_by_key(self._db, key)
if conf:
if old_value != value:
async with self._alock:
conf = await SystemConfig.async_get_by_key(self._db, key)
# 确定是否需要更新数据库
needs_db_update = False
if conf:
if conf.value != value:
needs_db_update = True
else: # 记录不存在,总是需要创建/更新
needs_db_update = True
if not needs_db_update:
# 即使数据库值相同,也要确保缓存同步
with self._rlock:
self.__SYSTEMCONF[key] = copy.deepcopy(value)
return None
# 执行数据库更新
if conf:
if value:
conf.update(self._db, {"value": value})
await conf.async_update(self._db, {"value": value})
else:
conf.delete(self._db, conf.id)
return True
return None
else:
conf = SystemConfig(key=key, value=value)
await conf.async_create(self._db)
await conf.async_delete(self._db, conf.id)
else:
conf = SystemConfig(key=key, value=value)
await conf.async_create(self._db)
# 数据库更新成功后,再更新缓存
with self._rlock:
self.__SYSTEMCONF[key] = copy.deepcopy(value)
return True
def get(self, key: Union[str, SystemConfigKey] = None) -> Any:
@@ -82,15 +97,17 @@ class SystemConfigOper(DbOper, metaclass=Singleton):
key = key.value
if not key:
return self.all()
# 避免将__SYSTEMCONF内的值引用出去会导致set时误判没有变动
return copy.deepcopy(self.__SYSTEMCONF.get(key))
with self._rlock:
# 避免将__SYSTEMCONF内的值引用出去会导致set时误判没有变动
return copy.deepcopy(self.__SYSTEMCONF.get(key))
def all(self):
"""
获取所有系统设置
"""
# 避免将__SYSTEMCONF内的值引用出去会导致set时误判没有变动
return copy.deepcopy(self.__SYSTEMCONF)
with self._rlock:
# 避免将__SYSTEMCONF内的值引用出去会导致set时误判没有变动
return copy.deepcopy(self.__SYSTEMCONF)
def delete(self, key: Union[str, SystemConfigKey]) -> bool:
"""
@@ -98,10 +115,11 @@ class SystemConfigOper(DbOper, metaclass=Singleton):
"""
if isinstance(key, SystemConfigKey):
key = key.value
# 更新内存
self.__SYSTEMCONF.pop(key, None)
# 写入数据库
conf = SystemConfig.get_by_key(self._db, key)
if conf:
conf.delete(self._db, conf.id)
return True
with self._rlock:
# 更新内存
self.__SYSTEMCONF.pop(key, None)
# 写入数据库
conf = SystemConfig.get_by_key(self._db, key)
if conf:
conf.delete(self._db, conf.id)
return True

View File

@@ -0,0 +1,41 @@
"""2.2.2
Revision ID: 41ef1dd7467c
Revises: a946dae52526
Create Date: 2026-01-13 13:02:41.614029
"""
from app.db import ScopedSession
from app.db.models.systemconfig import SystemConfig
from app.log import logger
# revision identifiers, used by Alembic.
revision = "41ef1dd7467c"
down_revision = "a946dae52526"
branch_labels = None
depends_on = None
def upgrade() -> None:
# systemconfig表 去重
with ScopedSession() as db:
try:
seen_keys = set()
# 按ID降序查询以便保留最新的配置
for item in db.query(SystemConfig).order_by(SystemConfig.id.desc()).all():
if item.key in seen_keys:
logger.warn(
f"已删除重复的SystemConfig项{item.key} 值:{item.value}"
)
db.delete(item)
else:
seen_keys.add(item.key)
db.commit()
except Exception as e:
logger.error(e)
db.rollback()
def downgrade() -> None:
pass