Files
MoviePilot/app/modules/filemanager/storages/alipan.py
2024-11-23 21:43:53 +08:00

477 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import base64
import json
import logging
import subprocess
import time
from pathlib import Path
from typing import Optional, Tuple, List
from aligo.response import CreateFileResponse
from app import schemas
from app.core.config import settings
from app.log import logger
from app.modules.filemanager.storages import StorageBase
from app.schemas.types import StorageSchema
from app.utils.http import RequestUtils
from aligo import Aligo, BaseFile
from app.utils.singleton import Singleton
from app.utils.string import StringUtils
class AliPan(StorageBase, metaclass=Singleton):
"""
阿里云相关操作
"""
# 存储类型
schema = StorageSchema.Alipan
# 支持的整理方式
transtype = {
"move": "移动",
}
# 是否有aria2c
_has_aria2c: bool = False
# aligo
aligo: Aligo = None
# 生成二维码
qrcode_url = ("https://passport.aliyundrive.com/newlogin/qrcode/generate.do?"
"appName=aliyun_drive&fromSite=52&appEntrance=web&isMobile=false"
"&lang=zh_CN&returnUrl=&bizParams=&_bx-v=2.0.31")
# 二维码登录确认
check_url = "https://passport.aliyundrive.com/newlogin/qrcode/query.do?appName=aliyun_drive&fromSite=52&_bx-v=2.0.31"
def __init__(self):
super().__init__()
try:
subprocess.run(['aria2c', '-h'], capture_output=True)
self._has_aria2c = True
logger.debug('发现 aria2c, 将使用 aria2c 下载文件')
except FileNotFoundError:
logger.debug('未发现 aria2c')
self._has_aria2c = False
self.init_storage()
def init_storage(self):
"""
初始化 aligo
"""
def show_qrcode(qr_link: str):
"""
显示二维码
"""
logger.info(f"请用阿里云盘 App 扫码登录:{qr_link}")
refresh_token = self.__auth_params.get("refreshToken")
if refresh_token:
try:
self.aligo = Aligo(refresh_token=refresh_token, show=show_qrcode, use_aria2=self._has_aria2c,
name="MoviePilot V2", level=logging.ERROR, re_login=False)
except Exception as err:
logger.error(f"初始化阿里云盘失败:{str(err)}")
self.__clear_params()
@property
def __auth_params(self):
"""
获取阿里云盘认证参数并初始化参数格式
"""
conf = self.get_config()
return conf.config if conf else {}
def __update_params(self, params: dict):
"""
设置阿里云盘认证参数
"""
current_params = self.__auth_params
current_params.update(params)
self.set_config(current_params)
def __clear_params(self):
"""
清除阿里云盘认证参数
"""
self.set_config({})
def generate_qrcode(self) -> Optional[Tuple[dict, str]]:
"""
生成二维码
"""
res = RequestUtils(timeout=10).get_res(self.qrcode_url)
if res:
data = res.json().get("content", {}).get("data")
return {
"codeContent": data.get("codeContent"),
"ck": data.get("ck"),
"t": data.get("t")
}, ""
elif res is not None:
return {}, f"请求阿里云盘二维码失败:{res.status_code} - {res.reason}"
return {}, f"请求阿里云盘二维码失败:无法连接!"
def check_login(self, ck: str, t: str) -> Optional[Tuple[dict, str]]:
"""
二维码登录确认
"""
params = {
"t": t,
"ck": ck,
"appName": "aliyun_drive",
"appEntrance": "web",
"isMobile": "false",
"lang": "zh_CN",
"returnUrl": "",
"fromSite": "52",
"bizParams": "",
"navlanguage": "zh-CN",
"navPlatform": "MacIntel",
}
body = "&".join([f"{key}={value}" for key, value in params.items()])
status = {
"NEW": "请用阿里云盘 App 扫码",
"SCANED": "请在手机上确认",
"EXPIRED": "二维码已过期",
"CANCELED": "已取消",
"CONFIRMED": "已确认",
}
headers = {
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
}
res = RequestUtils(headers=headers, timeout=5).post_res(self.check_url, data=body)
if res:
data = res.json().get("content", {}).get("data") or {}
qrCodeStatus = data.get("qrCodeStatus")
data["tip"] = status.get(qrCodeStatus) or "未知"
if data.get("bizExt"):
try:
bizExt = json.loads(base64.b64decode(data["bizExt"]).decode('GBK'))
pds_login_result = bizExt.get("pds_login_result")
if pds_login_result:
data.pop('bizExt')
data.update({
'userId': pds_login_result.get('userId'),
'expiresIn': pds_login_result.get('expiresIn'),
'nickName': pds_login_result.get('nickName'),
'avatar': pds_login_result.get('avatar'),
'tokenType': pds_login_result.get('tokenType'),
"refreshToken": pds_login_result.get('refreshToken'),
"accessToken": pds_login_result.get('accessToken'),
"defaultDriveId": pds_login_result.get('defaultDriveId'),
"updateTime": time.time(),
})
self.__update_params(data)
self.__update_drives()
self.init_storage()
except Exception as e:
return {}, f"bizExt 解码失败:{str(e)}"
return data, ""
elif res is not None:
return {}, f"阿里云盘登录确认失败:{res.status_code} - {res.reason}"
return {}, "阿里云盘登录确认失败:无法连接!"
def check(self) -> bool:
"""
检查存储是否可用
"""
if not self.aligo:
return False
return True if self.aligo.get_user() else False
def user_info(self) -> dict:
"""
获取用户信息drive_id等
"""
if not self.aligo:
return {}
return self.aligo.get_user()
def __update_drives(self):
"""
更新用户存储根目录
"""
if not self.aligo:
return
drivers = self.aligo.list_my_drives()
for driver in drivers:
if driver.category == "resource":
self.__update_params({"resourceDriveId": driver.drive_id})
elif driver.category == "backup":
self.__update_params({"backDriveId": driver.drive_id})
def __get_fileitem(self, fileinfo: BaseFile, parent: str = "/") -> schemas.FileItem:
"""
获取文件信息
"""
if not fileinfo:
return schemas.FileItem()
if fileinfo.type == "folder":
return schemas.FileItem(
storage=self.schema.value,
fileid=fileinfo.file_id,
parent_fileid=fileinfo.parent_file_id,
type="dir",
path=f"{parent}{fileinfo.name}" + "/",
name=fileinfo.name,
basename=fileinfo.name,
size=fileinfo.size,
modify_time=StringUtils.str_to_timestamp(fileinfo.updated_at),
drive_id=fileinfo.drive_id,
)
else:
return schemas.FileItem(
storage=self.schema.value,
fileid=fileinfo.file_id,
parent_fileid=fileinfo.parent_file_id,
type="file",
path=f"{parent}{fileinfo.name}",
name=fileinfo.name,
basename=Path(fileinfo.name).stem,
size=fileinfo.size,
extension=fileinfo.file_extension,
modify_time=StringUtils.str_to_timestamp(fileinfo.updated_at),
thumbnail=fileinfo.thumbnail,
drive_id=fileinfo.drive_id,
)
def list(self, fileitem: schemas.FileItem = None) -> List[schemas.FileItem]:
"""
浏览文件
limit 返回文件数量,默认 50最大 100
order_by created_at/updated_at/name/size
parent_file_id 根目录为root
type all | file | folder
"""
if not self.aligo:
return []
# 根目录处理
if not fileitem or not fileitem.drive_id:
return [
schemas.FileItem(
storage=self.schema.value,
fileid="root",
drive_id=self.__auth_params.get("resourceDriveId"),
parent_fileid="root",
type="dir",
path="/资源库/",
name="资源库",
basename="资源库"
),
schemas.FileItem(
storage=self.schema.value,
fileid="root",
drive_id=self.__auth_params.get("backDriveId"),
parent_fileid="root",
type="dir",
path="/备份盘/",
name="备份盘",
basename="备份盘"
)
]
elif fileitem.type == "file":
# 文件处理
file = self.detail(fileitem)
if file:
return [file]
else:
items = self.aligo.get_file_list(parent_file_id=fileitem.fileid, drive_id=fileitem.drive_id)
if items:
return [self.__get_fileitem(item, parent=fileitem.path) for item in items]
return []
def create_folder(self, fileitem: schemas.FileItem, name: str) -> Optional[schemas.FileItem]:
"""
创建目录
:param fileitem: 父目录
:param name: 目录名
"""
if not self.aligo:
return None
item = self.aligo.create_folder(name=name, parent_file_id=fileitem.fileid, drive_id=fileitem.drive_id)
if item:
if isinstance(item, CreateFileResponse):
item = self.aligo.get_file(file_id=item.file_id, drive_id=item.drive_id)
return self.__get_fileitem(item)
return None
def get_folder(self, path: Path) -> Optional[schemas.FileItem]:
"""
根据文件路程获取目录,不存在则创建
"""
def __find_dir(_fileitem: schemas.FileItem, _name: str) -> Optional[schemas.FileItem]:
"""
查找下级目录中匹配名称的目录
"""
for sub_folder in self.list(_fileitem):
if sub_folder.type != "dir":
continue
if sub_folder.name == _name:
return sub_folder
return None
# 是否已存在
folder = self.get_item(path)
if folder:
return folder
# 逐级查找和创建目录
fileitem = schemas.FileItem(path="/")
for part in path.parts:
if part == "/":
continue
dir_file = __find_dir(fileitem, part)
if dir_file:
fileitem = dir_file
else:
dir_file = self.create_folder(fileitem, part)
if not dir_file:
return None
fileitem = dir_file
return fileitem
def get_item(self, path: Path) -> Optional[schemas.FileItem]:
"""
获取文件或目录不存在返回None
"""
if not self.aligo:
return None
item = self.aligo.get_file_by_path(path=str(path))
if item:
return self.__get_fileitem(item)
return None
def delete(self, fileitem: schemas.FileItem) -> bool:
"""
删除文件
"""
if not self.aligo:
return False
if self.aligo.move_file_to_trash(file_id=fileitem.fileid, drive_id=fileitem.drive_id):
return True
return False
def detail(self, fileitem: schemas.FileItem) -> Optional[schemas.FileItem]:
"""
获取文件详情
"""
if not self.aligo:
return None
item = self.aligo.get_file(file_id=fileitem.fileid, drive_id=fileitem.drive_id)
if item:
return self.__get_fileitem(item)
return None
def rename(self, fileitem: schemas.FileItem, name: str) -> bool:
"""
重命名文件
"""
if not self.aligo:
return False
if self.aligo.rename_file(file_id=fileitem.fileid, name=name, drive_id=fileitem.drive_id):
return True
return False
def download(self, fileitem: schemas.FileItem, path: Path = None) -> Optional[Path]:
"""
下载文件,保存到本地
"""
if not self.aligo:
return None
local_path = self.aligo.download_file(file_id=fileitem.fileid, drive_id=fileitem.drive_id, # noqa
local_folder=str(path or settings.TEMP_PATH))
if local_path:
return Path(local_path)
return None
def upload(self, fileitem: schemas.FileItem, path: Path, new_name: str = None) -> Optional[schemas.FileItem]:
"""
上传文件,并标记完成
:param fileitem: 上传目录项
:param path: 目标目录
:param new_name: 新文件名
"""
if not self.aligo:
return None
# 上传文件
result = self.aligo.upload_file(file_path=str(path), parent_file_id=fileitem.fileid,
drive_id=fileitem.drive_id, name=new_name or path.name,
check_name_mode="refuse")
if result:
item = self.aligo.get_file(file_id=result.file_id, drive_id=result.drive_id)
if item:
return self.__get_fileitem(item)
return None
def move(self, fileitem: schemas.FileItem, path: Path, new_name: str) -> bool:
"""
移动文件
:param fileitem: 文件项
:param path: 目标目录
:param new_name: 新文件名
"""
if not self.aligo:
return False
target = self.get_folder(path)
if not target:
return False
if self.aligo.move_file(file_id=fileitem.fileid, drive_id=fileitem.drive_id,
to_parent_file_id=target.fileid, to_drive_id=target.drive_id,
new_name=new_name):
return True
return False
def copy(self, fileitem: schemas.FileItem, path: Path, new_name: str) -> bool:
"""
复制文件
:param fileitem: 文件项
:param path: 目标目录
:param new_name: 新文件名
"""
if not self.aligo:
return False
target = self.get_folder(path)
if not target:
return False
if self.aligo.copy_file(file_id=fileitem.fileid, drive_id=fileitem.drive_id,
to_parent_file_id=target.fileid, to_drive_id=target.drive_id,
new_name=new_name):
return True
return False
def link(self, fileitem: schemas.FileItem, target_file: Path) -> bool:
"""
硬链接文件
"""
pass
def softlink(self, fileitem: schemas.FileItem, target_file: Path) -> bool:
"""
软链接文件
"""
pass
def usage(self) -> Optional[schemas.StorageUsage]:
"""
存储使用情况
"""
if not self.aligo:
return None
user_capacity = self.aligo.get_user_capacity_info()
if user_capacity:
drive_capacity = user_capacity.drive_capacity_details
if drive_capacity:
return schemas.StorageUsage(
total=drive_capacity.drive_total_size,
available=drive_capacity.drive_total_size - drive_capacity.drive_used_size
)
return None