add monitor

This commit is contained in:
jxxghp
2024-07-03 17:46:35 +08:00
parent db26f2e108
commit 9f34be049d
7 changed files with 524 additions and 15 deletions

View File

@@ -58,8 +58,14 @@ class StorageChain(ChainBase):
""" """
return self.run_module("rename_file", fileitem=fileitem, name=name) return self.run_module("rename_file", fileitem=fileitem, name=name)
def snapshot_storage(self, fileitem: schemas.FileItem) -> Optional[Dict]: def get_file_item(self, storage: str, path: Path) -> Optional[schemas.FileItem]:
"""
根据路径获取文件项
"""
return self.run_module("get_file_item", storage=storage, path=path)
def snapshot_storage(self, storage: str, path: Path) -> Optional[Dict[str, float]]:
""" """
快照存储 快照存储
""" """
return self.run_module("snapshot_storage", fileitem=fileitem) return self.run_module("snapshot_storage", storage=storage, path=path)

View File

@@ -279,7 +279,7 @@ class PluginManager(metaclass=Singleton):
# 重新加载 # 重新加载
self.start(plugin_id) self.start(plugin_id)
def install_online_plugin(self): def sync(self):
""" """
安装本地不存在的在线插件 安装本地不存在的在线插件
""" """

View File

@@ -28,9 +28,11 @@ from app.helper.resource import ResourceHelper
from app.helper.sites import SitesHelper from app.helper.sites import SitesHelper
from app.helper.message import MessageHelper from app.helper.message import MessageHelper
from app.scheduler import Scheduler from app.scheduler import Scheduler
from app.monitor import Monitor
from app.command import Command, CommandChian from app.command import Command, CommandChian
from app.schemas import Notification, NotificationType from app.schemas import Notification, NotificationType
# App # App
App = FastAPI(title=settings.PROJECT_NAME, App = FastAPI(title=settings.PROJECT_NAME,
openapi_url=f"{settings.API_V1_STR}/openapi.json") openapi_url=f"{settings.API_V1_STR}/openapi.json")
@@ -195,6 +197,8 @@ def shutdown_server():
DisplayHelper().stop() DisplayHelper().stop()
# 停止定时服务 # 停止定时服务
Scheduler().stop() Scheduler().stop()
# 停止监控
Monitor().stop()
# 停止线程池 # 停止线程池
ThreadHelper().shutdown() ThreadHelper().shutdown()
# 停止前端服务 # 停止前端服务
@@ -217,9 +221,11 @@ def start_module():
# 加载模块 # 加载模块
ModuleManager() ModuleManager()
# 安装在线插件 # 安装在线插件
PluginManager().install_online_plugin() PluginManager().sync()
# 加载插件 # 加载插件
PluginManager().start() PluginManager().start()
# 启动监控任务
Monitor()
# 启动定时服务 # 启动定时服务
Scheduler() Scheduler()
# 启动事件消费 # 启动事件消费

View File

