TODO 后台整理队列

This commit is contained in:
jxxghp
2024-12-23 18:59:36 +08:00
parent 1b48eb8959
commit 10f8efc457
8 changed files with 247 additions and 126 deletions

View File

@@ -343,6 +343,7 @@ class DownloadChain(ChainBase):
seasons=_meta.season,
episodes=download_episodes or _meta.episode,
image=_media.get_backdrop_image(),
downloader=_downloader,
download_hash=_hash,
torrent_name=_torrent.title,
torrent_description=_torrent.description,

View File

@@ -1,7 +1,9 @@
import queue
import re
import threading
from pathlib import Path
from typing import List, Optional, Tuple, Union, Dict
from queue import Queue
from typing import List, Optional, Tuple, Union, Dict, Callable
from app.chain import ChainBase
from app.chain.media import MediaChain
@@ -20,20 +22,34 @@ from app.helper.directory import DirectoryHelper
from app.helper.format import FormatParser
from app.helper.progress import ProgressHelper
from app.log import logger
from app.schemas import TransferInfo, TransferTorrent, Notification, EpisodeFormat, FileItem, TransferDirectoryConf
from app.schemas import TransferInfo, TransferTorrent, Notification, EpisodeFormat, FileItem, TransferDirectoryConf, \
TransferTask, TransferQueue
from app.schemas.types import TorrentStatus, EventType, MediaType, ProgressKey, NotificationType, MessageChannel, \
SystemConfigKey
from app.utils.singleton import Singleton
from app.utils.string import StringUtils
from app.utils.system import SystemUtils
lock = threading.Lock()
class TransferChain(ChainBase):
class TransferChain(ChainBase, metaclass=Singleton):
"""
文件整理处理链
"""
# 可处理的文件后缀
all_exts = settings.RMT_MEDIAEXT
# 待整理任务队列
_queue = Queue()
# 文件整理线程
_transfer_thread = None
# 文件整理检查间隔(秒)
_transfer_interval = 5
def __init__(self):
super().__init__()
self.downloadhis = DownloadHistoryOper()
@@ -44,7 +60,69 @@ class TransferChain(ChainBase):
self.storagechain = StorageChain()
self.systemconfig = SystemConfigOper()
self.directoryhelper = DirectoryHelper()
self.all_exts = settings.RMT_MEDIAEXT
# 启动整理任务
self.__init()
def __init(self):
"""
初始化
"""
# 启动文件整理线程
self._transfer_thread = threading.Thread(target=self.__start_transfer, daemon=True)
self._transfer_thread.start()
def put_to_queue(self, task: TransferTask, callback: Optional[Callable] = None):
"""
添加到待整理队列
:param task: 任务信息
:param callback: 回调函数
"""
if not task:
return
self._queue.put(TransferQueue(
task=task,
callback=callback
))
def __start_transfer(self):
"""
处理队列
"""
while not global_vars.is_system_stopped:
try:
item: TransferQueue = self._queue.get(timeout=self._transfer_interval)
if item:
self.__handle_transfer(task=item.task, callback=item.callback)
except queue.Empty:
continue
except Exception as e:
logger.error(f"整理队列处理出现错误:{e}")
def __handle_transfer(self, task: TransferTask, callback: Optional[Callable] = None):
"""
处理整理任务
"""
if not task:
return
# 执行整理
transferinfo: TransferInfo = self.transfer(fileitem=task.fileitem,
meta=task.meta,
mediainfo=task.mediainfo,
target_directory=task.target_directory,
target_storage=task.target_storage,
target_path=task.target_path,
transfer_type=task.transfer_type,
episodes_info=task.episodes_info,
scrape=task.scrape,
library_type_folder=task.library_type_folder,
library_category_folder=task.library_category_folder)
if not transferinfo:
logger.error("文件整理模块运行失败")
return
# 回调,位置传参:任务、整理结果
if callback:
callback(task, transferinfo)
def recommend_name(self, meta: MetaBase, mediainfo: MediaInfo) -> Optional[str]:
"""
@@ -152,7 +230,8 @@ class TransferChain(ChainBase):
library_type_folder: bool = None, library_category_folder: bool = None,
season: int = None, epformat: EpisodeFormat = None, min_filesize: int = 0,
downloader: str = None, download_hash: str = None,
force: bool = False, src_match: bool = False) -> Tuple[bool, str]:
force: bool = False, src_match: bool = False,
background: bool = False) -> Tuple[bool, str]:
"""
执行一个复杂目录的整理操作
:param fileitem: 文件项
@@ -172,9 +251,16 @@ class TransferChain(ChainBase):
:param download_hash: 下载记录hash
:param force: 是否强制整理
:param src_match: 是否源目录匹配
:param background: 是否后台整理
返回:成功标识,错误信息
"""
def __callback(_task: TransferTask, _transferinfo: TransferInfo):
"""
TODO 整理完成回调
"""
pass
# 自定义格式
formaterHandler = FormatParser(eformat=epformat.format,
details=epformat.detail,
@@ -411,7 +497,26 @@ class TransferChain(ChainBase):
storage=file_item.storage,
target_storage=target_storage)
# 执行整理
# 后台整理
if background:
self.__handle_transfer(task=TransferTask(
fileitem=file_item,
meta=file_meta,
mediainfo=file_mediainfo,
target_directory=target_directory or dir_info,
target_storage=target_storage,
target_path=target_path,
transfer_type=transfer_type,
episodes_info=episodes_info,
scrape=scrape,
library_type_folder=library_type_folder,
library_category_folder=library_category_folder,
downloader=downloader,
download_hash=download_hash
), callback=__callback)
continue
# 实时整理
transferinfo: TransferInfo = self.transfer(fileitem=file_item,
meta=file_meta,
mediainfo=file_mediainfo,

View File

@@ -29,6 +29,8 @@ class DownloadHistory(Base):
episodes = Column(String)
# 海报
image = Column(String)
# 下载器
downloader = Column(String)
# 下载任务Hash
download_hash = Column(String, index=True)
# 种子名称
@@ -168,10 +170,10 @@ class DownloadFiles(Base):
下载文件记录
"""
id = Column(Integer, Sequence('id'), primary_key=True, index=True)
# 下载任务Hash
download_hash = Column(String, index=True)
# 下载器
downloader = Column(String)
# 下载任务Hash
download_hash = Column(String, index=True)
# 完整路径
fullpath = Column(String, index=True)
# 保存路径

