diff --git a/app/chain/download.py b/app/chain/download.py index 5cbf4489..7e9687b9 100644 --- a/app/chain/download.py +++ b/app/chain/download.py @@ -343,6 +343,7 @@ class DownloadChain(ChainBase): seasons=_meta.season, episodes=download_episodes or _meta.episode, image=_media.get_backdrop_image(), + downloader=_downloader, download_hash=_hash, torrent_name=_torrent.title, torrent_description=_torrent.description, diff --git a/app/chain/transfer.py b/app/chain/transfer.py index 646e2483..396bdbcf 100644 --- a/app/chain/transfer.py +++ b/app/chain/transfer.py @@ -1,7 +1,9 @@ +import queue import re import threading from pathlib import Path -from typing import List, Optional, Tuple, Union, Dict +from queue import Queue +from typing import List, Optional, Tuple, Union, Dict, Callable from app.chain import ChainBase from app.chain.media import MediaChain @@ -20,20 +22,34 @@ from app.helper.directory import DirectoryHelper from app.helper.format import FormatParser from app.helper.progress import ProgressHelper from app.log import logger -from app.schemas import TransferInfo, TransferTorrent, Notification, EpisodeFormat, FileItem, TransferDirectoryConf +from app.schemas import TransferInfo, TransferTorrent, Notification, EpisodeFormat, FileItem, TransferDirectoryConf, \ + TransferTask, TransferQueue from app.schemas.types import TorrentStatus, EventType, MediaType, ProgressKey, NotificationType, MessageChannel, \ SystemConfigKey +from app.utils.singleton import Singleton from app.utils.string import StringUtils from app.utils.system import SystemUtils lock = threading.Lock() -class TransferChain(ChainBase): +class TransferChain(ChainBase, metaclass=Singleton): """ 文件整理处理链 """ + # 可处理的文件后缀 + all_exts = settings.RMT_MEDIAEXT + + # 待整理任务队列 + _queue = Queue() + + # 文件整理线程 + _transfer_thread = None + + # 文件整理检查间隔(秒) + _transfer_interval = 5 + def __init__(self): super().__init__() self.downloadhis = DownloadHistoryOper() @@ -44,7 +60,69 @@ class TransferChain(ChainBase): self.storagechain = StorageChain() self.systemconfig = SystemConfigOper() self.directoryhelper = DirectoryHelper() - self.all_exts = settings.RMT_MEDIAEXT + + # 启动整理任务 + self.__init() + + def __init(self): + """ + 初始化 + """ + # 启动文件整理线程 + self._transfer_thread = threading.Thread(target=self.__start_transfer, daemon=True) + self._transfer_thread.start() + + def put_to_queue(self, task: TransferTask, callback: Optional[Callable] = None): + """ + 添加到待整理队列 + :param task: 任务信息 + :param callback: 回调函数 + """ + if not task: + return + self._queue.put(TransferQueue( + task=task, + callback=callback + )) + + def __start_transfer(self): + """ + 处理队列 + """ + while not global_vars.is_system_stopped: + try: + item: TransferQueue = self._queue.get(timeout=self._transfer_interval) + if item: + self.__handle_transfer(task=item.task, callback=item.callback) + except queue.Empty: + continue + except Exception as e: + logger.error(f"整理队列处理出现错误:{e}") + + def __handle_transfer(self, task: TransferTask, callback: Optional[Callable] = None): + """ + 处理整理任务 + """ + if not task: + return + # 执行整理 + transferinfo: TransferInfo = self.transfer(fileitem=task.fileitem, + meta=task.meta, + mediainfo=task.mediainfo, + target_directory=task.target_directory, + target_storage=task.target_storage, + target_path=task.target_path, + transfer_type=task.transfer_type, + episodes_info=task.episodes_info, + scrape=task.scrape, + library_type_folder=task.library_type_folder, + library_category_folder=task.library_category_folder) + if not transferinfo: + logger.error("文件整理模块运行失败") + return + # 回调,位置传参:任务、整理结果 + if callback: + callback(task, transferinfo) def recommend_name(self, meta: MetaBase, mediainfo: MediaInfo) -> Optional[str]: """ @@ -152,7 +230,8 @@ class TransferChain(ChainBase): library_type_folder: bool = None, library_category_folder: bool = None, season: int = None, epformat: EpisodeFormat = None, min_filesize: int = 0, downloader: str = None, download_hash: str = None, - force: bool = False, src_match: bool = False) -> Tuple[bool, str]: + force: bool = False, src_match: bool = False, + background: bool = False) -> Tuple[bool, str]: """ 执行一个复杂目录的整理操作 :param fileitem: 文件项 @@ -172,9 +251,16 @@ class TransferChain(ChainBase): :param download_hash: 下载记录hash :param force: 是否强制整理 :param src_match: 是否源目录匹配 + :param background: 是否后台整理 返回:成功标识,错误信息 """ + def __callback(_task: TransferTask, _transferinfo: TransferInfo): + """ + TODO 整理完成回调 + """ + pass + # 自定义格式 formaterHandler = FormatParser(eformat=epformat.format, details=epformat.detail, @@ -411,7 +497,26 @@ class TransferChain(ChainBase): storage=file_item.storage, target_storage=target_storage) - # 执行整理 + # 后台整理 + if background: + self.__handle_transfer(task=TransferTask( + fileitem=file_item, + meta=file_meta, + mediainfo=file_mediainfo, + target_directory=target_directory or dir_info, + target_storage=target_storage, + target_path=target_path, + transfer_type=transfer_type, + episodes_info=episodes_info, + scrape=scrape, + library_type_folder=library_type_folder, + library_category_folder=library_category_folder, + downloader=downloader, + download_hash=download_hash + ), callback=__callback) + continue + + # 实时整理 transferinfo: TransferInfo = self.transfer(fileitem=file_item, meta=file_meta, mediainfo=file_mediainfo, diff --git a/app/db/models/downloadhistory.py b/app/db/models/downloadhistory.py index cc58dfc7..bb7468a3 100644 --- a/app/db/models/downloadhistory.py +++ b/app/db/models/downloadhistory.py @@ -29,6 +29,8 @@ class DownloadHistory(Base): episodes = Column(String) # 海报 image = Column(String) + # 下载器 + downloader = Column(String) # 下载任务Hash download_hash = Column(String, index=True) # 种子名称 @@ -168,10 +170,10 @@ class DownloadFiles(Base): 下载文件记录 """ id = Column(Integer, Sequence('id'), primary_key=True, index=True) - # 下载任务Hash - download_hash = Column(String, index=True) # 下载器 downloader = Column(String) + # 下载任务Hash + download_hash = Column(String, index=True) # 完整路径 fullpath = Column(String, index=True) # 保存路径 diff --git a/app/monitor.py b/app/monitor.py index 9f1d1269..99ce636e 100644 --- a/app/monitor.py +++ b/app/monitor.py @@ -1,11 +1,9 @@ import datetime import platform -import queue import re import threading import traceback from pathlib import Path -from queue import Queue from threading import Lock from typing import Any @@ -28,7 +26,7 @@ from app.db.transferhistory_oper import TransferHistoryOper from app.helper.directory import DirectoryHelper from app.helper.message import MessageHelper from app.log import logger -from app.schemas import FileItem, TransferInfo, Notification +from app.schemas import FileItem, TransferInfo, Notification, TransferTask from app.schemas.types import SystemConfigKey, MediaType, NotificationType, EventType from app.utils.singleton import Singleton from app.utils.string import StringUtils @@ -52,12 +50,10 @@ class FileMonitorHandler(FileSystemEventHandler): self.callback = callback def on_created(self, event: FileSystemEvent): - self.callback.event_handler(event=event, text="创建", - mon_path=self._watch_path, event_path=Path(event.src_path)) + self.callback.event_handler(event=event, text="创建", event_path=Path(event.src_path)) def on_moved(self, event: FileSystemMovedEvent): - self.callback.event_handler(event=event, text="移动", - mon_path=self._watch_path, event_path=Path(event.dest_path)) + self.callback.event_handler(event=event, text="移动", event_path=Path(event.dest_path)) class Monitor(metaclass=Singleton): @@ -80,15 +76,6 @@ class Monitor(metaclass=Singleton): # 存储过照间隔(分钟) _snapshot_interval = 5 - # 待整理任务队列 - _queue = Queue() - - # 文件整理线程 - _transfer_thread = None - - # 文件整理间隔(秒) - _transfer_interval = 60 - # 消息汇总 _msg_medias = {} @@ -120,10 +107,6 @@ class Monitor(metaclass=Singleton): # 停止现有任务 self.stop() - # 启动文件整理线程 - self._transfer_thread = threading.Thread(target=self.__start_transfer, daemon=True) - self._transfer_thread.start() - # 读取目录配置 monitor_dirs = self.directoryhelper.get_download_dirs() if not monitor_dirs: @@ -212,16 +195,6 @@ class Monitor(metaclass=Singleton): logger.warn(f"导入模块错误:{error},将使用 PollingObserver 监控目录") return PollingObserver() - def put_to_queue(self, storage: str, filepath: Path, mon_path: Path): - """ - 添加到待整理队列 - """ - self._queue.put({ - "storage": storage, - "filepath": filepath, - "mon_path": mon_path - }) - def polling_observer(self, storage: str, mon_path: Path): """ 轮询监控 @@ -237,37 +210,95 @@ class Monitor(metaclass=Singleton): new_files = new_snapshot.keys() - old_snapshot.keys() for new_file in new_files: # 添加到待整理队列 - self.put_to_queue(storage=storage, filepath=Path(new_file), mon_path=mon_path) + self.__handle_file(storage=storage, event_path=Path(new_file)) # 更新快照 self._storage_snapshot[storage] = new_snapshot - def event_handler(self, event, mon_path: Path, text: str, event_path: Path): + def event_handler(self, event, text: str, event_path: Path): """ 处理文件变化 :param event: 事件 - :param mon_path: 监控目录 :param text: 事件描述 :param event_path: 事件文件路径 """ if not event.is_directory: # 文件发生变化 logger.debug(f"文件 {event_path} 发生了 {text}") - # 添加到待整理队列 - self.put_to_queue(storage="local", filepath=event_path, mon_path=mon_path) + # 整理文件 + self.__handle_file(storage="local", event_path=event_path) - def __start_transfer(self): + def __transfer_queue(self, task: TransferTask): """ - 整理队列中的文件 + 添加到整理队列 """ - while not self._event.is_set(): - try: - item = self._queue.get(timeout=self._transfer_interval) - if item: - self.__handle_file(storage=item.get("storage"), event_path=item.get("filepath")) - except queue.Empty: - continue - except Exception as e: - logger.error(f"整理队列处理出现错误:{e}") + + def __callback(_task: TransferTask, _transferinfo: TransferInfo, /): + """ + 整理完成后处理 + """ + if not _transferinfo: + logger.error("文件转移模块运行失败") + return + + if not _transferinfo.success: + # 转移失败 + logger.warn(f"{_task.file_path.name} 入库失败:{_transferinfo.message}") + # 新增转移失败历史记录 + self.transferhis.add_fail( + fileitem=_task.fileitem, + mode=_transferinfo.transfer_type if _transferinfo else '', + download_hash=_task.download_hash, + meta=_task.meta, + mediainfo=_task.mediainfo, + transferinfo=_transferinfo + ) + # 发送失败消息 + self.chain.post_message(Notification( + mtype=NotificationType.Manual, + title=f"{_task.mediainfo.title_year} {_task.meta.season_episode} 入库失败!", + text=f"原因:{_transferinfo.message or '未知'}", + image=_task.mediainfo.get_message_image(), + link=settings.MP_DOMAIN('#/history') + )) + return + + # 转移成功 + logger.info(f"{_task.file_path.name} 入库成功:{_transferinfo.target_diritem.path}") + # 新增转移成功历史记录 + self.transferhis.add_success( + fileitem=_task.fileitem, + mode=_transferinfo.transfer_type if _transferinfo else '', + download_hash=_task.download_hash, + meta=_task.meta, + mediainfo=_task.mediainfo, + transferinfo=_transferinfo + ) + + # 汇总刮削 + if _transferinfo.need_scrape: + self.mediaChain.scrape_metadata(fileitem=_transferinfo.target_diritem, + meta=_task.meta, + mediainfo=_task.mediainfo) + + # 广播事件 + EventManager().send_event(EventType.TransferComplete, { + 'fileitem': _task.fileitem, + 'meta': _task.meta, + 'mediainfo': _task.mediainfo, + 'transferinfo': _transferinfo + }) + + # 发送消息汇总 + if _transferinfo.need_notify: + self.__collect_msg_medias(mediainfo=_task.mediainfo, file_meta=_task.meta, + transferinfo=_transferinfo) + + # 移动模式删除空目录 + if _transferinfo.transfer_type in ["move"]: + self.storagechain.delete_media_file(_task.fileitem, delete_self=False) + + # 加入整理队列 + self.transferchain.put_to_queue(task=task, callback=__callback) def __handle_file(self, storage: str, event_path: Path): """ @@ -342,8 +373,9 @@ class Monitor(metaclass=Singleton): download_history = self.downloadhis.get_by_hash(download_file.download_hash) # 获取下载Hash - download_hash = None + downloader, download_hash = None, None if download_history: + downloader = download_history.downloader download_hash = download_history.download_hash # 识别媒体信息 @@ -413,73 +445,20 @@ class Monitor(metaclass=Singleton): else: episodes_info = None - # 转移 - transferinfo: TransferInfo = self.chain.transfer(fileitem=file_item, - meta=file_meta, - mediainfo=mediainfo, - target_directory=dir_info, - episodes_info=episodes_info) - - if not transferinfo: - logger.error("文件转移模块运行失败") - return - - if not transferinfo.success: - # 转移失败 - logger.warn(f"{event_path.name} 入库失败:{transferinfo.message}") - # 新增转移失败历史记录 - self.transferhis.add_fail( + # 进入队列 + self.__transfer_queue( + TransferTask( fileitem=file_item, - mode=transferinfo.transfer_type if transferinfo else '', - download_hash=download_hash, + file_path=event_path, meta=file_meta, mediainfo=mediainfo, - transferinfo=transferinfo + target_directory=dir_info, + episodes_info=episodes_info, + downloader=downloader, + download_hash=download_hash ) - # 发送失败消息 - self.chain.post_message(Notification( - mtype=NotificationType.Manual, - title=f"{mediainfo.title_year} {file_meta.season_episode} 入库失败!", - text=f"原因:{transferinfo.message or '未知'}", - image=mediainfo.get_message_image(), - link=settings.MP_DOMAIN('#/history') - )) - return - - # 转移成功 - logger.info(f"{event_path.name} 入库成功:{transferinfo.target_diritem.path}") - # 新增转移成功历史记录 - self.transferhis.add_success( - fileitem=file_item, - mode=transferinfo.transfer_type if transferinfo else '', - download_hash=download_hash, - meta=file_meta, - mediainfo=mediainfo, - transferinfo=transferinfo ) - # 汇总刮削 - if transferinfo.need_scrape: - self.mediaChain.scrape_metadata(fileitem=transferinfo.target_diritem, - meta=file_meta, - mediainfo=mediainfo) - - # 广播事件 - EventManager().send_event(EventType.TransferComplete, { - 'fileitem': file_item, - 'meta': file_meta, - 'mediainfo': mediainfo, - 'transferinfo': transferinfo - }) - - # 发送消息汇总 - if transferinfo.need_notify: - self.__collect_msg_medias(mediainfo=mediainfo, file_meta=file_meta, transferinfo=transferinfo) - - # 移动模式删除空目录 - if transferinfo.transfer_type in ["move"]: - self.storagechain.delete_media_file(file_item, delete_self=False) - except Exception as e: logger.error("目录监控发生错误:%s - %s" % (str(e), traceback.format_exc())) diff --git a/app/schemas/event.py b/app/schemas/event.py index 16a1349c..060a98dc 100644 --- a/app/schemas/event.py +++ b/app/schemas/event.py @@ -3,7 +3,6 @@ from typing import Optional, Dict, Any, List, Set from pydantic import BaseModel, Field, root_validator -from app.core.context import Context from app.schemas import MessageChannel @@ -169,7 +168,7 @@ class ResourceSelectionEventData(BaseModel): # 输出参数 updated: bool = Field(default=False, description="是否已更新") - updated_contexts: Optional[List[Context]] = Field(default=None, description="已更新的资源上下文列表") + updated_contexts: Optional[List[Any]] = Field(default=None, description="已更新的资源上下文列表") source: Optional[str] = Field(default="未知拦截源", description="拦截源") diff --git a/app/schemas/transfer.py b/app/schemas/transfer.py index 78338913..975aeb4b 100644 --- a/app/schemas/transfer.py +++ b/app/schemas/transfer.py @@ -1,11 +1,11 @@ from pathlib import Path -from typing import Optional, List +from typing import Optional, List, Any, Callable from pydantic import BaseModel, Field -from app.schemas import TmdbEpisode, MetaInfo, MediaInfo -from app.schemas.system import TransferDirectoryConf +from app.schemas import TmdbEpisode from app.schemas.file import FileItem +from app.schemas.system import TransferDirectoryConf class TransferTorrent(BaseModel): @@ -47,8 +47,9 @@ class TransferTask(BaseModel): 文件整理任务 """ fileitem: Optional[FileItem] = None - meta: Optional[MetaInfo] = None - mediainfo: Optional[MediaInfo] = None + file_path: Optional[Path] = None + meta: Optional[Any] = None + mediainfo: Optional[Any] = None target_directory: Optional[TransferDirectoryConf] = None target_storage: Optional[str] = None target_path: Optional[Path] = None @@ -57,6 +58,8 @@ class TransferTask(BaseModel): library_type_folder: Optional[bool] = None library_category_folder: Optional[bool] = None episodes_info: Optional[List[TmdbEpisode]] = None + downloader: Optional[str] = None + download_hash: Optional[str] = None def to_dict(self): """ @@ -111,13 +114,15 @@ class TransferInfo(BaseModel): return dicts -class AsyncTransferCallback(BaseModel): +class TransferQueue(BaseModel): """ - 异步整理回调信息 + 异步整理队列信息 """ # 任务信息 task: Optional[TransferTask] = None - # 结果信息 + # 回调函数 + callback: Optional[Callable] = None + # 整理结果 result: Optional[TransferInfo] = None diff --git a/app/startup/modules_initializer.py b/app/startup/modules_initializer.py index 7ed2c0c9..8e2abe60 100644 --- a/app/startup/modules_initializer.py +++ b/app/startup/modules_initializer.py @@ -11,6 +11,7 @@ from app.utils.system import SystemUtils try: from app.helper.sites import SitesHelper except ImportError as e: + SitesHelper = None error_message = f"错误: {str(e)}\n站点认证及索引相关资源导入失败,请尝试重建容器或手动拉取资源" print(error_message, file=sys.stderr) sys.exit(1) diff --git a/database/versions/bf28a012734c_2_0_8.py b/database/versions/bf28a012734c_2_0_8.py new file mode 100644 index 00000000..d9711d7a --- /dev/null +++ b/database/versions/bf28a012734c_2_0_8.py @@ -0,0 +1,29 @@ +"""2.0.8 + +Revision ID: bf28a012734c +Revises: eaf9cbc49027 +Create Date: 2024-12-23 18:29:31.202143 + +""" +import contextlib + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = 'bf28a012734c' +down_revision = 'eaf9cbc49027' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + # 下载历史记录 增加下载器字段 + with contextlib.suppress(Exception): + op.add_column('downloadhistory', sa.Column('downloader', sa.String(), nullable=True)) + # ### end Alembic commands ### + + +def downgrade() -> None: + pass