fix 115 api

This commit is contained in:
jxxghp
2025-03-21 21:08:14 +08:00
parent 2b9f7bca51
commit 59ed08b92d

View File

@@ -146,6 +146,110 @@ class U115Pan(StorageBase, metaclass=Singleton):
raise Exception(result.get("message"))
return result.get("data")
def _request_api(self, method: str, endpoint: str,
result_key: str = None, **kwargs) -> Optional[Union[dict, list]]:
"""
带错误处理和速率限制的API请求
"""
resp = self.session.request(
method, f"{self.base_url}{endpoint}",
**kwargs
)
if resp is None:
logger.error(f"请求 115 API 失败: {method} {endpoint}")
return {}
# 处理速率限制
if resp.status_code == 429:
reset_time = int(resp.headers.get("X-RateLimit-Reset", 60))
time.sleep(reset_time + 5)
return self._request_api(method, endpoint, result_key, **kwargs)
resp.raise_for_status()
if result_key:
result = resp.json().get(result_key)
if not result:
raise FileNotFoundError(f"请求 115 API 失败: {method} {endpoint}")
return result
return resp.json()
def _path_to_id(self, path: str) -> int:
"""
路径转FID带缓存机制
"""
# 命中缓存
if path in self._id_cache:
return self._id_cache[path]
# 逐级查找缓存
current_id = 0
parent_path = "/"
for p in Path(path).parents:
if str(p) in self._id_cache:
parent_path = str(p)
current_id = self._id_cache[parent_path]
break
# 计算相对路径
rel_path = Path(path).relative_to(parent_path)
for part in Path(rel_path).parts:
resp = self._request_api(
"GET",
"/open/ufile/files",
"data",
params={
"cid": current_id
}
)
for item in resp:
if item["name"] == part:
current_id = item["fid"]
break
else:
raise FileNotFoundError(f"路径不存在: {path}")
self._id_cache[path] = current_id
return current_id
def _id_to_path(self, fid: int) -> str:
"""
CID转路径带双向缓存
"""
# 根目录特殊处理
if fid == 0:
return "/"
# 优先从缓存读取
if fid in self._id_cache.values():
return next(k for k, v in self._id_cache.items() if v == fid)
# 从API获取当前节点信息
detail = self._request_api(
"GET",
"/open/folder/get_info",
"data",
params={
"file_id": fid
}
)
# 处理可能的空数据(如已删除文件)
if not detail:
raise FileNotFoundError(f"{fid} 不存在")
paths = detail["paths"]
path_parts = [item["file_name"] for item in paths]
# 构建完整路径
full_path = "/" + "/".join(reversed(path_parts))
# 缓存新路径
self._id_cache[full_path] = fid
return full_path
@staticmethod
def _calc_sha1(filepath: Path) -> str:
"""
计算文件SHA1符合115规范
"""
sha1 = hashlib.sha1()
with open(filepath, 'rb') as f:
while chunk := f.read(8192):
sha1.update(chunk)
return sha1.hexdigest()
def check_login(self) -> Optional[Dict]:
"""
改进的带PKCE校验的登录状态检查
@@ -385,110 +489,6 @@ class U115Pan(StorageBase, metaclass=Singleton):
f.write(chunk)
return local_path
def _request_api(self, method: str, endpoint: str,
result_key: str = None, **kwargs) -> Optional[Union[dict, list]]:
"""
带错误处理和速率限制的API请求
"""
resp = self.session.request(
method, f"{self.base_url}{endpoint}",
**kwargs
)
if resp is None:
logger.error(f"请求 115 API 失败: {method} {endpoint}")
return {}
# 处理速率限制
if resp.status_code == 429:
reset_time = int(resp.headers.get("X-RateLimit-Reset", 60))
time.sleep(reset_time + 5)
return self._request_api(method, endpoint, result_key, **kwargs)
resp.raise_for_status()
if result_key:
result = resp.json().get(result_key)
if not result:
raise FileNotFoundError(f"请求 115 API 失败: {method} {endpoint}")
return result
return resp.json()
def _path_to_id(self, path: str) -> int:
"""
路径转FID带缓存机制
"""
# 命中缓存
if path in self._id_cache:
return self._id_cache[path]
# 逐级查找缓存
current_id = 0
parent_path = "/"
for p in Path(path).parents:
if str(p) in self._id_cache:
parent_path = str(p)
current_id = self._id_cache[parent_path]
break
# 计算相对路径
rel_path = Path(path).relative_to(parent_path)
for part in Path(rel_path).parts:
resp = self._request_api(
"GET",
"/open/ufile/files",
"data",
params={
"cid": current_id
}
)
for item in resp:
if item["name"] == part:
current_id = item["fid"]
break
else:
raise FileNotFoundError(f"路径不存在: {path}")
self._id_cache[path] = current_id
return current_id
def _id_to_path(self, fid: int) -> str:
"""
CID转路径带双向缓存
"""
# 根目录特殊处理
if fid == 0:
return "/"
# 优先从缓存读取
if fid in self._id_cache.values():
return next(k for k, v in self._id_cache.items() if v == fid)
# 从API获取当前节点信息
detail = self._request_api(
"GET",
"/open/folder/get_info",
"data",
params={
"file_id": fid
}
)
# 处理可能的空数据(如已删除文件)
if not detail:
raise FileNotFoundError(f"{fid} 不存在")
paths = detail["paths"]
path_parts = [item["file_name"] for item in paths]
# 构建完整路径
full_path = "/" + "/".join(reversed(path_parts))
# 缓存新路径
self._id_cache[full_path] = fid
return full_path
@staticmethod
def _calc_sha1(filepath: Path) -> str:
"""
计算文件SHA1符合115规范
"""
sha1 = hashlib.sha1()
with open(filepath, 'rb') as f:
while chunk := f.read(8192):
sha1.update(chunk)
return sha1.hexdigest()
def check(self) -> bool:
return self.access_token is not None
@@ -524,6 +524,9 @@ class U115Pan(StorageBase, metaclass=Singleton):
if resp["state"]:
if fileitem.path in self._id_cache:
del self._id_cache[fileitem.path]
for key in list(self._id_cache.keys()):
if key.startswith(fileitem.path):
del self._id_cache[key]
new_path = Path(fileitem.path).parent / name
self._id_cache[str(new_path)] = file_id
return True