Files
MoviePilot/app/modules/filemanager/storage/rclone.py
2024-07-03 08:49:59 +08:00

299 lines
9.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import copy
import json
import subprocess
from pathlib import Path
from typing import Optional, List
from app import schemas
from app.log import logger
from app.modules.filemanager.storage import StorageBase
from app.schemas.types import StorageSchema
from app.utils.string import StringUtils
from app.utils.system import SystemUtils
class Rclone(StorageBase):
"""
TODO rclone相关操作
"""
# 存储类型
schema = StorageSchema.Rclone
# 支持的整理方式
transtype = {
"move": "移动",
"copy": "复制"
}
@staticmethod
def __get_hidden_shell():
if SystemUtils.is_windows():
st = subprocess.STARTUPINFO()
st.dwFlags = subprocess.STARTF_USESHOWWINDOW
st.wShowWindow = subprocess.SW_HIDE
return st
else:
return None
def __get_fileitem(self, path: Path):
"""
获取文件项
"""
return schemas.FileItem(
storage=self.schema.value,
type="file",
path=str(path).replace("\\", "/"),
name=path.name,
basename=path.stem,
extension=path.suffix[1:],
size=path.stat().st_size,
modify_time=path.stat().st_mtime,
)
def __get_rcloneitem(self, item: dict):
"""
获取rclone文件项
"""
return schemas.FileItem(
storage=self.schema.value,
type="dir" if item.get("IsDir") else "file",
path=item.get("Path"),
name=item.get("Name"),
basename=Path(item.get("Name")).stem,
extension=Path(item.get("Name")).suffix[1:],
size=item.get("Size"),
modify_time=StringUtils.str_to_timestamp(item.get("ModTime"))
)
def check(self) -> bool:
"""
检查存储是否可用
"""
try:
retcode = subprocess.run(
['rclone', 'lsf', 'MP:'],
startupinfo=self.__get_hidden_shell()
).returncode
if retcode == 0:
return True
except Exception as err:
logger.error(f"rclone存储检查失败{err}")
return False
def list(self, fileitm: schemas.FileItem) -> Optional[List[schemas.FileItem]]:
"""
浏览文件
"""
try:
ret = subprocess.run(
[
'rclone', 'lsjson',
f'MP:{fileitm.path}'
],
capture_output=True,
startupinfo=self.__get_hidden_shell()
)
if ret.returncode == 0:
items = json.loads(ret.stdout)
return [self.__get_rcloneitem(item) for item in items]
except Exception as err:
logger.error(f"rclone浏览文件失败{err}")
return None
def create_folder(self, fileitm: schemas.FileItem, name: str) -> Optional[schemas.FileItem]:
"""
创建目录
"""
try:
retcode = subprocess.run(
[
'rclone', 'mkdir',
f'MP:{fileitm.path}/{name}'
],
startupinfo=self.__get_hidden_shell()
).returncode
if retcode == 0:
ret_fileitem = copy.deepcopy(fileitm)
ret_fileitem.path = f"{fileitm.path}/{name}/"
ret_fileitem.name = name
return ret_fileitem
except Exception as err:
logger.error(f"rclone创建目录失败{err}")
return None
def get_folder(self, path: Path) -> Optional[schemas.FileItem]:
"""
根据文件路程获取目录,不存在则创建
"""
def __find_dir(_fileitem: schemas.FileItem, _name: str) -> Optional[schemas.FileItem]:
"""
查找下级目录中匹配名称的目录
"""
for sub_file in self.list(_fileitem):
if sub_file.type != "dir":
continue
if sub_file.name == _name:
return sub_file
return None
# 逐级查找和创建目录
fileitem = schemas.FileItem(path="/")
for part in path.parts:
if part == "/":
continue
dir_file = __find_dir(fileitem, part)
if dir_file:
fileitem = dir_file
else:
dir_file = self.create_folder(dir_file, part)
if not dir_file:
logger.warn(f"rclone创建目录 {fileitem.path}{part} 失败!")
return None
fileitem = dir_file
return fileitem
def get_item(self, path: Path) -> Optional[schemas.FileItem]:
"""
获取文件或目录不存在返回None
"""
try:
ret = subprocess.run(
[
'rclone', 'lsjson',
f'MP:{path}'
],
capture_output=True,
startupinfo=self.__get_hidden_shell()
)
if ret.returncode == 0:
items = json.loads(ret.stdout)
return self.__get_rcloneitem(items[0])
except Exception as err:
logger.error(f"rclone获取文件失败{err}")
return None
def delete(self, fileitm: schemas.FileItem) -> bool:
"""
删除文件
"""
try:
retcode = subprocess.run(
[
'rclone', 'deletefile',
f'MP:{fileitm.path}'
],
startupinfo=self.__get_hidden_shell()
).returncode
if retcode == 0:
return True
except Exception as err:
logger.error(f"rclone删除文件失败{err}")
return False
def rename(self, fileitm: schemas.FileItem, name: str) -> bool:
"""
重命名文件
"""
try:
retcode = subprocess.run(
[
'rclone', 'moveto',
f'MP:{fileitm.path}',
f'MP:{Path(fileitm.path).parent}/{name}'
],
startupinfo=self.__get_hidden_shell()
).returncode
if retcode == 0:
return True
except Exception as err:
logger.error(f"rclone重命名文件失败{err}")
return False
def download(self, fileitm: schemas.FileItem, path: Path) -> bool:
"""
下载文件
"""
try:
retcode = subprocess.run(
[
'rclone', 'copyto',
f'MP:{fileitm.path}',
f'{path}'
],
startupinfo=self.__get_hidden_shell()
).returncode
if retcode == 0:
return True
except Exception as err:
logger.error(f"rclone复制文件失败{err}")
return False
def upload(self, fileitm: schemas.FileItem, path: Path) -> Optional[schemas.FileItem]:
"""
上传文件
"""
try:
retcode = subprocess.run(
[
'rclone', 'copyto',
fileitm.path,
f'MP:{path}'
],
startupinfo=self.__get_hidden_shell()
).returncode
if retcode == 0:
return self.__get_fileitem(path)
except Exception as err:
logger.error(f"rclone上传文件失败{err}")
return None
def detail(self, fileitm: schemas.FileItem) -> Optional[schemas.FileItem]:
"""
获取文件详情
"""
try:
ret = subprocess.run(
[
'rclone', 'lsjson',
f'MP:{fileitm.path}'
],
capture_output=True,
startupinfo=self.__get_hidden_shell()
)
if ret.returncode == 0:
items = json.loads(ret.stdout)
return self.__get_rcloneitem(items[0])
except Exception as err:
logger.error(f"rclone获取文件详情失败{err}")
return None
def move(self, fileitm: schemas.FileItem, target: Path) -> bool:
"""
移动文件target_file格式rclone:path
"""
try:
retcode = subprocess.run(
[
'rclone', 'moveto',
f'MP:{fileitm.path}',
f'MP:{target}'
],
startupinfo=self.__get_hidden_shell()
).returncode
if retcode == 0:
return True
except Exception as err:
logger.error(f"rclone移动文件失败{err}")
return False
def copy(self, fileitm: schemas.FileItem, target_file: Path) -> bool:
pass
def link(self, fileitm: schemas.FileItem, target_file: Path) -> bool:
pass
def softlink(self, fileitm: schemas.FileItem, target_file: Path) -> bool:
pass