mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-04-13 17:52:28 +08:00
feat:整理进度登记
This commit is contained in:
@@ -1,6 +1,5 @@
|
||||
import base64
|
||||
import hashlib
|
||||
import io
|
||||
import secrets
|
||||
import threading
|
||||
import time
|
||||
@@ -8,12 +7,12 @@ from pathlib import Path
|
||||
from typing import List, Optional, Tuple, Union
|
||||
|
||||
import requests
|
||||
from tqdm import tqdm
|
||||
|
||||
from app import schemas
|
||||
from app.core.config import settings
|
||||
from app.log import logger
|
||||
from app.modules.filemanager import StorageBase
|
||||
from app.modules.filemanager.storages import transfer_process
|
||||
from app.schemas.types import StorageSchema
|
||||
from app.utils.singleton import WeakSingleton
|
||||
from app.utils.string import StringUtils
|
||||
@@ -580,29 +579,6 @@ class AliPan(StorageBase, metaclass=WeakSingleton):
|
||||
raise Exception(resp.get("message"))
|
||||
return resp
|
||||
|
||||
@staticmethod
|
||||
def _log_progress(desc: str, total: int) -> tqdm:
|
||||
"""
|
||||
创建一个可以输出到日志的进度条
|
||||
"""
|
||||
|
||||
class TqdmToLogger(io.StringIO):
|
||||
def write(s, buf): # noqa
|
||||
buf = buf.strip('\r\n\t ')
|
||||
if buf:
|
||||
logger.info(buf)
|
||||
|
||||
return tqdm(
|
||||
total=total,
|
||||
unit='B',
|
||||
unit_scale=True,
|
||||
desc=desc,
|
||||
file=TqdmToLogger(),
|
||||
mininterval=1.0,
|
||||
maxinterval=5.0,
|
||||
miniters=1
|
||||
)
|
||||
|
||||
def upload(self, target_dir: schemas.FileItem, local_path: Path,
|
||||
new_name: Optional[str] = None) -> Optional[schemas.FileItem]:
|
||||
"""
|
||||
@@ -643,9 +619,10 @@ class AliPan(StorageBase, metaclass=WeakSingleton):
|
||||
|
||||
# 4. 初始化进度条
|
||||
logger.info(f"【阿里云盘】开始上传: {local_path} -> {target_path},分片数:{len(part_info_list)}")
|
||||
progress_bar = self._log_progress(f"【阿里云盘】{target_name} 上传进度", file_size)
|
||||
progress_callback = transfer_process(local_path.as_posix())
|
||||
|
||||
# 5. 分片上传循环
|
||||
uploaded_size = 0
|
||||
with open(local_path, 'rb') as f:
|
||||
for part_info in part_info_list:
|
||||
part_num = part_info['part_number']
|
||||
@@ -657,7 +634,8 @@ class AliPan(StorageBase, metaclass=WeakSingleton):
|
||||
|
||||
# 更新进度条(已存在的分片)
|
||||
if part_num in uploaded_parts:
|
||||
progress_bar.update(current_chunk_size)
|
||||
uploaded_size += current_chunk_size
|
||||
progress_callback((uploaded_size * 100) / file_size)
|
||||
continue
|
||||
|
||||
# 准备分片数据
|
||||
@@ -694,13 +672,13 @@ class AliPan(StorageBase, metaclass=WeakSingleton):
|
||||
# 处理上传结果
|
||||
if success:
|
||||
uploaded_parts.add(part_num)
|
||||
progress_bar.update(current_chunk_size)
|
||||
uploaded_size += current_chunk_size
|
||||
progress_callback((uploaded_size * 100) / file_size)
|
||||
else:
|
||||
raise Exception(f"【阿里云盘】{target_name} 分片 {part_num} 上传失败!")
|
||||
|
||||
# 6. 关闭进度条
|
||||
if progress_bar:
|
||||
progress_bar.close()
|
||||
progress_callback(100)
|
||||
|
||||
# 7. 完成上传
|
||||
result = self._complete_upload(drive_id=target_dir.drive_id, file_id=file_id, upload_id=upload_id)
|
||||
@@ -712,7 +690,7 @@ class AliPan(StorageBase, metaclass=WeakSingleton):
|
||||
|
||||
def download(self, fileitem: schemas.FileItem, path: Path = None) -> Optional[Path]:
|
||||
"""
|
||||
带限速处理的下载
|
||||
带实时进度显示的下载
|
||||
"""
|
||||
download_info = self._request_api(
|
||||
"POST",
|
||||
@@ -723,14 +701,56 @@ class AliPan(StorageBase, metaclass=WeakSingleton):
|
||||
}
|
||||
)
|
||||
if not download_info:
|
||||
logger.error(f"【阿里云盘】获取下载链接失败: {fileitem.name}")
|
||||
return None
|
||||
|
||||
download_url = download_info.get("url")
|
||||
if not download_url:
|
||||
logger.error(f"【阿里云盘】下载链接为空: {fileitem.name}")
|
||||
return None
|
||||
|
||||
local_path = path or settings.TEMP_PATH / fileitem.name
|
||||
with requests.get(download_url, stream=True) as r:
|
||||
r.raise_for_status()
|
||||
with open(local_path, "wb") as f:
|
||||
for chunk in r.iter_content(chunk_size=8192):
|
||||
f.write(chunk)
|
||||
|
||||
# 获取文件大小
|
||||
file_size = fileitem.size
|
||||
|
||||
# 初始化进度条
|
||||
logger.info(f"【阿里云盘】开始下载: {fileitem.name} -> {local_path}")
|
||||
progress_callback = transfer_process(Path(fileitem.path).as_posix())
|
||||
|
||||
try:
|
||||
with requests.get(download_url, stream=True) as r:
|
||||
r.raise_for_status()
|
||||
downloaded_size = 0
|
||||
|
||||
with open(local_path, "wb") as f:
|
||||
for chunk in r.iter_content(chunk_size=8192):
|
||||
if chunk:
|
||||
f.write(chunk)
|
||||
downloaded_size += len(chunk)
|
||||
|
||||
# 更新进度
|
||||
if file_size:
|
||||
progress = (downloaded_size * 100) / file_size
|
||||
progress_callback(progress)
|
||||
|
||||
# 完成下载
|
||||
progress_callback(100)
|
||||
logger.info(f"【阿里云盘】下载完成: {fileitem.name}")
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.error(f"【阿里云盘】下载网络错误: {fileitem.name} - {str(e)}")
|
||||
# 删除可能部分下载的文件
|
||||
if local_path.exists():
|
||||
local_path.unlink()
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"【阿里云盘】下载失败: {fileitem.name} - {str(e)}")
|
||||
# 删除可能部分下载的文件
|
||||
if local_path.exists():
|
||||
local_path.unlink()
|
||||
return None
|
||||
|
||||
return local_path
|
||||
|
||||
def check(self) -> bool:
|
||||
|
||||
@@ -9,7 +9,7 @@ from app import schemas
|
||||
from app.core.cache import cached
|
||||
from app.core.config import settings
|
||||
from app.log import logger
|
||||
from app.modules.filemanager.storages import StorageBase
|
||||
from app.modules.filemanager.storages import StorageBase, transfer_process
|
||||
from app.schemas.types import StorageSchema
|
||||
from app.utils.http import RequestUtils
|
||||
from app.utils.singleton import WeakSingleton
|
||||
@@ -31,6 +31,9 @@ class Alist(StorageBase, metaclass=WeakSingleton):
|
||||
"move": "移动",
|
||||
}
|
||||
|
||||
# 文件块大小,默认1MB
|
||||
chunk_size = 1024 * 1024
|
||||
|
||||
snapshot_check_folder_modtime = settings.OPENLIST_SNAPSHOT_CHECK_FOLDER_MODTIME
|
||||
|
||||
def __init__(self):
|
||||
@@ -570,36 +573,78 @@ class Alist(StorageBase, metaclass=WeakSingleton):
|
||||
self, fileitem: schemas.FileItem, path: Path, new_name: Optional[str] = None, task: bool = False
|
||||
) -> Optional[schemas.FileItem]:
|
||||
"""
|
||||
上传文件
|
||||
上传文件(带进度)
|
||||
:param fileitem: 上传目录项
|
||||
:param path: 本地文件路径
|
||||
:param new_name: 上传后文件名
|
||||
:param task: 是否为任务,默认为False避免未完成上传时对文件进行操作
|
||||
"""
|
||||
encoded_path = UrlUtils.quote((Path(fileitem.path) / path.name).as_posix())
|
||||
headers = self.__get_header_with_token()
|
||||
headers.setdefault("Content-Type", "application/octet-stream")
|
||||
headers.setdefault("As-Task", str(task).lower())
|
||||
headers.setdefault("File-Path", encoded_path)
|
||||
with open(path, "rb") as f:
|
||||
resp = RequestUtils(headers=headers).put_res(
|
||||
self.__get_api_url("/api/fs/put"),
|
||||
data=f,
|
||||
)
|
||||
try:
|
||||
# 获取文件大小
|
||||
target_name = new_name or path.name
|
||||
target_path = Path(fileitem.path) / target_name
|
||||
|
||||
if resp is None:
|
||||
logger.warn(f"【OpenList】请求上传文件 {path} 失败")
|
||||
# 初始化进度回调
|
||||
progress_callback = transfer_process(path.as_posix())
|
||||
|
||||
# 准备上传请求
|
||||
encoded_path = UrlUtils.quote(target_path.as_posix())
|
||||
headers = self.__get_header_with_token()
|
||||
headers.setdefault("Content-Type", "application/octet-stream")
|
||||
headers.setdefault("As-Task", str(task).lower())
|
||||
headers.setdefault("File-Path", encoded_path)
|
||||
|
||||
# 创建自定义的文件流,支持进度回调
|
||||
class ProgressFileReader:
|
||||
def __init__(self, file_path: Path, callback):
|
||||
self.file = open(file_path, 'rb')
|
||||
self.callback = callback
|
||||
self.uploaded_size = 0
|
||||
self.file_size = file_path.stat().st_size
|
||||
|
||||
def read(self, size=-1):
|
||||
chunk = self.file.read(size)
|
||||
if chunk:
|
||||
self.uploaded_size += len(chunk)
|
||||
if self.callback:
|
||||
percent = (self.uploaded_size* 100) / self.file_size
|
||||
self.callback(percent)
|
||||
return chunk
|
||||
|
||||
def close(self):
|
||||
self.file.close()
|
||||
|
||||
# 使用自定义文件流上传
|
||||
progress_reader = ProgressFileReader(path, progress_callback)
|
||||
try:
|
||||
resp = RequestUtils(headers=headers).put_res(
|
||||
self.__get_api_url("/api/fs/put"),
|
||||
data=progress_reader,
|
||||
)
|
||||
finally:
|
||||
progress_reader.close()
|
||||
|
||||
if resp is None:
|
||||
logger.warn(f"【OpenList】请求上传文件 {path} 失败")
|
||||
return None
|
||||
if resp.status_code != 200:
|
||||
logger.warn(f"【OpenList】请求上传文件 {path} 失败,状态码:{resp.status_code}")
|
||||
return None
|
||||
|
||||
# 完成上传
|
||||
progress_callback(100)
|
||||
|
||||
# 获取上传后的文件项
|
||||
new_item = self.get_item(target_path)
|
||||
if new_item and new_name and new_name != path.name:
|
||||
if self.rename(new_item, new_name):
|
||||
return self.get_item(Path(new_item.path).with_name(new_name))
|
||||
|
||||
return new_item
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"【OpenList】上传文件 {path} 失败:{e}")
|
||||
return None
|
||||
if resp.status_code != 200:
|
||||
logger.warn(f"【OpenList】请求上传文件 {path} 失败,状态码:{resp.status_code}")
|
||||
return None
|
||||
|
||||
new_item = self.get_item(Path(fileitem.path) / path.name)
|
||||
if new_item and new_name and new_name != path.name:
|
||||
if self.rename(new_item, new_name):
|
||||
return self.get_item(Path(new_item.path).with_name(new_name))
|
||||
|
||||
return new_item
|
||||
|
||||
def detail(self, fileitem: schemas.FileItem) -> Optional[schemas.FileItem]:
|
||||
"""
|
||||
|
||||
@@ -200,8 +200,8 @@ class LocalStorage(StorageBase):
|
||||
"""
|
||||
total_size = src.stat().st_size
|
||||
copied_size = 0
|
||||
progress_callback = transfer_process(src.as_posix())
|
||||
try:
|
||||
progress_callback = transfer_process(src.as_posix())
|
||||
with open(src, "rb") as fsrc, open(dest, "wb") as fdst:
|
||||
while True:
|
||||
buf = fsrc.read(self.chunk_size)
|
||||
@@ -219,6 +219,8 @@ class LocalStorage(StorageBase):
|
||||
except Exception as e:
|
||||
logger.error(f"【local】复制文件 {src} 失败:{e}")
|
||||
return False
|
||||
finally:
|
||||
progress_callback(100)
|
||||
|
||||
def upload(
|
||||
self,
|
||||
|
||||
@@ -6,7 +6,7 @@ from typing import Optional, List
|
||||
from app import schemas
|
||||
from app.core.config import settings
|
||||
from app.log import logger
|
||||
from app.modules.filemanager.storages import StorageBase
|
||||
from app.modules.filemanager.storages import StorageBase, transfer_process
|
||||
from app.schemas.types import StorageSchema
|
||||
from app.utils.string import StringUtils
|
||||
from app.utils.system import SystemUtils
|
||||
@@ -58,6 +58,41 @@ class Rclone(StorageBase):
|
||||
else:
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def __parse_rclone_progress(line: str) -> Optional[float]:
|
||||
"""
|
||||
解析rclone进度输出
|
||||
"""
|
||||
if not line:
|
||||
return None
|
||||
|
||||
line = line.strip()
|
||||
|
||||
# 检查是否包含百分比
|
||||
if '%' not in line:
|
||||
return None
|
||||
|
||||
try:
|
||||
# 尝试多种进度输出格式
|
||||
if 'ETA' in line:
|
||||
# 格式: "Transferred: 1.234M / 5.678M, 22%, 1.234MB/s, ETA 2m3s"
|
||||
percent_str = line.split('%')[0].split()[-1]
|
||||
return float(percent_str)
|
||||
elif 'Transferred:' in line and '100%' in line:
|
||||
# 传输完成
|
||||
return 100.0
|
||||
else:
|
||||
# 其他包含百分比的格式
|
||||
parts = line.split()
|
||||
for part in parts:
|
||||
if '%' in part:
|
||||
percent_str = part.replace('%', '')
|
||||
return float(percent_str)
|
||||
except (ValueError, IndexError):
|
||||
pass
|
||||
|
||||
return None
|
||||
|
||||
def __get_rcloneitem(self, item: dict, parent: Optional[str] = "/") -> schemas.FileItem:
|
||||
"""
|
||||
获取rclone文件项
|
||||
@@ -238,47 +273,115 @@ class Rclone(StorageBase):
|
||||
|
||||
def download(self, fileitem: schemas.FileItem, path: Path = None) -> Optional[Path]:
|
||||
"""
|
||||
下载文件
|
||||
带实时进度显示的下载
|
||||
"""
|
||||
path = (path or settings.TEMP_PATH) / fileitem.name
|
||||
local_path = (path or settings.TEMP_PATH) / fileitem.name
|
||||
|
||||
# 初始化进度条
|
||||
logger.info(f"【rclone】开始下载: {fileitem.name} -> {local_path}")
|
||||
progress_callback = transfer_process(Path(fileitem.path).as_posix())
|
||||
|
||||
try:
|
||||
retcode = subprocess.run(
|
||||
# 使用rclone的进度显示功能
|
||||
process = subprocess.Popen(
|
||||
[
|
||||
'rclone', 'copyto',
|
||||
'--progress', # 启用进度显示
|
||||
'--stats', '1s', # 每秒更新一次统计信息
|
||||
f'MP:{fileitem.path}',
|
||||
f'{path}'
|
||||
f'{local_path}'
|
||||
],
|
||||
startupinfo=self.__get_hidden_shell()
|
||||
).returncode
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
startupinfo=self.__get_hidden_shell(),
|
||||
universal_newlines=True,
|
||||
bufsize=1
|
||||
)
|
||||
|
||||
# 监控进度输出
|
||||
last_progress = 0
|
||||
for line in process.stdout:
|
||||
if line:
|
||||
# 解析rclone的进度输出
|
||||
progress = self.__parse_rclone_progress(line)
|
||||
if progress is not None and progress > last_progress:
|
||||
progress_callback(progress)
|
||||
last_progress = progress
|
||||
if progress >= 100:
|
||||
break
|
||||
|
||||
# 等待进程完成
|
||||
retcode = process.wait()
|
||||
if retcode == 0:
|
||||
return path
|
||||
logger.info(f"【rclone】下载完成: {fileitem.name}")
|
||||
return local_path
|
||||
else:
|
||||
logger.error(f"【rclone】下载失败: {fileitem.name}")
|
||||
return None
|
||||
|
||||
except Exception as err:
|
||||
logger.error(f"【rclone】复制文件失败:{err}")
|
||||
return None
|
||||
logger.error(f"【rclone】下载失败: {fileitem.name} - {err}")
|
||||
# 删除可能部分下载的文件
|
||||
if local_path.exists():
|
||||
local_path.unlink()
|
||||
return None
|
||||
|
||||
def upload(self, fileitem: schemas.FileItem, path: Path,
|
||||
new_name: Optional[str] = None) -> Optional[schemas.FileItem]:
|
||||
"""
|
||||
上传文件
|
||||
带实时进度显示的上传
|
||||
:param fileitem: 上传目录项
|
||||
:param path: 本地文件路径
|
||||
:param new_name: 上传后文件名
|
||||
"""
|
||||
target_name = new_name or path.name
|
||||
new_path = Path(fileitem.path) / target_name
|
||||
|
||||
# 初始化进度条
|
||||
logger.info(f"【rclone】开始上传: {path} -> {new_path}")
|
||||
progress_callback = transfer_process(path.as_posix())
|
||||
|
||||
try:
|
||||
new_path = Path(fileitem.path) / (new_name or path.name)
|
||||
retcode = subprocess.run(
|
||||
# 使用rclone的进度显示功能
|
||||
process = subprocess.Popen(
|
||||
[
|
||||
'rclone', 'copyto',
|
||||
'--progress', # 启用进度显示
|
||||
'--stats', '1s', # 每秒更新一次统计信息
|
||||
path.as_posix(),
|
||||
f'MP:{new_path}'
|
||||
],
|
||||
startupinfo=self.__get_hidden_shell()
|
||||
).returncode
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
startupinfo=self.__get_hidden_shell(),
|
||||
universal_newlines=True,
|
||||
bufsize=1
|
||||
)
|
||||
|
||||
# 监控进度输出
|
||||
last_progress = 0
|
||||
for line in process.stdout:
|
||||
if line:
|
||||
# 解析rclone的进度输出
|
||||
progress = self.__parse_rclone_progress(line)
|
||||
if progress is not None and progress > last_progress:
|
||||
progress_callback(progress)
|
||||
last_progress = progress
|
||||
if progress >= 100:
|
||||
break
|
||||
|
||||
# 等待进程完成
|
||||
retcode = process.wait()
|
||||
if retcode == 0:
|
||||
logger.info(f"【rclone】上传完成: {target_name}")
|
||||
return self.get_item(new_path)
|
||||
else:
|
||||
logger.error(f"【rclone】上传失败: {target_name}")
|
||||
return None
|
||||
|
||||
except Exception as err:
|
||||
logger.error(f"【rclone】上传文件失败:{err}")
|
||||
return None
|
||||
logger.error(f"【rclone】上传失败: {target_name} - {err}")
|
||||
return None
|
||||
|
||||
def detail(self, fileitem: schemas.FileItem) -> Optional[schemas.FileItem]:
|
||||
"""
|
||||
@@ -307,20 +410,53 @@ class Rclone(StorageBase):
|
||||
:param path: 目标目录
|
||||
:param new_name: 新文件名
|
||||
"""
|
||||
target_path = path / new_name
|
||||
|
||||
# 初始化进度条
|
||||
logger.info(f"【rclone】开始移动: {fileitem.path} -> {target_path}")
|
||||
progress_callback = transfer_process(Path(fileitem.path).as_posix())
|
||||
|
||||
try:
|
||||
retcode = subprocess.run(
|
||||
# 使用rclone的进度显示功能
|
||||
process = subprocess.Popen(
|
||||
[
|
||||
'rclone', 'moveto',
|
||||
'--progress', # 启用进度显示
|
||||
'--stats', '1s', # 每秒更新一次统计信息
|
||||
f'MP:{fileitem.path}',
|
||||
f'MP:{path / new_name}'
|
||||
f'MP:{target_path}'
|
||||
],
|
||||
startupinfo=self.__get_hidden_shell()
|
||||
).returncode
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
startupinfo=self.__get_hidden_shell(),
|
||||
universal_newlines=True,
|
||||
bufsize=1
|
||||
)
|
||||
|
||||
# 监控进度输出
|
||||
last_progress = 0
|
||||
for line in process.stdout:
|
||||
if line:
|
||||
# 解析rclone的进度输出
|
||||
progress = self.__parse_rclone_progress(line)
|
||||
if progress is not None and progress > last_progress:
|
||||
progress_callback(progress)
|
||||
last_progress = progress
|
||||
if progress >= 100:
|
||||
break
|
||||
|
||||
# 等待进程完成
|
||||
retcode = process.wait()
|
||||
if retcode == 0:
|
||||
logger.info(f"【rclone】移动完成: {fileitem.name}")
|
||||
return True
|
||||
else:
|
||||
logger.error(f"【rclone】移动失败: {fileitem.name}")
|
||||
return False
|
||||
|
||||
except Exception as err:
|
||||
logger.error(f"【rclone】移动文件失败:{err}")
|
||||
return False
|
||||
logger.error(f"【rclone】移动失败: {fileitem.name} - {err}")
|
||||
return False
|
||||
|
||||
def copy(self, fileitem: schemas.FileItem, path: Path, new_name: str) -> bool:
|
||||
"""
|
||||
@@ -329,20 +465,53 @@ class Rclone(StorageBase):
|
||||
:param path: 目标目录
|
||||
:param new_name: 新文件名
|
||||
"""
|
||||
target_path = path / new_name
|
||||
|
||||
# 初始化进度条
|
||||
logger.info(f"【rclone】开始复制: {fileitem.path} -> {target_path}")
|
||||
progress_callback = transfer_process(Path(fileitem.path).as_posix())
|
||||
|
||||
try:
|
||||
retcode = subprocess.run(
|
||||
# 使用rclone的进度显示功能
|
||||
process = subprocess.Popen(
|
||||
[
|
||||
'rclone', 'copyto',
|
||||
'--progress', # 启用进度显示
|
||||
'--stats', '1s', # 每秒更新一次统计信息
|
||||
f'MP:{fileitem.path}',
|
||||
f'MP:{path / new_name}'
|
||||
f'MP:{target_path}'
|
||||
],
|
||||
startupinfo=self.__get_hidden_shell()
|
||||
).returncode
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
startupinfo=self.__get_hidden_shell(),
|
||||
universal_newlines=True,
|
||||
bufsize=1
|
||||
)
|
||||
|
||||
# 监控进度输出
|
||||
last_progress = 0
|
||||
for line in process.stdout:
|
||||
if line:
|
||||
# 解析rclone的进度输出
|
||||
progress = self.__parse_rclone_progress(line)
|
||||
if progress is not None and progress > last_progress:
|
||||
progress_callback(progress)
|
||||
last_progress = progress
|
||||
if progress >= 100:
|
||||
break
|
||||
|
||||
# 等待进程完成
|
||||
retcode = process.wait()
|
||||
if retcode == 0:
|
||||
logger.info(f"【rclone】复制完成: {fileitem.name}")
|
||||
return True
|
||||
else:
|
||||
logger.error(f"【rclone】复制失败: {fileitem.name}")
|
||||
return False
|
||||
|
||||
except Exception as err:
|
||||
logger.error(f"【rclone】复制文件失败:{err}")
|
||||
return False
|
||||
logger.error(f"【rclone】复制失败: {fileitem.name} - {err}")
|
||||
return False
|
||||
|
||||
def link(self, fileitem: schemas.FileItem, target_file: Path) -> bool:
|
||||
pass
|
||||
|
||||
@@ -11,6 +11,7 @@ from app import schemas
|
||||
from app.core.config import settings
|
||||
from app.log import logger
|
||||
from app.modules.filemanager import StorageBase
|
||||
from app.modules.filemanager.storages import transfer_process
|
||||
from app.schemas.types import StorageSchema
|
||||
from app.utils.singleton import WeakSingleton
|
||||
|
||||
@@ -412,63 +413,101 @@ class SMB(StorageBase, metaclass=WeakSingleton):
|
||||
|
||||
def download(self, fileitem: schemas.FileItem, path: Path = None) -> Optional[Path]:
|
||||
"""
|
||||
下载文件
|
||||
带实时进度显示的下载
|
||||
"""
|
||||
local_path = path or settings.TEMP_PATH / fileitem.name
|
||||
smb_path = self._normalize_path(fileitem.path)
|
||||
try:
|
||||
self._check_connection()
|
||||
|
||||
smb_path = self._normalize_path(fileitem.path)
|
||||
local_path = path or settings.TEMP_PATH / fileitem.name
|
||||
|
||||
# 确保本地目录存在
|
||||
local_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# 获取文件大小
|
||||
file_size = fileitem.size
|
||||
|
||||
# 初始化进度条
|
||||
logger.info(f"【SMB】开始下载: {fileitem.name} -> {local_path}")
|
||||
progress_callback = transfer_process(Path(fileitem.path).as_posix())
|
||||
|
||||
# 使用更高效的文件传输方式
|
||||
with smbclient.open_file(smb_path, mode="rb") as src_file:
|
||||
with open(local_path, "wb") as dst_file:
|
||||
# 使用更大的缓冲区提高性能
|
||||
buffer_size = 1024 * 1024 # 1MB
|
||||
downloaded_size = 0
|
||||
|
||||
while True:
|
||||
chunk = src_file.read(buffer_size)
|
||||
if not chunk:
|
||||
break
|
||||
dst_file.write(chunk)
|
||||
downloaded_size += len(chunk)
|
||||
|
||||
logger.info(f"【SMB】下载成功: {fileitem.path} -> {local_path}")
|
||||
# 更新进度
|
||||
if file_size:
|
||||
progress = (downloaded_size * 100) / file_size
|
||||
progress_callback(progress)
|
||||
|
||||
# 完成下载
|
||||
progress_callback(100)
|
||||
logger.info(f"【SMB】下载完成: {fileitem.name}")
|
||||
return local_path
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"【SMB】下载失败: {e}")
|
||||
logger.error(f"【SMB】下载失败: {fileitem.name} - {e}")
|
||||
# 删除可能部分下载的文件
|
||||
if local_path.exists():
|
||||
local_path.unlink()
|
||||
return None
|
||||
|
||||
def upload(self, fileitem: schemas.FileItem, path: Path,
|
||||
new_name: Optional[str] = None) -> Optional[schemas.FileItem]:
|
||||
"""
|
||||
上传文件
|
||||
带实时进度显示的上传
|
||||
"""
|
||||
target_name = new_name or path.name
|
||||
target_path = Path(fileitem.path) / target_name
|
||||
smb_path = self._normalize_path(str(target_path))
|
||||
|
||||
try:
|
||||
self._check_connection()
|
||||
|
||||
target_name = new_name or path.name
|
||||
target_path = Path(fileitem.path) / target_name
|
||||
smb_path = self._normalize_path(str(target_path))
|
||||
# 获取文件大小
|
||||
file_size = path.stat().st_size
|
||||
|
||||
# 初始化进度条
|
||||
logger.info(f"【SMB】开始上传: {path} -> {target_path}")
|
||||
progress_callback = transfer_process(path.as_posix())
|
||||
|
||||
# 使用更高效的文件传输方式
|
||||
with open(path, "rb") as src_file:
|
||||
with smbclient.open_file(smb_path, mode="wb") as dst_file:
|
||||
# 使用更大的缓冲区提高性能
|
||||
buffer_size = 1024 * 1024 # 1MB
|
||||
uploaded_size = 0
|
||||
|
||||
while True:
|
||||
chunk = src_file.read(buffer_size)
|
||||
if not chunk:
|
||||
break
|
||||
dst_file.write(chunk)
|
||||
uploaded_size += len(chunk)
|
||||
|
||||
logger.info(f"【SMB】上传成功: {path} -> {target_path}")
|
||||
# 更新进度
|
||||
if file_size:
|
||||
progress = (uploaded_size * 100) / file_size
|
||||
progress_callback(progress)
|
||||
|
||||
# 完成上传
|
||||
progress_callback(100)
|
||||
logger.info(f"【SMB】上传完成: {target_name}")
|
||||
|
||||
# 返回上传后的文件信息
|
||||
return self.get_item(target_path)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"【SMB】上传失败: {e}")
|
||||
logger.error(f"【SMB】上传失败: {target_name} - {e}")
|
||||
return None
|
||||
|
||||
def copy(self, fileitem: schemas.FileItem, path: Path, new_name: str) -> bool:
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import base64
|
||||
import hashlib
|
||||
import io
|
||||
import secrets
|
||||
import threading
|
||||
import time
|
||||
@@ -11,12 +10,12 @@ import oss2
|
||||
import requests
|
||||
from oss2 import SizedFileAdapter, determine_part_size
|
||||
from oss2.models import PartInfo
|
||||
from tqdm import tqdm
|
||||
|
||||
from app import schemas
|
||||
from app.core.config import settings
|
||||
from app.log import logger
|
||||
from app.modules.filemanager import StorageBase
|
||||
from app.modules.filemanager.storages import transfer_process
|
||||
from app.schemas.types import StorageSchema
|
||||
from app.utils.singleton import WeakSingleton
|
||||
from app.utils.string import StringUtils
|
||||
@@ -352,29 +351,6 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
|
||||
modify_time=int(time.time())
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _log_progress(desc: str, total: int) -> tqdm:
|
||||
"""
|
||||
创建一个可以输出到日志的进度条
|
||||
"""
|
||||
|
||||
class TqdmToLogger(io.StringIO):
|
||||
def write(s, buf): # noqa
|
||||
buf = buf.strip('\r\n\t ')
|
||||
if buf:
|
||||
logger.info(buf)
|
||||
|
||||
return tqdm(
|
||||
total=total,
|
||||
unit='B',
|
||||
unit_scale=True,
|
||||
desc=desc,
|
||||
file=TqdmToLogger(),
|
||||
mininterval=1.0,
|
||||
maxinterval=5.0,
|
||||
miniters=1
|
||||
)
|
||||
|
||||
def upload(self, target_dir: schemas.FileItem, local_path: Path,
|
||||
new_name: Optional[str] = None) -> Optional[schemas.FileItem]:
|
||||
"""
|
||||
@@ -539,13 +515,7 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
|
||||
|
||||
# 初始化进度条
|
||||
logger.info(f"【115】开始上传: {local_path} -> {target_path},分片大小:{StringUtils.str_filesize(part_size)}")
|
||||
progress_bar = tqdm(
|
||||
total=file_size,
|
||||
unit='B',
|
||||
unit_scale=True,
|
||||
desc="上传进度",
|
||||
ascii=True
|
||||
)
|
||||
progress_callback = transfer_process(local_path.as_posix())
|
||||
|
||||
# 初始化分片
|
||||
upload_id = bucket.init_multipart_upload(object_name,
|
||||
@@ -569,11 +539,11 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
|
||||
offset += num_to_upload
|
||||
part_number += 1
|
||||
# 更新进度
|
||||
progress_bar.update(num_to_upload)
|
||||
progress = (offset * 100) / file_size
|
||||
progress_callback(progress)
|
||||
|
||||
# 关闭进度条
|
||||
if progress_bar:
|
||||
progress_bar.close()
|
||||
# 完成上传
|
||||
progress_callback(100)
|
||||
|
||||
# 请求头
|
||||
headers = {
|
||||
@@ -601,11 +571,13 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
|
||||
|
||||
def download(self, fileitem: schemas.FileItem, path: Path = None) -> Optional[Path]:
|
||||
"""
|
||||
带限速处理的下载
|
||||
带实时进度显示的下载
|
||||
"""
|
||||
detail = self.get_item(Path(fileitem.path))
|
||||
if not detail:
|
||||
logger.error(f"【115】获取文件详情失败: {fileitem.name}")
|
||||
return None
|
||||
|
||||
download_info = self._request_api(
|
||||
"POST",
|
||||
"/open/ufile/downurl",
|
||||
@@ -615,14 +587,56 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
|
||||
}
|
||||
)
|
||||
if not download_info:
|
||||
logger.error(f"【115】获取下载链接失败: {fileitem.name}")
|
||||
return None
|
||||
|
||||
download_url = list(download_info.values())[0].get("url", {}).get("url")
|
||||
if not download_url:
|
||||
logger.error(f"【115】下载链接为空: {fileitem.name}")
|
||||
return None
|
||||
|
||||
local_path = path or settings.TEMP_PATH / fileitem.name
|
||||
with self.session.get(download_url, stream=True) as r:
|
||||
r.raise_for_status()
|
||||
with open(local_path, "wb") as f:
|
||||
for chunk in r.iter_content(chunk_size=8192):
|
||||
f.write(chunk)
|
||||
|
||||
# 获取文件大小
|
||||
file_size = detail.size
|
||||
|
||||
# 初始化进度条
|
||||
logger.info(f"【115】开始下载: {fileitem.name} -> {local_path}")
|
||||
progress_callback = transfer_process(Path(fileitem.path).as_posix())
|
||||
|
||||
try:
|
||||
with self.session.get(download_url, stream=True) as r:
|
||||
r.raise_for_status()
|
||||
downloaded_size = 0
|
||||
|
||||
with open(local_path, "wb") as f:
|
||||
for chunk in r.iter_content(chunk_size=8192):
|
||||
if chunk:
|
||||
f.write(chunk)
|
||||
downloaded_size += len(chunk)
|
||||
|
||||
# 更新进度
|
||||
if file_size:
|
||||
progress = (downloaded_size * 100) / file_size
|
||||
progress_callback(progress)
|
||||
|
||||
# 完成下载
|
||||
progress_callback(100)
|
||||
logger.info(f"【115】下载完成: {fileitem.name}")
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.error(f"【115】下载网络错误: {fileitem.name} - {str(e)}")
|
||||
# 删除可能部分下载的文件
|
||||
if local_path.exists():
|
||||
local_path.unlink()
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"【115】下载失败: {fileitem.name} - {str(e)}")
|
||||
# 删除可能部分下载的文件
|
||||
if local_path.exists():
|
||||
local_path.unlink()
|
||||
return None
|
||||
|
||||
return local_path
|
||||
|
||||
def check(self) -> bool:
|
||||
|
||||
Reference in New Issue
Block a user