refactor: polling_observer

This commit is contained in:
wumode
2025-07-27 12:37:27 +08:00
parent 721648ffdf
commit 68c29d89c9
5 changed files with 69 additions and 47 deletions

View File

@@ -1382,7 +1382,8 @@ class TransferChain(ChainBase, metaclass=Singleton):
mediainfo: MediaInfo = MediaChain().recognize_media(tmdbid=tmdbid, doubanid=doubanid,
mtype=mtype, episode_group=episode_group)
if not mediainfo:
return False, f"媒体信息识别失败tmdbid{tmdbid}doubanid{doubanid}type: {mtype.value}"
return (False,
f"媒体信息识别失败tmdbid{tmdbid}doubanid{doubanid}type: {mtype.value if mtype else None}")
else:
# 更新媒体图片
self.obtain_images(mediainfo=mediainfo)

View File

@@ -313,6 +313,8 @@ class ConfigModel(BaseModel):
WORKFLOW_STATISTIC_SHARE: bool = True
# 对rclone进行快照对比时是否检查文件夹的修改时间
RCLONE_SNAPSHOT_CHECK_FOLDER_MODTIME = True
# 对OpenList进行快照对比时是否检查文件夹的修改时间
OPENLIST_SNAPSHOT_CHECK_FOLDER_MODTIME = True
class Settings(BaseSettings, ConfigModel, LogConfigModel):
"""

View File

@@ -227,11 +227,13 @@ class StorageBase(metaclass=ABCMeta):
__snapshot_file(sub_file, current_depth + 1)
else:
# 记录文件的完整信息用于比对
files_info[_fileitm.path] = {
'size': _fileitm.size or 0,
'modify_time': getattr(_fileitm, 'modify_time', 0),
'type': _fileitm.type
}
if getattr(_fileitm, 'modify_time', 0) > last_snapshot_time:
files_info[_fileitm.path] = {
'size': _fileitm.size or 0,
'modify_time': getattr(_fileitm, 'modify_time', 0),
'type': _fileitm.type
}
except Exception as e:
logger.debug(f"Snapshot error for {_fileitm.path}: {e}")

View File

@@ -31,6 +31,8 @@ class Alist(StorageBase, metaclass=WeakSingleton):
"move": "移动",
}
snapshot_check_folder_modtime = settings.OPENLIST_SNAPSHOT_CHECK_FOLDER_MODTIME
def __init__(self):
super().__init__()
@@ -586,6 +588,9 @@ class Alist(StorageBase, metaclass=WeakSingleton):
data=f,
)
if resp is None:
logger.warn(f"【OpenList】请求上传文件 {path} 失败")
return None
if resp.status_code != 200:
logger.warn(f"【OpenList】请求上传文件 {path} 失败,状态码:{resp.status_code}")
return None

View File

@@ -97,17 +97,22 @@ class Monitor(metaclass=Singleton):
logger.info("配置变更事件触发,重新初始化目录监控...")
self.init()
def save_snapshot(self, storage: str, snapshot: Dict, file_count: int = 0):
def save_snapshot(self, storage: str, snapshot: Dict, file_count: int = 0,
last_snapshot_time: Optional[float] = None):
"""
保存快照到文件
:param storage: 存储名称
:param snapshot: 快照数据
:param last_snapshot_time: 上次快照时间戳
:param file_count: 文件数量,用于调整监控间隔
"""
try:
cache_file = self._snapshot_cache_dir / f"{storage}_snapshot.json"
snapshot_time = max((item.get('modify_time', 0) for item in snapshot.values()), default=None)
if snapshot_time is None:
snapshot_time = last_snapshot_time or time.time()
snapshot_data = {
'timestamp': time.time(),
'timestamp': snapshot_time,
'file_count': file_count,
'snapshot': snapshot
}
@@ -436,6 +441,7 @@ class Monitor(metaclass=Singleton):
self._scheduler = BackgroundScheduler(timezone=settings.TZ)
messagehelper = MessageHelper()
mon_storages = {}
for mon_dir in monitor_dirs:
if not mon_dir.library_path:
logger.warn(f"跳过监控配置 {mon_dir.download_path}:未设置媒体库目录")
@@ -520,28 +526,33 @@ class Monitor(metaclass=Singleton):
messagehelper.put(f"启动本地目录监控失败: {mon_path}\n错误: {err_msg}", title="目录监控")
else:
# 远程目录监控 - 使用智能间隔
# 先尝试加载已有快照获取文件数量
snapshot_data = self.load_snapshot(mon_dir.storage)
file_count = snapshot_data.get('file_count', 0) if snapshot_data else 0
interval = self.adjust_monitor_interval(file_count)
if not mon_storages.get(mon_dir.storage):
mon_storages[mon_dir.storage] = []
mon_storages[mon_dir.storage].append(mon_path)
logger.info(f"正在启动远程目录监控: {mon_path} [{mon_dir.storage}]")
logger.info("*** 重要提示:远程目录监控只处理新增和修改的文件,不会处理监控启动前已存在的文件 ***")
logger.info(f"预估文件数量: {file_count}, 监控间隔: {interval}分钟")
for storage, paths in mon_storages.items():
# 远程目录监控 - 使用智能间隔
# 先尝试加载已有快照获取文件数量
snapshot_data = self.load_snapshot(storage)
file_count = snapshot_data.get('file_count', 0) if snapshot_data else 0
interval = self.adjust_monitor_interval(file_count)
for path in paths:
logger.info(f"正在启动远程目录监控: {path} [{storage}]")
logger.info("*** 重要提示:远程目录监控只处理新增和修改的文件,不会处理监控启动前已存在的文件 ***")
logger.info(f"预估文件数量: {file_count}, 监控间隔: {interval}分钟")
self._scheduler.add_job(
self.polling_observer,
'interval',
minutes=interval,
kwargs={
'storage': mon_dir.storage,
'mon_path': mon_path
},
id=f"monitor_{mon_dir.storage}_{mon_dir.download_path}",
replace_existing=True
)
logger.info(f"✓ 远程目录监控已启动: {mon_path} [间隔: {interval}分钟]")
self._scheduler.add_job(
self.polling_observer,
'interval',
minutes=interval,
kwargs={
'storage': storage,
'mon_paths': paths
},
id=f"monitor_{storage}",
replace_existing=True
)
logger.info(f"✓ 远程目录监控已启动: [间隔: {interval}分钟]")
# 启动定时服务
if self._scheduler.get_jobs():
@@ -612,14 +623,12 @@ class Monitor(metaclass=Singleton):
logger.debug(f"导入 {module_name}.{class_name} 失败: {e}")
return None
def polling_observer(self, storage: str, mon_path: Path):
def polling_observer(self, storage: str, mon_paths: List[Path]):
"""
轮询监控(改进版)
"""
with snapshot_lock:
try:
logger.debug(f"开始对 {storage}:{mon_path} 进行快照...")
# 加载上次快照数据
old_snapshot_data = self.load_snapshot(storage)
old_snapshot = old_snapshot_data.get('snapshot', {}) if old_snapshot_data else {}
@@ -627,21 +636,24 @@ class Monitor(metaclass=Singleton):
# 判断是否为首次快照:检查快照文件是否存在且有效
is_first_snapshot = old_snapshot_data is None
new_snapshot = {}
for mon_path in mon_paths:
logger.debug(f"开始对 {storage}:{mon_path} 进行快照...")
# 生成新快照(增量模式)
new_snapshot = StorageChain().snapshot_storage(
storage=storage,
path=mon_path,
last_snapshot_time=last_snapshot_time
)
if new_snapshot is None:
logger.warn(f"获取 {storage}:{mon_path} 快照失败")
return
# 生成新快照(增量模式)
snapshot = StorageChain().snapshot_storage(
storage=storage,
path=mon_path,
last_snapshot_time=last_snapshot_time
)
if snapshot is None:
logger.warn(f"获取 {storage}:{mon_path} 快照失败")
continue
new_snapshot.update(snapshot)
file_count = len(snapshot)
logger.info(f"{storage}:{mon_path} 快照完成,发现 {file_count} 个文件")
file_count = len(new_snapshot)
logger.info(f"{storage}:{mon_path} 快照完成,发现 {file_count} 个文件")
if not is_first_snapshot:
# 比较快照找出变化
changes = self.compare_snapshots(old_snapshot, new_snapshot)
@@ -662,15 +674,15 @@ class Monitor(metaclass=Singleton):
if changes['added'] or changes['modified']:
logger.info(
f"{storage}:{mon_path} 发现 {len(changes['added'])} 个新增文件,{len(changes['modified'])} 个修改文件")
f"{storage} 发现 {len(changes['added'])} 个新增文件,{len(changes['modified'])} 个修改文件")
else:
logger.debug(f"{storage}:{mon_path} 无文件变化")
logger.debug(f"{storage} 无文件变化")
else:
logger.info(f"{storage}:{mon_path} 首次快照完成,共 {file_count} 个文件")
logger.info(f"{storage} 首次快照完成,共 {file_count} 个文件")
logger.info("*** 首次快照仅建立基准,不会处理现有文件。后续监控将处理新增和修改的文件 ***")
# 保存新快照
self.save_snapshot(storage, new_snapshot, file_count)
self.save_snapshot(storage, new_snapshot, file_count, last_snapshot_time)
# 动态调整监控间隔
new_interval = self.adjust_monitor_interval(file_count)