Merge pull request #5505 from DDSRem-Dev/rtorrent

This commit is contained in:
jxxghp
2026-02-22 16:10:39 +08:00
committed by GitHub
6 changed files with 1079 additions and 5 deletions

3
.gitignore vendored
View File

@@ -28,3 +28,6 @@ venv
# Pylint
pylint-report.json
.pylint.d/
# AI
.claude/

View File

@@ -25,7 +25,7 @@ class DownloaderHelper(ServiceBaseHelper[DownloaderConf]):
) -> bool:
"""
通用的下载器类型判断方法
:param service_type: 下载器的类型名称(如 'qbittorrent', 'transmission'
:param service_type: 下载器的类型名称(如 'qbittorrent', 'transmission', 'rtorrent'
:param service: 要判断的服务信息
:param name: 服务的名称
:return: 如果服务类型或实例为指定类型,返回 True否则返回 False

View File

@@ -0,0 +1,513 @@
from pathlib import Path
from typing import Set, Tuple, Optional, Union, List, Dict
from torrentool.torrent import Torrent
from app import schemas
from app.core.cache import FileCache
from app.core.config import settings
from app.core.metainfo import MetaInfo
from app.log import logger
from app.modules import _ModuleBase, _DownloaderBase
from app.modules.rtorrent.rtorrent import Rtorrent
from app.schemas import TransferTorrent, DownloadingTorrent
from app.schemas.types import TorrentStatus, ModuleType, DownloaderType
from app.utils.string import StringUtils
class RtorrentModule(_ModuleBase, _DownloaderBase[Rtorrent]):
def init_module(self) -> None:
"""
初始化模块
"""
super().init_service(
service_name=Rtorrent.__name__.lower(), service_type=Rtorrent
)
@staticmethod
def get_name() -> str:
return "Rtorrent"
@staticmethod
def get_type() -> ModuleType:
"""
获取模块类型
"""
return ModuleType.Downloader
@staticmethod
def get_subtype() -> DownloaderType:
"""
获取模块子类型
"""
return DownloaderType.Rtorrent
@staticmethod
def get_priority() -> int:
"""
获取模块优先级,数字越小优先级越高,只有同一接口下优先级才生效
"""
return 3
def stop(self):
pass
def test(self) -> Optional[Tuple[bool, str]]:
"""
测试模块连接性
"""
if not self.get_instances():
return None
for name, server in self.get_instances().items():
if server.is_inactive():
server.reconnect()
if not server.transfer_info():
return False, f"无法连接rTorrent下载器{name}"
return True, ""
def init_setting(self) -> Tuple[str, Union[str, bool]]:
pass
def scheduler_job(self) -> None:
"""
定时任务每10分钟调用一次
"""
for name, server in self.get_instances().items():
if server.is_inactive():
logger.info(f"rTorrent下载器 {name} 连接断开,尝试重连 ...")
server.reconnect()
def download(
self,
content: Union[Path, str, bytes],
download_dir: Path,
cookie: str,
episodes: Set[int] = None,
category: Optional[str] = None,
label: Optional[str] = None,
downloader: Optional[str] = None,
) -> Optional[Tuple[Optional[str], Optional[str], Optional[str], str]]:
"""
根据种子文件,选择并添加下载任务
:param content: 种子文件地址或者磁力链接或种子内容
:param download_dir: 下载目录
:param cookie: cookie
:param episodes: 需要下载的集数
:param category: 分类rTorrent中未使用
:param label: 标签
:param downloader: 下载器
:return: 下载器名称、种子Hash、种子文件布局、错误原因
"""
def __get_torrent_info() -> Tuple[Optional[Torrent], Optional[bytes]]:
"""
获取种子名称
"""
torrent_info, torrent_content = None, None
try:
if isinstance(content, Path):
if content.exists():
torrent_content = content.read_bytes()
else:
torrent_content = FileCache().get(
content.as_posix(), region="torrents"
)
else:
torrent_content = content
if torrent_content:
if StringUtils.is_magnet_link(torrent_content):
return None, torrent_content
else:
torrent_info = Torrent.from_string(torrent_content)
return torrent_info, torrent_content
except Exception as e:
logger.error(f"获取种子名称失败:{e}")
return None, None
if not content:
return None, None, None, "下载内容为空"
# 读取种子的名称
torrent_from_file, content = __get_torrent_info()
# 检查是否为磁力链接
is_magnet = (
isinstance(content, str)
and content.startswith("magnet:")
or isinstance(content, bytes)
and content.startswith(b"magnet:")
)
if not torrent_from_file and not is_magnet:
return None, None, None, f"添加种子任务失败:无法读取种子文件"
# 获取下载器
server: Rtorrent = self.get_instance(downloader)
if not server:
return None
# 生成随机Tag
tag = StringUtils.generate_random_str(10)
if label:
tags = label.split(",") + [tag]
elif settings.TORRENT_TAG:
tags = [tag, settings.TORRENT_TAG]
else:
tags = [tag]
# 如果要选择文件则先暂停
is_paused = True if episodes else False
# 添加任务
state = server.add_torrent(
content=content,
download_dir=self.normalize_path(download_dir, downloader),
is_paused=is_paused,
tags=tags,
cookie=cookie,
)
# rTorrent 始终使用原始种子布局
torrent_layout = "Original"
if not state:
# 查询所有下载器的种子
torrents, error = server.get_torrents()
if error:
return None, None, None, "无法连接rTorrent下载器"
if torrents:
try:
for torrent in torrents:
# 名称与大小相等则认为是同一个种子
if torrent.get("name") == getattr(
torrent_from_file, "name", ""
) and torrent.get("total_size") == getattr(
torrent_from_file, "total_size", 0
):
torrent_hash = torrent.get("hash")
torrent_tags = [
str(t).strip()
for t in torrent.get("tags", "").split(",")
if t.strip()
]
logger.warn(
f"下载器中已存在该种子任务:{torrent_hash} - {torrent.get('name')}"
)
# 给种子打上标签
if "已整理" in torrent_tags:
server.remove_torrents_tag(
ids=torrent_hash, tag=["已整理"]
)
if (
settings.TORRENT_TAG
and settings.TORRENT_TAG not in torrent_tags
):
logger.info(
f"给种子 {torrent_hash} 打上标签:{settings.TORRENT_TAG}"
)
server.set_torrents_tag(
ids=torrent_hash, tags=[settings.TORRENT_TAG]
)
return (
downloader or self.get_default_config_name(),
torrent_hash,
torrent_layout,
f"下载任务已存在",
)
finally:
torrents.clear()
del torrents
return None, None, None, f"添加种子任务失败:{content}"
else:
# 获取种子Hash
torrent_hash = server.get_torrent_id_by_tag(tags=tag)
if not torrent_hash:
return (
None,
None,
None,
f"下载任务添加成功但获取rTorrent任务信息失败{content}",
)
else:
if is_paused:
# 种子文件
torrent_files = server.get_files(torrent_hash)
if not torrent_files:
return (
downloader or self.get_default_config_name(),
torrent_hash,
torrent_layout,
"获取种子文件失败,下载任务可能在暂停状态",
)
# 不需要的文件ID
file_ids = []
# 需要的集清单
sucess_epidised = set()
try:
for torrent_file in torrent_files:
file_id = torrent_file.get("id")
file_name = torrent_file.get("name")
meta_info = MetaInfo(file_name)
if not meta_info.episode_list or not set(
meta_info.episode_list
).issubset(episodes):
file_ids.append(file_id)
else:
sucess_epidised.update(meta_info.episode_list)
finally:
torrent_files.clear()
del torrent_files
sucess_epidised = list(sucess_epidised)
if sucess_epidised and file_ids:
# 设置不需要的文件优先级为0不下载
server.set_files(
torrent_hash=torrent_hash, file_ids=file_ids, priority=0
)
# 开始任务
server.start_torrents(torrent_hash)
return (
downloader or self.get_default_config_name(),
torrent_hash,
torrent_layout,
f"添加下载成功,已选择集数:{sucess_epidised}",
)
else:
return (
downloader or self.get_default_config_name(),
torrent_hash,
torrent_layout,
"添加下载成功",
)
def list_torrents(
self,
status: TorrentStatus = None,
hashs: Union[list, str] = None,
downloader: Optional[str] = None,
) -> Optional[List[Union[TransferTorrent, DownloadingTorrent]]]:
"""
获取下载器种子列表
:param status: 种子状态
:param hashs: 种子Hash
:param downloader: 下载器
:return: 下载器中符合状态的种子列表
"""
# 获取下载器
if downloader:
server: Rtorrent = self.get_instance(downloader)
if not server:
return None
servers = {downloader: server}
else:
servers: Dict[str, Rtorrent] = self.get_instances()
ret_torrents = []
if hashs:
# 按Hash获取
for name, server in servers.items():
torrents, _ = (
server.get_torrents(ids=hashs, tags=settings.TORRENT_TAG) or []
)
try:
for torrent in torrents:
content_path = torrent.get("content_path")
if content_path:
torrent_path = Path(content_path)
else:
torrent_path = Path(torrent.get("save_path")) / torrent.get(
"name"
)
ret_torrents.append(
TransferTorrent(
downloader=name,
title=torrent.get("name"),
path=torrent_path,
hash=torrent.get("hash"),
size=torrent.get("total_size"),
tags=torrent.get("tags"),
progress=torrent.get("progress", 0),
state="paused"
if torrent.get("state") == 0
else "downloading",
)
)
finally:
torrents.clear()
del torrents
elif status == TorrentStatus.TRANSFER:
# 获取已完成且未整理的
for name, server in servers.items():
torrents = (
server.get_completed_torrents(tags=settings.TORRENT_TAG) or []
)
try:
for torrent in torrents:
tags = torrent.get("tags") or ""
tag_list = [t.strip() for t in tags.split(",") if t.strip()]
if "已整理" in tag_list:
continue
content_path = torrent.get("content_path")
if content_path:
torrent_path = Path(content_path)
else:
torrent_path = Path(torrent.get("save_path")) / torrent.get(
"name"
)
ret_torrents.append(
TransferTorrent(
downloader=name,
title=torrent.get("name"),
path=torrent_path,
hash=torrent.get("hash"),
tags=torrent.get("tags"),
)
)
finally:
torrents.clear()
del torrents
elif status == TorrentStatus.DOWNLOADING:
# 获取正在下载的任务
for name, server in servers.items():
torrents = (
server.get_downloading_torrents(tags=settings.TORRENT_TAG) or []
)
try:
for torrent in torrents:
meta = MetaInfo(torrent.get("name"))
dlspeed = torrent.get("dlspeed", 0)
upspeed = torrent.get("upspeed", 0)
total_size = torrent.get("total_size", 0)
completed = torrent.get("completed", 0)
ret_torrents.append(
DownloadingTorrent(
downloader=name,
hash=torrent.get("hash"),
title=torrent.get("name"),
name=meta.name,
year=meta.year,
season_episode=meta.season_episode,
progress=torrent.get("progress", 0),
size=total_size,
state="paused"
if torrent.get("state") == 0
else "downloading",
dlspeed=StringUtils.str_filesize(dlspeed),
upspeed=StringUtils.str_filesize(upspeed),
left_time=StringUtils.str_secends(
(total_size - completed) / dlspeed
)
if dlspeed > 0
else "",
)
)
finally:
torrents.clear()
del torrents
else:
return None
return ret_torrents # noqa
def transfer_completed(
self, hashs: Union[str, list], downloader: Optional[str] = None
) -> None:
"""
转移完成后的处理
:param hashs: 种子Hash
:param downloader: 下载器
"""
server: Rtorrent = self.get_instance(downloader)
if not server:
return None
# 获取原标签
org_tags = server.get_torrent_tags(ids=hashs)
# 种子打上已整理标签
if org_tags:
tags = org_tags + ["已整理"]
else:
tags = ["已整理"]
# 直接设置完整标签(覆盖)
server.set_torrents_tag(ids=hashs, tags=tags, overwrite=True)
return None
def remove_torrents(
self,
hashs: Union[str, list],
delete_file: Optional[bool] = True,
downloader: Optional[str] = None,
) -> Optional[bool]:
"""
删除下载器种子
:param hashs: 种子Hash
:param delete_file: 是否删除文件
:param downloader: 下载器
:return: bool
"""
server: Rtorrent = self.get_instance(downloader)
if not server:
return None
return server.delete_torrents(delete_file=delete_file, ids=hashs)
def start_torrents(
self, hashs: Union[list, str], downloader: Optional[str] = None
) -> Optional[bool]:
"""
开始下载
:param hashs: 种子Hash
:param downloader: 下载器
:return: bool
"""
server: Rtorrent = self.get_instance(downloader)
if not server:
return None
return server.start_torrents(ids=hashs)
def stop_torrents(
self, hashs: Union[list, str], downloader: Optional[str] = None
) -> Optional[bool]:
"""
停止下载
:param hashs: 种子Hash
:param downloader: 下载器
:return: bool
"""
server: Rtorrent = self.get_instance(downloader)
if not server:
return None
return server.stop_torrents(ids=hashs)
def torrent_files(
self, tid: str, downloader: Optional[str] = None
) -> Optional[List[Dict]]:
"""
获取种子文件列表
"""
server: Rtorrent = self.get_instance(downloader)
if not server:
return None
return server.get_files(tid=tid)
def downloader_info(
self, downloader: Optional[str] = None
) -> Optional[List[schemas.DownloaderInfo]]:
"""
下载器信息
"""
if downloader:
server: Rtorrent = self.get_instance(downloader)
if not server:
return None
servers = [server]
else:
servers = self.get_instances().values()
ret_info = []
for server in servers:
info = server.transfer_info()
if not info:
continue
ret_info.append(
schemas.DownloaderInfo(
download_speed=info.get("dl_info_speed"),
upload_speed=info.get("up_info_speed"),
download_size=info.get("dl_info_data"),
upload_size=info.get("up_info_data"),
)
)
return ret_info

View File

@@ -0,0 +1,549 @@
import socket
import traceback
import xmlrpc.client
from pathlib import Path
from typing import Optional, Union, Tuple, List, Dict
from urllib.parse import urlparse
from app.log import logger
class SCGITransport(xmlrpc.client.Transport):
"""
通过SCGI协议与rTorrent通信的Transport
"""
def single_request(self, host, handler, request_body, verbose=False):
# 建立socket连接
parsed = urlparse(f"scgi://{host}")
sock = socket.create_connection(
(parsed.hostname, parsed.port or 5000), timeout=60
)
try:
# 构造SCGI请求头
headers = (
f"CONTENT_LENGTH\x00{len(request_body)}\x00"
f"SCGI\x001\x00"
f"REQUEST_METHOD\x00POST\x00"
f"REQUEST_URI\x00/RPC2\x00"
)
# netstring格式: "len:headers,"
netstring = f"{len(headers)}:{headers},".encode()
# 发送请求
sock.sendall(netstring + request_body)
# 读取响应
response = b""
while True:
chunk = sock.recv(4096)
if not chunk:
break
response += chunk
finally:
sock.close()
# 跳过HTTP响应头
header_end = response.find(b"\r\n\r\n")
if header_end != -1:
response = response[header_end + 4 :]
# 解析XML-RPC响应
return self.parse_response(self._build_response(response))
@staticmethod
def _build_response(data: bytes):
"""
构造类文件对象用于parse_response
"""
import io
import http.client
class _FakeSocket(io.BytesIO):
def makefile(self, *args, **kwargs):
return self
raw = b"HTTP/1.0 200 OK\r\nContent-Type: text/xml\r\n\r\n" + data
response = http.client.HTTPResponse(_FakeSocket(raw)) # noqa
response.begin()
return response
class Rtorrent:
"""
rTorrent下载器
"""
def __init__(
self,
host: Optional[str] = None,
port: Optional[int] = None,
username: Optional[str] = None,
password: Optional[str] = None,
**kwargs,
):
self._proxy = None
if host and port:
self._host = f"{host}:{port}"
elif host:
self._host = host
else:
logger.error("rTorrent配置不完整")
return
self._username = username
self._password = password
self._proxy = self.__login_rtorrent()
def __login_rtorrent(self) -> Optional[xmlrpc.client.ServerProxy]:
"""
连接rTorrent
"""
if not self._host:
return None
try:
url = self._host
if url.startswith("scgi://"):
# SCGI直连模式
parsed = urlparse(url)
logger.info(f"正在通过SCGI连接 rTorrent{url}")
proxy = xmlrpc.client.ServerProxy(url, transport=SCGITransport())
else:
# HTTP模式 (通过nginx/ruTorrent代理)
if not url.startswith("http"):
url = f"http://{url}"
# 注入认证信息到URL
if self._username and self._password:
parsed = urlparse(url)
url = f"{parsed.scheme}://{self._username}:{self._password}@{parsed.hostname}"
if parsed.port:
url += f":{parsed.port}"
url += parsed.path or "/RPC2"
logger.info(
f"正在通过HTTP连接 rTorrent{url.split('@')[-1] if '@' in url else url}"
)
proxy = xmlrpc.client.ServerProxy(url)
# 测试连接
proxy.system.client_version()
return proxy
except Exception as err:
stack_trace = "".join(
traceback.format_exception(None, err, err.__traceback__)
)[:2000]
logger.error(f"rTorrent 连接出错:{str(err)}\n{stack_trace}")
return None
def is_inactive(self) -> bool:
"""
判断是否需要重连
"""
if not self._host:
return False
return True if not self._proxy else False
def reconnect(self):
"""
重连
"""
self._proxy = self.__login_rtorrent()
def get_torrents(
self,
ids: Optional[Union[str, list]] = None,
status: Optional[str] = None,
tags: Optional[Union[str, list]] = None,
) -> Tuple[List[Dict], bool]:
"""
获取种子列表
:return: 种子列表, 是否发生异常
"""
if not self._proxy:
return [], True
try:
# 使用d.multicall2获取种子列表
fields = [
"d.hash=",
"d.name=",
"d.size_bytes=",
"d.completed_bytes=",
"d.down.rate=",
"d.up.rate=",
"d.state=",
"d.complete=",
"d.directory=",
"d.custom1=",
"d.is_active=",
"d.is_open=",
"d.ratio=",
"d.base_path=",
]
# 获取所有种子
results = self._proxy.d.multicall2("", "main", *fields)
torrents = []
for r in results:
torrent = {
"hash": r[0],
"name": r[1],
"total_size": r[2],
"completed": r[3],
"dlspeed": r[4],
"upspeed": r[5],
"state": r[6], # 0=stopped, 1=started
"complete": r[7], # 0=incomplete, 1=complete
"save_path": r[8],
"tags": r[9], # d.custom1 用于标签
"is_active": r[10],
"is_open": r[11],
"ratio": int(r[12]) / 1000.0 if r[12] else 0,
"content_path": r[13], # base_path 即完整内容路径
}
# 计算进度
if torrent["total_size"] > 0:
torrent["progress"] = (
torrent["completed"] / torrent["total_size"] * 100
)
else:
torrent["progress"] = 0
# ID过滤
if ids:
if isinstance(ids, str):
ids_list = [ids.upper()]
else:
ids_list = [i.upper() for i in ids]
if torrent["hash"].upper() not in ids_list:
continue
# 标签过滤
if tags:
torrent_tags = [
t.strip() for t in torrent["tags"].split(",") if t.strip()
]
if isinstance(tags, str):
tags_list = [t.strip() for t in tags.split(",")]
else:
tags_list = tags
if not set(tags_list).issubset(set(torrent_tags)):
continue
torrents.append(torrent)
return torrents, False
except Exception as err:
logger.error(f"获取种子列表出错:{str(err)}")
return [], True
def get_completed_torrents(
self, ids: Union[str, list] = None, tags: Union[str, list] = None
) -> Optional[List[Dict]]:
"""
获取已完成的种子
"""
if not self._proxy:
return None
torrents, error = self.get_torrents(ids=ids, tags=tags)
if error:
return None
return [t for t in torrents if t.get("complete") == 1]
def get_downloading_torrents(
self, ids: Union[str, list] = None, tags: Union[str, list] = None
) -> Optional[List[Dict]]:
"""
获取正在下载的种子
"""
if not self._proxy:
return None
torrents, error = self.get_torrents(ids=ids, tags=tags)
if error:
return None
return [t for t in torrents if t.get("complete") == 0]
def add_torrent(
self,
content: Union[str, bytes],
is_paused: Optional[bool] = False,
download_dir: Optional[str] = None,
tags: Optional[List[str]] = None,
cookie: Optional[str] = None,
**kwargs,
) -> bool:
"""
添加种子
:param content: 种子内容bytes或磁力链接/URLstr
:param is_paused: 添加后暂停
:param download_dir: 下载路径
:param tags: 标签列表
:param cookie: Cookie
:return: bool
"""
if not self._proxy or not content:
return False
try:
# 构造命令参数
commands = []
if download_dir:
commands.append(f'd.directory.set="{download_dir}"')
if tags:
tag_str = ",".join(tags)
commands.append(f'd.custom1.set="{tag_str}"')
if isinstance(content, bytes):
# 检查是否为磁力链接bytes形式
if content.startswith(b"magnet:"):
content = content.decode("utf-8", errors="ignore")
else:
# 种子文件内容使用load.raw
raw = xmlrpc.client.Binary(content)
if is_paused:
self._proxy.load.raw("", raw, *commands)
else:
self._proxy.load.raw_start("", raw, *commands)
return True
# URL或磁力链接
if is_paused:
self._proxy.load.normal("", content, *commands)
else:
self._proxy.load.start("", content, *commands)
return True
except Exception as err:
logger.error(f"添加种子出错:{str(err)}")
return False
def start_torrents(self, ids: Union[str, list]) -> bool:
"""
启动种子
"""
if not self._proxy:
return False
try:
if isinstance(ids, str):
ids = [ids]
for tid in ids:
self._proxy.d.start(tid)
return True
except Exception as err:
logger.error(f"启动种子出错:{str(err)}")
return False
def stop_torrents(self, ids: Union[str, list]) -> bool:
"""
停止种子
"""
if not self._proxy:
return False
try:
if isinstance(ids, str):
ids = [ids]
for tid in ids:
self._proxy.d.stop(tid)
return True
except Exception as err:
logger.error(f"停止种子出错:{str(err)}")
return False
def delete_torrents(self, delete_file: bool, ids: Union[str, list]) -> bool:
"""
删除种子
"""
if not self._proxy:
return False
if not ids:
return False
try:
if isinstance(ids, str):
ids = [ids]
for tid in ids:
if delete_file:
# 先获取base_path用于删除文件
try:
base_path = self._proxy.d.base_path(tid)
self._proxy.d.erase(tid)
if base_path:
import shutil
path = Path(base_path)
if path.is_dir():
shutil.rmtree(str(path), ignore_errors=True)
elif path.is_file():
path.unlink(missing_ok=True)
except Exception as e:
logger.warning(f"删除种子文件出错:{str(e)}")
self._proxy.d.erase(tid)
else:
self._proxy.d.erase(tid)
return True
except Exception as err:
logger.error(f"删除种子出错:{str(err)}")
return False
def get_files(self, tid: str) -> Optional[List[Dict]]:
"""
获取种子文件列表
"""
if not self._proxy:
return None
if not tid:
return None
try:
files = self._proxy.f.multicall(
tid,
"",
"f.path=",
"f.size_bytes=",
"f.priority=",
"f.completed_chunks=",
"f.size_chunks=",
)
result = []
for idx, f in enumerate(files):
result.append(
{
"id": idx,
"name": f[0],
"size": f[1],
"priority": f[2],
"progress": int(f[3]) / int(f[4]) * 100 if int(f[4]) > 0 else 0,
}
)
return result
except Exception as err:
logger.error(f"获取种子文件列表出错:{str(err)}")
return None
def set_files(
self, torrent_hash: str = None, file_ids: list = None, priority: int = 0
) -> bool:
"""
设置下载文件的优先级priority为0为不下载priority为1为普通
"""
if not self._proxy:
return False
if not torrent_hash or not file_ids:
return False
try:
for file_id in file_ids:
self._proxy.f.priority.set(f"{torrent_hash}:f{file_id}", priority)
# 更新种子优先级
self._proxy.d.update_priorities(torrent_hash)
return True
except Exception as err:
logger.error(f"设置种子文件状态出错:{str(err)}")
return False
def set_torrents_tag(
self, ids: Union[str, list], tags: List[str], overwrite: bool = False
) -> bool:
"""
设置种子标签使用d.custom1
:param ids: 种子Hash
:param tags: 标签列表
:param overwrite: 是否覆盖现有标签,默认为合并
"""
if not self._proxy:
return False
if not ids:
return False
try:
if isinstance(ids, str):
ids = [ids]
for tid in ids:
if overwrite:
# 直接覆盖标签
self._proxy.d.custom1.set(tid, ",".join(tags))
else:
# 获取现有标签
existing = self._proxy.d.custom1(tid)
existing_tags = (
[t.strip() for t in existing.split(",") if t.strip()]
if existing
else []
)
# 合并标签
merged = list(set(existing_tags + tags))
self._proxy.d.custom1.set(tid, ",".join(merged))
return True
except Exception as err:
logger.error(f"设置种子Tag出错{str(err)}")
return False
def remove_torrents_tag(self, ids: Union[str, list], tag: Union[str, list]) -> bool:
"""
移除种子标签
"""
if not self._proxy:
return False
if not ids:
return False
try:
if isinstance(ids, str):
ids = [ids]
if isinstance(tag, str):
tag = [tag]
for tid in ids:
existing = self._proxy.d.custom1(tid)
existing_tags = (
[t.strip() for t in existing.split(",") if t.strip()]
if existing
else []
)
new_tags = [t for t in existing_tags if t not in tag]
self._proxy.d.custom1.set(tid, ",".join(new_tags))
return True
except Exception as err:
logger.error(f"移除种子Tag出错{str(err)}")
return False
def get_torrent_tags(self, ids: str) -> List[str]:
"""
获取种子标签
"""
if not self._proxy:
return []
try:
existing = self._proxy.d.custom1(ids)
return (
[t.strip() for t in existing.split(",") if t.strip()]
if existing
else []
)
except Exception as err:
logger.error(f"获取种子标签出错:{str(err)}")
return []
def get_torrent_id_by_tag(
self, tags: Union[str, list], status: Optional[str] = None
) -> Optional[str]:
"""
通过标签多次尝试获取刚添加的种子ID并移除标签
"""
import time
if isinstance(tags, str):
tags = [tags]
torrent_id = None
for i in range(1, 10):
time.sleep(3)
torrents, error = self.get_torrents(tags=tags)
if not error and torrents:
torrent_id = torrents[0].get("hash")
# 移除查找标签
for tag in tags:
self.remove_torrents_tag(ids=torrent_id, tag=[tag])
break
return torrent_id
def transfer_info(self) -> Optional[Dict]:
"""
获取传输信息
"""
if not self._proxy:
return None
try:
return {
"dl_info_speed": self._proxy.throttle.global_down.rate(),
"up_info_speed": self._proxy.throttle.global_up.rate(),
"dl_info_data": self._proxy.throttle.global_down.total(),
"up_info_data": self._proxy.throttle.global_up.total(),
}
except Exception as err:
logger.error(f"获取传输信息出错:{str(err)}")
return None

View File

@@ -9,6 +9,7 @@ class ServiceInfo:
"""
封装服务相关信息的数据类
"""
# 名称
name: Optional[str] = None
# 实例
@@ -25,6 +26,7 @@ class MediaServerConf(BaseModel):
"""
媒体服务器配置
"""
# 名称
name: Optional[str] = None
# 类型 emby/jellyfin/plex
@@ -41,9 +43,10 @@ class DownloaderConf(BaseModel):
"""
下载器配置
"""
# 名称
name: Optional[str] = None
# 类型 qbittorrent/transmission
# 类型 qbittorrent/transmission/rtorrent
type: Optional[str] = None
# 是否默认
default: Optional[bool] = False
@@ -59,6 +62,7 @@ class NotificationConf(BaseModel):
"""
通知配置
"""
# 名称
name: Optional[str] = None
# 类型 telegram/wechat/vocechat/synologychat/slack/webpush
@@ -75,16 +79,18 @@ class NotificationSwitchConf(BaseModel):
"""
通知场景开关配置
"""
# 场景名称
type: str = None
# 通知范围 all/user/admin
action: Optional[str] = 'all'
action: Optional[str] = "all"
class StorageConf(BaseModel):
"""
存储配置
"""
# 类型 local/alipan/u115/rclone/alist
type: Optional[str] = None
# 名称
@@ -97,6 +103,7 @@ class TransferDirectoryConf(BaseModel):
"""
文件整理目录配置
"""
# 名称
name: Optional[str] = None
# 优先级
@@ -116,7 +123,7 @@ class TransferDirectoryConf(BaseModel):
# 监控方式 downloader/monitorNone为不监控
monitor_type: Optional[str] = None
# 监控模式 fast / compatibility
monitor_mode: Optional[str] = 'fast'
monitor_mode: Optional[str] = "fast"
# 整理方式 move/copy/link/softlink
transfer_type: Optional[str] = None
# 文件覆盖模式 always/size/never/latest

View File

@@ -293,6 +293,8 @@ class DownloaderType(Enum):
Qbittorrent = "Qbittorrent"
# Transmission
Transmission = "Transmission"
# Rtorrent
Rtorrent = "Rtorrent"
# Aria2
# Aria2 = "Aria2"