feat(u115): improve stability of the u115 module

1. 优化API请求错误时到处理逻辑
2. 提升hash计算速度
3. 接口级QPS速率限制
4. 使用httpx替换request
5. 优化路径拼接稳定性
6. 代码格式化
This commit is contained in:
DDSRem
2025-11-19 19:39:02 +08:00
parent edd44a0993
commit f589fcc2d0
2 changed files with 222 additions and 188 deletions

View File

@@ -1,15 +1,16 @@
import base64 import base64
import hashlib
import secrets import secrets
import threading
import time import time
from pathlib import Path from pathlib import Path
from typing import List, Optional, Tuple, Union from threading import Lock
from typing import List, Optional, Tuple, Union, Dict
from hashlib import sha256
import oss2 import oss2
import requests import httpx
from oss2 import SizedFileAdapter, determine_part_size from oss2 import SizedFileAdapter, determine_part_size
from oss2.models import PartInfo from oss2.models import PartInfo
from cryptography.hazmat.primitives import hashes
from app import schemas from app import schemas
from app.core.config import settings, global_vars from app.core.config import settings, global_vars
@@ -19,8 +20,10 @@ from app.modules.filemanager.storages import transfer_process
from app.schemas.types import StorageSchema from app.schemas.types import StorageSchema
from app.utils.singleton import WeakSingleton from app.utils.singleton import WeakSingleton
from app.utils.string import StringUtils from app.utils.string import StringUtils
from app.utils.limit import QpsRateLimiter
lock = threading.Lock()
lock = Lock()
class NoCheckInException(Exception): class NoCheckInException(Exception):
@@ -36,10 +39,7 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
schema = StorageSchema.U115 schema = StorageSchema.U115
# 支持的整理方式 # 支持的整理方式
transtype = { transtype = {"move": "移动", "copy": "复制"}
"move": "移动",
"copy": "复制"
}
# 基础url # 基础url
base_url = "https://proapi.115.com" base_url = "https://proapi.115.com"
@@ -52,18 +52,28 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self._auth_state = {} self._auth_state = {}
self.session = requests.Session() self.session = httpx.Client(follow_redirects=True, timeout=20.0)
self._init_session() self._init_session()
self.qps_limiter: Dict[str, QpsRateLimiter] = {
"/open/ufile/files": QpsRateLimiter(4),
"/open/folder/get_info": QpsRateLimiter(3),
"/open/ufile/move": QpsRateLimiter(2),
"/open/ufile/copy": QpsRateLimiter(2),
"/open/ufile/update": QpsRateLimiter(2),
"/open/ufile/delete": QpsRateLimiter(2),
}
def _init_session(self): def _init_session(self):
""" """
初始化带速率限制的会话 初始化带速率限制的会话
""" """
self.session.headers.update({ self.session.headers.update(
"User-Agent": "W115Storage/2.0", {
"Accept-Encoding": "gzip, deflate", "User-Agent": "W115Storage/2.0",
"Content-Type": "application/x-www-form-urlencoded" "Accept-Encoding": "gzip, deflate",
}) "Content-Type": "application/x-www-form-urlencoded",
}
)
def _check_session(self): def _check_session(self):
""" """
@@ -87,10 +97,7 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
if expires_in and refresh_time + expires_in < int(time.time()): if expires_in and refresh_time + expires_in < int(time.time()):
tokens = self.__refresh_access_token(refresh_token) tokens = self.__refresh_access_token(refresh_token)
if tokens: if tokens:
self.set_config({ self.set_config({"refresh_time": int(time.time()), **tokens})
"refresh_time": int(time.time()),
**tokens
})
else: else:
return None return None
access_token = tokens.get("access_token") access_token = tokens.get("access_token")
@@ -105,7 +112,7 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
# 生成PKCE参数 # 生成PKCE参数
code_verifier = secrets.token_urlsafe(96)[:128] code_verifier = secrets.token_urlsafe(96)[:128]
code_challenge = base64.b64encode( code_challenge = base64.b64encode(
hashlib.sha256(code_verifier.encode("utf-8")).digest() sha256(code_verifier.encode("utf-8")).digest()
).decode("utf-8") ).decode("utf-8")
# 请求设备码 # 请求设备码
resp = self.session.post( resp = self.session.post(
@@ -113,8 +120,8 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
data={ data={
"client_id": settings.U115_APP_ID, "client_id": settings.U115_APP_ID,
"code_challenge": code_challenge, "code_challenge": code_challenge,
"code_challenge_method": "sha256" "code_challenge_method": "sha256",
} },
) )
if resp is None: if resp is None:
return {}, "网络错误" return {}, "网络错误"
@@ -126,13 +133,11 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
"code_verifier": code_verifier, "code_verifier": code_verifier,
"uid": result["data"]["uid"], "uid": result["data"]["uid"],
"time": result["data"]["time"], "time": result["data"]["time"],
"sign": result["data"]["sign"] "sign": result["data"]["sign"],
} }
# 生成二维码内容 # 生成二维码内容
return { return {"codeContent": result["data"]["qrcode"]}, ""
"codeContent": result['data']['qrcode']
}, ""
def check_login(self) -> Optional[Tuple[dict, str]]: def check_login(self) -> Optional[Tuple[dict, str]]:
""" """
@@ -146,8 +151,8 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
params={ params={
"uid": self._auth_state["uid"], "uid": self._auth_state["uid"],
"time": self._auth_state["time"], "time": self._auth_state["time"],
"sign": self._auth_state["sign"] "sign": self._auth_state["sign"],
} },
) )
if resp is None: if resp is None:
return {}, "网络错误" return {}, "网络错误"
@@ -156,11 +161,11 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
return {}, result.get("message") return {}, result.get("message")
if result["data"]["status"] == 2: if result["data"]["status"] == 2:
tokens = self.__get_access_token() tokens = self.__get_access_token()
self.set_config({ self.set_config({"refresh_time": int(time.time()), **tokens})
"refresh_time": int(time.time()), return {
**tokens "status": result["data"]["status"],
}) "tip": result["data"]["msg"],
return {"status": result["data"]["status"], "tip": result["data"]["msg"]}, "" }, ""
except Exception as e: except Exception as e:
return {}, str(e) return {}, str(e)
@@ -174,8 +179,8 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
"https://passportapi.115.com/open/deviceCodeToToken", "https://passportapi.115.com/open/deviceCodeToToken",
data={ data={
"uid": self._auth_state["uid"], "uid": self._auth_state["uid"],
"code_verifier": self._auth_state["code_verifier"] "code_verifier": self._auth_state["code_verifier"],
} },
) )
if resp is None: if resp is None:
raise Exception("获取 access_token 失败") raise Exception("获取 access_token 失败")
@@ -190,21 +195,24 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
""" """
resp = self.session.post( resp = self.session.post(
"https://passportapi.115.com/open/refreshToken", "https://passportapi.115.com/open/refreshToken",
data={ data={"refresh_token": refresh_token},
"refresh_token": refresh_token
}
) )
if resp is None: if resp is None:
logger.error(f"【115】刷新 access_token 失败refresh_token={refresh_token}") logger.error(
f"【115】刷新 access_token 失败refresh_token={refresh_token}"
)
return None return None
result = resp.json() result = resp.json()
if result.get("code") != 0: if result.get("code") != 0:
logger.warn(f"【115】刷新 access_token 失败:{result.get('code')} - {result.get('message')}") logger.warn(
f"【115】刷新 access_token 失败:{result.get('code')} - {result.get('message')}"
)
return None return None
return result.get("data") return result.get("data")
def _request_api(self, method: str, endpoint: str, def _request_api(
result_key: Optional[str] = None, **kwargs) -> Optional[Union[dict, list]]: self, method: str, endpoint: str, result_key: Optional[str] = None, **kwargs
) -> Optional[Union[dict, list]]:
""" """
带错误处理和速率限制的API请求 带错误处理和速率限制的API请求
""" """
@@ -216,12 +224,13 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
# 重试次数 # 重试次数
retry_times = kwargs.pop("retry_limit", 5) retry_times = kwargs.pop("retry_limit", 5)
# qps 速率限制
if endpoint in self.qps_limiter.keys():
self.qps_limiter[endpoint].acquire()
try: try:
resp = self.session.request( resp = self.session.request(method, f"{self.base_url}{endpoint}", **kwargs)
method, f"{self.base_url}{endpoint}", except httpx.RequestError as e:
**kwargs
)
except requests.exceptions.RequestException as e:
logger.error(f"【115】{method} 请求 {endpoint} 网络错误: {str(e)}") logger.error(f"【115】{method} 请求 {endpoint} 网络错误: {str(e)}")
return None return None
@@ -241,7 +250,20 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
return self._request_api(method, endpoint, result_key, **kwargs) return self._request_api(method, endpoint, result_key, **kwargs)
# 处理请求错误 # 处理请求错误
resp.raise_for_status() try:
resp.raise_for_status()
except httpx.HTTPStatusError as e:
if retry_times <= 0:
logger.error(
f"【115】{method} 请求 {endpoint} 错误 {e},重试次数用尽!"
)
return None
kwargs["retry_limit"] = retry_times - 1
logger.info(
f"【115】{method} 请求 {endpoint} 错误 {e},等待 {self.retry_delay} 秒后重试..."
)
time.sleep(2 ** (5 - retry_times + 1))
return self._request_api(method, endpoint, result_key, **kwargs)
# 返回数据 # 返回数据
ret_data = resp.json() ret_data = resp.json()
@@ -251,10 +273,14 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
logger.warn(f"【115】{method} 请求 {endpoint} 出错:{error_msg}") logger.warn(f"【115】{method} 请求 {endpoint} 出错:{error_msg}")
if "已达到当前访问上限" in error_msg: if "已达到当前访问上限" in error_msg:
if retry_times <= 0: if retry_times <= 0:
logger.error(f"【115】{method} 请求 {endpoint} 达到访问上限,重试次数用尽!") logger.error(
f"【115】{method} 请求 {endpoint} 达到访问上限,重试次数用尽!"
)
return None return None
kwargs["retry_limit"] = retry_times - 1 kwargs["retry_limit"] = retry_times - 1
logger.info(f"【115】{method} 请求 {endpoint} 达到访问上限,等待 {self.retry_delay} 秒后重试...") logger.info(
f"【115】{method} 请求 {endpoint} 达到访问上限,等待 {self.retry_delay} 秒后重试..."
)
time.sleep(self.retry_delay) time.sleep(self.retry_delay)
return self._request_api(method, endpoint, result_key, **kwargs) return self._request_api(method, endpoint, result_key, **kwargs)
return None return None
@@ -269,26 +295,15 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
计算文件SHA1符合115规范 计算文件SHA1符合115规范
size: 前多少字节 size: 前多少字节
""" """
sha1 = hashlib.sha1() sha1 = hashes.Hash(hashes.SHA1())
with open(filepath, 'rb') as f: with open(filepath, "rb") as f:
if size: if size:
chunk = f.read(size) chunk = f.read(size)
sha1.update(chunk) sha1.update(chunk)
else: else:
while chunk := f.read(8192): while chunk := f.read(8192):
sha1.update(chunk) sha1.update(chunk)
return sha1.hexdigest() return sha1.finalize().hex()
def _delay_get_item(self, path: Path) -> Optional[schemas.FileItem]:
"""
自动延迟重试 get_item 模块
"""
for i in range(1, 4):
time.sleep(2 ** i)
fileitem = self.get_item(path)
if fileitem:
return fileitem
return None
def init_storage(self): def init_storage(self):
pass pass
@@ -304,7 +319,7 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
return [item] return [item]
return [] return []
if fileitem.path == "/": if fileitem.path == "/":
cid = '0' cid = "0"
else: else:
cid = fileitem.fileid cid = fileitem.fileid
if not cid: if not cid:
@@ -322,29 +337,37 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
"GET", "GET",
"/open/ufile/files", "/open/ufile/files",
"data", "data",
params={"cid": int(cid), "limit": 1000, "offset": offset, "cur": True, "show_dir": 1} params={
"cid": int(cid),
"limit": 1000,
"offset": offset,
"cur": True,
"show_dir": 1,
},
) )
if resp is None: if resp is None:
raise FileNotFoundError(f"【115】{fileitem.path} 检索出错!") raise FileNotFoundError(f"【115】{fileitem.path} 检索出错!")
if not resp: if not resp:
break break
for item in resp: for item in resp:
# 更新缓存 parent_path = Path(fileitem.path) # noqa
path = f"{fileitem.path}{item['fn']}" item_name = item["fn"]
file_path = path + ("/" if item["fc"] == "0" else "") full_path = parent_path / item_name
items.append(schemas.FileItem( items.append(
storage=self.schema.value, schemas.FileItem(
fileid=str(item["fid"]), storage=self.schema.value,
parent_fileid=cid, fileid=str(item["fid"]),
name=item["fn"], parent_fileid=cid,
basename=Path(item["fn"]).stem, name=item["fn"],
extension=item["ico"] if item["fc"] == "1" else None, basename=Path(item["fn"]).stem,
type="dir" if item["fc"] == "0" else "file", extension=item["ico"] if item["fc"] == "1" else None,
path=file_path, type="dir" if item["fc"] == "0" else "file",
size=item["fs"] if item["fc"] == "1" else None, path=full_path.as_posix() + ("/" if item["fc"] == "0" else ""),
modify_time=item["upt"], size=item["fs"] if item["fc"] == "1" else None,
pickcode=item["pc"] modify_time=item["upt"],
)) pickcode=item["pc"],
)
)
if len(resp) < 1000: if len(resp) < 1000:
break break
@@ -352,7 +375,9 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
return items return items
def create_folder(self, parent_item: schemas.FileItem, name: str) -> Optional[schemas.FileItem]: def create_folder(
self, parent_item: schemas.FileItem, name: str
) -> Optional[schemas.FileItem]:
""" """
创建目录 创建目录
""" """
@@ -360,10 +385,7 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
resp = self._request_api( resp = self._request_api(
"POST", "POST",
"/open/folder/add", "/open/folder/add",
data={ data={"pid": int(parent_item.fileid or "0"), "file_name": name},
"pid": int(parent_item.fileid or "0"),
"file_name": name
}
) )
if not resp: if not resp:
return None return None
@@ -376,15 +398,19 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
return schemas.FileItem( return schemas.FileItem(
storage=self.schema.value, storage=self.schema.value,
fileid=str(resp["data"]["file_id"]), fileid=str(resp["data"]["file_id"]),
path=str(new_path) + "/", path=new_path.as_posix() + "/",
name=name, name=name,
basename=name, basename=name,
type="dir", type="dir",
modify_time=int(time.time()) modify_time=int(time.time()),
) )
def upload(self, target_dir: schemas.FileItem, local_path: Path, def upload(
new_name: Optional[str] = None) -> Optional[schemas.FileItem]: self,
target_dir: schemas.FileItem,
local_path: Path,
new_name: Optional[str] = None,
) -> Optional[schemas.FileItem]:
""" """
实现带秒传、断点续传和二次认证的文件上传 实现带秒传、断点续传和二次认证的文件上传
""" """
@@ -409,13 +435,9 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
"file_size": file_size, "file_size": file_size,
"target": target_param, "target": target_param,
"fileid": file_sha1, "fileid": file_sha1,
"preid": file_preid "preid": file_preid,
} }
init_resp = self._request_api( init_resp = self._request_api("POST", "/open/upload/init", data=init_data)
"POST",
"/open/upload/init",
data=init_data
)
if not init_resp: if not init_resp:
return None return None
if not init_resp.get("state"): if not init_resp.get("state"):
@@ -444,19 +466,15 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
# 取2392148-2392298之间的内容(包含2392148、2392298)的sha1 # 取2392148-2392298之间的内容(包含2392148、2392298)的sha1
f.seek(start) f.seek(start)
chunk = f.read(end - start + 1) chunk = f.read(end - start + 1)
sign_val = hashlib.sha1(chunk).hexdigest().upper() sha1 = hashes.Hash(hashes.SHA1())
sha1.update(chunk)
sign_val = sha1.finalize().hex().upper()
# 重新初始化请求 # 重新初始化请求
# sign_keysign_val(根据sign_check计算的值大写的sha1值) # sign_keysign_val(根据sign_check计算的值大写的sha1值)
init_data.update({ init_data.update(
"pick_code": pick_code, {"pick_code": pick_code, "sign_key": sign_key, "sign_val": sign_val}
"sign_key": sign_key,
"sign_val": sign_val
})
init_resp = self._request_api(
"POST",
"/open/upload/init",
data=init_data
) )
init_resp = self._request_api("POST", "/open/upload/init", data=init_data)
if not init_resp: if not init_resp:
return None return None
if not init_resp.get("state"): if not init_resp.get("state"):
@@ -485,32 +503,30 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
"GET", "GET",
"/open/folder/get_info", "/open/folder/get_info",
"data", "data",
params={ params={"file_id": int(file_id)},
"file_id": int(file_id)
}
) )
if info_resp: if info_resp:
return schemas.FileItem( return schemas.FileItem(
storage=self.schema.value, storage=self.schema.value,
fileid=str(info_resp["file_id"]), fileid=str(info_resp["file_id"]),
path=str(target_path) + ("/" if info_resp["file_category"] == "0" else ""), path=str(target_path)
+ ("/" if info_resp["file_category"] == "0" else ""),
type="file" if info_resp["file_category"] == "1" else "dir", type="file" if info_resp["file_category"] == "1" else "dir",
name=info_resp["file_name"], name=info_resp["file_name"],
basename=Path(info_resp["file_name"]).stem, basename=Path(info_resp["file_name"]).stem,
extension=Path(info_resp["file_name"]).suffix[1:] if info_resp[ extension=Path(info_resp["file_name"]).suffix[1:]
"file_category"] == "1" else None, if info_resp["file_category"] == "1"
else None,
pickcode=info_resp["pick_code"], pickcode=info_resp["pick_code"],
size=StringUtils.num_filesize(info_resp['size']) if info_resp["file_category"] == "1" else None, size=StringUtils.num_filesize(info_resp["size"])
modify_time=info_resp["utime"] if info_resp["file_category"] == "1"
else None,
modify_time=info_resp["utime"],
) )
return self._delay_get_item(target_path) return self.get_item(target_path)
# Step 4: 获取上传凭证 # Step 4: 获取上传凭证
token_resp = self._request_api( token_resp = self._request_api("GET", "/open/upload/get_token", "data")
"GET",
"/open/upload/get_token",
"data"
)
if not token_resp: if not token_resp:
logger.warn("【115】获取上传凭证失败") logger.warn("【115】获取上传凭证失败")
return None return None
@@ -530,8 +546,8 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
"file_size": file_size, "file_size": file_size,
"target": target_param, "target": target_param,
"fileid": file_sha1, "fileid": file_sha1,
"pick_code": pick_code "pick_code": pick_code,
} },
) )
if resume_resp: if resume_resp:
logger.debug(f"【115】上传 Step 5 断点续传结果: {resume_resp}") logger.debug(f"【115】上传 Step 5 断点续传结果: {resume_resp}")
@@ -542,25 +558,25 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
auth = oss2.StsAuth( auth = oss2.StsAuth(
access_key_id=AccessKeyId, access_key_id=AccessKeyId,
access_key_secret=AccessKeySecret, access_key_secret=AccessKeySecret,
security_token=SecurityToken security_token=SecurityToken,
) )
bucket = oss2.Bucket(auth, endpoint, bucket_name) # noqa bucket = oss2.Bucket(auth, endpoint, bucket_name) # noqa
# determine_part_size方法用于确定分片大小设置分片大小为 10M # determine_part_size方法用于确定分片大小设置分片大小为 10M
part_size = determine_part_size(file_size, preferred_size=10 * 1024 * 1024) part_size = determine_part_size(file_size, preferred_size=10 * 1024 * 1024)
# 初始化进度条 # 初始化进度条
logger.info(f"【115】开始上传: {local_path} -> {target_path},分片大小:{StringUtils.str_filesize(part_size)}") logger.info(
f"【115】开始上传: {local_path} -> {target_path},分片大小:{StringUtils.str_filesize(part_size)}"
)
progress_callback = transfer_process(local_path.as_posix()) progress_callback = transfer_process(local_path.as_posix())
# 初始化分片 # 初始化分片
upload_id = bucket.init_multipart_upload(object_name, upload_id = bucket.init_multipart_upload(
params={ object_name, params={"encoding-type": "url", "sequential": ""}
"encoding-type": "url", ).upload_id
"sequential": ""
}).upload_id
parts = [] parts = []
# 逐个上传分片 # 逐个上传分片
with open(local_path, 'rb') as fileobj: with open(local_path, "rb") as fileobj:
part_number = 1 part_number = 1
offset = 0 offset = 0
while offset < file_size: while offset < file_size:
@@ -569,9 +585,15 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
return None return None
num_to_upload = min(part_size, file_size - offset) num_to_upload = min(part_size, file_size - offset)
# 调用SizedFileAdapter(fileobj, size)方法会生成一个新的文件对象,重新计算起始追加位置。 # 调用SizedFileAdapter(fileobj, size)方法会生成一个新的文件对象,重新计算起始追加位置。
logger.info(f"【115】开始上传 {target_name} 分片 {part_number}: {offset} -> {offset + num_to_upload}") logger.info(
result = bucket.upload_part(object_name, upload_id, part_number, f"【115】开始上传 {target_name} 分片 {part_number}: {offset} -> {offset + num_to_upload}"
data=SizedFileAdapter(fileobj, num_to_upload)) )
result = bucket.upload_part(
object_name,
upload_id,
part_number,
data=SizedFileAdapter(fileobj, num_to_upload),
)
parts.append(PartInfo(part_number, result.etag)) parts.append(PartInfo(part_number, result.etag))
logger.info(f"【115】{target_name} 分片 {part_number} 上传完成") logger.info(f"【115】{target_name} 分片 {part_number} 上传完成")
offset += num_to_upload offset += num_to_upload
@@ -585,15 +607,18 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
# 请求头 # 请求头
headers = { headers = {
'X-oss-callback': encode_callback(callback["callback"]), "X-oss-callback": encode_callback(callback["callback"]),
'x-oss-callback-var': encode_callback(callback["callback_var"]), "x-oss-callback-var": encode_callback(callback["callback_var"]),
'x-oss-forbid-overwrite': 'false' "x-oss-forbid-overwrite": "false",
} }
try: try:
result = bucket.complete_multipart_upload(object_name, upload_id, parts, result = bucket.complete_multipart_upload(
headers=headers) object_name, upload_id, parts, headers=headers
)
if result.status == 200: if result.status == 200:
logger.debug(f"【115】上传 Step 6 回调结果:{result.resp.response.json()}") logger.debug(
f"【115】上传 Step 6 回调结果:{result.resp.response.json()}"
)
logger.info(f"【115】{target_name} 上传成功") logger.info(f"【115】{target_name} 上传成功")
else: else:
logger.warn(f"【115】{target_name} 上传失败,错误码: {result.status}") logger.warn(f"【115】{target_name} 上传失败,错误码: {result.status}")
@@ -602,10 +627,12 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
if e.code == "FileAlreadyExists": if e.code == "FileAlreadyExists":
logger.warn(f"【115】{target_name} 已存在") logger.warn(f"【115】{target_name} 已存在")
else: else:
logger.error(f"【115】{target_name} 上传失败: {e.status}, 错误码: {e.code}, 详情: {e.message}") logger.error(
f"【115】{target_name} 上传失败: {e.status}, 错误码: {e.code}, 详情: {e.message}"
)
return None return None
# 返回结果 # 返回结果
return self._delay_get_item(target_path) return self.get_item(target_path)
def download(self, fileitem: schemas.FileItem, path: Path = None) -> Optional[Path]: def download(self, fileitem: schemas.FileItem, path: Path = None) -> Optional[Path]:
""" """
@@ -617,12 +644,7 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
return None return None
download_info = self._request_api( download_info = self._request_api(
"POST", "POST", "/open/ufile/downurl", "data", data={"pick_code": detail.pickcode}
"/open/ufile/downurl",
"data",
data={
"pick_code": detail.pickcode
}
) )
if not download_info: if not download_info:
logger.error(f"【115】获取下载链接失败: {fileitem.name}") logger.error(f"【115】获取下载链接失败: {fileitem.name}")
@@ -643,28 +665,26 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
progress_callback = transfer_process(Path(fileitem.path).as_posix()) progress_callback = transfer_process(Path(fileitem.path).as_posix())
try: try:
with self.session.get(download_url, stream=True) as r: with self.session.stream("GET", download_url) as r:
r.raise_for_status() r.raise_for_status()
downloaded_size = 0 downloaded_size = 0
with open(local_path, "wb") as f: with open(local_path, "wb") as f:
for chunk in r.iter_content(chunk_size=self.chunk_size): for chunk in r.iter_bytes(chunk_size=self.chunk_size):
if global_vars.is_transfer_stopped(fileitem.path): if global_vars.is_transfer_stopped(fileitem.path):
logger.info(f"【115】{fileitem.path} 下载已取消!") logger.info(f"【115】{fileitem.path} 下载已取消!")
r.close()
return None return None
if chunk: f.write(chunk)
f.write(chunk) downloaded_size += len(chunk)
downloaded_size += len(chunk) if file_size:
# 更新进度 progress = (downloaded_size * 100) / file_size
if file_size: progress_callback(progress)
progress = (downloaded_size * 100) / file_size
progress_callback(progress)
# 完成下载 # 完成下载
progress_callback(100) progress_callback(100)
logger.info(f"【115】下载完成: {fileitem.name}") logger.info(f"【115】下载完成: {fileitem.name}")
except httpx.RequestError as e:
except requests.exceptions.RequestException as e:
logger.error(f"【115】下载网络错误: {fileitem.name} - {str(e)}") logger.error(f"【115】下载网络错误: {fileitem.name} - {str(e)}")
# 删除可能部分下载的文件 # 删除可能部分下载的文件
if local_path.exists(): if local_path.exists():
@@ -688,14 +708,10 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
""" """
try: try:
self._request_api( self._request_api(
"POST", "POST", "/open/ufile/delete", data={"file_ids": int(fileitem.fileid)}
"/open/ufile/delete",
data={
"file_ids": int(fileitem.fileid)
}
) )
return True return True
except requests.exceptions.HTTPError: except httpx.HTTPError:
return False return False
def rename(self, fileitem: schemas.FileItem, name: str) -> bool: def rename(self, fileitem: schemas.FileItem, name: str) -> bool:
@@ -705,10 +721,7 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
resp = self._request_api( resp = self._request_api(
"POST", "POST",
"/open/ufile/update", "/open/ufile/update",
data={ data={"file_id": int(fileitem.fileid), "file_name": name},
"file_id": int(fileitem.fileid),
"file_name": name
}
) )
if not resp: if not resp:
return False return False
@@ -725,10 +738,8 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
"POST", "POST",
"/open/folder/get_info", "/open/folder/get_info",
"data", "data",
data={ data={"path": path.as_posix()},
"path": path.as_posix() no_error_log=True,
},
no_error_log=True
) )
if not resp: if not resp:
return None return None
@@ -739,10 +750,12 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
type="file" if resp["file_category"] == "1" else "dir", type="file" if resp["file_category"] == "1" else "dir",
name=resp["file_name"], name=resp["file_name"],
basename=Path(resp["file_name"]).stem, basename=Path(resp["file_name"]).stem,
extension=Path(resp["file_name"]).suffix[1:] if resp["file_category"] == "1" else None, extension=Path(resp["file_name"]).suffix[1:]
if resp["file_category"] == "1"
else None,
pickcode=resp["pick_code"], pickcode=resp["pick_code"],
size=resp['size_byte'] if resp["file_category"] == "1" else None, size=resp["size_byte"] if resp["file_category"] == "1" else None,
modify_time=resp["utime"] modify_time=resp["utime"],
) )
except Exception as e: except Exception as e:
logger.debug(f"【115】获取文件信息失败: {str(e)}") logger.debug(f"【115】获取文件信息失败: {str(e)}")
@@ -753,7 +766,9 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
获取指定路径的文件夹,如不存在则创建 获取指定路径的文件夹,如不存在则创建
""" """
def __find_dir(_fileitem: schemas.FileItem, _name: str) -> Optional[schemas.FileItem]: def __find_dir(
_fileitem: schemas.FileItem, _name: str
) -> Optional[schemas.FileItem]:
""" """
查找下级目录中匹配名称的目录 查找下级目录中匹配名称的目录
""" """
@@ -808,13 +823,13 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
data={ data={
"file_id": int(fileitem.fileid), "file_id": int(fileitem.fileid),
"pid": int(dest_fileitem.fileid), "pid": int(dest_fileitem.fileid),
} },
) )
if not resp: if not resp:
return False return False
if resp["state"]: if resp["state"]:
new_path = Path(path) / fileitem.name new_path = Path(path) / fileitem.name
new_item = self._delay_get_item(new_path) new_item = self.get_item(new_path)
if not new_item: if not new_item:
return False return False
if self.rename(new_item, new_name): if self.rename(new_item, new_name):
@@ -840,13 +855,13 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
data={ data={
"file_ids": int(fileitem.fileid), "file_ids": int(fileitem.fileid),
"to_cid": int(dest_fileitem.fileid), "to_cid": int(dest_fileitem.fileid),
} },
) )
if not resp: if not resp:
return False return False
if resp["state"]: if resp["state"]:
new_path = Path(path) / fileitem.name new_path = Path(path) / fileitem.name
new_file = self._delay_get_item(new_path) new_file = self.get_item(new_path)
if not new_file: if not new_file:
return False return False
if self.rename(new_file, new_name): if self.rename(new_file, new_name):
@@ -864,17 +879,12 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
获取带有企业级配额信息的存储使用情况 获取带有企业级配额信息的存储使用情况
""" """
try: try:
resp = self._request_api( resp = self._request_api("GET", "/open/user/info", "data")
"GET",
"/open/user/info",
"data"
)
if not resp: if not resp:
return None return None
space = resp["rt_space_info"] space = resp["rt_space_info"]
return schemas.StorageUsage( return schemas.StorageUsage(
total=space["all_total"]["size"], total=space["all_total"]["size"], available=space["all_remain"]["size"]
available=space["all_remain"]["size"]
) )
except NoCheckInException: except NoCheckInException:
return None return None

View File

@@ -382,3 +382,27 @@ def rate_limit_window(max_calls: int, window_seconds: float,
limiter = WindowRateLimiter(max_calls, window_seconds, source, enable_logging) limiter = WindowRateLimiter(max_calls, window_seconds, source, enable_logging)
# 使用通用装饰器逻辑包装该限流器 # 使用通用装饰器逻辑包装该限流器
return rate_limit_handler(limiter, raise_on_limit) return rate_limit_handler(limiter, raise_on_limit)
class QpsRateLimiter:
"""
速率控制器,精确控制 QPS
"""
def __init__(self, qps: float | int):
if qps <= 0:
qps = float("inf")
self.interval = 1.0 / qps
self.lock = threading.Lock()
self.next_call_time = time.monotonic()
def acquire(self) -> None:
"""
获取调用许可,阻塞直到满足速率限制
"""
with self.lock:
now = time.monotonic()
wait_time = self.next_call_time - now
if wait_time > 0:
time.sleep(wait_time)
self.next_call_time = max(now, self.next_call_time) + self.interval