fix:优化下载器整理

This commit is contained in:
jxxghp
2026-01-21 21:31:55 +08:00
parent a1408ee18f
commit 0ba8d51b2a

View File

@@ -35,8 +35,11 @@ from app.utils.singleton import Singleton
from app.utils.string import StringUtils from app.utils.string import StringUtils
from app.utils.system import SystemUtils from app.utils.system import SystemUtils
# 下载器锁
downloader_lock = threading.Lock() downloader_lock = threading.Lock()
# 作业锁
job_lock = threading.Lock() job_lock = threading.Lock()
# 任务锁
task_lock = threading.Lock() task_lock = threading.Lock()
@@ -405,7 +408,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
""" """
self._queue_active = True self._queue_active = True
for i in range(settings.TRANSFER_THREADS): for i in range(settings.TRANSFER_THREADS):
logger.info(f"启动文件整理线程 {i+1} ...") logger.info(f"启动文件整理线程 {i + 1} ...")
thread = threading.Thread(target=self.__start_transfer, thread = threading.Thread(target=self.__start_transfer,
name=f"transfer-{i}", name=f"transfer-{i}",
daemon=True) daemon=True)
@@ -463,7 +466,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
整理完成后处理 整理完成后处理
""" """
def __do_finished(): def __all_finished():
""" """
完成时发送消息、刮削事件、移除任务等 完成时发送消息、刮削事件、移除任务等
""" """
@@ -473,7 +476,9 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
transferinfo.total_size = self.jobview.size(task.mediainfo, transferinfo.total_size = self.jobview.size(task.mediainfo,
task.meta.begin_season) or task.fileitem.size task.meta.begin_season) or task.fileitem.size
# 更新文件清单 # 更新文件清单
transferinfo.file_list_new = self._success_target_files.pop(transferinfo.target_diritem.path, []) with job_lock:
transferinfo.file_list_new = self._success_target_files.pop(transferinfo.target_diritem.path, [])
# 发送通知,实时手动整理时不发 # 发送通知,实时手动整理时不发
if transferinfo.need_notify and (task.background or not task.manual): if transferinfo.need_notify and (task.background or not task.manual):
se_str = None se_str = None
@@ -488,6 +493,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
transferinfo=transferinfo, transferinfo=transferinfo,
season_episode=se_str, season_episode=se_str,
username=task.username) username=task.username)
# 刮削事件 # 刮削事件
if transferinfo.need_scrape and self.__is_media_file(task.fileitem): if transferinfo.need_scrape and self.__is_media_file(task.fileitem):
self.eventmanager.send_event(EventType.MetadataScrape, { self.eventmanager.send_event(EventType.MetadataScrape, {
@@ -497,13 +503,16 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
'file_list': transferinfo.file_list_new, 'file_list': transferinfo.file_list_new,
'overwrite': False 'overwrite': False
}) })
# 移除已完成的任务 # 移除已完成的任务
self.jobview.remove_job(task) with job_lock:
self.jobview.remove_job(task)
transferhis = TransferHistoryOper() transferhis = TransferHistoryOper()
if not transferinfo.success: if not transferinfo.success:
# 转移失败 # 转移失败
logger.warn(f"{task.fileitem.name} 入库失败:{transferinfo.message}") logger.warn(f"{task.fileitem.name} 入库失败:{transferinfo.message}")
# 新增转移失败历史记录 # 新增转移失败历史记录
transferhis.add_fail( transferhis.add_fail(
fileitem=task.fileitem, fileitem=task.fileitem,
@@ -514,6 +523,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
mediainfo=task.mediainfo, mediainfo=task.mediainfo,
transferinfo=transferinfo transferinfo=transferinfo
) )
# 整理失败事件 # 整理失败事件
if self.__is_media_file(task.fileitem): if self.__is_media_file(task.fileitem):
self.eventmanager.send_event(EventType.TransferFailed, { self.eventmanager.send_event(EventType.TransferFailed, {
@@ -524,6 +534,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
'downloader': task.downloader, 'downloader': task.downloader,
'download_hash': task.download_hash, 'download_hash': task.download_hash,
}) })
# 发送失败消息 # 发送失败消息
self.post_message(Notification( self.post_message(Notification(
mtype=NotificationType.Manual, mtype=NotificationType.Manual,
@@ -533,19 +544,25 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
username=task.username, username=task.username,
link=settings.MP_DOMAIN('#/history') link=settings.MP_DOMAIN('#/history')
)) ))
# 整理失败 # 整理失败
self.jobview.fail_task(task) with job_lock:
with task_lock: self.jobview.fail_task(task)
# 整理完成且有成功的任务时
if self.jobview.is_finished(task): # 全部整理完成且有成功的任务时
__do_finished() if self.jobview.is_finished(task):
# 发送消息、刮削事件、移除任务
__all_finished()
return False, transferinfo.message return False, transferinfo.message
# 转移成功 # task转移成功
self.jobview.finish_task(task) with job_lock:
self.jobview.finish_task(task)
logger.info(f"{task.fileitem.name} 入库成功:{transferinfo.target_diritem.path}") logger.info(f"{task.fileitem.name} 入库成功:{transferinfo.target_diritem.path}")
# 新增转移成功历史记录 # 新增task转移成功历史记录
transferhis.add_success( transferhis.add_success(
fileitem=task.fileitem, fileitem=task.fileitem,
mode=transferinfo.transfer_type if transferinfo else '', mode=transferinfo.transfer_type if transferinfo else '',
@@ -556,7 +573,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
transferinfo=transferinfo transferinfo=transferinfo
) )
# 整理完成事件 # task整理完成事件
if self.__is_media_file(task.fileitem): if self.__is_media_file(task.fileitem):
self.eventmanager.send_event(EventType.TransferComplete, { self.eventmanager.send_event(EventType.TransferComplete, {
'fileitem': task.fileitem, 'fileitem': task.fileitem,
@@ -566,34 +583,40 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
'downloader': task.downloader, 'downloader': task.downloader,
'download_hash': task.download_hash, 'download_hash': task.download_hash,
}) })
# 下载器整理任务更新任务标签
if task.download_hash:
self.transfer_completed(hashs=task.download_hash, downloader=task.downloader)
with task_lock: # task登记转移成功文件清单
# 登记转移成功文件清单 target_dir_path = transferinfo.target_diritem.path
target_dir_path = transferinfo.target_diritem.path target_files = transferinfo.file_list_new
target_files = transferinfo.file_list_new with job_lock:
if self._success_target_files.get(target_dir_path): if self._success_target_files.get(target_dir_path):
self._success_target_files[target_dir_path].extend(target_files) self._success_target_files[target_dir_path].extend(target_files)
else: else:
self._success_target_files[target_dir_path] = target_files self._success_target_files[target_dir_path] = target_files
# 全部整理成功时
if self.jobview.is_success(task): # 全部整理成功时
# 移动模式删除空目录 if self.jobview.is_success(task):
if transferinfo.transfer_type in ["move"]: # 移动模式删除空目录
# 所有成功的业务 if transferinfo.transfer_type in ["move"]:
tasks = self.jobview.success_tasks(task.mediainfo, task.meta.begin_season) # 所有成功的业务
storagechain = StorageChain() tasks = self.jobview.success_tasks(task.mediainfo, task.meta.begin_season)
# 获取整理屏蔽词 # 获取整理屏蔽词
transfer_exclude_words = SystemConfigOper().get(SystemConfigKey.TransferExcludeWords) transfer_exclude_words = SystemConfigOper().get(SystemConfigKey.TransferExcludeWords)
for t in tasks: for t in tasks:
if t.download_hash and self._can_delete_torrent(t.download_hash, t.downloader, if t.download_hash and self._can_delete_torrent(t.download_hash, t.downloader,
transfer_exclude_words): transfer_exclude_words):
if self.remove_torrents(t.download_hash, downloader=t.downloader): if self.remove_torrents(t.download_hash, downloader=t.downloader):
logger.info(f"移动模式删除种子成功:{t.download_hash}") logger.info(f"移动模式删除种子成功:{t.download_hash}")
if t.fileitem: if t.fileitem:
storagechain.delete_media_file(t.fileitem, delete_self=False) StorageChain().delete_media_file(t.fileitem, delete_self=False)
# 整理完成且有成功的任务时
if self.jobview.is_finished(task): # 全部整理完成且有成功的任务时
__do_finished() if self.jobview.is_finished(task):
# 发送消息、刮削事件、移除任务
__all_finished()
return True, "" return True, ""
@@ -616,7 +639,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
""" """
添加到作业视图 添加到作业视图
""" """
with task_lock: with job_lock:
self.jobview.add_task(task) self.jobview.add_task(task)
def remove_from_queue(self, fileitem: FileItem): def remove_from_queue(self, fileitem: FileItem):
@@ -625,7 +648,8 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
""" """
if not fileitem: if not fileitem:
return return
self.jobview.remove_task(fileitem) with job_lock:
self.jobview.remove_task(fileitem)
def __start_transfer(self): def __start_transfer(self):
""" """
@@ -868,7 +892,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
finally: finally:
# 移除已完成的任务 # 移除已完成的任务
with task_lock: with job_lock:
if self.jobview.is_done(task): if self.jobview.is_done(task):
self.jobview.remove_job(task) self.jobview.remove_job(task)
@@ -955,8 +979,8 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
# 非MoviePilot下载的任务按文件识别 # 非MoviePilot下载的任务按文件识别
mediainfo = None mediainfo = None
# 执行实时整理,匹配源目录 # 执行异步整理,匹配源目录
state, errmsg = self.do_transfer( self.do_transfer(
fileitem=FileItem( fileitem=FileItem(
storage="local", storage="local",
path=file_path.as_posix() + ("/" if file_path.is_dir() else ""), path=file_path.as_posix() + ("/" if file_path.is_dir() else ""),
@@ -967,20 +991,13 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
), ),
mediainfo=mediainfo, mediainfo=mediainfo,
downloader=torrent.downloader, downloader=torrent.downloader,
download_hash=torrent.hash, download_hash=torrent.hash
background=False,
) )
# 设置下载任务状态
if not state:
logger.warn(f"整理下载器任务失败:{torrent.hash} - {errmsg}")
self.transfer_completed(hashs=torrent.hash, downloader=torrent.downloader)
finally: finally:
torrents.clear() torrents.clear()
del torrents del torrents
# 结束
logger.info("所有下载器中下载完成的文件已整理完成")
return True return True
def __get_trans_fileitems( def __get_trans_fileitems(