fix scraper

This commit is contained in:
jxxghp
2024-07-01 15:25:35 +08:00
parent 964e212831
commit 778b562cab
6 changed files with 18 additions and 482 deletions

View File

@@ -507,21 +507,6 @@ class ChainBase(metaclass=ABCMeta):
note=note_list)
return self.run_module("post_torrents_message", message=message, torrents=torrents)
def scrape_metadata(self, path: Path, mediainfo: MediaInfo, transfer_type: str,
metainfo: MetaBase = None, force_nfo: bool = False, force_img: bool = False) -> None:
"""
刮削元数据
:param path: 媒体文件路径
:param mediainfo: 识别的媒体信息
:param metainfo: 源文件的识别元数据
:param transfer_type: 转移模式
:param force_nfo: 强制刮削nfo
:param force_img: 强制刮削图片
:return: 成功或失败
"""
self.run_module("scrape_metadata", path=path, mediainfo=mediainfo, metainfo=metainfo,
transfer_type=transfer_type, force_nfo=force_nfo, force_img=force_img)
def metadata_img(self, mediainfo: MediaInfo, season: int = None) -> Optional[dict]:
"""
获取图片名称和url

View File

@@ -350,6 +350,8 @@ class MediaChain(ChainBase, metaclass=Singleton):
tmp_file = settings.TEMP_PATH / _path.name
tmp_file.write_bytes(_content)
StorageChain().upload_file(fileitem=_fileitem, path=tmp_file)
if tmp_file.exists():
tmp_file.unlink()
def __save_image(_url: str) -> Optional[bytes]:
"""

View File

