diff --git a/app/chain/media.py b/app/chain/media.py index a6fb1af6..e0a61722 100644 --- a/app/chain/media.py +++ b/app/chain/media.py @@ -1,4 +1,5 @@ from pathlib import Path +from tempfile import NamedTemporaryFile from threading import Lock from typing import Optional, List, Tuple, Union @@ -456,36 +457,53 @@ class MediaChain(ChainBase): """ if not _fileitem or not _content or not _path: return - # 保存文件到临时目录 - tmp_dir = settings.TEMP_PATH / StringUtils.generate_random_str(10) - tmp_dir.mkdir(parents=True, exist_ok=True) - tmp_file = tmp_dir / _path.name - tmp_file.write_bytes(_content) - # 获取文件的父目录 - try: - item = storagechain.upload_file(fileitem=_fileitem, path=tmp_file, new_name=_path.name) + # 使用tempfile创建临时文件,自动删除 + with NamedTemporaryFile(delete=True, suffix=_path.suffix) as tmp_file: + # 写入内容 + if isinstance(_content, bytes): + tmp_file.write(_content) + else: + tmp_file.write(_content.encode('utf-8')) + tmp_file.flush() + # 上传文件 + item = storagechain.upload_file(fileitem=_fileitem, path=Path(tmp_file.name), new_name=_path.name) if item: logger.info(f"已保存文件:{item.path}") else: logger.warn(f"文件保存失败:{_path}") - finally: - if tmp_file.exists(): - tmp_file.unlink() - def __download_image(_url: str) -> Optional[bytes]: + def __download_and_save_image(_fileitem: schemas.FileItem, _path: Path, _url: str): """ - 下载图片并保存 + 流式下载图片并直接保存到文件(减少内存占用) + :param _fileitem: 关联的媒体文件项 + :param _path: 图片文件路径 + :param _url: 图片下载URL """ + if not _fileitem or not _url or not _path: + return try: logger.info(f"正在下载图片:{_url} ...") - r = RequestUtils(proxies=settings.PROXY, ua=settings.NORMAL_USER_AGENT).get_res(url=_url) - if r: - return r.content - else: - logger.info(f"{_url} 图片下载失败,请检查网络连通性!") + request_utils = RequestUtils(proxies=settings.PROXY, ua=settings.NORMAL_USER_AGENT) + with request_utils.get_stream(url=_url) as r: + if r and r.status_code == 200: + # 使用tempfile创建临时文件,自动删除 + with NamedTemporaryFile(delete=True, suffix=_path.suffix) as tmp_file: + # 流式写入文件 + for chunk in r.iter_content(chunk_size=8192): + if chunk: + tmp_file.write(chunk) + tmp_file.flush() + # 上传文件 + item = storagechain.upload_file(fileitem=_fileitem, path=Path(tmp_file.name), + new_name=_path.name) + if item: + logger.info(f"已保存图片:{item.path}") + else: + logger.warn(f"图片保存失败:{_path}") + else: + logger.info(f"{_url} 图片下载失败") except Exception as err: logger.error(f"{_url} 图片下载失败:{str(err)}!") - return None if not fileitem: return @@ -587,11 +605,8 @@ class MediaChain(ChainBase): image_path = filepath.with_name(image_name) if overwrite or not storagechain.get_file_item(storage=fileitem.storage, path=image_path): - # 下载图片 - content = __download_image(image_url) - # 写入图片到当前目录 - if content: - __save_file(_fileitem=fileitem, _path=image_path, _content=content) + # 流式下载图片并直接保存 + __download_and_save_image(_fileitem=fileitem, _path=image_path, _url=image_url) else: logger.info(f"已存在图片文件:{image_path}") else: @@ -637,13 +652,10 @@ class MediaChain(ChainBase): for episode, image_url in image_dict.items(): image_path = filepath.with_suffix(Path(image_url).suffix) if overwrite or not storagechain.get_file_item(storage=fileitem.storage, path=image_path): - # 下载图片 - content = __download_image(image_url) - # 保存图片文件到当前目录 - if content: - if not parent: - parent = storagechain.get_parent_item(fileitem) - __save_file(_fileitem=parent, _path=image_path, _content=content) + # 流式下载图片并直接保存 + if not parent: + parent = storagechain.get_parent_item(fileitem) + __download_and_save_image(_fileitem=parent, _path=image_path, _url=image_url) else: logger.info(f"已存在图片文件:{image_path}") else: @@ -694,13 +706,10 @@ class MediaChain(ChainBase): image_path = filepath.with_name(image_name) if overwrite or not storagechain.get_file_item(storage=fileitem.storage, path=image_path): - # 下载图片 - content = __download_image(image_url) - # 保存图片文件到剧集目录 - if content: - if not parent: - parent = storagechain.get_parent_item(fileitem) - __save_file(_fileitem=parent, _path=image_path, _content=content) + # 流式下载图片并直接保存 + if not parent: + parent = storagechain.get_parent_item(fileitem) + __download_and_save_image(_fileitem=parent, _path=image_path, _url=image_url) else: logger.info(f"已存在图片文件:{image_path}") else: @@ -730,13 +739,11 @@ class MediaChain(ChainBase): continue if overwrite or not storagechain.get_file_item(storage=fileitem.storage, path=image_path): - # 下载图片 - content = __download_image(image_url) - # 保存图片文件到当前目录 - if content: - if not parent: - parent = storagechain.get_parent_item(fileitem) - __save_file(_fileitem=parent, _path=image_path, _content=content) + # 流式下载图片并直接保存 + if not parent: + parent = storagechain.get_parent_item(fileitem) + __download_and_save_image(_fileitem=parent, _path=image_path, + _url=image_url) else: logger.info(f"已存在图片文件:{image_path}") else: @@ -786,11 +793,8 @@ class MediaChain(ChainBase): image_path = filepath / image_name if overwrite or not storagechain.get_file_item(storage=fileitem.storage, path=image_path): - # 下载图片 - content = __download_image(image_url) - # 保存图片文件到当前目录 - if content: - __save_file(_fileitem=fileitem, _path=image_path, _content=content) + # 流式下载图片并直接保存 + __download_and_save_image(_fileitem=fileitem, _path=image_path, _url=image_url) else: logger.info(f"已存在图片文件:{image_path}") else: diff --git a/app/modules/filemanager/storages/__init__.py b/app/modules/filemanager/storages/__init__.py index 9cd6ddbb..7c6332ce 100644 --- a/app/modules/filemanager/storages/__init__.py +++ b/app/modules/filemanager/storages/__init__.py @@ -15,7 +15,7 @@ def transfer_process(path: str) -> Callable[[int | float], None]: """ 传输进度回调 """ - pbar = tqdm(total=100, desc="整理进度", unit="%") + pbar = tqdm(total=100, desc="进度", unit="%") progress = ProgressHelper(HashUtils.md5(path)) progress.start() diff --git a/app/modules/filemanager/storages/alipan.py b/app/modules/filemanager/storages/alipan.py index 6ebea644..cad4654b 100644 --- a/app/modules/filemanager/storages/alipan.py +++ b/app/modules/filemanager/storages/alipan.py @@ -14,6 +14,7 @@ from app.log import logger from app.modules.filemanager import StorageBase from app.modules.filemanager.storages import transfer_process from app.schemas.types import StorageSchema +from app.utils.http import RequestUtils from app.utils.singleton import WeakSingleton from app.utils.string import StringUtils @@ -729,7 +730,25 @@ class AliPan(StorageBase, metaclass=WeakSingleton): progress_callback = transfer_process(Path(fileitem.path).as_posix()) try: - with requests.get(download_url, stream=True) as r: + # 构建请求头,包含必要的认证信息 + headers = { + "User-Agent": settings.NORMAL_USER_AGENT, + "Referer": "https://www.aliyundrive.com/", + "Accept": "*/*", + "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", + "Accept-Encoding": "gzip, deflate, br", + "Connection": "keep-alive", + "Sec-Fetch-Dest": "empty", + "Sec-Fetch-Mode": "cors", + "Sec-Fetch-Site": "cross-site" + } + + # 如果有access_token,添加到请求头 + if self.access_token: + headers["Authorization"] = f"Bearer {self.access_token}" + + request_utils = RequestUtils(headers=headers) + with request_utils.get_stream(download_url, raise_exception=True) as r: r.raise_for_status() downloaded_size = 0 with open(local_path, "wb") as f: @@ -748,22 +767,13 @@ class AliPan(StorageBase, metaclass=WeakSingleton): # 完成下载 progress_callback(100) logger.info(f"【阿里云盘】下载完成: {fileitem.name}") - - except requests.exceptions.RequestException as e: - logger.error(f"【阿里云盘】下载网络错误: {fileitem.name} - {str(e)}") - # 删除可能部分下载的文件 - if local_path.exists(): - local_path.unlink() - return None + return local_path except Exception as e: logger.error(f"【阿里云盘】下载失败: {fileitem.name} - {str(e)}") - # 删除可能部分下载的文件 if local_path.exists(): local_path.unlink() return None - return local_path - def check(self) -> bool: return self.access_token is not None diff --git a/app/modules/filemanager/storages/alist.py b/app/modules/filemanager/storages/alist.py index 250da720..b9fae8dd 100644 --- a/app/modules/filemanager/storages/alist.py +++ b/app/modules/filemanager/storages/alist.py @@ -4,8 +4,6 @@ from datetime import datetime from pathlib import Path from typing import Optional, List -import requests - from app import schemas from app.core.cache import cached from app.core.config import settings, global_vars @@ -569,18 +567,22 @@ class Alist(StorageBase, metaclass=WeakSingleton): else: local_path = path / fileitem.name - with requests.get(download_url, headers=self.__get_header_with_token(), stream=True) as r: - r.raise_for_status() - with open(local_path, "wb") as f: - for chunk in r.iter_content(chunk_size=8192): - if global_vars.is_transfer_stopped(fileitem.path): - logger.info(f"【OpenList】{fileitem.path} 下载已取消!") - return None - f.write(chunk) + request_utils = RequestUtils(headers=self.__get_header_with_token()) + try: + with request_utils.get_stream(download_url, raise_exception=True) as r: + r.raise_for_status() + with open(local_path, "wb") as f: + for chunk in r.iter_content(chunk_size=8192): + if global_vars.is_transfer_stopped(fileitem.path): + logger.info(f"【OpenList】{fileitem.path} 下载已取消!") + return None + f.write(chunk) + except Exception as e: + logger.error(f"【OpenList】下载文件 {fileitem.path} 失败:{e}") + if local_path.exists(): + return local_path - if local_path.exists(): - return local_path - return None + return local_path def upload( self, fileitem: schemas.FileItem, path: Path, new_name: Optional[str] = None, task: bool = False