mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-02-08 21:15:58 +08:00
627 lines
26 KiB
Python
627 lines
26 KiB
Python
from pathlib import Path
|
||
from typing import Optional, List, Tuple, Union, Dict, Callable
|
||
|
||
from app.chain.tmdb import TmdbChain
|
||
from app.core.config import settings
|
||
from app.core.context import MediaInfo
|
||
from app.core.meta import MetaBase
|
||
from app.core.metainfo import MetaInfo
|
||
from app.helper.directory import DirectoryHelper
|
||
from app.helper.message import MessageHelper
|
||
from app.helper.module import ModuleHelper
|
||
from app.log import logger
|
||
from app.modules import _ModuleBase
|
||
from app.modules.filemanager.storages import StorageBase
|
||
from app.modules.filemanager.transhandler import TransHandler
|
||
from app.schemas import TransferInfo, ExistMediaInfo, TmdbEpisode, TransferDirectoryConf, FileItem, StorageUsage
|
||
from app.schemas.types import MediaType, ModuleType, OtherModulesType
|
||
from app.utils.system import SystemUtils
|
||
|
||
|
||
class FileManagerModule(_ModuleBase):
|
||
"""
|
||
文件整理模块
|
||
"""
|
||
|
||
_storage_schemas = []
|
||
_support_storages = []
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.directoryhelper = DirectoryHelper()
|
||
self.messagehelper = MessageHelper()
|
||
|
||
def init_module(self) -> None:
|
||
# 加载模块
|
||
self._storage_schemas = ModuleHelper.load('app.modules.filemanager.storages',
|
||
filter_func=lambda _, obj: hasattr(obj, 'schema') and obj.schema)
|
||
# 获取存储类型
|
||
self._support_storages = [storage.schema.value for storage in self._storage_schemas if storage.schema]
|
||
|
||
@staticmethod
|
||
def get_name() -> str:
|
||
return "文件整理"
|
||
|
||
@staticmethod
|
||
def get_type() -> ModuleType:
|
||
"""
|
||
获取模块类型
|
||
"""
|
||
return ModuleType.Other
|
||
|
||
@staticmethod
|
||
def get_subtype() -> OtherModulesType:
|
||
"""
|
||
获取模块子类型
|
||
"""
|
||
return OtherModulesType.FileManager
|
||
|
||
@staticmethod
|
||
def get_priority() -> int:
|
||
"""
|
||
获取模块优先级,数字越小优先级越高,只有同一接口下优先级才生效
|
||
"""
|
||
return 4
|
||
|
||
def stop(self):
|
||
pass
|
||
|
||
def test(self) -> Tuple[bool, str]:
|
||
"""
|
||
测试模块连接性
|
||
"""
|
||
# 检查目录
|
||
dirs = self.directoryhelper.get_dirs()
|
||
if not dirs:
|
||
return False, "未设置任何目录"
|
||
for d in dirs:
|
||
# 下载目录
|
||
download_path = d.download_path
|
||
if not download_path:
|
||
return False, f"{d.name} 的下载目录未设置"
|
||
if d.storage == "local" and not Path(download_path).exists():
|
||
return False, f"{d.name} 的下载目录 {download_path} 不存在"
|
||
# 媒体库目录
|
||
library_path = d.library_path
|
||
if not library_path:
|
||
return False, f"{d.name} 的媒体库目录未设置"
|
||
if d.library_storage == "local" and not Path(library_path).exists():
|
||
return False, f"{d.name} 的媒体库目录 {library_path} 不存在"
|
||
# 硬链接
|
||
if d.transfer_type == "link" \
|
||
and d.storage == "local" \
|
||
and d.library_storage == "local" \
|
||
and not SystemUtils.is_same_disk(Path(download_path), Path(library_path)):
|
||
return False, f"{d.name} 的下载目录 {download_path} 与媒体库目录 {library_path} 不在同一磁盘,无法硬链接"
|
||
# 存储
|
||
storage_oper = self.__get_storage_oper(d.storage)
|
||
if storage_oper:
|
||
if not storage_oper.check():
|
||
return False, f"{d.name} 的存储测试不通过"
|
||
if d.transfer_type and d.transfer_type not in storage_oper.support_transtype():
|
||
return False, f"{d.name} 的存储不支持 {d.transfer_type} 整理方式"
|
||
|
||
return True, ""
|
||
|
||
def __get_storage_oper(self, _storage: str, _func: Optional[str] = None) -> Optional[StorageBase]:
|
||
"""
|
||
获取存储操作对象
|
||
"""
|
||
for storage_schema in self._storage_schemas:
|
||
if storage_schema.schema \
|
||
and storage_schema.schema.value == _storage \
|
||
and (not _func or hasattr(storage_schema, _func)):
|
||
return storage_schema()
|
||
return None
|
||
|
||
def init_setting(self) -> Tuple[str, Union[str, bool]]:
|
||
pass
|
||
|
||
def support_transtype(self, storage: str) -> Optional[dict]:
|
||
"""
|
||
支持的整理方式
|
||
"""
|
||
if storage not in self._support_storages:
|
||
return None
|
||
storage_oper = self.__get_storage_oper(storage)
|
||
if not storage_oper:
|
||
logger.error(f"不支持 {storage} 的整理方式获取")
|
||
return None
|
||
return storage_oper.support_transtype()
|
||
|
||
@staticmethod
|
||
def recommend_name(meta: MetaBase, mediainfo: MediaInfo) -> Optional[str]:
|
||
"""
|
||
获取重命名后的名称
|
||
:param meta: 元数据
|
||
:param mediainfo: 媒体信息
|
||
:return: 重命名后的名称(含目录)
|
||
"""
|
||
handler = TransHandler()
|
||
# 重命名格式
|
||
rename_format = settings.RENAME_FORMAT(mediainfo.type)
|
||
# 获取集信息
|
||
episodes_info: Optional[List[TmdbEpisode]] = None
|
||
if mediainfo.type == MediaType.TV:
|
||
# 判断注意season为0的情况
|
||
season_num = mediainfo.season
|
||
if season_num is None and meta.season_seq:
|
||
if meta.season_seq.isdigit():
|
||
season_num = int(meta.season_seq)
|
||
# 默认值1
|
||
if season_num is None:
|
||
season_num = 1
|
||
episodes_info = TmdbChain().tmdb_episodes(
|
||
tmdbid=mediainfo.tmdb_id,
|
||
season=season_num,
|
||
episode_group=mediainfo.episode_group,
|
||
)
|
||
# 获取重命名后的名称
|
||
path = handler.get_rename_path(
|
||
template_string=rename_format,
|
||
rename_dict=handler.get_naming_dict(meta=meta,
|
||
mediainfo=mediainfo,
|
||
episodes_info=episodes_info,
|
||
file_ext=Path(meta.title).suffix)
|
||
)
|
||
return path.as_posix() if path else ""
|
||
|
||
def save_config(self, storage: str, conf: Dict) -> None:
|
||
"""
|
||
保存存储配置
|
||
"""
|
||
storage_oper = self.__get_storage_oper(storage)
|
||
if not storage_oper:
|
||
logger.error(f"不支持 {storage} 的配置保存")
|
||
return
|
||
storage_oper.set_config(conf)
|
||
|
||
def reset_config(self, storage: str) -> None:
|
||
"""
|
||
重置存储配置
|
||
"""
|
||
storage_oper = self.__get_storage_oper(storage)
|
||
if not storage_oper:
|
||
logger.error(f"不支持 {storage} 的重置存储配置")
|
||
return
|
||
storage_oper.reset_config()
|
||
|
||
def generate_qrcode(self, storage: str) -> Optional[Tuple[dict, str]]:
|
||
"""
|
||
生成二维码
|
||
"""
|
||
storage_oper = self.__get_storage_oper(storage, "generate_qrcode")
|
||
if not storage_oper:
|
||
logger.error(f"不支持 {storage} 的二维码生成")
|
||
return None
|
||
return storage_oper.generate_qrcode()
|
||
|
||
def generate_auth_url(self, storage: str) -> Optional[Tuple[dict, str]]:
|
||
"""
|
||
生成 OAuth2 授权 URL
|
||
"""
|
||
storage_oper = self.__get_storage_oper(storage, "generate_auth_url")
|
||
if not storage_oper:
|
||
logger.error(f"不支持 {storage} 的 OAuth2 授权")
|
||
return {}, f"不支持 {storage} 的 OAuth2 授权"
|
||
return storage_oper.generate_auth_url()
|
||
|
||
def check_login(self, storage: str, **kwargs) -> Optional[Dict[str, str]]:
|
||
"""
|
||
登录确认
|
||
"""
|
||
storage_oper = self.__get_storage_oper(storage, "check_login")
|
||
if not storage_oper:
|
||
logger.error(f"不支持 {storage} 的登录确认")
|
||
return None
|
||
return storage_oper.check_login(**kwargs)
|
||
|
||
def list_files(self, fileitem: FileItem, recursion: Optional[bool] = False) -> Optional[List[FileItem]]:
|
||
"""
|
||
浏览文件
|
||
:param fileitem: 源文件
|
||
:param recursion: 是否递归,此时只浏览文件
|
||
:return: 文件项列表
|
||
"""
|
||
if fileitem.storage not in self._support_storages:
|
||
return None
|
||
storage_oper = self.__get_storage_oper(fileitem.storage)
|
||
if not storage_oper:
|
||
logger.error(f"不支持 {fileitem.storage} 的文件浏览")
|
||
return None
|
||
|
||
def __get_files(_item: FileItem, _r: Optional[bool] = False):
|
||
"""
|
||
递归处理
|
||
"""
|
||
_items = storage_oper.list(_item)
|
||
if _items:
|
||
if _r:
|
||
for t in _items:
|
||
if t.type == "dir":
|
||
__get_files(t, _r)
|
||
else:
|
||
result.append(t)
|
||
else:
|
||
result.extend(_items)
|
||
|
||
# 返回结果
|
||
result = []
|
||
__get_files(fileitem, recursion)
|
||
|
||
return result
|
||
|
||
def any_files(self, fileitem: FileItem, extensions: list = None) -> Optional[bool]:
|
||
"""
|
||
查询当前目录下是否存在指定扩展名任意文件
|
||
"""
|
||
if fileitem.storage not in self._support_storages:
|
||
return None
|
||
storage_oper = self.__get_storage_oper(fileitem.storage)
|
||
if not storage_oper:
|
||
logger.error(f"不支持 {fileitem.storage} 的文件浏览")
|
||
return None
|
||
|
||
def __any_file(_item: FileItem):
|
||
"""
|
||
递归处理
|
||
"""
|
||
_items = storage_oper.list(_item)
|
||
if _items:
|
||
if not extensions:
|
||
return True
|
||
for t in _items:
|
||
if (t.type == "file"
|
||
and t.extension
|
||
and f".{t.extension.lower()}" in extensions):
|
||
return True
|
||
elif t.type == "dir":
|
||
if __any_file(t):
|
||
return True
|
||
return False
|
||
|
||
# 返回结果
|
||
return __any_file(fileitem)
|
||
|
||
def create_folder(self, fileitem: FileItem, name: str) -> Optional[FileItem]:
|
||
"""
|
||
创建目录
|
||
:param fileitem: 源文件
|
||
:param name: 目录名
|
||
:return: 创建的目录
|
||
"""
|
||
if fileitem.storage not in self._support_storages:
|
||
return None
|
||
storage_oper = self.__get_storage_oper(fileitem.storage)
|
||
if not storage_oper:
|
||
logger.error(f"不支持 {fileitem.storage} 的目录创建")
|
||
return None
|
||
return storage_oper.create_folder(fileitem, name)
|
||
|
||
def delete_file(self, fileitem: FileItem) -> Optional[bool]:
|
||
"""
|
||
删除文件或目录
|
||
"""
|
||
if fileitem.storage not in self._support_storages:
|
||
return None
|
||
storage_oper = self.__get_storage_oper(fileitem.storage)
|
||
if not storage_oper:
|
||
logger.error(f"不支持 {fileitem.storage} 的删除处理")
|
||
return False
|
||
return storage_oper.delete(fileitem)
|
||
|
||
def rename_file(self, fileitem: FileItem, name: str) -> Optional[bool]:
|
||
"""
|
||
重命名文件或目录
|
||
"""
|
||
if fileitem.storage not in self._support_storages:
|
||
return None
|
||
storage_oper = self.__get_storage_oper(fileitem.storage)
|
||
if not storage_oper:
|
||
logger.error(f"不支持 {fileitem.storage} 的重命名处理")
|
||
return False
|
||
return storage_oper.rename(fileitem, name)
|
||
|
||
def download_file(self, fileitem: FileItem, path: Path = None) -> Optional[Path]:
|
||
"""
|
||
下载文件
|
||
"""
|
||
if fileitem.storage not in self._support_storages:
|
||
return None
|
||
storage_oper = self.__get_storage_oper(fileitem.storage)
|
||
if not storage_oper:
|
||
logger.error(f"不支持 {fileitem.storage} 的下载处理")
|
||
return None
|
||
return storage_oper.download(fileitem, path=path)
|
||
|
||
def upload_file(self, fileitem: FileItem, path: Path, new_name: Optional[str] = None) -> Optional[FileItem]:
|
||
"""
|
||
上传文件
|
||
"""
|
||
if fileitem.storage not in self._support_storages:
|
||
return None
|
||
storage_oper = self.__get_storage_oper(fileitem.storage)
|
||
if not storage_oper:
|
||
logger.error(f"不支持 {fileitem.storage} 的上传处理")
|
||
return None
|
||
return storage_oper.upload(fileitem, path, new_name)
|
||
|
||
def get_file_item(self, storage: str, path: Path) -> Optional[FileItem]:
|
||
"""
|
||
根据路径获取文件项
|
||
"""
|
||
if storage not in self._support_storages:
|
||
return None
|
||
storage_oper = self.__get_storage_oper(storage)
|
||
if not storage_oper:
|
||
logger.error(f"不支持 {storage} 的文件获取")
|
||
return None
|
||
return storage_oper.get_item(path)
|
||
|
||
def get_parent_item(self, fileitem: FileItem) -> Optional[FileItem]:
|
||
"""
|
||
获取上级目录项
|
||
"""
|
||
if fileitem.storage not in self._support_storages:
|
||
return None
|
||
storage_oper = self.__get_storage_oper(fileitem.storage)
|
||
if not storage_oper:
|
||
logger.error(f"不支持 {fileitem.storage} 的文件获取")
|
||
return None
|
||
return storage_oper.get_parent(fileitem)
|
||
|
||
def snapshot_storage(self, storage: str, path: Path,
|
||
last_snapshot_time: float = None, max_depth: int = 5) -> Optional[Dict[str, Dict]]:
|
||
"""
|
||
快照存储
|
||
:param storage: 存储类型
|
||
:param path: 路径
|
||
:param last_snapshot_time: 上次快照时间,用于增量快照
|
||
:param max_depth: 最大递归深度,避免过深遍历
|
||
"""
|
||
if storage not in self._support_storages:
|
||
return None
|
||
storage_oper = self.__get_storage_oper(storage)
|
||
if not storage_oper:
|
||
logger.error(f"不支持 {storage} 的快照处理")
|
||
return None
|
||
return storage_oper.snapshot(path, last_snapshot_time=last_snapshot_time, max_depth=max_depth)
|
||
|
||
def storage_usage(self, storage: str) -> Optional[StorageUsage]:
|
||
"""
|
||
存储使用情况
|
||
"""
|
||
if storage not in self._support_storages:
|
||
return None
|
||
storage_oper = self.__get_storage_oper(storage)
|
||
if not storage_oper:
|
||
logger.error(f"不支持 {storage} 的存储使用情况")
|
||
return None
|
||
return storage_oper.usage()
|
||
|
||
def transfer(self, fileitem: FileItem, meta: MetaBase, mediainfo: MediaInfo,
|
||
target_directory: TransferDirectoryConf = None,
|
||
target_storage: Optional[str] = None, target_path: Path = None,
|
||
transfer_type: Optional[str] = None, scrape: Optional[bool] = None,
|
||
library_type_folder: Optional[bool] = None, library_category_folder: Optional[bool] = None,
|
||
episodes_info: List[TmdbEpisode] = None,
|
||
source_oper: Callable = None, target_oper: Callable = None) -> TransferInfo:
|
||
"""
|
||
文件整理
|
||
:param fileitem: 文件信息
|
||
:param meta: 预识别的元数据
|
||
:param mediainfo: 识别的媒体信息
|
||
:param target_directory: 目标目录配置
|
||
:param target_storage: 目标存储
|
||
:param target_path: 目标路径
|
||
:param transfer_type: 转移模式
|
||
:param scrape: 是否刮削元数据
|
||
:param library_type_folder: 是否按媒体类型创建目录
|
||
:param library_category_folder: 是否按媒体类别创建目录
|
||
:param episodes_info: 当前季的全部集信息
|
||
:param source_oper: 源存储操作对象
|
||
:param target_oper: 目标存储操作对象
|
||
:return: {path, target_path, message}
|
||
"""
|
||
handler = TransHandler()
|
||
# 检查目录路径
|
||
if fileitem.storage == "local" and not Path(fileitem.path).exists():
|
||
return TransferInfo(success=False,
|
||
fileitem=fileitem,
|
||
message=f"{fileitem.path} 不存在")
|
||
# 目标路径不能是文件
|
||
if target_path and target_path.is_file():
|
||
logger.error(f"整理目标路径 {target_path} 是一个文件")
|
||
return TransferInfo(success=False,
|
||
fileitem=fileitem,
|
||
message=f"{target_path} 不是有效目录")
|
||
# 获取目标路径
|
||
if target_directory:
|
||
# 目标媒体库目录未设置
|
||
if not target_directory.library_path:
|
||
logger.error(f"目标媒体库目录未设置,无法整理文件,源路径:{fileitem.path}")
|
||
return TransferInfo(success=False,
|
||
fileitem=fileitem,
|
||
message="目标媒体库目录未设置")
|
||
# 整理方式
|
||
if not transfer_type:
|
||
transfer_type = target_directory.transfer_type
|
||
# 目标存储
|
||
if not target_storage:
|
||
target_storage = target_directory.library_storage
|
||
# 是否需要重命名
|
||
need_rename = target_directory.renaming
|
||
# 是否需要通知
|
||
need_notify = target_directory.notify
|
||
# 覆盖模式
|
||
overwrite_mode = target_directory.overwrite_mode
|
||
# 是否需要刮削
|
||
need_scrape = target_directory.scraping if scrape is None else scrape
|
||
# 拼装媒体库一、二级子目录
|
||
target_path = handler.get_dest_dir(mediainfo=mediainfo, target_dir=target_directory,
|
||
need_type_folder=library_type_folder,
|
||
need_category_folder=library_category_folder)
|
||
elif target_path:
|
||
need_scrape = scrape or False
|
||
need_rename = True
|
||
need_notify = False
|
||
overwrite_mode = "never"
|
||
# 手动整理的场景,有自定义目标路径
|
||
target_path = handler.get_dest_path(mediainfo=mediainfo, target_path=target_path,
|
||
need_type_folder=library_type_folder,
|
||
need_category_folder=library_category_folder)
|
||
else:
|
||
# 未找到有效的媒体库目录
|
||
logger.error(
|
||
f"{mediainfo.type.value if mediainfo.type else '未知类型'} {mediainfo.title_year} 未找到有效的媒体库目录,无法整理文件,源路径:{fileitem.path}")
|
||
return TransferInfo(success=False,
|
||
fileitem=fileitem,
|
||
message="未找到有效的媒体库目录")
|
||
# 整理方式
|
||
if not transfer_type:
|
||
logger.error(f"{target_directory.name} 未设置整理方式")
|
||
return TransferInfo(success=False,
|
||
fileitem=fileitem,
|
||
message=f"{target_directory.name} 未设置整理方式")
|
||
|
||
# 源操作对象
|
||
if not source_oper:
|
||
source_oper = self.__get_storage_oper(fileitem.storage)
|
||
if not source_oper:
|
||
return TransferInfo(success=False,
|
||
message=f"不支持的存储类型:{fileitem.storage}",
|
||
fileitem=fileitem,
|
||
fail_list=[fileitem.path],
|
||
transfer_type=transfer_type,
|
||
need_notify=need_notify
|
||
)
|
||
# 目的操作对象
|
||
if not target_oper:
|
||
if not target_storage:
|
||
target_storage = fileitem.storage
|
||
target_oper = self.__get_storage_oper(target_storage)
|
||
if not target_oper:
|
||
return TransferInfo(success=False,
|
||
message=f"不支持的存储类型:{target_storage}",
|
||
fileitem=fileitem,
|
||
fail_list=[fileitem.path],
|
||
transfer_type=transfer_type,
|
||
need_notify=need_notify)
|
||
|
||
# 整理
|
||
logger.info(f"获取整理目标路径:【{target_storage}】{target_path}")
|
||
return handler.transfer_media(fileitem=fileitem,
|
||
in_meta=meta,
|
||
mediainfo=mediainfo,
|
||
target_storage=target_storage,
|
||
target_path=target_path,
|
||
transfer_type=transfer_type,
|
||
need_scrape=need_scrape,
|
||
need_rename=need_rename,
|
||
need_notify=need_notify,
|
||
overwrite_mode=overwrite_mode,
|
||
episodes_info=episodes_info,
|
||
source_oper=source_oper,
|
||
target_oper=target_oper)
|
||
|
||
def media_files(self, mediainfo: MediaInfo) -> List[FileItem]:
|
||
"""
|
||
获取对应媒体的媒体库文件列表
|
||
:param mediainfo: 媒体信息
|
||
"""
|
||
handler = TransHandler()
|
||
ret_fileitems = []
|
||
# 检查本地媒体库
|
||
dest_dirs = DirectoryHelper().get_library_dirs()
|
||
# 检查每一个媒体库目录
|
||
for dest_dir in dest_dirs:
|
||
# 存储
|
||
storage_oper = self.__get_storage_oper(dest_dir.library_storage)
|
||
if not storage_oper:
|
||
continue
|
||
# 媒体分类路径
|
||
dir_path = handler.get_dest_dir(mediainfo=mediainfo, target_dir=dest_dir)
|
||
# 重命名格式
|
||
rename_format = settings.RENAME_FORMAT(mediainfo.type)
|
||
# 元数据补上常用属性,尽可能确保重命名后的路径不出现空白
|
||
meta = MetaInfo(mediainfo.title)
|
||
if meta.type == MediaType.UNKNOWN and mediainfo.type is not None:
|
||
meta.type = mediainfo.type
|
||
if meta.year is None:
|
||
meta.year = mediainfo.year
|
||
if meta.begin_season is None:
|
||
meta.begin_season = 1
|
||
if meta.begin_episode is None:
|
||
meta.begin_episode = 1
|
||
# 获取路径(重命名路径)
|
||
target_path = handler.get_rename_path(
|
||
path=dir_path,
|
||
template_string=rename_format,
|
||
rename_dict=handler.get_naming_dict(meta=meta,
|
||
mediainfo=mediainfo)
|
||
)
|
||
# 获取重命名后的媒体文件根路径
|
||
media_path = DirectoryHelper.get_media_root_path(
|
||
rename_format, rename_path=target_path
|
||
)
|
||
if not media_path:
|
||
# 忽略
|
||
continue
|
||
if dir_path != media_path and dir_path.is_relative_to(media_path):
|
||
# 兜底检查,避免不必要的扫盘
|
||
logger.warn(f"{media_path} 是媒体库目录 {dir_path} 的父目录,忽略获取媒体文件列表,请检查重命名格式!")
|
||
continue
|
||
# 检索媒体文件
|
||
fileitem = storage_oper.get_item(media_path)
|
||
if not fileitem:
|
||
continue
|
||
try:
|
||
media_files = self.list_files(fileitem, True)
|
||
except Exception as e:
|
||
logger.debug(f"获取媒体文件列表失败:{str(e)}")
|
||
continue
|
||
if media_files:
|
||
for media_file in media_files:
|
||
if f".{media_file.extension.lower()}" in settings.RMT_MEDIAEXT:
|
||
if media_file not in ret_fileitems:
|
||
ret_fileitems.append(media_file)
|
||
return ret_fileitems
|
||
|
||
def media_exists(self, mediainfo: MediaInfo, **kwargs) -> Optional[ExistMediaInfo]:
|
||
"""
|
||
判断媒体文件是否存在于文件系统(网盘或本地文件),只支持标准媒体库结构
|
||
:param mediainfo: 识别的媒体信息
|
||
:return: 如不存在返回None,存在时返回信息,包括每季已存在所有集{type: movie/tv, seasons: {season: [episodes]}}
|
||
"""
|
||
if not settings.LOCAL_EXISTS_SEARCH:
|
||
return None
|
||
|
||
logger.debug(f"正在本地媒体库中查找 {mediainfo.title_year}...")
|
||
|
||
# 检查媒体库
|
||
fileitems = self.media_files(mediainfo)
|
||
if not fileitems:
|
||
logger.debug(f"{mediainfo.title_year} 不在本地媒体库中")
|
||
return None
|
||
|
||
if mediainfo.type == MediaType.MOVIE:
|
||
# 电影存在任何文件为存在
|
||
logger.info(f"{mediainfo.title_year} 在本地文件系统中找到了")
|
||
return ExistMediaInfo(type=MediaType.MOVIE)
|
||
else:
|
||
# 电视剧检索集数
|
||
seasons: Dict[int, list] = {}
|
||
for fileitem in fileitems:
|
||
file_meta = MetaInfo(fileitem.basename)
|
||
season_index = file_meta.begin_season or 1
|
||
episode_index = file_meta.begin_episode
|
||
if not episode_index:
|
||
continue
|
||
if season_index not in seasons:
|
||
seasons[season_index] = []
|
||
if episode_index not in seasons[season_index]:
|
||
seasons[season_index].append(episode_index)
|
||
# 返回剧集情况
|
||
logger.info(f"{mediainfo.title_year} 在本地文件系统中找到了这些季集:{seasons}")
|
||
return ExistMediaInfo(type=MediaType.TV, seasons=seasons)
|