@@ -173,15 +173,25 @@ class FileManagerModule(_ModuleBase):
return None return None
return storage_oper.upload(fileitem, path) return storage_oper.upload(fileitem, path)
def snapshot_storage(self, fileitem: FileItem) -> Optional[Dict]: def get_file_item(self, storage: str, path: Path) -> Optional[FileItem]:
"""
根据路径获取文件项
"""
storage_oper = self.__get_storage_oper(storage)
if not storage_oper:
logger.error(f"不支持 {storage} 的文件获取")
return None
return storage_oper.get_item(path)
def snapshot_storage(self, storage: str, path: Path) -> Optional[Dict[str, float]]:
""" """
快照存储 快照存储
""" """
storage_oper = self.__get_storage_oper(fileitem.storage) storage_oper = self.__get_storage_oper(storage)
if not storage_oper: if not storage_oper:
logger.error(f"不支持 {storage} 的快照处理") logger.error(f"不支持 {storage} 的快照处理")
return None return None
return storage_oper.snapshot(fileitem) return storage_oper.snapshot(path)
def transfer(self, fileitem: FileItem, meta: MetaBase, mediainfo: MediaInfo, def transfer(self, fileitem: FileItem, meta: MetaBase, mediainfo: MediaInfo,
transfer_type: str, target_storage: str = None, target_path: Path = None, transfer_type: str, target_storage: str = None, target_path: Path = None,

View File

@@ -138,9 +138,9 @@ class StorageBase(metaclass=ABCMeta):
""" """
pass pass
def snapshot(self, fileitm: schemas.FileItem) -> Dict[str, float]: def snapshot(self, path: Path) -> Dict[str, float]:
""" """
快照文件系统,输出所有层级文件信息 快照文件系统,输出所有层级文件信息(不含目录)
""" """
files_info = {} files_info = {}
@@ -148,12 +148,17 @@ class StorageBase(metaclass=ABCMeta):
""" """
递归获取文件信息 递归获取文件信息
""" """
files_info[_fileitm.path] = _fileitm.size
if _fileitm.type == "dir": if _fileitm.type == "dir":
for sub_file in self.list(_fileitm): for sub_file in self.list(_fileitm):
__snapshot_file(sub_file) __snapshot_file(sub_file)
else:
files_info[_fileitm.path] = _fileitm.size
__snapshot_file(fileitm) fileitem = self.get_item(path)
if not fileitem:
return {}
__snapshot_file(fileitem)
return files_info return files_info

View File

@@ -1,4 +1,5 @@
import base64 import base64
import hashlib
import json import json
import time import time
import uuid import uuid
@@ -612,12 +613,27 @@ class AliPan(StorageBase):
def upload(self, fileitem: schemas.FileItem, path: Path) -> Optional[schemas.FileItem]: def upload(self, fileitem: schemas.FileItem, path: Path) -> Optional[schemas.FileItem]:
""" """
上传文件,并标记完成 上传文件,并标记完成
TODO 上传文件分片、秒传
""" """
def __sha1(_path: Path):
"""
计算文件sha1用于快传
"""
_sha1 = hashlib.sha1()
with open(_path, 'rb') as f:
while True:
data = f.read(8192)
if not data:
break
_sha1.update(data)
return _sha1.hexdigest()
params = self.__access_params params = self.__access_params
if not params: if not params:
return None return None
headers = self.__get_headers(params) headers = self.__get_headers(params)
# 计算sha1
sha1 = __sha1(path)
res = RequestUtils(headers=headers, timeout=10).post_res(self.create_folder_file_url, json={ res = RequestUtils(headers=headers, timeout=10).post_res(self.create_folder_file_url, json={
"drive_id": fileitem.drive_id, "drive_id": fileitem.drive_id,
"parent_file_id": fileitem.parent_fileid, "parent_file_id": fileitem.parent_fileid,
@@ -625,6 +641,8 @@ class AliPan(StorageBase):
"check_name_mode": "refuse", "check_name_mode": "refuse",
"create_scene": "file_upload", "create_scene": "file_upload",
"type": "file", "type": "file",
"content_hash": sha1,
"content_hash_name": "sha1",
"part_info_list": [ "part_info_list": [
{ {
"part_number": 1 "part_number": 1
@@ -635,10 +653,11 @@ class AliPan(StorageBase):
if not res: if not res:
self.__handle_error(res, "创建文件") self.__handle_error(res, "创建文件")
return None return None
# 获取上传参数 # 获取上传请求结果
result = res.json() result = res.json()
if result.get("exist"): if result.get("exist") or result.get("rapid_upload"):
logger.info(f"文件 {result.get('file_name')} 已存在,无需上传") # 已存在
logger.info(f"文件 {result.get('file_name')} 已存在或已秒传完成,无需上传")
return schemas.FileItem( return schemas.FileItem(
storage=self.schema.value, storage=self.schema.value,
drive_id=result.get("drive_id"), drive_id=result.get("drive_id"),
@@ -648,6 +667,7 @@ class AliPan(StorageBase):
name=result.get("file_name"), name=result.get("file_name"),
path=f"{fileitem.path}{result.get('file_name')}" path=f"{fileitem.path}{result.get('file_name')}"
) )
# 上传文件
file_id = result.get("file_id") file_id = result.get("file_id")
upload_id = result.get("upload_id") upload_id = result.get("upload_id")
part_info_list = result.get("part_info_list") part_info_list = result.get("part_info_list")

462
app/monitor.py Normal file
View File

@@ -0,0 +1,462 @@
import platform
import re
import threading
import traceback
from pathlib import Path
from queue import Queue
from threading import Lock
from typing import Any
from apscheduler.schedulers.background import BackgroundScheduler
from watchdog.events import FileSystemEventHandler
from watchdog.observers.polling import PollingObserver
from app.chain import ChainBase
from app.chain.media import MediaChain
from app.chain.storage import StorageChain
from app.chain.tmdb import TmdbChain
from app.core.config import settings
from app.core.context import MediaInfo
from app.core.event import EventManager
from app.core.metainfo import MetaInfoPath
from app.db.downloadhistory_oper import DownloadHistoryOper
from app.db.systemconfig_oper import SystemConfigOper
from app.db.transferhistory_oper import TransferHistoryOper
from app.helper.directory import DirectoryHelper
from app.helper.message import MessageHelper
from app.log import logger
from app.schemas import FileItem, TransferInfo, Notification
from app.schemas.types import SystemConfigKey, MediaType, NotificationType, EventType
from app.utils.singleton import Singleton
lock = Lock()
snapshot_lock = Lock()
class MonitorChain(ChainBase):
pass
class FileMonitorHandler(FileSystemEventHandler):
"""
目录监控响应类
"""
def __init__(self, mon_path: Path, callback: Any, **kwargs):
super(FileMonitorHandler, self).__init__(**kwargs)
self._watch_path = mon_path
self.callback = callback
def on_created(self, event):
self.callback.event_handler(event=event, text="创建",
mon_path=self._watch_path, event_path=Path(event.src_path))
def on_moved(self, event):
self.callback.event_handler(event=event, text="移动",
mon_path=self._watch_path, event_path=Path(event.src_path))
class Monitor(metaclass=Singleton):
"""
目录监控处理链,单例模式
"""
# 退出事件
_event = threading.Event()
# 监控服务
_observers = []
# 定时服务
_scheduler = None
# 存储快照
_storage_snapshot = {}
# 存储过照间隔(分钟)
_snapshot_interval = 5
# 待整理任务队列
_queue = Queue()
# 文件整理线程
_transfer_thread = None
# 文件整理间隔(秒)
_transfer_interval = 60
def __init__(self):
super().__init__()
self.chain = MonitorChain()
self.transferhis = TransferHistoryOper()
self.downloadhis = DownloadHistoryOper()
self.mediaChain = MediaChain()
self.tmdbchain = TmdbChain()
self.storagechain = StorageChain()
self.directoryhelper = DirectoryHelper()
self.systemmessage = MessageHelper()
self.systemconfig = SystemConfigOper()
# 启动目录监控和文件整理
self.start()
def start(self):
"""
启动监控
"""
# 停止现有任务
self.stop()
# 启动定时服务进程
self._scheduler = BackgroundScheduler(timezone=settings.TZ)
# 启动文件整理线程
self._transfer_thread = threading.Thread(target=self.__start_transfer)
# 读取目录配置
monitor_dirs = self.directoryhelper.get_download_dirs()
if not monitor_dirs:
return
for mon_dir in monitor_dirs:
if not mon_dir.library_path:
continue
if mon_dir.monitor_type != "monitor":
continue
# 检查媒体库目录是不是下载目录的子目录
mon_path = Path(mon_dir.download_path)
target_path = Path(mon_dir.library_path)
if target_path.is_relative_to(mon_path):
logger.warn(f"{target_path} 是监控目录 {mon_path} 的子目录,无法监控!")
self.systemmessage.put(f"{target_path} 是监控目录 {mon_path} 的子目录,无法监控", title="目录监控")
continue
# 启动监控
if mon_dir.storage == "local":
# 本地目录监控
try:
observer = self.__choose_observer()
self._observers.append(observer)
observer.schedule(FileMonitorHandler(mon_path=mon_path, callback=self),
path=mon_path,
recursive=True)
observer.daemon = True
observer.start()
logger.info(f"已启动 {mon_path} 的目录监控服务")
except Exception as e:
err_msg = str(e)
if "inotify" in err_msg and "reached" in err_msg:
logger.warn(
f"目录监控服务启动出现异常:{err_msg}请在宿主机上不是docker容器内执行以下命令并重启"
+ """
echo fs.inotify.max_user_watches=524288 | sudo tee -a /etc/sysctl.conf
echo fs.inotify.max_user_instances=524288 | sudo tee -a /etc/sysctl.conf
sudo sysctl -p
""")
else:
logger.error(f"{mon_path} 启动目录监控失败:{err_msg}")
self.systemmessage.put(f"{mon_path} 启动目录监控失败:{err_msg}", title="目录监控")
else:
# 远程目录监控
self._scheduler.add_job(self.polling_observer, 'interval', minutes=self._snapshot_interval,
kwargs={
'storage': mon_dir.storage,
'mon_path': mon_path
})
@staticmethod
def __choose_observer() -> Any:
"""
选择最优的监控模式
"""
system = platform.system()
try:
if system == 'Linux':
from watchdog.observers.inotify import InotifyObserver
return InotifyObserver()
elif system == 'Darwin':
from watchdog.observers.fsevents import FSEventsObserver
return FSEventsObserver()
elif system == 'Windows':
from watchdog.observers.read_directory_changes import WindowsApiObserver
return WindowsApiObserver()
except Exception as error:
logger.warn(f"导入模块错误:{error},将使用 PollingObserver 监控目录")
return PollingObserver()
def put_to_queue(self, storage: str, filepath: Path, mon_path: Path):
"""
添加到待整理队列
"""
self._queue.put({
"storage": storage,
"filepath": filepath,
"mon_path": mon_path
})
def polling_observer(self, storage: str, mon_path: Path):
"""
轮询监控
"""
with snapshot_lock:
# 快照存储
new_snapshot = self.storagechain.snapshot_storage(storage=storage, path=mon_path)
if new_snapshot:
# 比较快照
old_snapshot = self._storage_snapshot.get(storage)
if old_snapshot:
# 新增的文件
new_files = new_snapshot.keys() - old_snapshot.keys()
for new_file in new_files:
# 添加到待整理队列
self.put_to_queue(storage=storage, filepath=Path(new_file), mon_path=mon_path)
# 更新快照
self._storage_snapshot[storage] = new_snapshot
def event_handler(self, event, mon_path: Path, text: str, event_path: Path):
"""
处理文件变化
:param event: 事件
:param mon_path: 监控目录
:param text: 事件描述
:param event_path: 事件文件路径
"""
if not event.is_directory:
# 文件发生变化
logger.debug(f"文件 {event_path} 发生了 {text}")
# 添加到待整理队列
self.put_to_queue(storage="local", filepath=event_path, mon_path=mon_path)
def __start_transfer(self):
"""
整理队列中的文件
"""
while not self._event.is_set():
try:
item = self._queue.get(timeout=self._transfer_interval)
if item:
self.__handle_file(storage=item.get("storage"),
event_path=item.get("filepath"),
mon_path=item.get("mon_path"))
except Exception as e:
logger.error(f"整理队列处理出现错误:{e}")
def __handle_file(self, storage: str, event_path: Path, mon_path: Path):
"""
整理一个文件
:param event_path: 事件文件路径
:param mon_path: 监控目录
"""
def __get_bluray_dir(_path: Path):
"""
获取BDMV目录的上级目录
"""
for parent in _path.parents:
if parent.name == "BDMV":
return parent.parent
return None
# 全程加锁
with lock:
try:
# 回收站及隐藏的文件不处理
if str(event_path).find('/@Recycle/') != -1 \
or str(event_path).find('/#recycle/') != -1 \
or str(event_path).find('/.') != -1 \
or str(event_path).find('/@eaDir') != -1:
logger.debug(f"{event_path} 是回收站或隐藏的文件")
return
# 不是媒体文件不处理
if event_path.suffix not in settings.RMT_MEDIAEXT:
logger.debug(f"{event_path} 不是媒体文件")
return
# 整理屏蔽词不处理
transfer_exclude_words = self.systemconfig.get(SystemConfigKey.TransferExcludeWords)
if transfer_exclude_words:
for keyword in transfer_exclude_words:
if not keyword:
continue
if keyword and re.search(r"%s" % keyword, event_path, re.IGNORECASE):
logger.info(f"{event_path} 命中整理屏蔽词 {keyword},不处理")
return
# 判断是不是蓝光目录
bluray_flag = False
if re.search(r"BDMV[/\\]STREAM", str(event_path), re.IGNORECASE):
bluray_flag = True
# 截取BDMV前面的路径
event_path = __get_bluray_dir(event_path)
logger.info(f"{event_path} 是蓝光原盘目录,更正文件路径为:{event_path}")
# 查询历史记录,已转移的不处理
if self.transferhis.get_by_src(str(event_path)):
logger.info(f"{event_path} 已经整理过了")
return
# 元数据
file_meta = MetaInfoPath(event_path)
if not file_meta.name:
logger.error(f"{event_path.name} 无法识别有效信息")
return
# 根据父路径获取下载历史
download_history = None
if bluray_flag:
# 蓝光原盘,按目录名查询
download_history = self.downloadhis.get_by_path(str(event_path))
else:
# 按文件全路径查询
download_file = self.downloadhis.get_file_by_fullpath(str(event_path))
if download_file:
download_history = self.downloadhis.get_by_hash(download_file.download_hash)
# 获取下载Hash
download_hash = None
if download_history:
download_hash = download_history.download_hash
# 识别媒体信息
if download_history and download_history.tmdbid:
mediainfo: MediaInfo = self.mediaChain.recognize_media(mtype=MediaType(download_history.type),
tmdbid=download_history.tmdbid,
doubanid=download_history.doubanid)
else:
mediainfo: MediaInfo = self.mediaChain.recognize_by_meta(file_meta)
if not mediainfo:
logger.warn(f'未识别到媒体信息,标题:{file_meta.name}')
# 新增转移失败历史记录
his = self.transferhis.add_fail(
fileitem=FileItem(
storage=storage,
type="file",
path=str(event_path),
name=event_path.name,
basename=event_path.stem,
extension=event_path.suffix[1:],
),
mode='',
meta=file_meta,
download_hash=download_hash
)
self.chain.post_message(Notification(
mtype=NotificationType.Manual,
title=f"{event_path.name} 未识别到媒体信息,无法入库!",
text=f"回复:```\n/redo {his.id} [tmdbid]|[类型]\n``` 手动识别转移。",
link=settings.MP_DOMAIN('#/history')
))
return
# 查询转移目的目录
dir_info = self.directoryhelper.get_dir(mediainfo, src_path=Path(mon_path))
if not dir_info:
logger.warn(f"{event_path.name} 未找到对应的目标目录")
return
# 查找这个文件项
file_item = self.storagechain.get_file_item(storage=storage, path=event_path)
if not file_item:
logger.warn(f"{event_path.name} 未找到对应的文件")
return
# 如果未开启新增已入库媒体是否跟随TMDB信息变化则根据tmdbid查询之前的title
if not settings.SCRAP_FOLLOW_TMDB:
transfer_history = self.transferhis.get_by_type_tmdbid(tmdbid=mediainfo.tmdb_id,
mtype=mediainfo.type.value)
if transfer_history:
mediainfo.title = transfer_history.title
logger.info(f"{event_path.name} 识别为:{mediainfo.type.value} {mediainfo.title_year}")
# 更新媒体图片
self.chain.obtain_images(mediainfo=mediainfo)
# 获取集数据
if mediainfo.type == MediaType.TV:
episodes_info = self.tmdbchain.tmdb_episodes(tmdbid=mediainfo.tmdb_id,
season=file_meta.begin_season or 1)
else:
episodes_info = None
# 转移
transferinfo: TransferInfo = self.chain.transfer(fileitem=file_item,
meta=file_meta,
mediainfo=mediainfo,
transfer_type=dir_info.transfer_type,
target_storage=dir_info.library_storage,
target_path=Path(dir_info.library_path),
episodes_info=episodes_info,
scrape=dir_info.scraping)
if not transferinfo:
logger.error("文件转移模块运行失败")
return
if not transferinfo:
logger.error("文件转移模块运行失败")
return
if not transferinfo.success:
# 转移失败
logger.warn(f"{event_path.name} 入库失败:{transferinfo.message}")
# 新增转移失败历史记录
self.transferhis.add_fail(
fileitem=file_item,
mode=dir_info.transfer_type,
download_hash=download_hash,
meta=file_meta,
mediainfo=mediainfo,
transferinfo=transferinfo
)
# TODO 汇总发送消息
self.chain.post_message(Notification(
mtype=NotificationType.Manual,
title=f"{mediainfo.title_year} {file_meta.season_episode} 入库失败!",
text=f"原因:{transferinfo.message or '未知'}",
image=mediainfo.get_message_image(),
link=settings.MP_DOMAIN('#/history')
))
return
# 刮削单个文件
if dir_info.scraping:
self.mediaChain.scrape_metadata(fileitem=transferinfo.target_item,
meta=file_meta,
mediainfo=mediainfo)
# 广播事件
EventManager().send_event(EventType.TransferComplete, {
'fileitem': file_item,
'meta': file_meta,
'mediainfo': mediainfo,
'transferinfo': transferinfo
})
# TODO 移动模式删除空目录
except Exception as e:
logger.error("目录监控发生错误:%s - %s" % (str(e), traceback.format_exc()))
def stop(self):
"""
退出插件
"""
self._event.set()
if self._observers:
for observer in self._observers:
try:
observer.stop()
observer.join()
except Exception as e:
logger.error(f"停止目录监控服务出现了错误:{e}")
self._observers = []
if self._scheduler:
self._scheduler.remove_all_jobs()
if self._scheduler.running:
try:
self._scheduler.shutdown()
except Exception as e:
logger.error(f"停止定时服务出现了错误:{e}")
self._scheduler = None
self._event.clear()