@@ -1,5 +1,4 @@
import re
from pathlib import Path
from typing import List, Optional, Tuple, Union
import cn2an
@@ -8,7 +7,7 @@ from app import schemas
from app.core.config import settings
from app.core.context import MediaInfo
from app.core.meta import MetaBase
from app.core.metainfo import MetaInfo, MetaInfoPath
from app.core.metainfo import MetaInfo
from app.log import logger
from app.modules import _ModuleBase
from app.modules.douban.apiv2 import DoubanApi
@@ -18,7 +17,6 @@ from app.schemas import MediaPerson
from app.schemas.types import MediaType
from app.utils.common import retry
from app.utils.http import RequestUtils
from app.utils.system import SystemUtils
class DoubanModule(_ModuleBase):
@@ -664,107 +662,6 @@ class DoubanModule(_ModuleBase):
return [MediaInfo(douban_info=info) for info in infos.get("subject_collection_items")]
return []
def scrape_metadata(self, path: Path, mediainfo: MediaInfo, transfer_type: str,
metainfo: MetaBase = None, force_nfo: bool = False, force_img: bool = False) -> None:
"""
刮削元数据
:param path: 媒体文件路径
:param mediainfo: 识别的媒体信息
:param transfer_type: 传输类型
:param metainfo: 源文件的识别元数据
:param force_nfo: 是否强制刮削nfo
:param force_img: 是否强制刮削图片
:return: 成功或失败
"""
def __get_mediainfo(_meta: MetaBase, _mediainfo: MediaInfo) -> Optional[MediaInfo]:
"""
获取豆瓣媒体信息
"""
if not _meta.name:
return None
# 查询豆瓣详情
if not _mediainfo.douban_id:
# 根据TMDB名称查询豆瓣数据
_doubaninfo = self.match_doubaninfo(name=_mediainfo.title,
imdbid=_mediainfo.imdb_id,
mtype=_mediainfo.type,
year=_mediainfo.year)
if not _doubaninfo:
logger.warn(f"未找到 {_mediainfo.title} 的豆瓣信息")
return None
_doubaninfo = self.douban_info(doubanid=_doubaninfo.get("id"), mtype=_mediainfo.type)
else:
_doubaninfo = self.douban_info(doubanid=_mediainfo.douban_id,
mtype=_mediainfo.type)
if not _doubaninfo:
logger(f"未获取到 {_mediainfo.douban_id} 的豆瓣媒体信息,无法刮削!")
return None
# 豆瓣媒体信息
_doubanmedia = MediaInfo(douban_info=_doubaninfo)
# 补充图片
self.obtain_images(_doubanmedia)
return _doubanmedia
if settings.SCRAP_SOURCE != "douban":
return None
if SystemUtils.is_bluray_dir(path):
# 蓝光原盘
logger.info(f"开始刮削蓝光原盘:{path} ...")
# 优先使用传入metainfo
meta = metainfo or MetaInfo(path.name)
# 刮削路径
scrape_path = path / path.name
# 媒体信息
doubanmedia = __get_mediainfo(_meta=meta, _mediainfo=mediainfo)
if not doubanmedia:
return
# 刮削
self.scraper.gen_scraper_files(meta=meta,
mediainfo=doubanmedia,
file_path=scrape_path,
transfer_type=transfer_type,
force_nfo=force_nfo,
force_img=force_img)
elif path.is_file():
# 刮削单个文件
logger.info(f"开始刮削媒体库文件:{path} ...")
# 优先使用传入metainfo
meta = metainfo or MetaInfoPath(path)
# 媒体信息
doubanmedia = __get_mediainfo(_meta=meta, _mediainfo=mediainfo)
if not doubanmedia:
return
# 刮削
self.scraper.gen_scraper_files(meta=meta,
mediainfo=doubanmedia,
file_path=path,
transfer_type=transfer_type,
force_nfo=force_nfo,
force_img=force_img)
else:
# 目录下的所有文件
for file in SystemUtils.list_files(path, settings.RMT_MEDIAEXT):
if not file:
continue
logger.info(f"开始刮削媒体库文件:{file} ...")
try:
meta = MetaInfoPath(file)
# 豆瓣媒体信息
doubanmedia = __get_mediainfo(_meta=meta, _mediainfo=mediainfo)
if not doubanmedia:
return
# 刮削
self.scraper.gen_scraper_files(meta=meta,
mediainfo=doubanmedia,
file_path=file,
transfer_type=transfer_type,
force_nfo=force_nfo,
force_img=force_img)
except Exception as e:
logger.error(f"刮削文件 {file} 失败,原因:{str(e)}")
logger.info(f"{path} 刮削完成")
def metadata_nfo(self, mediainfo: MediaInfo, season: int = None, **kwargs) -> Optional[str]:
"""
获取NFO文件内容文本

View File

@@ -1,15 +1,11 @@
from pathlib import Path
from typing import Union, Optional
from typing import Optional
from xml.dom import minidom
from app.core.config import settings
from app.core.context import MediaInfo
from app.core.meta import MetaBase
from app.log import logger
from app.schemas.types import MediaType
from app.utils.dom import DomUtils
from app.utils.http import RequestUtils
from app.utils.system import SystemUtils
class DoubanScraper:
@@ -55,64 +51,6 @@ class DoubanScraper:
ret_dict[f"backdrop{Path(mediainfo.backdrop_path).suffix}"] = mediainfo.backdrop_path
return ret_dict
def gen_scraper_files(self, meta: MetaBase, mediainfo: MediaInfo,
file_path: Path, transfer_type: str,
force_nfo: bool = False, force_img: bool = False):
"""
生成刮削文件
:param meta: 元数据
:param mediainfo: 媒体信息
:param file_path: 文件路径或者目录路径
:param transfer_type: 转输类型
:param force_nfo: 强制生成NFO
:param force_img: 强制生成图片
"""
if not mediainfo or not file_path:
return
self._transfer_type = transfer_type
self._force_nfo = force_nfo
self._force_img = force_img
try:
# 电影
if mediainfo.type == MediaType.MOVIE:
# 强制或者不已存在时才处理
if self._force_nfo or (not file_path.with_name("movie.nfo").exists()
and not file_path.with_suffix(".nfo").exists()):
# 生成电影描述文件
self.__gen_movie_nfo_file(mediainfo=mediainfo,
file_path=file_path)
# 生成电影图片
image_dict = self.get_metadata_img(mediainfo)
for img_name, img_url in image_dict.items():
image_path = file_path.with_name(img_name)
if self._force_img or not image_path.exists():
self.__save_image(url=img_url,
file_path=image_path)
# 电视剧
else:
# 不存在时才处理
if self._force_nfo or not file_path.parent.with_name("tvshow.nfo").exists():
# 根目录描述文件
self.__gen_tv_nfo_file(mediainfo=mediainfo,
dir_path=file_path.parents[1])
# 生成根目录图片
image_dict = self.get_metadata_img(mediainfo)
for img_name, img_url in image_dict.items():
image_path = file_path.with_name(img_name)
if self._force_img or not image_path.exists():
self.__save_image(url=img_url,
file_path=image_path)
# 季目录NFO
if self._force_nfo or not file_path.with_name("season.nfo").exists():
self.__gen_tv_season_nfo_file(mediainfo=mediainfo,
season=meta.begin_season,
season_path=file_path.parent)
except Exception as e:
logger.error(f"{file_path} 刮削失败:{str(e)}")
@staticmethod
def __gen_common_nfo(mediainfo: MediaInfo, doc: minidom.Document, root: minidom.Node):
# 简介
@@ -136,17 +74,12 @@ class DoubanScraper:
return doc
def __gen_movie_nfo_file(self,
mediainfo: MediaInfo,
file_path: Path = None) -> minidom.Document:
def __gen_movie_nfo_file(self, mediainfo: MediaInfo) -> minidom.Document:
"""
生成电影的NFO描述文件
:param mediainfo: 豆瓣信息
:param file_path: 电影文件路径
"""
# 开始生成XML
if file_path:
logger.info(f"正在生成电影NFO文件{file_path.name}")
doc = minidom.Document()
root = DomUtils.add_node(doc, doc, "movie")
# 公共部分
@@ -157,22 +90,15 @@ class DoubanScraper:
DomUtils.add_node(doc, root, "title", mediainfo.title or "")
# 年份
DomUtils.add_node(doc, root, "year", mediainfo.year or "")
# 保存
if file_path:
self.__save_nfo(doc, file_path.with_suffix(".nfo"))
return doc
def __gen_tv_nfo_file(self,
mediainfo: MediaInfo,
dir_path: Path = None) -> minidom.Document:
def __gen_tv_nfo_file(self, mediainfo: MediaInfo) -> minidom.Document:
"""
生成电视剧的NFO描述文件
:param mediainfo: 媒体信息
:param dir_path: 电视剧根目录
"""
# 开始生成XML
logger.info(f"正在生成电视剧NFO文件{dir_path.name}")
doc = minidom.Document()
root = DomUtils.add_node(doc, doc, "tvshow")
# 公共部分
@@ -185,21 +111,17 @@ class DoubanScraper:
DomUtils.add_node(doc, root, "year", mediainfo.year or "")
DomUtils.add_node(doc, root, "season", "-1")
DomUtils.add_node(doc, root, "episode", "-1")
# 保存
if dir_path:
self.__save_nfo(doc, dir_path.joinpath("tvshow.nfo"))
return doc
def __gen_tv_season_nfo_file(self, mediainfo: MediaInfo,
season: int, season_path: Path = None) -> minidom.Document:
@staticmethod
def __gen_tv_season_nfo_file(mediainfo: MediaInfo,
season: int) -> minidom.Document:
"""
生成电视剧季的NFO描述文件
:param mediainfo: 媒体信息
:param season: 季号
:param season_path: 电视剧季的目录
"""
logger.info(f"正在生成季NFO文件{season_path.name}")
doc = minidom.Document()
root = DomUtils.add_node(doc, doc, "season")
# 简介
@@ -216,56 +138,5 @@ class DoubanScraper:
DomUtils.add_node(doc, root, "year", mediainfo.release_date[:4] if mediainfo.release_date else "")
# seasonnumber
DomUtils.add_node(doc, root, "seasonnumber", str(season))
# 保存
if season_path:
self.__save_nfo(doc, season_path.joinpath("season.nfo"))
return doc
def __save_image(self, url: str, file_path: Path):
"""
下载图片并保存
"""
if not url:
return
try:
# 没有后缀时处理URL转化为jpg格式
if not file_path.suffix:
url = url.replace("/format/webp", "/format/jpg")
file_path.with_suffix(".jpg")
logger.info(f"正在下载{file_path.stem}图片:{url} ...")
r = RequestUtils().get_res(url=url)
if r:
if self._transfer_type in ['rclone_move', 'rclone_copy']:
self.__save_remove_file(file_path, r.content)
else:
file_path.write_bytes(r.content)
logger.info(f"图片已保存:{file_path}")
else:
logger.info(f"{file_path.stem}图片下载失败,请检查网络连通性")
except Exception as err:
logger.error(f"{file_path.stem}图片下载失败:{str(err)}")
def __save_nfo(self, doc, file_path: Path):
"""
保存NFO
"""
xml_str = doc.toprettyxml(indent=" ", encoding="utf-8")
if self._transfer_type in ['rclone_move', 'rclone_copy']:
self.__save_remove_file(file_path, xml_str)
else:
file_path.write_bytes(xml_str)
logger.info(f"NFO文件已保存{file_path}")
def __save_remove_file(self, out_file: Path, content: Union[str, bytes]):
"""
保存文件到远端
"""
temp_file = settings.TEMP_PATH / str(out_file)[1:]
temp_file_dir = temp_file.parent
if not temp_file_dir.exists():
temp_file_dir.mkdir(parents=True, exist_ok=True)
temp_file.write_bytes(content)
if self._transfer_type == 'rclone_move':
SystemUtils.rclone_move(temp_file, out_file)
elif self._transfer_type == 'rclone_copy':
SystemUtils.rclone_copy(temp_file, out_file)

View File

@@ -1,4 +1,3 @@
from pathlib import Path
from typing import Optional, List, Tuple, Union, Dict
import cn2an
@@ -16,7 +15,6 @@ from app.modules.themoviedb.tmdbapi import TmdbApi
from app.schemas import MediaPerson
from app.schemas.types import MediaType, MediaImageType
from app.utils.http import RequestUtils
from app.utils.system import SystemUtils
class TheMovieDbModule(_ModuleBase):
@@ -289,53 +287,6 @@ class TheMovieDbModule(_ModuleBase):
return [MediaPerson(source='themoviedb', **person) for person in results]
return []
def scrape_metadata(self, path: Path, mediainfo: MediaInfo, transfer_type: str,
metainfo: MetaBase = None, force_nfo: bool = False, force_img: bool = False) -> None:
"""
刮削元数据
:param path: 媒体文件路径
:param mediainfo: 识别的媒体信息
:param metainfo: 源文件的识别元数据
:param transfer_type: 转移类型
:param force_nfo: 强制刮削nfo
:param force_img: 强制刮削图片
:return: 成功或失败
"""
if settings.SCRAP_SOURCE != "themoviedb":
return None
if SystemUtils.is_bluray_dir(path):
# 蓝光原盘
logger.info(f"开始刮削蓝光原盘:{path} ...")
scrape_path = path / path.name
self.scraper.gen_scraper_files(mediainfo=mediainfo,
file_path=scrape_path,
transfer_type=transfer_type,
metainfo=metainfo,
force_nfo=force_nfo,
force_img=force_img)
elif path.is_file():
# 单个文件
logger.info(f"开始刮削媒体库文件:{path} ...")
self.scraper.gen_scraper_files(mediainfo=mediainfo,
file_path=path,
transfer_type=transfer_type,
metainfo=metainfo,
force_nfo=force_nfo,
force_img=force_img)
else:
# 目录下的所有文件
logger.info(f"开始刮削目录:{path} ...")
for file in SystemUtils.list_files(path, settings.RMT_MEDIAEXT):
if not file:
continue
self.scraper.gen_scraper_files(mediainfo=mediainfo,
file_path=file,
transfer_type=transfer_type,
force_nfo=force_nfo,
force_img=force_img)
logger.info(f"{path} 刮削完成")
def metadata_nfo(self, meta: MetaBase, mediainfo: MediaInfo,
season: int = None, episode: int = None) -> Optional[str]:
"""

