feat:切换使用python-115

This commit is contained in:
jxxghp
2024-11-17 02:10:45 +08:00
parent 510c20dc70
commit 293e417865
5 changed files with 151 additions and 189 deletions

View File

@@ -347,7 +347,7 @@ class AliPan(StorageBase):
"""
if not self.aligo:
return None
local_path = self.aligo.download_file(file_id=fileitem.fileid, drive_id=fileitem.drive_id,
local_path = self.aligo.download_file(file_id=fileitem.fileid, drive_id=fileitem.drive_id, # noqa
local_folder=str(path or settings.TEMP_PATH))
if local_path:
return Path(local_path)

View File

@@ -1,18 +1,13 @@
import base64
from pathlib import Path
from typing import Optional, Tuple, List
import oss2
import py115
from py115 import Cloud
from py115.types import LoginTarget, QrcodeSession, QrcodeStatus, Credential
from p115 import P115Client, P115FileSystem, P115Path
from app import schemas
from app.core.config import settings
from app.log import logger
from app.modules.filemanager.storages import StorageBase
from app.schemas.types import StorageSchema
from app.utils.http import RequestUtils
from app.utils.singleton import Singleton
@@ -29,8 +24,9 @@ class U115Pan(StorageBase, metaclass=Singleton):
"move": "移动"
}
cloud: Optional[Cloud] = None
_session: QrcodeSession = None
client: P115Client = None
fs: P115FileSystem = None
session_info: dict = None
def __init_cloud(self) -> bool:
"""
@@ -41,8 +37,9 @@ class U115Pan(StorageBase, metaclass=Singleton):
logger.warn("115未登录请先登录")
return False
try:
if not self.cloud:
self.cloud = py115.connect(credential)
if not self.client or not self.client.cookies:
self.client = P115Client(credential)
self.fs = P115FileSystem(self.client)
except Exception as err:
logger.error(f"115连接失败请重新扫码登录{str(err)}")
self.__clear_credential()
@@ -50,20 +47,23 @@ class U115Pan(StorageBase, metaclass=Singleton):
return True
@property
def __credential(self) -> Optional[Credential]:
def __credential(self) -> Optional[str]:
"""
获取已保存的115认证参数
获取已保存的115 Cookie
"""
cookie_dict = self.get_config()
if not cookie_dict:
conf = self.get_config()
if not conf:
return None
return Credential.from_dict(cookie_dict.dict().get("config"))
if not conf.config:
return None
# 将dict转换为cookie字符串格式
return "; ".join([f"{k}={v}" for k, v in conf.config.items()])
def __save_credential(self, credential: Credential):
def __save_credential(self, credential: dict):
"""
设置115认证参数
"""
self.set_config(credential.to_dict())
self.set_config(credential)
def __clear_credential(self):
"""
@@ -76,16 +76,14 @@ class U115Pan(StorageBase, metaclass=Singleton):
生成二维码
"""
try:
self.cloud = py115.connect()
self._session = self.cloud.qrcode_login(LoginTarget.Web)
image_bin = self._session.image_data
if not image_bin:
resp = self.client.login_qrcode_token()
self.session_info = resp["data"]
qrcode_content = self.session_info.pop("qrcode")
if not qrcode_content:
logger.warn("115生成二维码失败未获取到二维码数据")
return None
# 转换为base64图片格式
image_base64 = base64.b64encode(image_bin).decode()
return {
"codeContent": f"data:image/jpeg;base64,{image_base64}"
"codeContent": qrcode_content
}, ""
except Exception as e:
logger.warn(f"115生成二维码失败{str(e)}")
@@ -95,42 +93,46 @@ class U115Pan(StorageBase, metaclass=Singleton):
"""
二维码登录确认
"""
if not self._session:
return {}, "请先生成二维码!"
try:
if not self.cloud:
if not self.session_info:
return {}, "请先生成二维码!"
status = self.cloud.qrcode_poll(self._session)
if status == QrcodeStatus.Done:
# 确认完成,保存认证信息
self.__save_credential(self.cloud.export_credentail())
result = {
"status": 1,
"tip": "登录成功!"
}
elif status == QrcodeStatus.Waiting:
result = {
"status": 0,
"tip": "请使用微信或115客户端扫码"
}
elif status == QrcodeStatus.Expired:
result = {
"status": -1,
"tip": "二维码已过期,请重新刷新!"
}
self.cloud = None
elif status == QrcodeStatus.Failed:
result = {
"status": -2,
"tip": "登录失败,请重试!"
}
self.cloud = None
else:
result = {
"status": -3,
"tip": "未知错误,请重试!"
}
self.cloud = None
resp = self.client.login_qrcode_scan_status(self.session_info)
match resp["data"].get("status"):
case 0:
result = {
"status": 0,
"tip": "请使用微信或115客户端扫码"
}
case 1:
result = {
"status": 1,
"tip": "已扫码"
}
case 2:
# 确认完成,保存认证信息
resp = self.client.login_qrcode_scan_result(uid=self.session_info.get("uid"),
app="alipaymini")
if resp:
self.__save_credential(resp["data"]["cookie"])
result = {
"status": 2,
"tip": "登录成功!"
}
case -1:
result = {
"status": -1,
"tip": "二维码已过期,请重新刷新!"
}
case -2:
result = {
"status": -2,
"tip": "登录失败,请重试!"
}
case _:
result = {
"status": -3,
"tip": "未知错误,请重试!"
}
return result, ""
except Exception as e:
return {}, f"115登录确认失败{str(e)}"
@@ -142,7 +144,9 @@ class U115Pan(StorageBase, metaclass=Singleton):
if not self.__init_cloud():
return None
try:
return self.cloud.storage().space()
usage = self.fs.space_summury()
if usage:
return usage['rt_space_info']['all_total']['size'], usage['rt_space_info']['all_remain']['size']
except Exception as e:
logger.error(f"115获取存储空间失败{str(e)}")
return None
@@ -151,9 +155,7 @@ class U115Pan(StorageBase, metaclass=Singleton):
"""
检查存储是否可用
"""
return True if self.list(schemas.FileItem(
fileid="0"
)) else False
return True if self.list(schemas.FileItem()) else False
def list(self, fileitem: schemas.FileItem) -> Optional[List[schemas.FileItem]]:
"""
@@ -164,18 +166,16 @@ class U115Pan(StorageBase, metaclass=Singleton):
try:
if fileitem.type == "file":
return [fileitem]
items = self.cloud.storage().list(dir_id=fileitem.fileid)
items: List[P115Path] = self.fs.list(fileitem.path)
return [schemas.FileItem(
storage=self.schema.value,
fileid=item.file_id,
parent_fileid=item.parent_id,
type="dir" if item.is_dir else "file",
path=f"{fileitem.path}{item.name}" + ("/" if item.is_dir else ""),
type="dir" if item.is_dir() else "file",
path=item.path + ("/" if item.is_dir() else ""),
name=item.name,
size=item.size,
extension=Path(item.name).suffix[1:],
modify_time=item.modified_time.timestamp() if item.modified_time else 0,
pickcode=item.pickcode
basename=item.stem,
size=item.stat().st_size,
extension=item.suffix[1:],
modify_time=item.stat().st_mtime
) for item in items if item]
except Exception as e:
logger.error(f"115浏览文件失败{str(e)}")
@@ -188,17 +188,14 @@ class U115Pan(StorageBase, metaclass=Singleton):
if not self.__init_cloud():
return None
try:
result = self.cloud.storage().make_dir(fileitem.fileid, name)
result = self.fs.makedirs(path=(Path(fileitem.path) / name), exist_ok=True) # noqa
if result:
return schemas.FileItem(
storage=self.schema.value,
fileid=result.file_id,
parent_fileid=result.parent_id,
type="dir",
path=f"{fileitem.path}{name}/",
name=name,
modify_time=result.modified_time.timestamp() if result.modified_time else 0,
pickcode=result.pickcode
modify_time=result.get("utime")
)
except Exception as e:
logger.error(f"115创建目录失败{str(e)}")
@@ -208,64 +205,65 @@ class U115Pan(StorageBase, metaclass=Singleton):
"""
根据文件路程获取目录,不存在则创建
"""
def __find_dir(_fileitem: schemas.FileItem, _name: str) -> Optional[schemas.FileItem]:
"""
查找下级目录中匹配名称的目录
"""
for sub_file in self.list(_fileitem):
if sub_file.type != "dir":
continue
if sub_file.name == _name:
return sub_file
if not self.__init_cloud():
return None
# 逐级查找和创建目录
fileitem = schemas.FileItem(fileid="0")
for part in path.parts:
if part == "/":
continue
dir_file = __find_dir(fileitem, part)
if dir_file:
fileitem = dir_file
else:
dir_file = self.create_folder(fileitem, part)
if not dir_file:
logger.warn(f"115创建目录 {fileitem.path}{part} 失败!")
return None
fileitem = dir_file
return fileitem if fileitem.fileid != "0" else None
try:
result = self.fs.makedirs(path=path, exist_ok=True) # noqa
if result:
return schemas.FileItem(
storage=self.schema.value,
type="dir",
path=str(path) + "/",
name=path.name,
modify_time=result.get("utime")
)
except Exception as e:
logger.error(f"115获取目录失败{str(e)}")
return None
def get_item(self, path: Path) -> Optional[schemas.FileItem]:
"""
获取文件或目录不存在返回None
"""
def __find_item(_fileitem: schemas.FileItem, _name: str) -> Optional[schemas.FileItem]:
"""
查找下级目录中匹配名称的目录或文件
"""
for sub_file in self.list(_fileitem):
if sub_file.name == _name:
return sub_file
if not self.__init_cloud():
return None
# 逐级查找
fileitem = schemas.FileItem(fileid="0")
for part in path.parts:
if part == "/":
continue
item = __find_item(fileitem, part)
if not item:
return None
fileitem = item
return fileitem
try:
item = self.fs.attr(path)
if item:
return schemas.FileItem(
storage=self.schema.value,
type="dir" if item.is_dir() else "file",
path=str(path) + ("/" if item.is_dir() else ""),
name=item.name,
size=item.stat().st_size,
extension=item.file_extension[1:],
modify_time=item.stat().st_mtime
)
except Exception as e:
logger.error(f"115获取文件失败{str(e)}")
return None
def detail(self, fileitem: schemas.FileItem) -> Optional[schemas.FileItem]:
"""
获取文件详情
"""
pass
if not self.__init_cloud():
return None
try:
item = self.fs.attr(fileitem.path)
if item:
return schemas.FileItem(
storage=self.schema.value,
type="dir" if item.is_dir() else "file",
path=fileitem.path + ("/" if item.is_dir() else ""),
name=item.name,
size=item.stat().st_size,
extension=item.file_extension[1:],
modify_time=item.stat().st_mtime
)
except Exception as e:
logger.error(f"115获取文件详情失败{str(e)}")
return None
def delete(self, fileitem: schemas.FileItem) -> bool:
"""
@@ -274,7 +272,7 @@ class U115Pan(StorageBase, metaclass=Singleton):
if not self.__init_cloud():
return False
try:
self.cloud.storage().delete(fileitem.fileid)
self.fs.remove(fileitem.path)
return True
except Exception as e:
logger.error(f"115删除文件失败{str(e)}")
@@ -287,7 +285,7 @@ class U115Pan(StorageBase, metaclass=Singleton):
if not self.__init_cloud():
return False
try:
self.cloud.storage().rename(fileitem.fileid, name)
self.fs.rename(fileitem.path, Path(fileitem.path).with_name(name))
return True
except Exception as e:
logger.error(f"115重命名文件失败{str(e)}")
@@ -299,19 +297,13 @@ class U115Pan(StorageBase, metaclass=Singleton):
"""
if not self.__init_cloud():
return None
local_file = (path or settings.TEMP_PATH) / fileitem.name
try:
ticket = self.cloud.storage().request_download(fileitem.pickcode)
if ticket:
path = (path or settings.TEMP_PATH) / fileitem.name
res = RequestUtils(headers=ticket.headers).get_res(ticket.url)
if res:
with open(path, "wb") as f:
f.write(res.content)
return path
else:
logger.warn(f"{fileitem.path} 未获取到下载链接")
task = self.fs.download(fileitem.path, file=local_file)
if task:
return local_file
except Exception as e:
logger.error(f"115下载失败{str(e)}")
logger.error(f"115下载文件失败:{str(e)}")
return None
def upload(self, fileitem: schemas.FileItem, path: Path, new_name: str = None) -> Optional[schemas.FileItem]:
@@ -321,52 +313,18 @@ class U115Pan(StorageBase, metaclass=Singleton):
if not self.__init_cloud():
return None
try:
ticket = self.cloud.storage().request_upload(dir_id=fileitem.fileid, file_path=str(path))
if ticket is None:
logger.warn(f"115请求上传出错")
return None
elif ticket.is_done:
file_path = Path(fileitem.path) / path.name
logger.warn(f"115上传{file_path} 文件已存在")
return self.get_item(file_path)
else:
auth = oss2.StsAuth(**ticket.oss_token)
bucket = oss2.Bucket(
auth=auth,
endpoint=ticket.oss_endpoint,
bucket_name=ticket.bucket_name,
)
por = bucket.put_object_from_file(
key=ticket.object_key,
filename=str(path),
headers=ticket.headers,
)
result = por.resp.response.json()
with open(path, "rb") as f:
result = self.fs.upload(f, Path(fileitem.path) / (new_name or path.name))
if result:
result_data = result.get('data')
logger.info(f"115上传文件成功{result_data.get('file_name')}")
item = schemas.FileItem(
return schemas.FileItem(
storage=self.schema.value,
fileid=result_data.get('file_id'),
parent_fileid=fileitem.fileid,
type="file",
name=result_data.get('file_name'),
basename=Path(result_data.get('file_name')).stem,
path=f"{fileitem.path}{result_data.get('file_name')}",
size=result_data.get('file_size'),
extension=Path(result_data.get('file_name')).suffix[1:],
pickcode=result_data.get('pickcode')
path=fileitem.path,
name=new_name or path.name,
size=result.get("size"),
extension=result.get("extension"),
modify_time=result.get("utime")
)
if new_name and new_name != item.name:
if self.rename(item, new_name):
item.name = new_name
item.basename = Path(new_name).stem
item.path = f"{fileitem.path}{new_name}"
item.extension = Path(new_name).suffix[1:]
return item
else:
logger.warn(f"115上传文件失败{por.resp.response.text}")
return None
except Exception as e:
logger.error(f"115上传文件失败{str(e)}")
return None
@@ -378,14 +336,24 @@ class U115Pan(StorageBase, metaclass=Singleton):
if not self.__init_cloud():
return False
try:
self.cloud.storage().move(fileitem.fileid, target.fileid)
self.fs.move(fileitem.path, target.path)
return True
except Exception as e:
logger.error(f"115移动文件失败{str(e)}")
return False
def copy(self, fileitem: schemas.FileItem, target_file: Path) -> bool:
pass
"""
复制文件
"""
if not self.__init_cloud():
return False
try:
self.fs.copy(fileitem.path, target_file)
return True
except Exception as e:
logger.error(f"115复制文件失败{str(e)}")
return False
def link(self, fileitem: schemas.FileItem, target_file: Path) -> bool:
pass
@@ -399,9 +367,9 @@ class U115Pan(StorageBase, metaclass=Singleton):
"""
info = self.storage()
if info:
total, used = info
total, free = info
return schemas.StorageUsage(
total=total,
available=total - used
available=free
)
return schemas.StorageUsage()