From c35faf5356bb21c1f0ca640c52e65c3d3fd2fd85 Mon Sep 17 00:00:00 2001 From: DDSRem <1448139087@qq.com> Date: Sun, 22 Feb 2026 13:12:22 +0800 Subject: [PATCH 1/3] feat(downloader): add rTorrent downloader support Implement rTorrent downloader module via XML-RPC protocol, supporting both HTTP (nginx/ruTorrent proxy) and SCGI connection modes. Add RtorrentModule implementing _ModuleBase and _DownloaderBase interfaces with no extra dependencies. Co-Authored-By: Claude Opus 4.6 --- .gitignore | 5 +- app/helper/downloader.py | 2 +- app/modules/rtorrent/__init__.py | 422 ++++++++++++++++++++++++++ app/modules/rtorrent/rtorrent.py | 499 +++++++++++++++++++++++++++++++ app/schemas/system.py | 13 +- app/schemas/types.py | 2 + 6 files changed, 938 insertions(+), 5 deletions(-) create mode 100644 app/modules/rtorrent/__init__.py create mode 100644 app/modules/rtorrent/rtorrent.py diff --git a/.gitignore b/.gitignore index 21197911..bc4842fc 100644 --- a/.gitignore +++ b/.gitignore @@ -27,4 +27,7 @@ venv # Pylint pylint-report.json -.pylint.d/ \ No newline at end of file +.pylint.d/ + +# AI +.claude/ diff --git a/app/helper/downloader.py b/app/helper/downloader.py index e0e449ba..425d1db4 100644 --- a/app/helper/downloader.py +++ b/app/helper/downloader.py @@ -25,7 +25,7 @@ class DownloaderHelper(ServiceBaseHelper[DownloaderConf]): ) -> bool: """ 通用的下载器类型判断方法 - :param service_type: 下载器的类型名称(如 'qbittorrent', 'transmission') + :param service_type: 下载器的类型名称(如 'qbittorrent', 'transmission', 'rtorrent') :param service: 要判断的服务信息 :param name: 服务的名称 :return: 如果服务类型或实例为指定类型,返回 True;否则返回 False diff --git a/app/modules/rtorrent/__init__.py b/app/modules/rtorrent/__init__.py new file mode 100644 index 00000000..89d9823b --- /dev/null +++ b/app/modules/rtorrent/__init__.py @@ -0,0 +1,422 @@ +from pathlib import Path +from typing import Set, Tuple, Optional, Union, List, Dict + +from torrentool.torrent import Torrent + +from app import schemas +from app.core.cache import FileCache +from app.core.config import settings +from app.core.metainfo import MetaInfo +from app.log import logger +from app.modules import _ModuleBase, _DownloaderBase +from app.modules.rtorrent.rtorrent import Rtorrent +from app.schemas import TransferTorrent, DownloadingTorrent +from app.schemas.types import TorrentStatus, ModuleType, DownloaderType +from app.utils.string import StringUtils + + +class RtorrentModule(_ModuleBase, _DownloaderBase[Rtorrent]): + + def init_module(self) -> None: + """ + 初始化模块 + """ + super().init_service(service_name=Rtorrent.__name__.lower(), + service_type=Rtorrent) + + @staticmethod + def get_name() -> str: + return "Rtorrent" + + @staticmethod + def get_type() -> ModuleType: + """ + 获取模块类型 + """ + return ModuleType.Downloader + + @staticmethod + def get_subtype() -> DownloaderType: + """ + 获取模块子类型 + """ + return DownloaderType.Rtorrent + + @staticmethod + def get_priority() -> int: + """ + 获取模块优先级,数字越小优先级越高,只有同一接口下优先级才生效 + """ + return 3 + + def stop(self): + pass + + def test(self) -> Optional[Tuple[bool, str]]: + """ + 测试模块连接性 + """ + if not self.get_instances(): + return None + for name, server in self.get_instances().items(): + if server.is_inactive(): + server.reconnect() + if not server.transfer_info(): + return False, f"无法连接rTorrent下载器:{name}" + return True, "" + + def init_setting(self) -> Tuple[str, Union[str, bool]]: + pass + + def scheduler_job(self) -> None: + """ + 定时任务,每10分钟调用一次 + """ + for name, server in self.get_instances().items(): + if server.is_inactive(): + logger.info(f"rTorrent下载器 {name} 连接断开,尝试重连 ...") + server.reconnect() + + def download(self, content: Union[Path, str, bytes], download_dir: Path, cookie: str, + episodes: Set[int] = None, category: Optional[str] = None, label: Optional[str] = None, + downloader: Optional[str] = None) -> Optional[Tuple[Optional[str], Optional[str], Optional[str], str]]: + """ + 根据种子文件,选择并添加下载任务 + :param content: 种子文件地址或者磁力链接或种子内容 + :param download_dir: 下载目录 + :param cookie: cookie + :param episodes: 需要下载的集数 + :param category: 分类,rTorrent中未使用 + :param label: 标签 + :param downloader: 下载器 + :return: 下载器名称、种子Hash、种子文件布局、错误原因 + """ + + def __get_torrent_info() -> Tuple[Optional[Torrent], Optional[bytes]]: + """ + 获取种子名称 + """ + torrent_info, torrent_content = None, None + try: + if isinstance(content, Path): + if content.exists(): + torrent_content = content.read_bytes() + else: + torrent_content = FileCache().get(content.as_posix(), region="torrents") + else: + torrent_content = content + + if torrent_content: + if StringUtils.is_magnet_link(torrent_content): + return None, torrent_content + else: + torrent_info = Torrent.from_string(torrent_content) + + return torrent_info, torrent_content + except Exception as e: + logger.error(f"获取种子名称失败:{e}") + return None, None + + if not content: + return None, None, None, "下载内容为空" + + # 读取种子的名称 + torrent_from_file, content = __get_torrent_info() + # 检查是否为磁力链接 + is_magnet = isinstance(content, str) and content.startswith("magnet:") or isinstance(content, + bytes) and content.startswith( + b"magnet:") + if not torrent_from_file and not is_magnet: + return None, None, None, f"添加种子任务失败:无法读取种子文件" + + # 获取下载器 + server: Rtorrent = self.get_instance(downloader) + if not server: + return None + + # 生成随机Tag + tag = StringUtils.generate_random_str(10) + if label: + tags = label.split(',') + [tag] + elif settings.TORRENT_TAG: + tags = [tag, settings.TORRENT_TAG] + else: + tags = [tag] + # 如果要选择文件则先暂停 + is_paused = True if episodes else False + # 添加任务 + state = server.add_torrent( + content=content, + download_dir=self.normalize_path(download_dir, downloader), + is_paused=is_paused, + tags=tags, + cookie=cookie, + ) + + # rTorrent 始终使用原始种子布局 + torrent_layout = "Original" + + if not state: + # 查询所有下载器的种子 + torrents, error = server.get_torrents() + if error: + return None, None, None, "无法连接rTorrent下载器" + if torrents: + try: + for torrent in torrents: + # 名称与大小相等则认为是同一个种子 + if torrent.get("name") == getattr(torrent_from_file, 'name', '') \ + and torrent.get("total_size") == getattr(torrent_from_file, 'total_size', 0): + torrent_hash = torrent.get("hash") + torrent_tags = [str(t).strip() for t in torrent.get("tags", "").split(',') if t.strip()] + logger.warn(f"下载器中已存在该种子任务:{torrent_hash} - {torrent.get('name')}") + # 给种子打上标签 + if "已整理" in torrent_tags: + server.remove_torrents_tag(ids=torrent_hash, tag=['已整理']) + if settings.TORRENT_TAG and settings.TORRENT_TAG not in torrent_tags: + logger.info(f"给种子 {torrent_hash} 打上标签:{settings.TORRENT_TAG}") + server.set_torrents_tag(ids=torrent_hash, tags=[settings.TORRENT_TAG]) + return downloader or self.get_default_config_name(), torrent_hash, torrent_layout, f"下载任务已存在" + finally: + torrents.clear() + del torrents + return None, None, None, f"添加种子任务失败:{content}" + else: + # 获取种子Hash + torrent_hash = server.get_torrent_id_by_tag(tags=tag) + if not torrent_hash: + return None, None, None, f"下载任务添加成功,但获取rTorrent任务信息失败:{content}" + else: + if is_paused: + # 种子文件 + torrent_files = server.get_files(torrent_hash) + if not torrent_files: + return downloader or self.get_default_config_name(), torrent_hash, torrent_layout, "获取种子文件失败,下载任务可能在暂停状态" + + # 不需要的文件ID + file_ids = [] + # 需要的集清单 + sucess_epidised = [] + try: + for torrent_file in torrent_files: + file_id = torrent_file.get("id") + file_name = torrent_file.get("name") + meta_info = MetaInfo(file_name) + if not meta_info.episode_list \ + or not set(meta_info.episode_list).issubset(episodes): + file_ids.append(file_id) + else: + sucess_epidised = list(set(sucess_epidised).union(set(meta_info.episode_list))) + finally: + torrent_files.clear() + del torrent_files + if sucess_epidised and file_ids: + # 设置不需要的文件优先级为0(不下载) + server.set_files(torrent_hash=torrent_hash, file_ids=file_ids, priority=0) + # 开始任务 + server.start_torrents(torrent_hash) + return downloader or self.get_default_config_name(), torrent_hash, torrent_layout, f"添加下载成功,已选择集数:{sucess_epidised}" + else: + return downloader or self.get_default_config_name(), torrent_hash, torrent_layout, "添加下载成功" + + def list_torrents(self, status: TorrentStatus = None, + hashs: Union[list, str] = None, + downloader: Optional[str] = None + ) -> Optional[List[Union[TransferTorrent, DownloadingTorrent]]]: + """ + 获取下载器种子列表 + :param status: 种子状态 + :param hashs: 种子Hash + :param downloader: 下载器 + :return: 下载器中符合状态的种子列表 + """ + # 获取下载器 + if downloader: + server: Rtorrent = self.get_instance(downloader) + if not server: + return None + servers = {downloader: server} + else: + servers: Dict[str, Rtorrent] = self.get_instances() + ret_torrents = [] + if hashs: + # 按Hash获取 + for name, server in servers.items(): + torrents, _ = server.get_torrents(ids=hashs, tags=settings.TORRENT_TAG) or [] + try: + for torrent in torrents: + content_path = torrent.get("content_path") + if content_path: + torrent_path = Path(content_path) + else: + torrent_path = Path(torrent.get('save_path')) / torrent.get('name') + ret_torrents.append(TransferTorrent( + downloader=name, + title=torrent.get('name'), + path=torrent_path, + hash=torrent.get('hash'), + size=torrent.get('total_size'), + tags=torrent.get('tags'), + progress=torrent.get('progress', 0), + state="paused" if torrent.get('state') == 0 else "downloading", + )) + finally: + torrents.clear() + del torrents + elif status == TorrentStatus.TRANSFER: + # 获取已完成且未整理的 + for name, server in servers.items(): + torrents = server.get_completed_torrents(tags=settings.TORRENT_TAG) or [] + try: + for torrent in torrents: + tags = torrent.get("tags") or "" + tag_list = [t.strip() for t in tags.split(",") if t.strip()] + if "已整理" in tag_list: + continue + content_path = torrent.get("content_path") + if content_path: + torrent_path = Path(content_path) + else: + torrent_path = Path(torrent.get('save_path')) / torrent.get('name') + ret_torrents.append(TransferTorrent( + downloader=name, + title=torrent.get('name'), + path=torrent_path, + hash=torrent.get('hash'), + tags=torrent.get('tags') + )) + finally: + torrents.clear() + del torrents + elif status == TorrentStatus.DOWNLOADING: + # 获取正在下载的任务 + for name, server in servers.items(): + torrents = server.get_downloading_torrents(tags=settings.TORRENT_TAG) or [] + try: + for torrent in torrents: + meta = MetaInfo(torrent.get('name')) + dlspeed = torrent.get('dlspeed', 0) + upspeed = torrent.get('upspeed', 0) + total_size = torrent.get('total_size', 0) + completed = torrent.get('completed', 0) + ret_torrents.append(DownloadingTorrent( + downloader=name, + hash=torrent.get('hash'), + title=torrent.get('name'), + name=meta.name, + year=meta.year, + season_episode=meta.season_episode, + progress=torrent.get('progress', 0), + size=total_size, + state="paused" if torrent.get('state') == 0 else "downloading", + dlspeed=StringUtils.str_filesize(dlspeed), + upspeed=StringUtils.str_filesize(upspeed), + left_time=StringUtils.str_secends( + (total_size - completed) / dlspeed) if dlspeed > 0 else '' + )) + finally: + torrents.clear() + del torrents + else: + return None + return ret_torrents # noqa + + def transfer_completed(self, hashs: str, downloader: Optional[str] = None) -> None: + """ + 转移完成后的处理 + :param hashs: 种子Hash + :param downloader: 下载器 + """ + server: Rtorrent = self.get_instance(downloader) + if not server: + return None + # 获取原标签 + org_tags = server.get_torrent_tags(ids=hashs) + # 种子打上已整理标签 + if org_tags: + tags = org_tags + ['已整理'] + else: + tags = ['已整理'] + # 直接设置完整标签(覆盖) + if isinstance(hashs, str): + hashs_list = [hashs] + else: + hashs_list = hashs + for tid in hashs_list: + try: + server._proxy.d.custom1.set(tid, ",".join(tags)) + except Exception: + pass + return None + + def remove_torrents(self, hashs: Union[str, list], delete_file: Optional[bool] = True, + downloader: Optional[str] = None) -> Optional[bool]: + """ + 删除下载器种子 + :param hashs: 种子Hash + :param delete_file: 是否删除文件 + :param downloader: 下载器 + :return: bool + """ + server: Rtorrent = self.get_instance(downloader) + if not server: + return None + return server.delete_torrents(delete_file=delete_file, ids=hashs) + + def start_torrents(self, hashs: Union[list, str], + downloader: Optional[str] = None) -> Optional[bool]: + """ + 开始下载 + :param hashs: 种子Hash + :param downloader: 下载器 + :return: bool + """ + server: Rtorrent = self.get_instance(downloader) + if not server: + return None + return server.start_torrents(ids=hashs) + + def stop_torrents(self, hashs: Union[list, str], downloader: Optional[str] = None) -> Optional[bool]: + """ + 停止下载 + :param hashs: 种子Hash + :param downloader: 下载器 + :return: bool + """ + server: Rtorrent = self.get_instance(downloader) + if not server: + return None + return server.stop_torrents(ids=hashs) + + def torrent_files(self, tid: str, downloader: Optional[str] = None) -> Optional[List[Dict]]: + """ + 获取种子文件列表 + """ + server: Rtorrent = self.get_instance(downloader) + if not server: + return None + return server.get_files(tid=tid) + + def downloader_info(self, downloader: Optional[str] = None) -> Optional[List[schemas.DownloaderInfo]]: + """ + 下载器信息 + """ + if downloader: + server: Rtorrent = self.get_instance(downloader) + if not server: + return None + servers = [server] + else: + servers = self.get_instances().values() + ret_info = [] + for server in servers: + info = server.transfer_info() + if not info: + continue + ret_info.append(schemas.DownloaderInfo( + download_speed=info.get("dl_info_speed"), + upload_speed=info.get("up_info_speed"), + download_size=info.get("dl_info_data"), + upload_size=info.get("up_info_data") + )) + return ret_info diff --git a/app/modules/rtorrent/rtorrent.py b/app/modules/rtorrent/rtorrent.py new file mode 100644 index 00000000..87b17d09 --- /dev/null +++ b/app/modules/rtorrent/rtorrent.py @@ -0,0 +1,499 @@ +import socket +import traceback +import xmlrpc.client +from pathlib import Path +from typing import Optional, Union, Tuple, List, Dict +from urllib.parse import urlparse + +from app.log import logger + + +class SCGITransport(xmlrpc.client.Transport): + """ + 通过SCGI协议与rTorrent通信的Transport + """ + + def single_request(self, host, handler, request_body, verbose=False): + # 建立socket连接 + parsed = urlparse(f"scgi://{host}") + sock = socket.create_connection((parsed.hostname, parsed.port or 5000), timeout=60) + try: + # 构造SCGI请求头 + headers = ( + f"CONTENT_LENGTH\x00{len(request_body)}\x00" + f"SCGI\x001\x00" + f"REQUEST_METHOD\x00POST\x00" + f"REQUEST_URI\x00/RPC2\x00" + ) + # netstring格式: "len:headers," + netstring = f"{len(headers)}:{headers},".encode() + # 发送请求 + sock.sendall(netstring + request_body) + # 读取响应 + response = b"" + while True: + chunk = sock.recv(4096) + if not chunk: + break + response += chunk + finally: + sock.close() + + # 跳过HTTP响应头 + header_end = response.find(b"\r\n\r\n") + if header_end != -1: + response = response[header_end + 4:] + + # 解析XML-RPC响应 + return self.parse_response(self._build_response(response)) + + @staticmethod + def _build_response(data: bytes): + """ + 构造类文件对象用于parse_response + """ + import io + import http.client + + class _FakeSocket(io.BytesIO): + def makefile(self, *args, **kwargs): + return self + + raw = b"HTTP/1.0 200 OK\r\nContent-Type: text/xml\r\n\r\n" + data + response = http.client.HTTPResponse(_FakeSocket(raw)) # noqa + response.begin() + return response + + +class Rtorrent: + """ + rTorrent下载器 + """ + + def __init__(self, host: Optional[str] = None, port: Optional[int] = None, + username: Optional[str] = None, password: Optional[str] = None, + **kwargs): + self._proxy = None + if host and port: + self._host = f"{host}:{port}" + elif host: + self._host = host + else: + logger.error("rTorrent配置不完整!") + return + self._username = username + self._password = password + self._proxy = self.__login_rtorrent() + + def __login_rtorrent(self) -> Optional[xmlrpc.client.ServerProxy]: + """ + 连接rTorrent + """ + if not self._host: + return None + try: + url = self._host + if url.startswith("scgi://"): + # SCGI直连模式 + parsed = urlparse(url) + logger.info(f"正在通过SCGI连接 rTorrent:{url}") + proxy = xmlrpc.client.ServerProxy( + url, + transport=SCGITransport() + ) + else: + # HTTP模式 (通过nginx/ruTorrent代理) + if not url.startswith("http"): + url = f"http://{url}" + # 注入认证信息到URL + if self._username and self._password: + parsed = urlparse(url) + url = f"{parsed.scheme}://{self._username}:{self._password}@{parsed.hostname}" + if parsed.port: + url += f":{parsed.port}" + url += parsed.path or "/RPC2" + logger.info(f"正在通过HTTP连接 rTorrent:{url.split('@')[-1] if '@' in url else url}") + proxy = xmlrpc.client.ServerProxy(url) + + # 测试连接 + proxy.system.client_version() + return proxy + except Exception as err: + stack_trace = "".join(traceback.format_exception(None, err, err.__traceback__))[:2000] + logger.error(f"rTorrent 连接出错:{str(err)}\n{stack_trace}") + return None + + def is_inactive(self) -> bool: + """ + 判断是否需要重连 + """ + if not self._host: + return False + return True if not self._proxy else False + + def reconnect(self): + """ + 重连 + """ + self._proxy = self.__login_rtorrent() + + def get_torrents(self, ids: Optional[Union[str, list]] = None, + status: Optional[str] = None, + tags: Optional[Union[str, list]] = None) -> Tuple[List[Dict], bool]: + """ + 获取种子列表 + :return: 种子列表, 是否发生异常 + """ + if not self._proxy: + return [], True + try: + # 使用d.multicall2获取种子列表 + fields = [ + "d.hash=", + "d.name=", + "d.size_bytes=", + "d.completed_bytes=", + "d.down.rate=", + "d.up.rate=", + "d.state=", + "d.complete=", + "d.directory=", + "d.custom1=", + "d.is_active=", + "d.is_open=", + "d.ratio=", + "d.base_path=", + ] + # 获取所有种子 + results = self._proxy.d.multicall2("", "main", *fields) + torrents = [] + for r in results: + torrent = { + "hash": r[0], + "name": r[1], + "total_size": r[2], + "completed": r[3], + "dlspeed": r[4], + "upspeed": r[5], + "state": r[6], # 0=stopped, 1=started + "complete": r[7], # 0=incomplete, 1=complete + "save_path": r[8], + "tags": r[9], # d.custom1 用于标签 + "is_active": r[10], + "is_open": r[11], + "ratio": int(r[12]) / 1000.0 if r[12] else 0, + "content_path": r[13], # base_path 即完整内容路径 + } + # 计算进度 + if torrent["total_size"] > 0: + torrent["progress"] = torrent["completed"] / torrent["total_size"] * 100 + else: + torrent["progress"] = 0 + + # ID过滤 + if ids: + if isinstance(ids, str): + ids_list = [ids.upper()] + else: + ids_list = [i.upper() for i in ids] + if torrent["hash"].upper() not in ids_list: + continue + + # 标签过滤 + if tags: + torrent_tags = [t.strip() for t in torrent["tags"].split(",") if t.strip()] + if isinstance(tags, str): + tags_list = [t.strip() for t in tags.split(",")] + else: + tags_list = tags + if not set(tags_list).issubset(set(torrent_tags)): + continue + + torrents.append(torrent) + return torrents, False + except Exception as err: + logger.error(f"获取种子列表出错:{str(err)}") + return [], True + + def get_completed_torrents(self, ids: Union[str, list] = None, + tags: Union[str, list] = None) -> Optional[List[Dict]]: + """ + 获取已完成的种子 + """ + if not self._proxy: + return None + torrents, error = self.get_torrents(ids=ids, tags=tags) + if error: + return None + return [t for t in torrents if t.get("complete") == 1] + + def get_downloading_torrents(self, ids: Union[str, list] = None, + tags: Union[str, list] = None) -> Optional[List[Dict]]: + """ + 获取正在下载的种子 + """ + if not self._proxy: + return None + torrents, error = self.get_torrents(ids=ids, tags=tags) + if error: + return None + return [t for t in torrents if t.get("complete") == 0] + + def add_torrent(self, + content: Union[str, bytes], + is_paused: Optional[bool] = False, + download_dir: Optional[str] = None, + tags: Optional[List[str]] = None, + cookie: Optional[str] = None, + **kwargs) -> bool: + """ + 添加种子 + :param content: 种子内容(bytes)或磁力链接/URL(str) + :param is_paused: 添加后暂停 + :param download_dir: 下载路径 + :param tags: 标签列表 + :param cookie: Cookie + :return: bool + """ + if not self._proxy or not content: + return False + try: + # 构造命令参数 + commands = [] + if download_dir: + commands.append(f'd.directory.set="{download_dir}"') + if tags: + tag_str = ",".join(tags) + commands.append(f'd.custom1.set="{tag_str}"') + + if isinstance(content, bytes): + # 检查是否为磁力链接(bytes形式) + if content.startswith(b"magnet:"): + content = content.decode("utf-8", errors="ignore") + else: + # 种子文件内容,使用load.raw + raw = xmlrpc.client.Binary(content) + if is_paused: + self._proxy.load.raw("", raw, *commands) + else: + self._proxy.load.raw_start("", raw, *commands) + return True + + # URL或磁力链接 + if is_paused: + self._proxy.load.normal("", content, *commands) + else: + self._proxy.load.start("", content, *commands) + return True + except Exception as err: + logger.error(f"添加种子出错:{str(err)}") + return False + + def start_torrents(self, ids: Union[str, list]) -> bool: + """ + 启动种子 + """ + if not self._proxy: + return False + try: + if isinstance(ids, str): + ids = [ids] + for tid in ids: + self._proxy.d.start(tid) + return True + except Exception as err: + logger.error(f"启动种子出错:{str(err)}") + return False + + def stop_torrents(self, ids: Union[str, list]) -> bool: + """ + 停止种子 + """ + if not self._proxy: + return False + try: + if isinstance(ids, str): + ids = [ids] + for tid in ids: + self._proxy.d.stop(tid) + return True + except Exception as err: + logger.error(f"停止种子出错:{str(err)}") + return False + + def delete_torrents(self, delete_file: bool, ids: Union[str, list]) -> bool: + """ + 删除种子 + """ + if not self._proxy: + return False + if not ids: + return False + try: + if isinstance(ids, str): + ids = [ids] + for tid in ids: + if delete_file: + # 先获取base_path用于删除文件 + try: + base_path = self._proxy.d.base_path(tid) + self._proxy.d.erase(tid) + if base_path: + import shutil + path = Path(base_path) + if path.is_dir(): + shutil.rmtree(str(path), ignore_errors=True) + elif path.is_file(): + path.unlink(missing_ok=True) + except Exception as e: + logger.warning(f"删除种子文件出错:{str(e)}") + self._proxy.d.erase(tid) + else: + self._proxy.d.erase(tid) + return True + except Exception as err: + logger.error(f"删除种子出错:{str(err)}") + return False + + def get_files(self, tid: str) -> Optional[List[Dict]]: + """ + 获取种子文件列表 + """ + if not self._proxy: + return None + if not tid: + return None + try: + files = self._proxy.f.multicall(tid, "", + "f.path=", + "f.size_bytes=", + "f.priority=", + "f.completed_chunks=", + "f.size_chunks=") + result = [] + for idx, f in enumerate(files): + result.append({ + "id": idx, + "name": f[0], + "size": f[1], + "priority": f[2], + "progress": int(f[3]) / int(f[4]) * 100 if int(f[4]) > 0 else 0 + }) + return result + except Exception as err: + logger.error(f"获取种子文件列表出错:{str(err)}") + return None + + def set_files(self, torrent_hash: str = None, file_ids: list = None, priority: int = 0) -> bool: + """ + 设置下载文件的优先级,priority为0为不下载,priority为1为普通 + """ + if not self._proxy: + return False + if not torrent_hash or not file_ids: + return False + try: + for file_id in file_ids: + self._proxy.f.priority.set(f"{torrent_hash}:f{file_id}", priority) + # 更新种子优先级 + self._proxy.d.update_priorities(torrent_hash) + return True + except Exception as err: + logger.error(f"设置种子文件状态出错:{str(err)}") + return False + + def set_torrents_tag(self, ids: Union[str, list], tags: List[str]) -> bool: + """ + 设置种子标签(使用d.custom1) + """ + if not self._proxy: + return False + if not ids: + return False + try: + if isinstance(ids, str): + ids = [ids] + for tid in ids: + # 获取现有标签 + existing = self._proxy.d.custom1(tid) + existing_tags = [t.strip() for t in existing.split(",") if t.strip()] if existing else [] + # 合并标签 + merged = list(set(existing_tags + tags)) + self._proxy.d.custom1.set(tid, ",".join(merged)) + return True + except Exception as err: + logger.error(f"设置种子Tag出错:{str(err)}") + return False + + def remove_torrents_tag(self, ids: Union[str, list], tag: Union[str, list]) -> bool: + """ + 移除种子标签 + """ + if not self._proxy: + return False + if not ids: + return False + try: + if isinstance(ids, str): + ids = [ids] + if isinstance(tag, str): + tag = [tag] + for tid in ids: + existing = self._proxy.d.custom1(tid) + existing_tags = [t.strip() for t in existing.split(",") if t.strip()] if existing else [] + new_tags = [t for t in existing_tags if t not in tag] + self._proxy.d.custom1.set(tid, ",".join(new_tags)) + return True + except Exception as err: + logger.error(f"移除种子Tag出错:{str(err)}") + return False + + def get_torrent_tags(self, ids: str) -> List[str]: + """ + 获取种子标签 + """ + if not self._proxy: + return [] + try: + existing = self._proxy.d.custom1(ids) + return [t.strip() for t in existing.split(",") if t.strip()] if existing else [] + except Exception as err: + logger.error(f"获取种子标签出错:{str(err)}") + return [] + + def get_torrent_id_by_tag(self, tags: Union[str, list], + status: Optional[str] = None) -> Optional[str]: + """ + 通过标签多次尝试获取刚添加的种子ID,并移除标签 + """ + import time + if isinstance(tags, str): + tags = [tags] + torrent_id = None + for i in range(1, 10): + time.sleep(3) + torrents, error = self.get_torrents(tags=tags) + if not error and torrents: + torrent_id = torrents[0].get("hash") + # 移除查找标签 + for tag in tags: + self.remove_torrents_tag(ids=torrent_id, tag=[tag]) + break + return torrent_id + + def transfer_info(self) -> Optional[Dict]: + """ + 获取传输信息 + """ + if not self._proxy: + return None + try: + return { + "dl_info_speed": self._proxy.throttle.global_down.rate(), + "up_info_speed": self._proxy.throttle.global_up.rate(), + "dl_info_data": self._proxy.throttle.global_down.total(), + "up_info_data": self._proxy.throttle.global_up.total(), + } + except Exception as err: + logger.error(f"获取传输信息出错:{str(err)}") + return None diff --git a/app/schemas/system.py b/app/schemas/system.py index 2b8e9222..6097d441 100644 --- a/app/schemas/system.py +++ b/app/schemas/system.py @@ -9,6 +9,7 @@ class ServiceInfo: """ 封装服务相关信息的数据类 """ + # 名称 name: Optional[str] = None # 实例 @@ -25,6 +26,7 @@ class MediaServerConf(BaseModel): """ 媒体服务器配置 """ + # 名称 name: Optional[str] = None # 类型 emby/jellyfin/plex @@ -41,9 +43,10 @@ class DownloaderConf(BaseModel): """ 下载器配置 """ + # 名称 name: Optional[str] = None - # 类型 qbittorrent/transmission + # 类型 qbittorrent/transmission/rtorrent type: Optional[str] = None # 是否默认 default: Optional[bool] = False @@ -59,6 +62,7 @@ class NotificationConf(BaseModel): """ 通知配置 """ + # 名称 name: Optional[str] = None # 类型 telegram/wechat/vocechat/synologychat/slack/webpush @@ -75,16 +79,18 @@ class NotificationSwitchConf(BaseModel): """ 通知场景开关配置 """ + # 场景名称 type: str = None # 通知范围 all/user/admin - action: Optional[str] = 'all' + action: Optional[str] = "all" class StorageConf(BaseModel): """ 存储配置 """ + # 类型 local/alipan/u115/rclone/alist type: Optional[str] = None # 名称 @@ -97,6 +103,7 @@ class TransferDirectoryConf(BaseModel): """ 文件整理目录配置 """ + # 名称 name: Optional[str] = None # 优先级 @@ -116,7 +123,7 @@ class TransferDirectoryConf(BaseModel): # 监控方式 downloader/monitor,None为不监控 monitor_type: Optional[str] = None # 监控模式 fast / compatibility - monitor_mode: Optional[str] = 'fast' + monitor_mode: Optional[str] = "fast" # 整理方式 move/copy/link/softlink transfer_type: Optional[str] = None # 文件覆盖模式 always/size/never/latest diff --git a/app/schemas/types.py b/app/schemas/types.py index eb19100d..c9073b0f 100644 --- a/app/schemas/types.py +++ b/app/schemas/types.py @@ -293,6 +293,8 @@ class DownloaderType(Enum): Qbittorrent = "Qbittorrent" # Transmission Transmission = "Transmission" + # Rtorrent + Rtorrent = "Rtorrent" # Aria2 # Aria2 = "Aria2" From def652c76834db025db45a7758c35dcf591e3251 Mon Sep 17 00:00:00 2001 From: DDSRem <1448139087@qq.com> Date: Sun, 22 Feb 2026 13:40:15 +0800 Subject: [PATCH 2/3] fix(rtorrent): address code review feedback - Replace direct _proxy access in transfer_completed with set_torrents_tag(overwrite=True) for proper encapsulation and error logging - Optimize episode collection by using set accumulation instead of repeated list-set conversions in loop - Fix type hint for hashs parameter in transfer_completed (str -> Union[str, list]) - Add overwrite parameter to set_torrents_tag to support tag replacement Co-Authored-By: Claude Opus 4.6 --- app/modules/rtorrent/__init__.py | 17 +++++------------ app/modules/rtorrent/rtorrent.py | 21 ++++++++++++++------- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/app/modules/rtorrent/__init__.py b/app/modules/rtorrent/__init__.py index 89d9823b..61af67a9 100644 --- a/app/modules/rtorrent/__init__.py +++ b/app/modules/rtorrent/__init__.py @@ -196,7 +196,7 @@ class RtorrentModule(_ModuleBase, _DownloaderBase[Rtorrent]): # 不需要的文件ID file_ids = [] # 需要的集清单 - sucess_epidised = [] + sucess_epidised = set() try: for torrent_file in torrent_files: file_id = torrent_file.get("id") @@ -206,10 +206,11 @@ class RtorrentModule(_ModuleBase, _DownloaderBase[Rtorrent]): or not set(meta_info.episode_list).issubset(episodes): file_ids.append(file_id) else: - sucess_epidised = list(set(sucess_epidised).union(set(meta_info.episode_list))) + sucess_epidised.update(meta_info.episode_list) finally: torrent_files.clear() del torrent_files + sucess_epidised = list(sucess_epidised) if sucess_epidised and file_ids: # 设置不需要的文件优先级为0(不下载) server.set_files(torrent_hash=torrent_hash, file_ids=file_ids, priority=0) @@ -321,7 +322,7 @@ class RtorrentModule(_ModuleBase, _DownloaderBase[Rtorrent]): return None return ret_torrents # noqa - def transfer_completed(self, hashs: str, downloader: Optional[str] = None) -> None: + def transfer_completed(self, hashs: Union[str, list], downloader: Optional[str] = None) -> None: """ 转移完成后的处理 :param hashs: 种子Hash @@ -338,15 +339,7 @@ class RtorrentModule(_ModuleBase, _DownloaderBase[Rtorrent]): else: tags = ['已整理'] # 直接设置完整标签(覆盖) - if isinstance(hashs, str): - hashs_list = [hashs] - else: - hashs_list = hashs - for tid in hashs_list: - try: - server._proxy.d.custom1.set(tid, ",".join(tags)) - except Exception: - pass + server.set_torrents_tag(ids=hashs, tags=tags, overwrite=True) return None def remove_torrents(self, hashs: Union[str, list], delete_file: Optional[bool] = True, diff --git a/app/modules/rtorrent/rtorrent.py b/app/modules/rtorrent/rtorrent.py index 87b17d09..9767e1f4 100644 --- a/app/modules/rtorrent/rtorrent.py +++ b/app/modules/rtorrent/rtorrent.py @@ -402,9 +402,12 @@ class Rtorrent: logger.error(f"设置种子文件状态出错:{str(err)}") return False - def set_torrents_tag(self, ids: Union[str, list], tags: List[str]) -> bool: + def set_torrents_tag(self, ids: Union[str, list], tags: List[str], overwrite: bool = False) -> bool: """ 设置种子标签(使用d.custom1) + :param ids: 种子Hash + :param tags: 标签列表 + :param overwrite: 是否覆盖现有标签,默认为合并 """ if not self._proxy: return False @@ -414,12 +417,16 @@ class Rtorrent: if isinstance(ids, str): ids = [ids] for tid in ids: - # 获取现有标签 - existing = self._proxy.d.custom1(tid) - existing_tags = [t.strip() for t in existing.split(",") if t.strip()] if existing else [] - # 合并标签 - merged = list(set(existing_tags + tags)) - self._proxy.d.custom1.set(tid, ",".join(merged)) + if overwrite: + # 直接覆盖标签 + self._proxy.d.custom1.set(tid, ",".join(tags)) + else: + # 获取现有标签 + existing = self._proxy.d.custom1(tid) + existing_tags = [t.strip() for t in existing.split(",") if t.strip()] if existing else [] + # 合并标签 + merged = list(set(existing_tags + tags)) + self._proxy.d.custom1.set(tid, ",".join(merged)) return True except Exception as err: logger.error(f"设置种子Tag出错:{str(err)}") From 69a120710216cebde29f3375f5ed3a14c9caed50 Mon Sep 17 00:00:00 2001 From: DDSRem <1448139087@qq.com> Date: Sun, 22 Feb 2026 13:42:27 +0800 Subject: [PATCH 3/3] chore(rtorrent): formatting code --- app/modules/rtorrent/__init__.py | 276 +++++++++++++++++++++---------- app/modules/rtorrent/rtorrent.py | 143 ++++++++++------ 2 files changed, 280 insertions(+), 139 deletions(-) diff --git a/app/modules/rtorrent/__init__.py b/app/modules/rtorrent/__init__.py index 61af67a9..e55f76f4 100644 --- a/app/modules/rtorrent/__init__.py +++ b/app/modules/rtorrent/__init__.py @@ -16,13 +16,13 @@ from app.utils.string import StringUtils class RtorrentModule(_ModuleBase, _DownloaderBase[Rtorrent]): - def init_module(self) -> None: """ 初始化模块 """ - super().init_service(service_name=Rtorrent.__name__.lower(), - service_type=Rtorrent) + super().init_service( + service_name=Rtorrent.__name__.lower(), service_type=Rtorrent + ) @staticmethod def get_name() -> str: @@ -77,9 +77,16 @@ class RtorrentModule(_ModuleBase, _DownloaderBase[Rtorrent]): logger.info(f"rTorrent下载器 {name} 连接断开,尝试重连 ...") server.reconnect() - def download(self, content: Union[Path, str, bytes], download_dir: Path, cookie: str, - episodes: Set[int] = None, category: Optional[str] = None, label: Optional[str] = None, - downloader: Optional[str] = None) -> Optional[Tuple[Optional[str], Optional[str], Optional[str], str]]: + def download( + self, + content: Union[Path, str, bytes], + download_dir: Path, + cookie: str, + episodes: Set[int] = None, + category: Optional[str] = None, + label: Optional[str] = None, + downloader: Optional[str] = None, + ) -> Optional[Tuple[Optional[str], Optional[str], Optional[str], str]]: """ 根据种子文件,选择并添加下载任务 :param content: 种子文件地址或者磁力链接或种子内容 @@ -102,7 +109,9 @@ class RtorrentModule(_ModuleBase, _DownloaderBase[Rtorrent]): if content.exists(): torrent_content = content.read_bytes() else: - torrent_content = FileCache().get(content.as_posix(), region="torrents") + torrent_content = FileCache().get( + content.as_posix(), region="torrents" + ) else: torrent_content = content @@ -123,9 +132,12 @@ class RtorrentModule(_ModuleBase, _DownloaderBase[Rtorrent]): # 读取种子的名称 torrent_from_file, content = __get_torrent_info() # 检查是否为磁力链接 - is_magnet = isinstance(content, str) and content.startswith("magnet:") or isinstance(content, - bytes) and content.startswith( - b"magnet:") + is_magnet = ( + isinstance(content, str) + and content.startswith("magnet:") + or isinstance(content, bytes) + and content.startswith(b"magnet:") + ) if not torrent_from_file and not is_magnet: return None, None, None, f"添加种子任务失败:无法读取种子文件" @@ -137,7 +149,7 @@ class RtorrentModule(_ModuleBase, _DownloaderBase[Rtorrent]): # 生成随机Tag tag = StringUtils.generate_random_str(10) if label: - tags = label.split(',') + [tag] + tags = label.split(",") + [tag] elif settings.TORRENT_TAG: tags = [tag, settings.TORRENT_TAG] else: @@ -165,18 +177,41 @@ class RtorrentModule(_ModuleBase, _DownloaderBase[Rtorrent]): try: for torrent in torrents: # 名称与大小相等则认为是同一个种子 - if torrent.get("name") == getattr(torrent_from_file, 'name', '') \ - and torrent.get("total_size") == getattr(torrent_from_file, 'total_size', 0): + if torrent.get("name") == getattr( + torrent_from_file, "name", "" + ) and torrent.get("total_size") == getattr( + torrent_from_file, "total_size", 0 + ): torrent_hash = torrent.get("hash") - torrent_tags = [str(t).strip() for t in torrent.get("tags", "").split(',') if t.strip()] - logger.warn(f"下载器中已存在该种子任务:{torrent_hash} - {torrent.get('name')}") + torrent_tags = [ + str(t).strip() + for t in torrent.get("tags", "").split(",") + if t.strip() + ] + logger.warn( + f"下载器中已存在该种子任务:{torrent_hash} - {torrent.get('name')}" + ) # 给种子打上标签 if "已整理" in torrent_tags: - server.remove_torrents_tag(ids=torrent_hash, tag=['已整理']) - if settings.TORRENT_TAG and settings.TORRENT_TAG not in torrent_tags: - logger.info(f"给种子 {torrent_hash} 打上标签:{settings.TORRENT_TAG}") - server.set_torrents_tag(ids=torrent_hash, tags=[settings.TORRENT_TAG]) - return downloader or self.get_default_config_name(), torrent_hash, torrent_layout, f"下载任务已存在" + server.remove_torrents_tag( + ids=torrent_hash, tag=["已整理"] + ) + if ( + settings.TORRENT_TAG + and settings.TORRENT_TAG not in torrent_tags + ): + logger.info( + f"给种子 {torrent_hash} 打上标签:{settings.TORRENT_TAG}" + ) + server.set_torrents_tag( + ids=torrent_hash, tags=[settings.TORRENT_TAG] + ) + return ( + downloader or self.get_default_config_name(), + torrent_hash, + torrent_layout, + f"下载任务已存在", + ) finally: torrents.clear() del torrents @@ -185,13 +220,23 @@ class RtorrentModule(_ModuleBase, _DownloaderBase[Rtorrent]): # 获取种子Hash torrent_hash = server.get_torrent_id_by_tag(tags=tag) if not torrent_hash: - return None, None, None, f"下载任务添加成功,但获取rTorrent任务信息失败:{content}" + return ( + None, + None, + None, + f"下载任务添加成功,但获取rTorrent任务信息失败:{content}", + ) else: if is_paused: # 种子文件 torrent_files = server.get_files(torrent_hash) if not torrent_files: - return downloader or self.get_default_config_name(), torrent_hash, torrent_layout, "获取种子文件失败,下载任务可能在暂停状态" + return ( + downloader or self.get_default_config_name(), + torrent_hash, + torrent_layout, + "获取种子文件失败,下载任务可能在暂停状态", + ) # 不需要的文件ID file_ids = [] @@ -202,8 +247,9 @@ class RtorrentModule(_ModuleBase, _DownloaderBase[Rtorrent]): file_id = torrent_file.get("id") file_name = torrent_file.get("name") meta_info = MetaInfo(file_name) - if not meta_info.episode_list \ - or not set(meta_info.episode_list).issubset(episodes): + if not meta_info.episode_list or not set( + meta_info.episode_list + ).issubset(episodes): file_ids.append(file_id) else: sucess_epidised.update(meta_info.episode_list) @@ -213,17 +259,31 @@ class RtorrentModule(_ModuleBase, _DownloaderBase[Rtorrent]): sucess_epidised = list(sucess_epidised) if sucess_epidised and file_ids: # 设置不需要的文件优先级为0(不下载) - server.set_files(torrent_hash=torrent_hash, file_ids=file_ids, priority=0) + server.set_files( + torrent_hash=torrent_hash, file_ids=file_ids, priority=0 + ) # 开始任务 server.start_torrents(torrent_hash) - return downloader or self.get_default_config_name(), torrent_hash, torrent_layout, f"添加下载成功,已选择集数:{sucess_epidised}" + return ( + downloader or self.get_default_config_name(), + torrent_hash, + torrent_layout, + f"添加下载成功,已选择集数:{sucess_epidised}", + ) else: - return downloader or self.get_default_config_name(), torrent_hash, torrent_layout, "添加下载成功" + return ( + downloader or self.get_default_config_name(), + torrent_hash, + torrent_layout, + "添加下载成功", + ) - def list_torrents(self, status: TorrentStatus = None, - hashs: Union[list, str] = None, - downloader: Optional[str] = None - ) -> Optional[List[Union[TransferTorrent, DownloadingTorrent]]]: + def list_torrents( + self, + status: TorrentStatus = None, + hashs: Union[list, str] = None, + downloader: Optional[str] = None, + ) -> Optional[List[Union[TransferTorrent, DownloadingTorrent]]]: """ 获取下载器种子列表 :param status: 种子状态 @@ -243,31 +303,41 @@ class RtorrentModule(_ModuleBase, _DownloaderBase[Rtorrent]): if hashs: # 按Hash获取 for name, server in servers.items(): - torrents, _ = server.get_torrents(ids=hashs, tags=settings.TORRENT_TAG) or [] + torrents, _ = ( + server.get_torrents(ids=hashs, tags=settings.TORRENT_TAG) or [] + ) try: for torrent in torrents: content_path = torrent.get("content_path") if content_path: torrent_path = Path(content_path) else: - torrent_path = Path(torrent.get('save_path')) / torrent.get('name') - ret_torrents.append(TransferTorrent( - downloader=name, - title=torrent.get('name'), - path=torrent_path, - hash=torrent.get('hash'), - size=torrent.get('total_size'), - tags=torrent.get('tags'), - progress=torrent.get('progress', 0), - state="paused" if torrent.get('state') == 0 else "downloading", - )) + torrent_path = Path(torrent.get("save_path")) / torrent.get( + "name" + ) + ret_torrents.append( + TransferTorrent( + downloader=name, + title=torrent.get("name"), + path=torrent_path, + hash=torrent.get("hash"), + size=torrent.get("total_size"), + tags=torrent.get("tags"), + progress=torrent.get("progress", 0), + state="paused" + if torrent.get("state") == 0 + else "downloading", + ) + ) finally: torrents.clear() del torrents elif status == TorrentStatus.TRANSFER: # 获取已完成且未整理的 for name, server in servers.items(): - torrents = server.get_completed_torrents(tags=settings.TORRENT_TAG) or [] + torrents = ( + server.get_completed_torrents(tags=settings.TORRENT_TAG) or [] + ) try: for torrent in torrents: tags = torrent.get("tags") or "" @@ -278,43 +348,56 @@ class RtorrentModule(_ModuleBase, _DownloaderBase[Rtorrent]): if content_path: torrent_path = Path(content_path) else: - torrent_path = Path(torrent.get('save_path')) / torrent.get('name') - ret_torrents.append(TransferTorrent( - downloader=name, - title=torrent.get('name'), - path=torrent_path, - hash=torrent.get('hash'), - tags=torrent.get('tags') - )) + torrent_path = Path(torrent.get("save_path")) / torrent.get( + "name" + ) + ret_torrents.append( + TransferTorrent( + downloader=name, + title=torrent.get("name"), + path=torrent_path, + hash=torrent.get("hash"), + tags=torrent.get("tags"), + ) + ) finally: torrents.clear() del torrents elif status == TorrentStatus.DOWNLOADING: # 获取正在下载的任务 for name, server in servers.items(): - torrents = server.get_downloading_torrents(tags=settings.TORRENT_TAG) or [] + torrents = ( + server.get_downloading_torrents(tags=settings.TORRENT_TAG) or [] + ) try: for torrent in torrents: - meta = MetaInfo(torrent.get('name')) - dlspeed = torrent.get('dlspeed', 0) - upspeed = torrent.get('upspeed', 0) - total_size = torrent.get('total_size', 0) - completed = torrent.get('completed', 0) - ret_torrents.append(DownloadingTorrent( - downloader=name, - hash=torrent.get('hash'), - title=torrent.get('name'), - name=meta.name, - year=meta.year, - season_episode=meta.season_episode, - progress=torrent.get('progress', 0), - size=total_size, - state="paused" if torrent.get('state') == 0 else "downloading", - dlspeed=StringUtils.str_filesize(dlspeed), - upspeed=StringUtils.str_filesize(upspeed), - left_time=StringUtils.str_secends( - (total_size - completed) / dlspeed) if dlspeed > 0 else '' - )) + meta = MetaInfo(torrent.get("name")) + dlspeed = torrent.get("dlspeed", 0) + upspeed = torrent.get("upspeed", 0) + total_size = torrent.get("total_size", 0) + completed = torrent.get("completed", 0) + ret_torrents.append( + DownloadingTorrent( + downloader=name, + hash=torrent.get("hash"), + title=torrent.get("name"), + name=meta.name, + year=meta.year, + season_episode=meta.season_episode, + progress=torrent.get("progress", 0), + size=total_size, + state="paused" + if torrent.get("state") == 0 + else "downloading", + dlspeed=StringUtils.str_filesize(dlspeed), + upspeed=StringUtils.str_filesize(upspeed), + left_time=StringUtils.str_secends( + (total_size - completed) / dlspeed + ) + if dlspeed > 0 + else "", + ) + ) finally: torrents.clear() del torrents @@ -322,7 +405,9 @@ class RtorrentModule(_ModuleBase, _DownloaderBase[Rtorrent]): return None return ret_torrents # noqa - def transfer_completed(self, hashs: Union[str, list], downloader: Optional[str] = None) -> None: + def transfer_completed( + self, hashs: Union[str, list], downloader: Optional[str] = None + ) -> None: """ 转移完成后的处理 :param hashs: 种子Hash @@ -335,15 +420,19 @@ class RtorrentModule(_ModuleBase, _DownloaderBase[Rtorrent]): org_tags = server.get_torrent_tags(ids=hashs) # 种子打上已整理标签 if org_tags: - tags = org_tags + ['已整理'] + tags = org_tags + ["已整理"] else: - tags = ['已整理'] + tags = ["已整理"] # 直接设置完整标签(覆盖) server.set_torrents_tag(ids=hashs, tags=tags, overwrite=True) return None - def remove_torrents(self, hashs: Union[str, list], delete_file: Optional[bool] = True, - downloader: Optional[str] = None) -> Optional[bool]: + def remove_torrents( + self, + hashs: Union[str, list], + delete_file: Optional[bool] = True, + downloader: Optional[str] = None, + ) -> Optional[bool]: """ 删除下载器种子 :param hashs: 种子Hash @@ -356,8 +445,9 @@ class RtorrentModule(_ModuleBase, _DownloaderBase[Rtorrent]): return None return server.delete_torrents(delete_file=delete_file, ids=hashs) - def start_torrents(self, hashs: Union[list, str], - downloader: Optional[str] = None) -> Optional[bool]: + def start_torrents( + self, hashs: Union[list, str], downloader: Optional[str] = None + ) -> Optional[bool]: """ 开始下载 :param hashs: 种子Hash @@ -369,7 +459,9 @@ class RtorrentModule(_ModuleBase, _DownloaderBase[Rtorrent]): return None return server.start_torrents(ids=hashs) - def stop_torrents(self, hashs: Union[list, str], downloader: Optional[str] = None) -> Optional[bool]: + def stop_torrents( + self, hashs: Union[list, str], downloader: Optional[str] = None + ) -> Optional[bool]: """ 停止下载 :param hashs: 种子Hash @@ -381,7 +473,9 @@ class RtorrentModule(_ModuleBase, _DownloaderBase[Rtorrent]): return None return server.stop_torrents(ids=hashs) - def torrent_files(self, tid: str, downloader: Optional[str] = None) -> Optional[List[Dict]]: + def torrent_files( + self, tid: str, downloader: Optional[str] = None + ) -> Optional[List[Dict]]: """ 获取种子文件列表 """ @@ -390,7 +484,9 @@ class RtorrentModule(_ModuleBase, _DownloaderBase[Rtorrent]): return None return server.get_files(tid=tid) - def downloader_info(self, downloader: Optional[str] = None) -> Optional[List[schemas.DownloaderInfo]]: + def downloader_info( + self, downloader: Optional[str] = None + ) -> Optional[List[schemas.DownloaderInfo]]: """ 下载器信息 """ @@ -406,10 +502,12 @@ class RtorrentModule(_ModuleBase, _DownloaderBase[Rtorrent]): info = server.transfer_info() if not info: continue - ret_info.append(schemas.DownloaderInfo( - download_speed=info.get("dl_info_speed"), - upload_speed=info.get("up_info_speed"), - download_size=info.get("dl_info_data"), - upload_size=info.get("up_info_data") - )) + ret_info.append( + schemas.DownloaderInfo( + download_speed=info.get("dl_info_speed"), + upload_speed=info.get("up_info_speed"), + download_size=info.get("dl_info_data"), + upload_size=info.get("up_info_data"), + ) + ) return ret_info diff --git a/app/modules/rtorrent/rtorrent.py b/app/modules/rtorrent/rtorrent.py index 9767e1f4..6166c484 100644 --- a/app/modules/rtorrent/rtorrent.py +++ b/app/modules/rtorrent/rtorrent.py @@ -16,7 +16,9 @@ class SCGITransport(xmlrpc.client.Transport): def single_request(self, host, handler, request_body, verbose=False): # 建立socket连接 parsed = urlparse(f"scgi://{host}") - sock = socket.create_connection((parsed.hostname, parsed.port or 5000), timeout=60) + sock = socket.create_connection( + (parsed.hostname, parsed.port or 5000), timeout=60 + ) try: # 构造SCGI请求头 headers = ( @@ -42,7 +44,7 @@ class SCGITransport(xmlrpc.client.Transport): # 跳过HTTP响应头 header_end = response.find(b"\r\n\r\n") if header_end != -1: - response = response[header_end + 4:] + response = response[header_end + 4 :] # 解析XML-RPC响应 return self.parse_response(self._build_response(response)) @@ -70,9 +72,14 @@ class Rtorrent: rTorrent下载器 """ - def __init__(self, host: Optional[str] = None, port: Optional[int] = None, - username: Optional[str] = None, password: Optional[str] = None, - **kwargs): + def __init__( + self, + host: Optional[str] = None, + port: Optional[int] = None, + username: Optional[str] = None, + password: Optional[str] = None, + **kwargs, + ): self._proxy = None if host and port: self._host = f"{host}:{port}" @@ -97,10 +104,7 @@ class Rtorrent: # SCGI直连模式 parsed = urlparse(url) logger.info(f"正在通过SCGI连接 rTorrent:{url}") - proxy = xmlrpc.client.ServerProxy( - url, - transport=SCGITransport() - ) + proxy = xmlrpc.client.ServerProxy(url, transport=SCGITransport()) else: # HTTP模式 (通过nginx/ruTorrent代理) if not url.startswith("http"): @@ -112,14 +116,18 @@ class Rtorrent: if parsed.port: url += f":{parsed.port}" url += parsed.path or "/RPC2" - logger.info(f"正在通过HTTP连接 rTorrent:{url.split('@')[-1] if '@' in url else url}") + logger.info( + f"正在通过HTTP连接 rTorrent:{url.split('@')[-1] if '@' in url else url}" + ) proxy = xmlrpc.client.ServerProxy(url) # 测试连接 proxy.system.client_version() return proxy except Exception as err: - stack_trace = "".join(traceback.format_exception(None, err, err.__traceback__))[:2000] + stack_trace = "".join( + traceback.format_exception(None, err, err.__traceback__) + )[:2000] logger.error(f"rTorrent 连接出错:{str(err)}\n{stack_trace}") return None @@ -137,9 +145,12 @@ class Rtorrent: """ self._proxy = self.__login_rtorrent() - def get_torrents(self, ids: Optional[Union[str, list]] = None, - status: Optional[str] = None, - tags: Optional[Union[str, list]] = None) -> Tuple[List[Dict], bool]: + def get_torrents( + self, + ids: Optional[Union[str, list]] = None, + status: Optional[str] = None, + tags: Optional[Union[str, list]] = None, + ) -> Tuple[List[Dict], bool]: """ 获取种子列表 :return: 种子列表, 是否发生异常 @@ -175,10 +186,10 @@ class Rtorrent: "completed": r[3], "dlspeed": r[4], "upspeed": r[5], - "state": r[6], # 0=stopped, 1=started - "complete": r[7], # 0=incomplete, 1=complete + "state": r[6], # 0=stopped, 1=started + "complete": r[7], # 0=incomplete, 1=complete "save_path": r[8], - "tags": r[9], # d.custom1 用于标签 + "tags": r[9], # d.custom1 用于标签 "is_active": r[10], "is_open": r[11], "ratio": int(r[12]) / 1000.0 if r[12] else 0, @@ -186,7 +197,9 @@ class Rtorrent: } # 计算进度 if torrent["total_size"] > 0: - torrent["progress"] = torrent["completed"] / torrent["total_size"] * 100 + torrent["progress"] = ( + torrent["completed"] / torrent["total_size"] * 100 + ) else: torrent["progress"] = 0 @@ -201,7 +214,9 @@ class Rtorrent: # 标签过滤 if tags: - torrent_tags = [t.strip() for t in torrent["tags"].split(",") if t.strip()] + torrent_tags = [ + t.strip() for t in torrent["tags"].split(",") if t.strip() + ] if isinstance(tags, str): tags_list = [t.strip() for t in tags.split(",")] else: @@ -215,8 +230,9 @@ class Rtorrent: logger.error(f"获取种子列表出错:{str(err)}") return [], True - def get_completed_torrents(self, ids: Union[str, list] = None, - tags: Union[str, list] = None) -> Optional[List[Dict]]: + def get_completed_torrents( + self, ids: Union[str, list] = None, tags: Union[str, list] = None + ) -> Optional[List[Dict]]: """ 获取已完成的种子 """ @@ -227,8 +243,9 @@ class Rtorrent: return None return [t for t in torrents if t.get("complete") == 1] - def get_downloading_torrents(self, ids: Union[str, list] = None, - tags: Union[str, list] = None) -> Optional[List[Dict]]: + def get_downloading_torrents( + self, ids: Union[str, list] = None, tags: Union[str, list] = None + ) -> Optional[List[Dict]]: """ 获取正在下载的种子 """ @@ -239,13 +256,15 @@ class Rtorrent: return None return [t for t in torrents if t.get("complete") == 0] - def add_torrent(self, - content: Union[str, bytes], - is_paused: Optional[bool] = False, - download_dir: Optional[str] = None, - tags: Optional[List[str]] = None, - cookie: Optional[str] = None, - **kwargs) -> bool: + def add_torrent( + self, + content: Union[str, bytes], + is_paused: Optional[bool] = False, + download_dir: Optional[str] = None, + tags: Optional[List[str]] = None, + cookie: Optional[str] = None, + **kwargs, + ) -> bool: """ 添加种子 :param content: 种子内容(bytes)或磁力链接/URL(str) @@ -340,6 +359,7 @@ class Rtorrent: self._proxy.d.erase(tid) if base_path: import shutil + path = Path(base_path) if path.is_dir(): shutil.rmtree(str(path), ignore_errors=True) @@ -364,27 +384,34 @@ class Rtorrent: if not tid: return None try: - files = self._proxy.f.multicall(tid, "", - "f.path=", - "f.size_bytes=", - "f.priority=", - "f.completed_chunks=", - "f.size_chunks=") + files = self._proxy.f.multicall( + tid, + "", + "f.path=", + "f.size_bytes=", + "f.priority=", + "f.completed_chunks=", + "f.size_chunks=", + ) result = [] for idx, f in enumerate(files): - result.append({ - "id": idx, - "name": f[0], - "size": f[1], - "priority": f[2], - "progress": int(f[3]) / int(f[4]) * 100 if int(f[4]) > 0 else 0 - }) + result.append( + { + "id": idx, + "name": f[0], + "size": f[1], + "priority": f[2], + "progress": int(f[3]) / int(f[4]) * 100 if int(f[4]) > 0 else 0, + } + ) return result except Exception as err: logger.error(f"获取种子文件列表出错:{str(err)}") return None - def set_files(self, torrent_hash: str = None, file_ids: list = None, priority: int = 0) -> bool: + def set_files( + self, torrent_hash: str = None, file_ids: list = None, priority: int = 0 + ) -> bool: """ 设置下载文件的优先级,priority为0为不下载,priority为1为普通 """ @@ -402,7 +429,9 @@ class Rtorrent: logger.error(f"设置种子文件状态出错:{str(err)}") return False - def set_torrents_tag(self, ids: Union[str, list], tags: List[str], overwrite: bool = False) -> bool: + def set_torrents_tag( + self, ids: Union[str, list], tags: List[str], overwrite: bool = False + ) -> bool: """ 设置种子标签(使用d.custom1) :param ids: 种子Hash @@ -423,7 +452,11 @@ class Rtorrent: else: # 获取现有标签 existing = self._proxy.d.custom1(tid) - existing_tags = [t.strip() for t in existing.split(",") if t.strip()] if existing else [] + existing_tags = ( + [t.strip() for t in existing.split(",") if t.strip()] + if existing + else [] + ) # 合并标签 merged = list(set(existing_tags + tags)) self._proxy.d.custom1.set(tid, ",".join(merged)) @@ -447,7 +480,11 @@ class Rtorrent: tag = [tag] for tid in ids: existing = self._proxy.d.custom1(tid) - existing_tags = [t.strip() for t in existing.split(",") if t.strip()] if existing else [] + existing_tags = ( + [t.strip() for t in existing.split(",") if t.strip()] + if existing + else [] + ) new_tags = [t for t in existing_tags if t not in tag] self._proxy.d.custom1.set(tid, ",".join(new_tags)) return True @@ -463,17 +500,23 @@ class Rtorrent: return [] try: existing = self._proxy.d.custom1(ids) - return [t.strip() for t in existing.split(",") if t.strip()] if existing else [] + return ( + [t.strip() for t in existing.split(",") if t.strip()] + if existing + else [] + ) except Exception as err: logger.error(f"获取种子标签出错:{str(err)}") return [] - def get_torrent_id_by_tag(self, tags: Union[str, list], - status: Optional[str] = None) -> Optional[str]: + def get_torrent_id_by_tag( + self, tags: Union[str, list], status: Optional[str] = None + ) -> Optional[str]: """ 通过标签多次尝试获取刚添加的种子ID,并移除标签 """ import time + if isinstance(tags, str): tags = [tags] torrent_id = None