mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-04-13 17:52:28 +08:00
Merge branch 'jxxghp:v2' into v2
This commit is contained in:
@@ -339,7 +339,8 @@ class DownloadChain(ChainBase):
|
||||
meta=_meta,
|
||||
mediainfo=_media,
|
||||
torrentinfo=_torrent,
|
||||
download_episodes=download_episodes
|
||||
download_episodes=download_episodes,
|
||||
username=username,
|
||||
)
|
||||
# 下载成功后处理
|
||||
self.download_added(context=context, download_dir=download_dir, torrent_path=torrent_file)
|
||||
|
||||
@@ -241,6 +241,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
|
||||
link=link,
|
||||
username=username
|
||||
),
|
||||
meta=metainfo,
|
||||
mediainfo=mediainfo,
|
||||
username=username
|
||||
)
|
||||
@@ -1023,7 +1024,8 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
|
||||
),
|
||||
meta=meta,
|
||||
mediainfo=mediainfo,
|
||||
msgstr=msgstr
|
||||
msgstr=msgstr,
|
||||
username=subscribe.username
|
||||
)
|
||||
# 发送事件
|
||||
EventManager().send_event(EventType.SubscribeComplete, {
|
||||
|
||||
@@ -1385,5 +1385,6 @@ class TransferChain(ChainBase, metaclass=Singleton):
|
||||
meta=meta,
|
||||
mediainfo=mediainfo,
|
||||
transferinfo=transferinfo,
|
||||
season_episode=season_episode
|
||||
season_episode=season_episode,
|
||||
username=username
|
||||
)
|
||||
|
||||
@@ -465,13 +465,15 @@ class PluginManager(metaclass=Singleton):
|
||||
}]
|
||||
"""
|
||||
ret_apis = []
|
||||
for plugin_id, plugin in self._running_plugins.items():
|
||||
if pid:
|
||||
plugins = {pid: self._running_plugins.get(pid)}
|
||||
else:
|
||||
plugins = self._running_plugins
|
||||
for plugin_id, plugin in plugins.items():
|
||||
if pid and pid != plugin_id:
|
||||
continue
|
||||
if hasattr(plugin, "get_api") and ObjectUtils.check_method(plugin.get_api):
|
||||
try:
|
||||
if not plugin.get_state():
|
||||
continue
|
||||
apis = plugin.get_api() or []
|
||||
for api in apis:
|
||||
api["path"] = f"/{plugin_id}{api['path']}"
|
||||
@@ -831,7 +833,8 @@ class PluginManager(metaclass=Singleton):
|
||||
logger.debug(f"获取插件是否在本地包中存在失败,{e}")
|
||||
return False
|
||||
|
||||
def get_plugins_from_market(self, market: str, package_version: Optional[str] = None) -> Optional[List[schemas.Plugin]]:
|
||||
def get_plugins_from_market(self, market: str, package_version: Optional[str] = None) -> Optional[
|
||||
List[schemas.Plugin]]:
|
||||
"""
|
||||
从指定的市场获取插件信息
|
||||
:param market: 市场的 URL 或标识
|
||||
@@ -845,7 +848,8 @@ class PluginManager(metaclass=Singleton):
|
||||
# 获取在线插件
|
||||
online_plugins = self.pluginhelper.get_plugins(market, package_version)
|
||||
if online_plugins is None:
|
||||
logger.warning(f"获取{package_version if package_version else ''}插件库失败:{market},请检查 GitHub 网络连接")
|
||||
logger.warning(
|
||||
f"获取{package_version if package_version else ''}插件库失败:{market},请检查 GitHub 网络连接")
|
||||
return []
|
||||
ret_plugins = []
|
||||
add_time = len(online_plugins)
|
||||
|
||||
@@ -42,7 +42,7 @@ class TemplateContextBuilder:
|
||||
transferinfo: Optional[TransferInfo] = None,
|
||||
file_extension: Optional[str] = None,
|
||||
episodes_info: Optional[List[TmdbEpisode]] = None,
|
||||
include_raw_objects: bool = False,
|
||||
include_raw_objects: bool = True,
|
||||
**kwargs
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
@@ -80,8 +80,11 @@ class TemplateContextBuilder:
|
||||
"en_title": self.__convert_invalid_characters(mediainfo.en_title),
|
||||
# 原语种标题
|
||||
"original_title": self.__convert_invalid_characters(mediainfo.original_title),
|
||||
# 季号
|
||||
"season": self._context.get("season") or mediainfo.season,
|
||||
# 年份
|
||||
"year": mediainfo.year or self._context.get("year"),
|
||||
# 媒体标题 + 年份
|
||||
"title_year": mediainfo.title_year or self._context.get("title_year"),
|
||||
}
|
||||
|
||||
|
||||
@@ -905,8 +905,7 @@ class FileManagerModule(_ModuleBase):
|
||||
def __transfer_file(self, fileitem: FileItem, mediainfo: MediaInfo,
|
||||
source_oper: StorageBase, target_oper: StorageBase,
|
||||
target_storage: str, target_file: Path,
|
||||
transfer_type: str, over_flag: Optional[bool] = False) -> Tuple[
|
||||
Optional[FileItem], str]:
|
||||
transfer_type: str, over_flag: Optional[bool] = False) -> Tuple[Optional[FileItem], str]:
|
||||
"""
|
||||
整理一个文件,同时处理其他相关文件
|
||||
:param fileitem: 原文件
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
import base64
|
||||
import hashlib
|
||||
import io
|
||||
import secrets
|
||||
import threading
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from pathlib import Path
|
||||
from typing import List, Dict, Optional, Tuple, Union
|
||||
|
||||
@@ -24,6 +26,10 @@ class NoCheckInException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class SessionInvalidException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class AliPan(StorageBase, metaclass=Singleton):
|
||||
"""
|
||||
阿里云盘相关操作
|
||||
@@ -50,6 +56,13 @@ class AliPan(StorageBase, metaclass=Singleton):
|
||||
# CID和路径缓存
|
||||
_id_cache: Dict[str, Tuple[str, str]] = {}
|
||||
|
||||
# 最大线程数
|
||||
MAX_WORKERS = 10
|
||||
# 最大分片大小(1GB)
|
||||
MAX_PART_SIZE = 1024 * 1024 * 1024
|
||||
# 最小分片大小(100MB)
|
||||
MIN_PART_SIZE = 100 * 1024 * 1024
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.session = requests.Session()
|
||||
@@ -177,7 +190,7 @@ class AliPan(StorageBase, metaclass=Singleton):
|
||||
确认登录后,获取相关token
|
||||
"""
|
||||
if not self._auth_state:
|
||||
raise Exception("【阿里云盘】请先生成二维码")
|
||||
raise SessionInvalidException("【阿里云盘】请先生成二维码")
|
||||
resp = self.session.post(
|
||||
f"{self.base_url}/oauth/access_token",
|
||||
json={
|
||||
@@ -188,7 +201,7 @@ class AliPan(StorageBase, metaclass=Singleton):
|
||||
}
|
||||
)
|
||||
if resp is None:
|
||||
raise Exception("【阿里云盘】获取 access_token 失败")
|
||||
raise SessionInvalidException("【阿里云盘】获取 access_token 失败")
|
||||
result = resp.json()
|
||||
if result.get("code"):
|
||||
raise Exception(f"【阿里云盘】{result.get('code')} - {result.get('message')}!")
|
||||
@@ -199,7 +212,7 @@ class AliPan(StorageBase, metaclass=Singleton):
|
||||
刷新access_token
|
||||
"""
|
||||
if not refresh_token:
|
||||
raise Exception("【阿里云盘】会话失效,请重新扫码登录!")
|
||||
raise SessionInvalidException("【阿里云盘】会话失效,请重新扫码登录!")
|
||||
resp = self.session.post(
|
||||
f"{self.base_url}/oauth/access_token",
|
||||
json={
|
||||
@@ -335,6 +348,8 @@ class AliPan(StorageBase, metaclass=Singleton):
|
||||
"""
|
||||
if not fileinfo:
|
||||
return schemas.FileItem()
|
||||
if not parent.endswith("/"):
|
||||
parent += "/"
|
||||
if fileinfo.get("type") == "folder":
|
||||
return schemas.FileItem(
|
||||
storage=self.schema.value,
|
||||
@@ -437,7 +452,7 @@ class AliPan(StorageBase, metaclass=Singleton):
|
||||
"/adrive/v1.0/openFile/create",
|
||||
json={
|
||||
"drive_id": parent_item.drive_id,
|
||||
"parent_file_id": parent_item.fileid,
|
||||
"parent_file_id": parent_item.fileid or "root",
|
||||
"name": name,
|
||||
"type": "folder"
|
||||
}
|
||||
@@ -583,13 +598,6 @@ class AliPan(StorageBase, metaclass=Singleton):
|
||||
raise Exception(resp.get("message"))
|
||||
return resp.get('part_info_list', [])
|
||||
|
||||
@staticmethod
|
||||
def _upload_part(upload_url: str, data: bytes):
|
||||
"""
|
||||
上传单个分片
|
||||
"""
|
||||
return requests.put(upload_url, data=data)
|
||||
|
||||
def _list_uploaded_parts(self, drive_id: str, file_id: str, upload_id: str) -> dict:
|
||||
"""
|
||||
获取已上传分片列表
|
||||
@@ -628,22 +636,77 @@ class AliPan(StorageBase, metaclass=Singleton):
|
||||
raise Exception(resp.get("message"))
|
||||
return resp
|
||||
|
||||
def _calc_parts(self, file_size: int) -> Tuple[int, int]:
|
||||
"""
|
||||
计算最优分片大小和线程数,在最大分片大小和最小分片大小之间取最优值
|
||||
:param file_size: 文件大小
|
||||
:return: 分片大小,线程数
|
||||
"""
|
||||
if file_size <= self.MIN_PART_SIZE:
|
||||
return file_size, 1
|
||||
if file_size >= self.MAX_PART_SIZE:
|
||||
part_size = self.MAX_PART_SIZE
|
||||
else:
|
||||
part_size = max(self.MIN_PART_SIZE, file_size // self.MAX_WORKERS)
|
||||
return part_size, (file_size + part_size - 1) // part_size
|
||||
|
||||
@staticmethod
|
||||
def _log_progress(desc: str, total: int) -> tqdm:
|
||||
"""
|
||||
创建一个可以输出到日志的进度条
|
||||
"""
|
||||
|
||||
class TqdmToLogger(io.StringIO):
|
||||
def write(s, buf): # noqa
|
||||
buf = buf.strip('\r\n\t ')
|
||||
if buf:
|
||||
logger.info(buf)
|
||||
|
||||
return tqdm(
|
||||
total=total,
|
||||
unit='B',
|
||||
unit_scale=True,
|
||||
desc=desc,
|
||||
file=TqdmToLogger(),
|
||||
mininterval=1.0,
|
||||
maxinterval=5.0,
|
||||
miniters=1
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _upload_part(upload_url: str, data: bytes, part_num: int) -> Tuple[int, str, int]:
|
||||
"""
|
||||
上传单个分片
|
||||
"""
|
||||
try:
|
||||
response = requests.put(upload_url, data=data)
|
||||
if response and response.status_code == 200:
|
||||
logger.info(f"【阿里云盘】分片 {part_num} 上传完成")
|
||||
return part_num, response.headers.get('ETag', ''), len(data)
|
||||
else:
|
||||
raise Exception(f"上传失败: {response.status_code if response else 'No Response'}")
|
||||
except Exception as e:
|
||||
logger.error(f"【阿里云盘】分片 {part_num} 上传失败: {str(e)}")
|
||||
raise
|
||||
|
||||
def upload(self, target_dir: schemas.FileItem, local_path: Path,
|
||||
new_name: Optional[str] = None) -> Optional[schemas.FileItem]:
|
||||
"""
|
||||
文件上传:分片、支持秒传
|
||||
文件上传:多线程分片、支持秒传
|
||||
"""
|
||||
target_name = new_name or local_path.name
|
||||
target_path = Path(target_dir.path) / target_name
|
||||
file_size = local_path.stat().st_size
|
||||
|
||||
# 1. 创建文件并检查秒传
|
||||
chunk_size = 100 * 1024 * 1024 # 分片大小 100M
|
||||
# 1. 计算分片大小和线程数
|
||||
part_size, workers = self._calc_parts(file_size)
|
||||
|
||||
# 2. 创建文件并检查秒传
|
||||
create_res = self._create_file(drive_id=target_dir.drive_id,
|
||||
parent_file_id=target_dir.fileid,
|
||||
file_name=target_name,
|
||||
file_path=local_path,
|
||||
chunk_size=chunk_size)
|
||||
chunk_size=part_size)
|
||||
if create_res.get('rapid_upload', False):
|
||||
logger.info(f"【阿里云盘】{target_name} 秒传完成!")
|
||||
return self.get_item(target_path)
|
||||
@@ -652,93 +715,78 @@ class AliPan(StorageBase, metaclass=Singleton):
|
||||
logger.info(f"【阿里云盘】{target_name} 已存在")
|
||||
return self.get_item(target_path)
|
||||
|
||||
# 2. 准备分片上传参数
|
||||
# 3. 准备分片上传参数
|
||||
file_id = create_res.get('file_id')
|
||||
if not file_id:
|
||||
logger.warn(f"【阿里云盘】创建 {target_name} 文件失败!")
|
||||
return None
|
||||
upload_id = create_res.get('upload_id')
|
||||
part_info_list = create_res.get('part_info_list')
|
||||
uploaded_parts = set()
|
||||
uploaded_parts = {}
|
||||
|
||||
# 3. 获取已上传分片
|
||||
# 4. 获取已上传分片
|
||||
uploaded_info = self._list_uploaded_parts(drive_id=target_dir.drive_id, file_id=file_id, upload_id=upload_id)
|
||||
for part in uploaded_info.get('uploaded_parts', []):
|
||||
uploaded_parts.add(part['part_number'])
|
||||
uploaded_parts[part['part_number']] = part.get('etag', '')
|
||||
|
||||
# 4. 初始化进度条
|
||||
logger.info(f"【阿里云盘】开始上传: {local_path} -> {target_path},分片数:{len(part_info_list)}")
|
||||
progress_bar = tqdm(
|
||||
total=file_size,
|
||||
unit='B',
|
||||
unit_scale=True,
|
||||
desc="上传进度",
|
||||
ascii=True
|
||||
)
|
||||
# 5. 初始化进度条
|
||||
logger.info(f"【阿里云盘】开始上传: {local_path} -> {target_path},"
|
||||
f"分片大小:{StringUtils.str_filesize(part_size)},线程数:{workers}")
|
||||
progress_bar = self._log_progress(f"【阿里云盘】{target_name} 上传进度", file_size)
|
||||
|
||||
# 5. 分片上传循环
|
||||
with open(local_path, 'rb') as f:
|
||||
for part_info in part_info_list:
|
||||
part_num = part_info['part_number']
|
||||
# 7. 创建线程池
|
||||
with ThreadPoolExecutor(max_workers=workers) as pool:
|
||||
futures = []
|
||||
|
||||
# 计算分片参数
|
||||
start = (part_num - 1) * chunk_size
|
||||
end = min(start + chunk_size, file_size)
|
||||
current_chunk_size = end - start
|
||||
# 提交上传任务
|
||||
with open(local_path, 'rb') as f:
|
||||
for part_info in part_info_list:
|
||||
part_num = part_info['part_number']
|
||||
|
||||
# 更新进度条(已存在的分片)
|
||||
if part_num in uploaded_parts:
|
||||
progress_bar.update(current_chunk_size)
|
||||
continue
|
||||
# 跳过已上传的分片
|
||||
if part_num in uploaded_parts:
|
||||
start = (part_num - 1) * part_size
|
||||
end = min(start + part_size, file_size)
|
||||
progress_bar.update(end - start)
|
||||
continue
|
||||
|
||||
# 准备分片数据
|
||||
f.seek(start)
|
||||
data = f.read(current_chunk_size)
|
||||
# 准备分片数据
|
||||
start = (part_num - 1) * part_size
|
||||
end = min(start + part_size, file_size)
|
||||
f.seek(start)
|
||||
data = f.read(end - start)
|
||||
|
||||
# 上传分片(带重试逻辑)
|
||||
success = False
|
||||
for attempt in range(3): # 最大重试次数
|
||||
try:
|
||||
# 获取当前上传地址(可能刷新)
|
||||
if attempt > 0:
|
||||
new_urls = self._refresh_upload_urls(drive_id=target_dir.drive_id, file_id=file_id,
|
||||
upload_id=upload_id, part_numbers=[part_num])
|
||||
upload_url = new_urls[0]['upload_url']
|
||||
else:
|
||||
upload_url = part_info['upload_url']
|
||||
# 提交上传任务
|
||||
future = pool.submit(
|
||||
self._upload_part,
|
||||
part_info['upload_url'],
|
||||
data,
|
||||
part_num
|
||||
)
|
||||
futures.append((part_num, future))
|
||||
|
||||
# 执行上传
|
||||
logger.info(
|
||||
f"【阿里云盘】开始 第{attempt + 1}次 上传 {target_name} 分片 {part_num} ...")
|
||||
response = self._upload_part(upload_url=upload_url, data=data)
|
||||
if response is None:
|
||||
continue
|
||||
if response.status_code == 200:
|
||||
success = True
|
||||
break
|
||||
else:
|
||||
logger.warn(
|
||||
f"【阿里云盘】{target_name} 分片 {part_num} 第 {attempt + 1} 次上传失败:{response.text}!")
|
||||
except Exception as e:
|
||||
logger.warn(f"【阿里云盘】{target_name} 分片 {part_num} 上传异常: {str(e)}!")
|
||||
# 等待所有任务完成
|
||||
for part_num, future in futures:
|
||||
try:
|
||||
num, etag, uploaded = future.result()
|
||||
uploaded_parts[num] = etag
|
||||
progress_bar.update(uploaded)
|
||||
except Exception as e:
|
||||
logger.error(f"【阿里云盘】分片上传失败: {str(e)}")
|
||||
progress_bar.close()
|
||||
return None
|
||||
|
||||
# 处理上传结果
|
||||
if success:
|
||||
uploaded_parts.add(part_num)
|
||||
progress_bar.update(current_chunk_size)
|
||||
else:
|
||||
raise Exception(f"【阿里云盘】{target_name} 分片 {part_num} 上传失败!")
|
||||
# 8. 关闭进度条
|
||||
progress_bar.close()
|
||||
|
||||
# 6. 关闭进度条
|
||||
if progress_bar:
|
||||
progress_bar.close()
|
||||
|
||||
# 7. 完成上传
|
||||
# 9. 完成上传
|
||||
result = self._complete_upload(drive_id=target_dir.drive_id, file_id=file_id, upload_id=upload_id)
|
||||
if not result:
|
||||
raise Exception("【阿里云盘】完成上传失败!")
|
||||
if result.get("code"):
|
||||
logger.warn(f"【阿里云盘】{target_name} 上传失败:{result.get('message')}!")
|
||||
return None
|
||||
|
||||
return self.__get_fileitem(result, parent=target_dir.path)
|
||||
|
||||
def download(self, fileitem: schemas.FileItem, path: Path = None) -> Optional[Path]:
|
||||
@@ -828,7 +876,7 @@ class AliPan(StorageBase, metaclass=Singleton):
|
||||
if resp.get("code"):
|
||||
logger.debug(f"【阿里云盘】获取文件信息失败: {resp.get('message')}")
|
||||
return None
|
||||
return self.__get_fileitem(resp, parent=f"{str(path.parent)}/")
|
||||
return self.__get_fileitem(resp, parent=str(path.parent))
|
||||
except Exception as e:
|
||||
logger.debug(f"【阿里云盘】获取文件信息失败: {str(e)}")
|
||||
return None
|
||||
@@ -854,7 +902,7 @@ class AliPan(StorageBase, metaclass=Singleton):
|
||||
if folder:
|
||||
return folder
|
||||
# 逐级查找和创建目录
|
||||
fileitem = schemas.FileItem(storage=self.schema.value, path="/")
|
||||
fileitem = schemas.FileItem(storage=self.schema.value, path="/", drive_id=self._default_drive_id)
|
||||
for part in path.parts[1:]:
|
||||
dir_file = __find_dir(fileitem, part)
|
||||
if dir_file:
|
||||
@@ -957,3 +1005,5 @@ class AliPan(StorageBase, metaclass=Singleton):
|
||||
)
|
||||
except NoCheckInException:
|
||||
return None
|
||||
except SessionInvalidException:
|
||||
return None
|
||||
|
||||
@@ -1,15 +1,15 @@
|
||||
import base64
|
||||
import hashlib
|
||||
import json
|
||||
import io
|
||||
import secrets
|
||||
import threading
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from pathlib import Path
|
||||
from typing import List, Dict, Optional, Tuple, Union
|
||||
|
||||
import oss2
|
||||
import requests
|
||||
from oss2 import SizedFileAdapter, determine_part_size
|
||||
from oss2.models import PartInfo
|
||||
from tqdm import tqdm
|
||||
|
||||
@@ -54,6 +54,13 @@ class U115Pan(StorageBase, metaclass=Singleton):
|
||||
# CID和路径缓存
|
||||
_id_cache: Dict[str, str] = {}
|
||||
|
||||
# 最大线程数
|
||||
MAX_WORKERS = 10
|
||||
# 最大分片大小(1GB)
|
||||
MAX_PART_SIZE = 1024 * 1024 * 1024
|
||||
# 最小分片大小(100MB)
|
||||
MIN_PART_SIZE = 100 * 1024 * 1024
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.session = requests.Session()
|
||||
@@ -375,7 +382,7 @@ class U115Pan(StorageBase, metaclass=Singleton):
|
||||
"POST",
|
||||
"/open/folder/add",
|
||||
data={
|
||||
"pid": int(parent_item.fileid),
|
||||
"pid": int(parent_item.fileid or "0"),
|
||||
"file_name": name
|
||||
}
|
||||
)
|
||||
@@ -399,17 +406,66 @@ class U115Pan(StorageBase, metaclass=Singleton):
|
||||
modify_time=int(time.time())
|
||||
)
|
||||
|
||||
def _calc_parts(self, file_size: int) -> Tuple[int, int]:
|
||||
"""
|
||||
计算最优分片大小和线程数,在最大分片大小和最小分片大小之间取最优值
|
||||
:param file_size: 文件大小
|
||||
:return: 分片大小,线程数
|
||||
"""
|
||||
if file_size <= self.MIN_PART_SIZE:
|
||||
return file_size, 1
|
||||
if file_size >= self.MAX_PART_SIZE:
|
||||
part_size = self.MAX_PART_SIZE
|
||||
else:
|
||||
part_size = max(self.MIN_PART_SIZE, file_size // self.MAX_WORKERS)
|
||||
return part_size, (file_size + part_size - 1) // part_size
|
||||
|
||||
@staticmethod
|
||||
def _upload_part(bucket: oss2.Bucket, object_name: str, upload_id: str,
|
||||
part_number: int, part_data: bytes) -> Tuple[PartInfo, int]:
|
||||
"""
|
||||
上传单个分片
|
||||
"""
|
||||
try:
|
||||
result = bucket.upload_part(object_name, upload_id, part_number, part_data)
|
||||
part_info = PartInfo(part_number, result.etag)
|
||||
logger.info(f"【115】分片 {part_number} 上传完成")
|
||||
return part_info, len(part_data)
|
||||
except Exception as e:
|
||||
logger.error(f"【115】分片 {part_number} 上传失败: {str(e)}")
|
||||
raise
|
||||
|
||||
@staticmethod
|
||||
def _log_progress(desc: str, total: int) -> tqdm:
|
||||
"""
|
||||
创建一个可以输出到日志的进度条
|
||||
"""
|
||||
|
||||
class TqdmToLogger(io.StringIO):
|
||||
def write(s, buf): # noqa
|
||||
buf = buf.strip('\r\n\t ')
|
||||
if buf:
|
||||
logger.info(buf)
|
||||
|
||||
return tqdm(
|
||||
total=total,
|
||||
unit='B',
|
||||
unit_scale=True,
|
||||
desc=desc,
|
||||
file=TqdmToLogger(),
|
||||
mininterval=1.0,
|
||||
maxinterval=5.0,
|
||||
miniters=1
|
||||
)
|
||||
|
||||
def upload(self, target_dir: schemas.FileItem, local_path: Path,
|
||||
new_name: Optional[str] = None) -> Optional[schemas.FileItem]:
|
||||
"""
|
||||
实现带秒传、断点续传和二次认证的文件上传
|
||||
实现带秒传、断点续传和多线程并发上传
|
||||
"""
|
||||
|
||||
def encode_callback(cb: dict):
|
||||
"""
|
||||
回调参数Base64编码函数
|
||||
"""
|
||||
return oss2.utils.b64encode_as_string(json.dumps(cb).strip())
|
||||
def encode_callback(cb: str):
|
||||
return oss2.utils.b64encode_as_string(cb)
|
||||
|
||||
target_name = new_name or local_path.name
|
||||
target_path = Path(target_dir.path) / target_name
|
||||
@@ -440,6 +496,7 @@ class U115Pan(StorageBase, metaclass=Singleton):
|
||||
if not init_resp.get("state"):
|
||||
logger.warn(f"【115】初始化上传失败: {init_resp.get('error')}")
|
||||
return None
|
||||
|
||||
# 结果
|
||||
init_result = init_resp.get("data")
|
||||
logger.debug(f"【115】上传 Step 1 初始化结果: {init_result}")
|
||||
@@ -457,15 +514,10 @@ class U115Pan(StorageBase, metaclass=Singleton):
|
||||
sign_checks = sign_check.split("-")
|
||||
start = int(sign_checks[0])
|
||||
end = int(sign_checks[1])
|
||||
# 计算指定区间的SHA1
|
||||
# sign_check (用下划线隔开,截取上传文内容的sha1)(单位是byte): "2392148-2392298"
|
||||
with open(local_path, "rb") as f:
|
||||
# 取2392148-2392298之间的内容(包含2392148、2392298)的sha1
|
||||
f.seek(start)
|
||||
chunk = f.read(end - start + 1)
|
||||
sign_val = hashlib.sha1(chunk).hexdigest().upper()
|
||||
# 重新初始化请求
|
||||
# sign_key,sign_val(根据sign_check计算的值大写的sha1值)
|
||||
init_data.update({
|
||||
"pick_code": pick_code,
|
||||
"sign_key": sign_key,
|
||||
@@ -478,7 +530,6 @@ class U115Pan(StorageBase, metaclass=Singleton):
|
||||
)
|
||||
if not init_resp:
|
||||
return None
|
||||
# 二次认证结果
|
||||
init_result = init_resp.get("data")
|
||||
logger.debug(f"【115】上传 Step 2 二次认证结果: {init_result}")
|
||||
if not pick_code:
|
||||
@@ -505,7 +556,7 @@ class U115Pan(StorageBase, metaclass=Singleton):
|
||||
logger.warn("【115】获取上传凭证失败")
|
||||
return None
|
||||
logger.debug(f"【115】上传 Step 4 获取上传凭证结果: {token_resp}")
|
||||
# 上传凭证
|
||||
|
||||
endpoint = token_resp.get("endpoint")
|
||||
AccessKeyId = token_resp.get("AccessKeyId")
|
||||
AccessKeySecret = token_resp.get("AccessKeySecret")
|
||||
@@ -528,66 +579,75 @@ class U115Pan(StorageBase, metaclass=Singleton):
|
||||
if resume_resp.get("callback"):
|
||||
callback = resume_resp["callback"]
|
||||
|
||||
# Step 6: 对象存储上传
|
||||
# Step 6: 多线程分片上传
|
||||
auth = oss2.StsAuth(
|
||||
access_key_id=AccessKeyId,
|
||||
access_key_secret=AccessKeySecret,
|
||||
security_token=SecurityToken
|
||||
)
|
||||
bucket = oss2.Bucket(auth, endpoint, bucket_name) # noqa
|
||||
# 处理oss请求回调
|
||||
callback_dict = json.loads(callback.get("callback"))
|
||||
callback_var_dict = json.loads(callback.get("callback_var"))
|
||||
# 补充参数
|
||||
logger.debug(f"【115】上传 Step 6 回调参数:{callback_dict} {callback_var_dict}")
|
||||
# 填写不能包含Bucket名称在内的Object完整路径,例如exampledir/exampleobject.txt。
|
||||
# determine_part_size方法用于确定分片大小,设置分片大小为 100M
|
||||
part_size = determine_part_size(file_size, preferred_size=100 * 1024 * 1024)
|
||||
|
||||
# 计算分片大小和线程数
|
||||
part_size, workers = self._calc_parts(file_size)
|
||||
logger.info(f"【115】开始上传: {local_path} -> {target_path},"
|
||||
f"分片大小:{StringUtils.str_filesize(part_size)},线程数:{workers}")
|
||||
|
||||
# 初始化进度条
|
||||
logger.info(f"【115】开始上传: {local_path} -> {target_path},分片大小:{StringUtils.str_filesize(part_size)}")
|
||||
progress_bar = tqdm(
|
||||
total=file_size,
|
||||
unit='B',
|
||||
unit_scale=True,
|
||||
desc="上传进度",
|
||||
ascii=True
|
||||
)
|
||||
progress_bar = self._log_progress(f"【115】{target_name} 上传进度", file_size)
|
||||
|
||||
# 初始化分片
|
||||
# 初始化分片上传
|
||||
upload_id = bucket.init_multipart_upload(object_name,
|
||||
params={
|
||||
"encoding-type": "url",
|
||||
"sequential": ""
|
||||
}).upload_id
|
||||
parts = []
|
||||
# 逐个上传分片
|
||||
with open(local_path, 'rb') as fileobj:
|
||||
part_number = 1
|
||||
offset = 0
|
||||
while offset < file_size:
|
||||
num_to_upload = min(part_size, file_size - offset)
|
||||
# 调用SizedFileAdapter(fileobj, size)方法会生成一个新的文件对象,重新计算起始追加位置。
|
||||
logger.info(f"【115】开始上传 {target_name} 分片 {part_number}: {offset} -> {offset + num_to_upload}")
|
||||
result = bucket.upload_part(object_name, upload_id, part_number,
|
||||
data=SizedFileAdapter(fileobj, num_to_upload))
|
||||
parts.append(PartInfo(part_number, result.etag))
|
||||
logger.info(f"【115】{target_name} 分片 {part_number} 上传完成")
|
||||
offset += num_to_upload
|
||||
part_number += 1
|
||||
# 更新进度
|
||||
progress_bar.update(num_to_upload)
|
||||
|
||||
# 关闭进度条
|
||||
if progress_bar:
|
||||
progress_bar.close()
|
||||
# 创建线程池
|
||||
with ThreadPoolExecutor(max_workers=workers) as pool:
|
||||
futures = []
|
||||
parts = []
|
||||
|
||||
# 请求头
|
||||
# 提交上传任务
|
||||
with open(local_path, 'rb') as fileobj:
|
||||
part_number = 1
|
||||
offset = 0
|
||||
while offset < file_size:
|
||||
size = min(part_size, file_size - offset)
|
||||
fileobj.seek(offset)
|
||||
part_data = fileobj.read(size)
|
||||
future = pool.submit(
|
||||
self._upload_part,
|
||||
bucket,
|
||||
object_name,
|
||||
upload_id,
|
||||
part_number,
|
||||
part_data
|
||||
)
|
||||
futures.append(future)
|
||||
offset += size
|
||||
part_number += 1
|
||||
|
||||
# 等待所有任务完成
|
||||
for future in as_completed(futures):
|
||||
try:
|
||||
part_info, uploaded = future.result()
|
||||
parts.append(part_info)
|
||||
progress_bar.update(uploaded)
|
||||
except Exception as e:
|
||||
logger.error(f"【115】分片上传失败: {str(e)}")
|
||||
progress_bar.close()
|
||||
return None
|
||||
|
||||
# 按分片号排序
|
||||
parts.sort(key=lambda x: x.part_number)
|
||||
|
||||
# 完成上传
|
||||
headers = {
|
||||
'X-oss-callback': encode_callback(callback_dict),
|
||||
'x-oss-callback-var': encode_callback(callback_var_dict),
|
||||
'X-oss-callback': encode_callback(callback["callback"]),
|
||||
'x-oss-callback-var': encode_callback(callback["callback_var"]),
|
||||
'x-oss-forbid-overwrite': 'false'
|
||||
}
|
||||
|
||||
try:
|
||||
result = bucket.complete_multipart_upload(object_name, upload_id, parts,
|
||||
headers=headers)
|
||||
@@ -603,6 +663,9 @@ class U115Pan(StorageBase, metaclass=Singleton):
|
||||
else:
|
||||
logger.error(f"【115】{target_name} 上传失败: {e.status}, 错误码: {e.code}, 详情: {e.message}")
|
||||
return None
|
||||
finally:
|
||||
progress_bar.close()
|
||||
|
||||
# 返回结果
|
||||
return self.get_item(target_path)
|
||||
|
||||
|
||||
@@ -44,6 +44,7 @@ class TrimeMedia:
|
||||
self._playhost = play_api.host
|
||||
elif play_host:
|
||||
logger.warning(f"请检查外网播放地址 {play_host}")
|
||||
self._playhost = UrlUtils.standardize_base_url(play_host).rstrip("/")
|
||||
|
||||
self.reconnect()
|
||||
|
||||
|
||||
@@ -52,21 +52,28 @@ class ObjectUtils:
|
||||
# 跳过空行
|
||||
if not line:
|
||||
continue
|
||||
# 处理多行注释
|
||||
# 处理"""单行注释
|
||||
if line.startswith(('"""', "'''")) and line.endswith(('"""', "'''")):
|
||||
continue
|
||||
# 处理"""多行注释
|
||||
if line.startswith(('"""', "'''")):
|
||||
in_comment = not in_comment
|
||||
continue
|
||||
# 在注释中则跳过
|
||||
if in_comment:
|
||||
continue
|
||||
# 跳过注释、pass语句、装饰器、函数定义行
|
||||
if line.startswith('#') or line == "pass" or line.startswith('@') or line.startswith('def '):
|
||||
# 跳过#注释、pass语句、装饰器、函数定义行
|
||||
if (line.startswith('#')
|
||||
or line == "pass"
|
||||
or line.startswith('@')
|
||||
or line.startswith('def ')):
|
||||
continue
|
||||
# 发现有效代码行
|
||||
return True
|
||||
# 没有有效代码行
|
||||
return False
|
||||
except Exception:
|
||||
except Exception as err:
|
||||
print(err)
|
||||
# 源代码分析失败时,进行字节码分析
|
||||
code_obj = func.__code__
|
||||
instructions = list(dis.get_instructions(code_obj))
|
||||
|
||||
@@ -1,2 +1,2 @@
|
||||
APP_VERSION = 'v2.4.4-1'
|
||||
FRONTEND_VERSION = 'v2.4.4-1'
|
||||
APP_VERSION = 'v2.4.5'
|
||||
FRONTEND_VERSION = 'v2.4.5'
|
||||
|
||||
Reference in New Issue
Block a user