View File

@@ -1,20 +1,12 @@
import traceback
from pathlib import Path
from typing import Union, Optional, Tuple
from typing import Optional, Tuple
from xml.dom import minidom
from requests import RequestException
from app.core.config import settings
from app.core.context import MediaInfo
from app.core.meta import MetaBase
from app.core.metainfo import MetaInfo
from app.log import logger
from app.schemas.types import MediaType
from app.utils.common import retry
from app.utils.dom import DomUtils
from app.utils.http import RequestUtils
from app.utils.system import SystemUtils
class TmdbScraper:
@@ -110,93 +102,6 @@ class TmdbScraper:
return _episode_info
return {}
def gen_scraper_files(self, mediainfo: MediaInfo, file_path: Path, transfer_type: str,
metainfo: MetaBase = None, force_nfo: bool = False, force_img: bool = False):
"""
生成刮削文件包括NFO和图片传入路径为文件路径
:param mediainfo: 媒体信息
:param metainfo: 源文件的识别元数据
:param file_path: 文件路径或者目录路径
:param transfer_type: 传输类型
:param force_nfo: 是否强制生成NFO
:param force_img: 是否强制生成图片
"""
if not mediainfo or not file_path:
return
self._transfer_type = transfer_type
self._force_nfo = force_nfo
self._force_img = force_img
try:
# 电影,路径为文件名 名称/名称.xxx 或者蓝光原盘目录 名称/名称
if mediainfo.type == MediaType.MOVIE:
# 不已存在时才处理
if self._force_nfo or (not file_path.with_name("movie.nfo").exists()
and not file_path.with_suffix(".nfo").exists()):
# 生成电影描述文件
self.__gen_movie_nfo_file(mediainfo=mediainfo,
file_path=file_path)
# 生成电影图片
image_dict = self.get_metadata_img(mediainfo=mediainfo)
for image_name, image_url in image_dict.items():
image_path = file_path.with_name(image_name)
if self._force_img or not image_path.exists():
self.__save_image(url=image_url, file_path=image_path)
# 电视剧,路径为每一季的文件名 名称/Season xx/名称 SxxExx.xxx
else:
# 如果有上游传入的元信息则使用,否则使用文件名识别
meta = metainfo or MetaInfo(file_path.name)
if meta.begin_season is None:
meta.begin_season = mediainfo.season if mediainfo.season is not None else 1
# 根目录不存在时才处理
if self._force_nfo or not file_path.parent.with_name("tvshow.nfo").exists():
# 根目录描述文件
self.__gen_tv_nfo_file(mediainfo=mediainfo,
dir_path=file_path.parents[1])
# 生成根目录图片
image_dict = self.get_metadata_img(mediainfo=mediainfo)
for image_name, image_url in image_dict.items():
image_path = file_path.parent.with_name(image_name)
if self._force_img or not image_path.exists():
self.__save_image(url=image_url, file_path=image_path)
# 查询季信息
seasoninfo = self.tmdb.get_tv_season_detail(mediainfo.tmdb_id, meta.begin_season)
if seasoninfo:
# 季目录NFO
if self._force_nfo or not file_path.with_name("season.nfo").exists():
self.__gen_tv_season_nfo_file(seasoninfo=seasoninfo,
season=meta.begin_season,
season_path=file_path.parent)
# TMDB季图片
poster_name, poster_url = self.get_season_poster(seasoninfo, meta.begin_season)
if poster_name and poster_url:
image_path = file_path.parent.with_name(poster_name)
if self._force_img or not image_path.exists():
self.__save_image(url=poster_url, file_path=image_path)
# 查询集详情
episodeinfo = self.__get_episode_detail(seasoninfo, meta.begin_episode)
if episodeinfo:
# 集NFO
if self._force_nfo or not file_path.with_suffix(".nfo").exists():
self.__gen_tv_episode_nfo_file(episodeinfo=episodeinfo,
tmdbid=mediainfo.tmdb_id,
season=meta.begin_season,
episode=meta.begin_episode,
file_path=file_path)
# 集的图片
episode_image = episodeinfo.get("still_path")
if episode_image:
image_path = file_path.with_name(file_path.stem + "-thumb.jpg").with_suffix(
Path(episode_image).suffix)
if self._force_img or not image_path.exists():
self.__save_image(
f"https://{settings.TMDB_IMAGE_DOMAIN}/t/p/original{episode_image}",
image_path)
except Exception as e:
logger.error(f"{file_path} 刮削失败:{str(e)} - {traceback.format_exc()}")
@staticmethod
def __gen_common_nfo(mediainfo: MediaInfo, doc: minidom.Document, root: minidom.Element):
"""
@@ -250,17 +155,12 @@ class TmdbScraper:
return doc
def __gen_movie_nfo_file(self,
mediainfo: MediaInfo,
file_path: Path = None) -> minidom.Document:
def __gen_movie_nfo_file(self, mediainfo: MediaInfo) -> minidom.Document:
"""
生成电影的NFO描述文件
:param mediainfo: 识别后的媒体信息
:param file_path: 电影文件路径
"""
# 开始生成XML
if file_path:
logger.info(f"正在生成电影NFO文件{file_path.name}")
doc = minidom.Document()
root = DomUtils.add_node(doc, doc, "movie")
# 公共部分
@@ -274,22 +174,14 @@ class TmdbScraper:
DomUtils.add_node(doc, root, "premiered", mediainfo.release_date or "")
# 年份
DomUtils.add_node(doc, root, "year", mediainfo.year or "")
# 保存
if file_path:
self.__save_nfo(doc, file_path.with_suffix(".nfo"))
return doc
def __gen_tv_nfo_file(self,
mediainfo: MediaInfo,
dir_path: Path = None) -> minidom.Document:
def __gen_tv_nfo_file(self, mediainfo: MediaInfo) -> minidom.Document:
"""
生成电视剧的NFO描述文件
:param mediainfo: 媒体信息
:param dir_path: 电视剧根目录
"""
# 开始生成XML
if dir_path:
logger.info(f"正在生成电视剧NFO文件{dir_path.name}")
doc = minidom.Document()
root = DomUtils.add_node(doc, doc, "tvshow")
# 公共部分
@@ -305,22 +197,16 @@ class TmdbScraper:
DomUtils.add_node(doc, root, "year", mediainfo.year or "")
DomUtils.add_node(doc, root, "season", "-1")
DomUtils.add_node(doc, root, "episode", "-1")
# 保存
if dir_path:
self.__save_nfo(doc, dir_path.joinpath("tvshow.nfo"))
return doc
def __gen_tv_season_nfo_file(self, seasoninfo: dict,
season: int, season_path: Path = None) -> minidom.Document:
@staticmethod
def __gen_tv_season_nfo_file(seasoninfo: dict, season: int) -> minidom.Document:
"""
生成电视剧季的NFO描述文件
:param seasoninfo: TMDB季媒体信息
:param season: 季号
:param season_path: 电视剧季的目录
"""
if season_path:
logger.info(f"正在生成季NFO文件{season_path.name}")
doc = minidom.Document()
root = DomUtils.add_node(doc, doc, "season")
# 简介
@@ -338,28 +224,21 @@ class TmdbScraper:
seasoninfo.get("air_date")[:4] if seasoninfo.get("air_date") else "")
# seasonnumber
DomUtils.add_node(doc, root, "seasonnumber", str(season))
# 保存
if season_path:
self.__save_nfo(doc, season_path.joinpath("season.nfo"))
return doc
def __gen_tv_episode_nfo_file(self,
tmdbid: int,
@staticmethod
def __gen_tv_episode_nfo_file(tmdbid: int,
episodeinfo: dict,
season: int,
episode: int,
file_path: Path = None) -> minidom.Document:
episode: int) -> minidom.Document:
"""
生成电视剧集的NFO描述文件
:param tmdbid: TMDBID
:param episodeinfo: 集TMDB元数据
:param season: 季号
:param episode: 集号
:param file_path: 集文件的路径
"""
# 开始生成集的信息
if file_path:
logger.info(f"正在生成剧集NFO文件{file_path.name}")
doc = minidom.Document()
root = DomUtils.add_node(doc, doc, "episodedetails")
# TMDBID
@@ -404,53 +283,4 @@ class TmdbScraper:
f"https://{settings.TMDB_IMAGE_DOMAIN}/t/p/original{actor.get('profile_path')}")
DomUtils.add_node(doc, xactor, "profile",
f"https://www.themoviedb.org/person/{actor.get('id')}")
# 保存文件
if file_path:
self.__save_nfo(doc, file_path.with_suffix(".nfo"))
return doc
@retry(RequestException, logger=logger)
def __save_image(self, url: str, file_path: Path):
"""
下载图片并保存
"""
try:
logger.info(f"正在下载{file_path.stem}图片:{url} ...")
r = RequestUtils(proxies=settings.PROXY).get_res(url=url, raise_exception=True)
if r:
if self._transfer_type in ['rclone_move', 'rclone_copy']:
self.__save_remove_file(file_path, r.content)
else:
file_path.write_bytes(r.content)
logger.info(f"图片已保存:{file_path}")
else:
logger.info(f"{file_path.stem}图片下载失败,请检查网络连通性")
except RequestException as err:
raise err
except Exception as err:
logger.error(f"{file_path.stem}图片下载失败:{str(err)}")
def __save_nfo(self, doc: minidom.Document, file_path: Path):
"""
保存NFO
"""
xml_str = doc.toprettyxml(indent=" ", encoding="utf-8")
if self._transfer_type in ['rclone_move', 'rclone_copy']:
self.__save_remove_file(file_path, xml_str)
else:
file_path.write_bytes(xml_str)
logger.info(f"NFO文件已保存{file_path}")
def __save_remove_file(self, out_file: Path, content: Union[str, bytes]):
"""
保存文件到远端
"""
temp_file = settings.TEMP_PATH / str(out_file)[1:]
temp_file_dir = temp_file.parent
if not temp_file_dir.exists():
temp_file_dir.mkdir(parents=True, exist_ok=True)
temp_file.write_bytes(content)
if self._transfer_type == 'rclone_move':
SystemUtils.rclone_move(temp_file, out_file)
elif self._transfer_type == 'rclone_copy':
SystemUtils.rclone_copy(temp_file, out_file)