View File

@@ -1,11 +1,9 @@
import datetime
import platform
import queue
import re
import threading
import traceback
from pathlib import Path
from queue import Queue
from threading import Lock
from typing import Any
@@ -28,7 +26,7 @@ from app.db.transferhistory_oper import TransferHistoryOper
from app.helper.directory import DirectoryHelper
from app.helper.message import MessageHelper
from app.log import logger
from app.schemas import FileItem, TransferInfo, Notification
from app.schemas import FileItem, TransferInfo, Notification, TransferTask
from app.schemas.types import SystemConfigKey, MediaType, NotificationType, EventType
from app.utils.singleton import Singleton
from app.utils.string import StringUtils
@@ -52,12 +50,10 @@ class FileMonitorHandler(FileSystemEventHandler):
self.callback = callback
def on_created(self, event: FileSystemEvent):
self.callback.event_handler(event=event, text="创建",
mon_path=self._watch_path, event_path=Path(event.src_path))
self.callback.event_handler(event=event, text="创建", event_path=Path(event.src_path))
def on_moved(self, event: FileSystemMovedEvent):
self.callback.event_handler(event=event, text="移动",
mon_path=self._watch_path, event_path=Path(event.dest_path))
self.callback.event_handler(event=event, text="移动", event_path=Path(event.dest_path))
class Monitor(metaclass=Singleton):
@@ -80,15 +76,6 @@ class Monitor(metaclass=Singleton):
# 存储过照间隔(分钟)
_snapshot_interval = 5
# 待整理任务队列
_queue = Queue()
# 文件整理线程
_transfer_thread = None
# 文件整理间隔(秒)
_transfer_interval = 60
# 消息汇总
_msg_medias = {}
@@ -120,10 +107,6 @@ class Monitor(metaclass=Singleton):
# 停止现有任务
self.stop()
# 启动文件整理线程
self._transfer_thread = threading.Thread(target=self.__start_transfer, daemon=True)
self._transfer_thread.start()
# 读取目录配置
monitor_dirs = self.directoryhelper.get_download_dirs()
if not monitor_dirs:
@@ -212,16 +195,6 @@ class Monitor(metaclass=Singleton):
logger.warn(f"导入模块错误:{error},将使用 PollingObserver 监控目录")
return PollingObserver()
def put_to_queue(self, storage: str, filepath: Path, mon_path: Path):
"""
添加到待整理队列
"""
self._queue.put({
"storage": storage,
"filepath": filepath,
"mon_path": mon_path
})
def polling_observer(self, storage: str, mon_path: Path):
"""
轮询监控
@@ -237,37 +210,95 @@ class Monitor(metaclass=Singleton):
new_files = new_snapshot.keys() - old_snapshot.keys()
for new_file in new_files:
# 添加到待整理队列
self.put_to_queue(storage=storage, filepath=Path(new_file), mon_path=mon_path)
self.__handle_file(storage=storage, event_path=Path(new_file))
# 更新快照
self._storage_snapshot[storage] = new_snapshot
def event_handler(self, event, mon_path: Path, text: str, event_path: Path):
def event_handler(self, event, text: str, event_path: Path):
"""
处理文件变化
:param event: 事件
:param mon_path: 监控目录
:param text: 事件描述
:param event_path: 事件文件路径
"""
if not event.is_directory:
# 文件发生变化
logger.debug(f"文件 {event_path} 发生了 {text}")
# 添加到待整理队列
self.put_to_queue(storage="local", filepath=event_path, mon_path=mon_path)
# 整理文件
self.__handle_file(storage="local", event_path=event_path)
def __start_transfer(self):
def __transfer_queue(self, task: TransferTask):
"""
整理队列中的文件
添加到整理队列
"""
while not self._event.is_set():
try:
item = self._queue.get(timeout=self._transfer_interval)
if item:
self.__handle_file(storage=item.get("storage"), event_path=item.get("filepath"))
except queue.Empty:
continue
except Exception as e:
logger.error(f"整理队列处理出现错误:{e}")
def __callback(_task: TransferTask, _transferinfo: TransferInfo, /):
"""
整理完成后处理
"""
if not _transferinfo:
logger.error("文件转移模块运行失败")
return
if not _transferinfo.success:
# 转移失败
logger.warn(f"{_task.file_path.name} 入库失败:{_transferinfo.message}")
# 新增转移失败历史记录
self.transferhis.add_fail(
fileitem=_task.fileitem,
mode=_transferinfo.transfer_type if _transferinfo else '',
download_hash=_task.download_hash,
meta=_task.meta,
mediainfo=_task.mediainfo,
transferinfo=_transferinfo
)
# 发送失败消息
self.chain.post_message(Notification(
mtype=NotificationType.Manual,
title=f"{_task.mediainfo.title_year} {_task.meta.season_episode} 入库失败!",
text=f"原因:{_transferinfo.message or '未知'}",
image=_task.mediainfo.get_message_image(),
link=settings.MP_DOMAIN('#/history')
))
return
# 转移成功
logger.info(f"{_task.file_path.name} 入库成功:{_transferinfo.target_diritem.path}")
# 新增转移成功历史记录
self.transferhis.add_success(
fileitem=_task.fileitem,
mode=_transferinfo.transfer_type if _transferinfo else '',
download_hash=_task.download_hash,
meta=_task.meta,
mediainfo=_task.mediainfo,
transferinfo=_transferinfo
)
# 汇总刮削
if _transferinfo.need_scrape:
self.mediaChain.scrape_metadata(fileitem=_transferinfo.target_diritem,
meta=_task.meta,
mediainfo=_task.mediainfo)
# 广播事件
EventManager().send_event(EventType.TransferComplete, {
'fileitem': _task.fileitem,
'meta': _task.meta,
'mediainfo': _task.mediainfo,
'transferinfo': _transferinfo
})
# 发送消息汇总
if _transferinfo.need_notify:
self.__collect_msg_medias(mediainfo=_task.mediainfo, file_meta=_task.meta,
transferinfo=_transferinfo)
# 移动模式删除空目录
if _transferinfo.transfer_type in ["move"]:
self.storagechain.delete_media_file(_task.fileitem, delete_self=False)
# 加入整理队列
self.transferchain.put_to_queue(task=task, callback=__callback)
def __handle_file(self, storage: str, event_path: Path):
"""
@@ -342,8 +373,9 @@ class Monitor(metaclass=Singleton):
download_history = self.downloadhis.get_by_hash(download_file.download_hash)
# 获取下载Hash
download_hash = None
downloader, download_hash = None, None
if download_history:
downloader = download_history.downloader
download_hash = download_history.download_hash
# 识别媒体信息
@@ -413,73 +445,20 @@ class Monitor(metaclass=Singleton):
else:
episodes_info = None
# 转移
transferinfo: TransferInfo = self.chain.transfer(fileitem=file_item,
meta=file_meta,
mediainfo=mediainfo,
target_directory=dir_info,
episodes_info=episodes_info)
if not transferinfo:
logger.error("文件转移模块运行失败")
return
if not transferinfo.success:
# 转移失败
logger.warn(f"{event_path.name} 入库失败:{transferinfo.message}")
# 新增转移失败历史记录
self.transferhis.add_fail(
# 进入队列
self.__transfer_queue(
TransferTask(
fileitem=file_item,
mode=transferinfo.transfer_type if transferinfo else '',
download_hash=download_hash,
file_path=event_path,
meta=file_meta,
mediainfo=mediainfo,
transferinfo=transferinfo
target_directory=dir_info,
episodes_info=episodes_info,
downloader=downloader,
download_hash=download_hash
)
# 发送失败消息
self.chain.post_message(Notification(
mtype=NotificationType.Manual,
title=f"{mediainfo.title_year} {file_meta.season_episode} 入库失败!",
text=f"原因:{transferinfo.message or '未知'}",
image=mediainfo.get_message_image(),
link=settings.MP_DOMAIN('#/history')
))
return
# 转移成功
logger.info(f"{event_path.name} 入库成功:{transferinfo.target_diritem.path}")
# 新增转移成功历史记录
self.transferhis.add_success(
fileitem=file_item,
mode=transferinfo.transfer_type if transferinfo else '',
download_hash=download_hash,
meta=file_meta,
mediainfo=mediainfo,
transferinfo=transferinfo
)
# 汇总刮削
if transferinfo.need_scrape:
self.mediaChain.scrape_metadata(fileitem=transferinfo.target_diritem,
meta=file_meta,
mediainfo=mediainfo)
# 广播事件
EventManager().send_event(EventType.TransferComplete, {
'fileitem': file_item,
'meta': file_meta,
'mediainfo': mediainfo,
'transferinfo': transferinfo
})
# 发送消息汇总
if transferinfo.need_notify:
self.__collect_msg_medias(mediainfo=mediainfo, file_meta=file_meta, transferinfo=transferinfo)
# 移动模式删除空目录
if transferinfo.transfer_type in ["move"]:
self.storagechain.delete_media_file(file_item, delete_self=False)
except Exception as e:
logger.error("目录监控发生错误:%s - %s" % (str(e), traceback.format_exc()))

View File

@@ -3,7 +3,6 @@ from typing import Optional, Dict, Any, List, Set
from pydantic import BaseModel, Field, root_validator
from app.core.context import Context
from app.schemas import MessageChannel
@@ -169,7 +168,7 @@ class ResourceSelectionEventData(BaseModel):
# 输出参数
updated: bool = Field(default=False, description="是否已更新")
updated_contexts: Optional[List[Context]] = Field(default=None, description="已更新的资源上下文列表")
updated_contexts: Optional[List[Any]] = Field(default=None, description="已更新的资源上下文列表")
source: Optional[str] = Field(default="未知拦截源", description="拦截源")

View File

@@ -1,11 +1,11 @@
from pathlib import Path
from typing import Optional, List
from typing import Optional, List, Any, Callable
from pydantic import BaseModel, Field
from app.schemas import TmdbEpisode, MetaInfo, MediaInfo
from app.schemas.system import TransferDirectoryConf
from app.schemas import TmdbEpisode
from app.schemas.file import FileItem
from app.schemas.system import TransferDirectoryConf
class TransferTorrent(BaseModel):
@@ -47,8 +47,9 @@ class TransferTask(BaseModel):
文件整理任务
"""
fileitem: Optional[FileItem] = None
meta: Optional[MetaInfo] = None
mediainfo: Optional[MediaInfo] = None
file_path: Optional[Path] = None
meta: Optional[Any] = None
mediainfo: Optional[Any] = None
target_directory: Optional[TransferDirectoryConf] = None
target_storage: Optional[str] = None
target_path: Optional[Path] = None
@@ -57,6 +58,8 @@ class TransferTask(BaseModel):
library_type_folder: Optional[bool] = None
library_category_folder: Optional[bool] = None
episodes_info: Optional[List[TmdbEpisode]] = None
downloader: Optional[str] = None
download_hash: Optional[str] = None
def to_dict(self):
"""
@@ -111,13 +114,15 @@ class TransferInfo(BaseModel):
return dicts
class AsyncTransferCallback(BaseModel):
class TransferQueue(BaseModel):
"""
异步整理回调信息
异步整理队列信息
"""
# 任务信息
task: Optional[TransferTask] = None
# 结果信息
# 回调函数
callback: Optional[Callable] = None
# 整理结果
result: Optional[TransferInfo] = None

View File

@@ -11,6 +11,7 @@ from app.utils.system import SystemUtils
try:
from app.helper.sites import SitesHelper
except ImportError as e:
SitesHelper = None
error_message = f"错误: {str(e)}\n站点认证及索引相关资源导入失败,请尝试重建容器或手动拉取资源"
print(error_message, file=sys.stderr)
sys.exit(1)

View File

@@ -0,0 +1,29 @@
"""2.0.8
Revision ID: bf28a012734c
Revises: eaf9cbc49027
Create Date: 2024-12-23 18:29:31.202143
"""
import contextlib
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = 'bf28a012734c'
down_revision = 'eaf9cbc49027'
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
# 下载历史记录 增加下载器字段
with contextlib.suppress(Exception):
op.add_column('downloadhistory', sa.Column('downloader', sa.String(), nullable=True))
# ### end Alembic commands ###
def downgrade() -> None:
pass