diff --git a/.gitignore b/.gitignore index 21197911..bc4842fc 100644 --- a/.gitignore +++ b/.gitignore @@ -27,4 +27,7 @@ venv # Pylint pylint-report.json -.pylint.d/ \ No newline at end of file +.pylint.d/ + +# AI +.claude/ diff --git a/app/helper/downloader.py b/app/helper/downloader.py index e0e449ba..425d1db4 100644 --- a/app/helper/downloader.py +++ b/app/helper/downloader.py @@ -25,7 +25,7 @@ class DownloaderHelper(ServiceBaseHelper[DownloaderConf]): ) -> bool: """ 通用的下载器类型判断方法 - :param service_type: 下载器的类型名称(如 'qbittorrent', 'transmission') + :param service_type: 下载器的类型名称(如 'qbittorrent', 'transmission', 'rtorrent') :param service: 要判断的服务信息 :param name: 服务的名称 :return: 如果服务类型或实例为指定类型,返回 True;否则返回 False diff --git a/app/modules/rtorrent/__init__.py b/app/modules/rtorrent/__init__.py new file mode 100644 index 00000000..e55f76f4 --- /dev/null +++ b/app/modules/rtorrent/__init__.py @@ -0,0 +1,513 @@ +from pathlib import Path +from typing import Set, Tuple, Optional, Union, List, Dict + +from torrentool.torrent import Torrent + +from app import schemas +from app.core.cache import FileCache +from app.core.config import settings +from app.core.metainfo import MetaInfo +from app.log import logger +from app.modules import _ModuleBase, _DownloaderBase +from app.modules.rtorrent.rtorrent import Rtorrent +from app.schemas import TransferTorrent, DownloadingTorrent +from app.schemas.types import TorrentStatus, ModuleType, DownloaderType +from app.utils.string import StringUtils + + +class RtorrentModule(_ModuleBase, _DownloaderBase[Rtorrent]): + def init_module(self) -> None: + """ + 初始化模块 + """ + super().init_service( + service_name=Rtorrent.__name__.lower(), service_type=Rtorrent + ) + + @staticmethod + def get_name() -> str: + return "Rtorrent" + + @staticmethod + def get_type() -> ModuleType: + """ + 获取模块类型 + """ + return ModuleType.Downloader + + @staticmethod + def get_subtype() -> DownloaderType: + """ + 获取模块子类型 + """ + return DownloaderType.Rtorrent + + @staticmethod + def get_priority() -> int: + """ + 获取模块优先级,数字越小优先级越高,只有同一接口下优先级才生效 + """ + return 3 + + def stop(self): + pass + + def test(self) -> Optional[Tuple[bool, str]]: + """ + 测试模块连接性 + """ + if not self.get_instances(): + return None + for name, server in self.get_instances().items(): + if server.is_inactive(): + server.reconnect() + if not server.transfer_info(): + return False, f"无法连接rTorrent下载器:{name}" + return True, "" + + def init_setting(self) -> Tuple[str, Union[str, bool]]: + pass + + def scheduler_job(self) -> None: + """ + 定时任务,每10分钟调用一次 + """ + for name, server in self.get_instances().items(): + if server.is_inactive(): + logger.info(f"rTorrent下载器 {name} 连接断开,尝试重连 ...") + server.reconnect() + + def download( + self, + content: Union[Path, str, bytes], + download_dir: Path, + cookie: str, + episodes: Set[int] = None, + category: Optional[str] = None, + label: Optional[str] = None, + downloader: Optional[str] = None, + ) -> Optional[Tuple[Optional[str], Optional[str], Optional[str], str]]: + """ + 根据种子文件,选择并添加下载任务 + :param content: 种子文件地址或者磁力链接或种子内容 + :param download_dir: 下载目录 + :param cookie: cookie + :param episodes: 需要下载的集数 + :param category: 分类,rTorrent中未使用 + :param label: 标签 + :param downloader: 下载器 + :return: 下载器名称、种子Hash、种子文件布局、错误原因 + """ + + def __get_torrent_info() -> Tuple[Optional[Torrent], Optional[bytes]]: + """ + 获取种子名称 + """ + torrent_info, torrent_content = None, None + try: + if isinstance(content, Path): + if content.exists(): + torrent_content = content.read_bytes() + else: + torrent_content = FileCache().get( + content.as_posix(), region="torrents" + ) + else: + torrent_content = content + + if torrent_content: + if StringUtils.is_magnet_link(torrent_content): + return None, torrent_content + else: + torrent_info = Torrent.from_string(torrent_content) + + return torrent_info, torrent_content + except Exception as e: + logger.error(f"获取种子名称失败:{e}") + return None, None + + if not content: + return None, None, None, "下载内容为空" + + # 读取种子的名称 + torrent_from_file, content = __get_torrent_info() + # 检查是否为磁力链接 + is_magnet = ( + isinstance(content, str) + and content.startswith("magnet:") + or isinstance(content, bytes) + and content.startswith(b"magnet:") + ) + if not torrent_from_file and not is_magnet: + return None, None, None, f"添加种子任务失败:无法读取种子文件" + + # 获取下载器 + server: Rtorrent = self.get_instance(downloader) + if not server: + return None + + # 生成随机Tag + tag = StringUtils.generate_random_str(10) + if label: + tags = label.split(",") + [tag] + elif settings.TORRENT_TAG: + tags = [tag, settings.TORRENT_TAG] + else: + tags = [tag] + # 如果要选择文件则先暂停 + is_paused = True if episodes else False + # 添加任务 + state = server.add_torrent( + content=content, + download_dir=self.normalize_path(download_dir, downloader), + is_paused=is_paused, + tags=tags, + cookie=cookie, + ) + + # rTorrent 始终使用原始种子布局 + torrent_layout = "Original" + + if not state: + # 查询所有下载器的种子 + torrents, error = server.get_torrents() + if error: + return None, None, None, "无法连接rTorrent下载器" + if torrents: + try: + for torrent in torrents: + # 名称与大小相等则认为是同一个种子 + if torrent.get("name") == getattr( + torrent_from_file, "name", "" + ) and torrent.get("total_size") == getattr( + torrent_from_file, "total_size", 0 + ): + torrent_hash = torrent.get("hash") + torrent_tags = [ + str(t).strip() + for t in torrent.get("tags", "").split(",") + if t.strip() + ] + logger.warn( + f"下载器中已存在该种子任务:{torrent_hash} - {torrent.get('name')}" + ) + # 给种子打上标签 + if "已整理" in torrent_tags: + server.remove_torrents_tag( + ids=torrent_hash, tag=["已整理"] + ) + if ( + settings.TORRENT_TAG + and settings.TORRENT_TAG not in torrent_tags + ): + logger.info( + f"给种子 {torrent_hash} 打上标签:{settings.TORRENT_TAG}" + ) + server.set_torrents_tag( + ids=torrent_hash, tags=[settings.TORRENT_TAG] + ) + return ( + downloader or self.get_default_config_name(), + torrent_hash, + torrent_layout, + f"下载任务已存在", + ) + finally: + torrents.clear() + del torrents + return None, None, None, f"添加种子任务失败:{content}" + else: + # 获取种子Hash + torrent_hash = server.get_torrent_id_by_tag(tags=tag) + if not torrent_hash: + return ( + None, + None, + None, + f"下载任务添加成功,但获取rTorrent任务信息失败:{content}", + ) + else: + if is_paused: + # 种子文件 + torrent_files = server.get_files(torrent_hash) + if not torrent_files: + return ( + downloader or self.get_default_config_name(), + torrent_hash, + torrent_layout, + "获取种子文件失败,下载任务可能在暂停状态", + ) + + # 不需要的文件ID + file_ids = [] + # 需要的集清单 + sucess_epidised = set() + try: + for torrent_file in torrent_files: + file_id = torrent_file.get("id") + file_name = torrent_file.get("name") + meta_info = MetaInfo(file_name) + if not meta_info.episode_list or not set( + meta_info.episode_list + ).issubset(episodes): + file_ids.append(file_id) + else: + sucess_epidised.update(meta_info.episode_list) + finally: + torrent_files.clear() + del torrent_files + sucess_epidised = list(sucess_epidised) + if sucess_epidised and file_ids: + # 设置不需要的文件优先级为0(不下载) + server.set_files( + torrent_hash=torrent_hash, file_ids=file_ids, priority=0 + ) + # 开始任务 + server.start_torrents(torrent_hash) + return ( + downloader or self.get_default_config_name(), + torrent_hash, + torrent_layout, + f"添加下载成功,已选择集数:{sucess_epidised}", + ) + else: + return ( + downloader or self.get_default_config_name(), + torrent_hash, + torrent_layout, + "添加下载成功", + ) + + def list_torrents( + self, + status: TorrentStatus = None, + hashs: Union[list, str] = None, + downloader: Optional[str] = None, + ) -> Optional[List[Union[TransferTorrent, DownloadingTorrent]]]: + """ + 获取下载器种子列表 + :param status: 种子状态 + :param hashs: 种子Hash + :param downloader: 下载器 + :return: 下载器中符合状态的种子列表 + """ + # 获取下载器 + if downloader: + server: Rtorrent = self.get_instance(downloader) + if not server: + return None + servers = {downloader: server} + else: + servers: Dict[str, Rtorrent] = self.get_instances() + ret_torrents = [] + if hashs: + # 按Hash获取 + for name, server in servers.items(): + torrents, _ = ( + server.get_torrents(ids=hashs, tags=settings.TORRENT_TAG) or [] + ) + try: + for torrent in torrents: + content_path = torrent.get("content_path") + if content_path: + torrent_path = Path(content_path) + else: + torrent_path = Path(torrent.get("save_path")) / torrent.get( + "name" + ) + ret_torrents.append( + TransferTorrent( + downloader=name, + title=torrent.get("name"), + path=torrent_path, + hash=torrent.get("hash"), + size=torrent.get("total_size"), + tags=torrent.get("tags"), + progress=torrent.get("progress", 0), + state="paused" + if torrent.get("state") == 0 + else "downloading", + ) + ) + finally: + torrents.clear() + del torrents + elif status == TorrentStatus.TRANSFER: + # 获取已完成且未整理的 + for name, server in servers.items(): + torrents = ( + server.get_completed_torrents(tags=settings.TORRENT_TAG) or [] + ) + try: + for torrent in torrents: + tags = torrent.get("tags") or "" + tag_list = [t.strip() for t in tags.split(",") if t.strip()] + if "已整理" in tag_list: + continue + content_path = torrent.get("content_path") + if content_path: + torrent_path = Path(content_path) + else: + torrent_path = Path(torrent.get("save_path")) / torrent.get( + "name" + ) + ret_torrents.append( + TransferTorrent( + downloader=name, + title=torrent.get("name"), + path=torrent_path, + hash=torrent.get("hash"), + tags=torrent.get("tags"), + ) + ) + finally: + torrents.clear() + del torrents + elif status == TorrentStatus.DOWNLOADING: + # 获取正在下载的任务 + for name, server in servers.items(): + torrents = ( + server.get_downloading_torrents(tags=settings.TORRENT_TAG) or [] + ) + try: + for torrent in torrents: + meta = MetaInfo(torrent.get("name")) + dlspeed = torrent.get("dlspeed", 0) + upspeed = torrent.get("upspeed", 0) + total_size = torrent.get("total_size", 0) + completed = torrent.get("completed", 0) + ret_torrents.append( + DownloadingTorrent( + downloader=name, + hash=torrent.get("hash"), + title=torrent.get("name"), + name=meta.name, + year=meta.year, + season_episode=meta.season_episode, + progress=torrent.get("progress", 0), + size=total_size, + state="paused" + if torrent.get("state") == 0 + else "downloading", + dlspeed=StringUtils.str_filesize(dlspeed), + upspeed=StringUtils.str_filesize(upspeed), + left_time=StringUtils.str_secends( + (total_size - completed) / dlspeed + ) + if dlspeed > 0 + else "", + ) + ) + finally: + torrents.clear() + del torrents + else: + return None + return ret_torrents # noqa + + def transfer_completed( + self, hashs: Union[str, list], downloader: Optional[str] = None + ) -> None: + """ + 转移完成后的处理 + :param hashs: 种子Hash + :param downloader: 下载器 + """ + server: Rtorrent = self.get_instance(downloader) + if not server: + return None + # 获取原标签 + org_tags = server.get_torrent_tags(ids=hashs) + # 种子打上已整理标签 + if org_tags: + tags = org_tags + ["已整理"] + else: + tags = ["已整理"] + # 直接设置完整标签(覆盖) + server.set_torrents_tag(ids=hashs, tags=tags, overwrite=True) + return None + + def remove_torrents( + self, + hashs: Union[str, list], + delete_file: Optional[bool] = True, + downloader: Optional[str] = None, + ) -> Optional[bool]: + """ + 删除下载器种子 + :param hashs: 种子Hash + :param delete_file: 是否删除文件 + :param downloader: 下载器 + :return: bool + """ + server: Rtorrent = self.get_instance(downloader) + if not server: + return None + return server.delete_torrents(delete_file=delete_file, ids=hashs) + + def start_torrents( + self, hashs: Union[list, str], downloader: Optional[str] = None + ) -> Optional[bool]: + """ + 开始下载 + :param hashs: 种子Hash + :param downloader: 下载器 + :return: bool + """ + server: Rtorrent = self.get_instance(downloader) + if not server: + return None + return server.start_torrents(ids=hashs) + + def stop_torrents( + self, hashs: Union[list, str], downloader: Optional[str] = None + ) -> Optional[bool]: + """ + 停止下载 + :param hashs: 种子Hash + :param downloader: 下载器 + :return: bool + """ + server: Rtorrent = self.get_instance(downloader) + if not server: + return None + return server.stop_torrents(ids=hashs) + + def torrent_files( + self, tid: str, downloader: Optional[str] = None + ) -> Optional[List[Dict]]: + """ + 获取种子文件列表 + """ + server: Rtorrent = self.get_instance(downloader) + if not server: + return None + return server.get_files(tid=tid) + + def downloader_info( + self, downloader: Optional[str] = None + ) -> Optional[List[schemas.DownloaderInfo]]: + """ + 下载器信息 + """ + if downloader: + server: Rtorrent = self.get_instance(downloader) + if not server: + return None + servers = [server] + else: + servers = self.get_instances().values() + ret_info = [] + for server in servers: + info = server.transfer_info() + if not info: + continue + ret_info.append( + schemas.DownloaderInfo( + download_speed=info.get("dl_info_speed"), + upload_speed=info.get("up_info_speed"), + download_size=info.get("dl_info_data"), + upload_size=info.get("up_info_data"), + ) + ) + return ret_info diff --git a/app/modules/rtorrent/rtorrent.py b/app/modules/rtorrent/rtorrent.py new file mode 100644 index 00000000..6166c484 --- /dev/null +++ b/app/modules/rtorrent/rtorrent.py @@ -0,0 +1,549 @@ +import socket +import traceback +import xmlrpc.client +from pathlib import Path +from typing import Optional, Union, Tuple, List, Dict +from urllib.parse import urlparse + +from app.log import logger + + +class SCGITransport(xmlrpc.client.Transport): + """ + 通过SCGI协议与rTorrent通信的Transport + """ + + def single_request(self, host, handler, request_body, verbose=False): + # 建立socket连接 + parsed = urlparse(f"scgi://{host}") + sock = socket.create_connection( + (parsed.hostname, parsed.port or 5000), timeout=60 + ) + try: + # 构造SCGI请求头 + headers = ( + f"CONTENT_LENGTH\x00{len(request_body)}\x00" + f"SCGI\x001\x00" + f"REQUEST_METHOD\x00POST\x00" + f"REQUEST_URI\x00/RPC2\x00" + ) + # netstring格式: "len:headers," + netstring = f"{len(headers)}:{headers},".encode() + # 发送请求 + sock.sendall(netstring + request_body) + # 读取响应 + response = b"" + while True: + chunk = sock.recv(4096) + if not chunk: + break + response += chunk + finally: + sock.close() + + # 跳过HTTP响应头 + header_end = response.find(b"\r\n\r\n") + if header_end != -1: + response = response[header_end + 4 :] + + # 解析XML-RPC响应 + return self.parse_response(self._build_response(response)) + + @staticmethod + def _build_response(data: bytes): + """ + 构造类文件对象用于parse_response + """ + import io + import http.client + + class _FakeSocket(io.BytesIO): + def makefile(self, *args, **kwargs): + return self + + raw = b"HTTP/1.0 200 OK\r\nContent-Type: text/xml\r\n\r\n" + data + response = http.client.HTTPResponse(_FakeSocket(raw)) # noqa + response.begin() + return response + + +class Rtorrent: + """ + rTorrent下载器 + """ + + def __init__( + self, + host: Optional[str] = None, + port: Optional[int] = None, + username: Optional[str] = None, + password: Optional[str] = None, + **kwargs, + ): + self._proxy = None + if host and port: + self._host = f"{host}:{port}" + elif host: + self._host = host + else: + logger.error("rTorrent配置不完整!") + return + self._username = username + self._password = password + self._proxy = self.__login_rtorrent() + + def __login_rtorrent(self) -> Optional[xmlrpc.client.ServerProxy]: + """ + 连接rTorrent + """ + if not self._host: + return None + try: + url = self._host + if url.startswith("scgi://"): + # SCGI直连模式 + parsed = urlparse(url) + logger.info(f"正在通过SCGI连接 rTorrent:{url}") + proxy = xmlrpc.client.ServerProxy(url, transport=SCGITransport()) + else: + # HTTP模式 (通过nginx/ruTorrent代理) + if not url.startswith("http"): + url = f"http://{url}" + # 注入认证信息到URL + if self._username and self._password: + parsed = urlparse(url) + url = f"{parsed.scheme}://{self._username}:{self._password}@{parsed.hostname}" + if parsed.port: + url += f":{parsed.port}" + url += parsed.path or "/RPC2" + logger.info( + f"正在通过HTTP连接 rTorrent:{url.split('@')[-1] if '@' in url else url}" + ) + proxy = xmlrpc.client.ServerProxy(url) + + # 测试连接 + proxy.system.client_version() + return proxy + except Exception as err: + stack_trace = "".join( + traceback.format_exception(None, err, err.__traceback__) + )[:2000] + logger.error(f"rTorrent 连接出错:{str(err)}\n{stack_trace}") + return None + + def is_inactive(self) -> bool: + """ + 判断是否需要重连 + """ + if not self._host: + return False + return True if not self._proxy else False + + def reconnect(self): + """ + 重连 + """ + self._proxy = self.__login_rtorrent() + + def get_torrents( + self, + ids: Optional[Union[str, list]] = None, + status: Optional[str] = None, + tags: Optional[Union[str, list]] = None, + ) -> Tuple[List[Dict], bool]: + """ + 获取种子列表 + :return: 种子列表, 是否发生异常 + """ + if not self._proxy: + return [], True + try: + # 使用d.multicall2获取种子列表 + fields = [ + "d.hash=", + "d.name=", + "d.size_bytes=", + "d.completed_bytes=", + "d.down.rate=", + "d.up.rate=", + "d.state=", + "d.complete=", + "d.directory=", + "d.custom1=", + "d.is_active=", + "d.is_open=", + "d.ratio=", + "d.base_path=", + ] + # 获取所有种子 + results = self._proxy.d.multicall2("", "main", *fields) + torrents = [] + for r in results: + torrent = { + "hash": r[0], + "name": r[1], + "total_size": r[2], + "completed": r[3], + "dlspeed": r[4], + "upspeed": r[5], + "state": r[6], # 0=stopped, 1=started + "complete": r[7], # 0=incomplete, 1=complete + "save_path": r[8], + "tags": r[9], # d.custom1 用于标签 + "is_active": r[10], + "is_open": r[11], + "ratio": int(r[12]) / 1000.0 if r[12] else 0, + "content_path": r[13], # base_path 即完整内容路径 + } + # 计算进度 + if torrent["total_size"] > 0: + torrent["progress"] = ( + torrent["completed"] / torrent["total_size"] * 100 + ) + else: + torrent["progress"] = 0 + + # ID过滤 + if ids: + if isinstance(ids, str): + ids_list = [ids.upper()] + else: + ids_list = [i.upper() for i in ids] + if torrent["hash"].upper() not in ids_list: + continue + + # 标签过滤 + if tags: + torrent_tags = [ + t.strip() for t in torrent["tags"].split(",") if t.strip() + ] + if isinstance(tags, str): + tags_list = [t.strip() for t in tags.split(",")] + else: + tags_list = tags + if not set(tags_list).issubset(set(torrent_tags)): + continue + + torrents.append(torrent) + return torrents, False + except Exception as err: + logger.error(f"获取种子列表出错:{str(err)}") + return [], True + + def get_completed_torrents( + self, ids: Union[str, list] = None, tags: Union[str, list] = None + ) -> Optional[List[Dict]]: + """ + 获取已完成的种子 + """ + if not self._proxy: + return None + torrents, error = self.get_torrents(ids=ids, tags=tags) + if error: + return None + return [t for t in torrents if t.get("complete") == 1] + + def get_downloading_torrents( + self, ids: Union[str, list] = None, tags: Union[str, list] = None + ) -> Optional[List[Dict]]: + """ + 获取正在下载的种子 + """ + if not self._proxy: + return None + torrents, error = self.get_torrents(ids=ids, tags=tags) + if error: + return None + return [t for t in torrents if t.get("complete") == 0] + + def add_torrent( + self, + content: Union[str, bytes], + is_paused: Optional[bool] = False, + download_dir: Optional[str] = None, + tags: Optional[List[str]] = None, + cookie: Optional[str] = None, + **kwargs, + ) -> bool: + """ + 添加种子 + :param content: 种子内容(bytes)或磁力链接/URL(str) + :param is_paused: 添加后暂停 + :param download_dir: 下载路径 + :param tags: 标签列表 + :param cookie: Cookie + :return: bool + """ + if not self._proxy or not content: + return False + try: + # 构造命令参数 + commands = [] + if download_dir: + commands.append(f'd.directory.set="{download_dir}"') + if tags: + tag_str = ",".join(tags) + commands.append(f'd.custom1.set="{tag_str}"') + + if isinstance(content, bytes): + # 检查是否为磁力链接(bytes形式) + if content.startswith(b"magnet:"): + content = content.decode("utf-8", errors="ignore") + else: + # 种子文件内容,使用load.raw + raw = xmlrpc.client.Binary(content) + if is_paused: + self._proxy.load.raw("", raw, *commands) + else: + self._proxy.load.raw_start("", raw, *commands) + return True + + # URL或磁力链接 + if is_paused: + self._proxy.load.normal("", content, *commands) + else: + self._proxy.load.start("", content, *commands) + return True + except Exception as err: + logger.error(f"添加种子出错:{str(err)}") + return False + + def start_torrents(self, ids: Union[str, list]) -> bool: + """ + 启动种子 + """ + if not self._proxy: + return False + try: + if isinstance(ids, str): + ids = [ids] + for tid in ids: + self._proxy.d.start(tid) + return True + except Exception as err: + logger.error(f"启动种子出错:{str(err)}") + return False + + def stop_torrents(self, ids: Union[str, list]) -> bool: + """ + 停止种子 + """ + if not self._proxy: + return False + try: + if isinstance(ids, str): + ids = [ids] + for tid in ids: + self._proxy.d.stop(tid) + return True + except Exception as err: + logger.error(f"停止种子出错:{str(err)}") + return False + + def delete_torrents(self, delete_file: bool, ids: Union[str, list]) -> bool: + """ + 删除种子 + """ + if not self._proxy: + return False + if not ids: + return False + try: + if isinstance(ids, str): + ids = [ids] + for tid in ids: + if delete_file: + # 先获取base_path用于删除文件 + try: + base_path = self._proxy.d.base_path(tid) + self._proxy.d.erase(tid) + if base_path: + import shutil + + path = Path(base_path) + if path.is_dir(): + shutil.rmtree(str(path), ignore_errors=True) + elif path.is_file(): + path.unlink(missing_ok=True) + except Exception as e: + logger.warning(f"删除种子文件出错:{str(e)}") + self._proxy.d.erase(tid) + else: + self._proxy.d.erase(tid) + return True + except Exception as err: + logger.error(f"删除种子出错:{str(err)}") + return False + + def get_files(self, tid: str) -> Optional[List[Dict]]: + """ + 获取种子文件列表 + """ + if not self._proxy: + return None + if not tid: + return None + try: + files = self._proxy.f.multicall( + tid, + "", + "f.path=", + "f.size_bytes=", + "f.priority=", + "f.completed_chunks=", + "f.size_chunks=", + ) + result = [] + for idx, f in enumerate(files): + result.append( + { + "id": idx, + "name": f[0], + "size": f[1], + "priority": f[2], + "progress": int(f[3]) / int(f[4]) * 100 if int(f[4]) > 0 else 0, + } + ) + return result + except Exception as err: + logger.error(f"获取种子文件列表出错:{str(err)}") + return None + + def set_files( + self, torrent_hash: str = None, file_ids: list = None, priority: int = 0 + ) -> bool: + """ + 设置下载文件的优先级,priority为0为不下载,priority为1为普通 + """ + if not self._proxy: + return False + if not torrent_hash or not file_ids: + return False + try: + for file_id in file_ids: + self._proxy.f.priority.set(f"{torrent_hash}:f{file_id}", priority) + # 更新种子优先级 + self._proxy.d.update_priorities(torrent_hash) + return True + except Exception as err: + logger.error(f"设置种子文件状态出错:{str(err)}") + return False + + def set_torrents_tag( + self, ids: Union[str, list], tags: List[str], overwrite: bool = False + ) -> bool: + """ + 设置种子标签(使用d.custom1) + :param ids: 种子Hash + :param tags: 标签列表 + :param overwrite: 是否覆盖现有标签,默认为合并 + """ + if not self._proxy: + return False + if not ids: + return False + try: + if isinstance(ids, str): + ids = [ids] + for tid in ids: + if overwrite: + # 直接覆盖标签 + self._proxy.d.custom1.set(tid, ",".join(tags)) + else: + # 获取现有标签 + existing = self._proxy.d.custom1(tid) + existing_tags = ( + [t.strip() for t in existing.split(",") if t.strip()] + if existing + else [] + ) + # 合并标签 + merged = list(set(existing_tags + tags)) + self._proxy.d.custom1.set(tid, ",".join(merged)) + return True + except Exception as err: + logger.error(f"设置种子Tag出错:{str(err)}") + return False + + def remove_torrents_tag(self, ids: Union[str, list], tag: Union[str, list]) -> bool: + """ + 移除种子标签 + """ + if not self._proxy: + return False + if not ids: + return False + try: + if isinstance(ids, str): + ids = [ids] + if isinstance(tag, str): + tag = [tag] + for tid in ids: + existing = self._proxy.d.custom1(tid) + existing_tags = ( + [t.strip() for t in existing.split(",") if t.strip()] + if existing + else [] + ) + new_tags = [t for t in existing_tags if t not in tag] + self._proxy.d.custom1.set(tid, ",".join(new_tags)) + return True + except Exception as err: + logger.error(f"移除种子Tag出错:{str(err)}") + return False + + def get_torrent_tags(self, ids: str) -> List[str]: + """ + 获取种子标签 + """ + if not self._proxy: + return [] + try: + existing = self._proxy.d.custom1(ids) + return ( + [t.strip() for t in existing.split(",") if t.strip()] + if existing + else [] + ) + except Exception as err: + logger.error(f"获取种子标签出错:{str(err)}") + return [] + + def get_torrent_id_by_tag( + self, tags: Union[str, list], status: Optional[str] = None + ) -> Optional[str]: + """ + 通过标签多次尝试获取刚添加的种子ID,并移除标签 + """ + import time + + if isinstance(tags, str): + tags = [tags] + torrent_id = None + for i in range(1, 10): + time.sleep(3) + torrents, error = self.get_torrents(tags=tags) + if not error and torrents: + torrent_id = torrents[0].get("hash") + # 移除查找标签 + for tag in tags: + self.remove_torrents_tag(ids=torrent_id, tag=[tag]) + break + return torrent_id + + def transfer_info(self) -> Optional[Dict]: + """ + 获取传输信息 + """ + if not self._proxy: + return None + try: + return { + "dl_info_speed": self._proxy.throttle.global_down.rate(), + "up_info_speed": self._proxy.throttle.global_up.rate(), + "dl_info_data": self._proxy.throttle.global_down.total(), + "up_info_data": self._proxy.throttle.global_up.total(), + } + except Exception as err: + logger.error(f"获取传输信息出错:{str(err)}") + return None diff --git a/app/schemas/system.py b/app/schemas/system.py index 2b8e9222..6097d441 100644 --- a/app/schemas/system.py +++ b/app/schemas/system.py @@ -9,6 +9,7 @@ class ServiceInfo: """ 封装服务相关信息的数据类 """ + # 名称 name: Optional[str] = None # 实例 @@ -25,6 +26,7 @@ class MediaServerConf(BaseModel): """ 媒体服务器配置 """ + # 名称 name: Optional[str] = None # 类型 emby/jellyfin/plex @@ -41,9 +43,10 @@ class DownloaderConf(BaseModel): """ 下载器配置 """ + # 名称 name: Optional[str] = None - # 类型 qbittorrent/transmission + # 类型 qbittorrent/transmission/rtorrent type: Optional[str] = None # 是否默认 default: Optional[bool] = False @@ -59,6 +62,7 @@ class NotificationConf(BaseModel): """ 通知配置 """ + # 名称 name: Optional[str] = None # 类型 telegram/wechat/vocechat/synologychat/slack/webpush @@ -75,16 +79,18 @@ class NotificationSwitchConf(BaseModel): """ 通知场景开关配置 """ + # 场景名称 type: str = None # 通知范围 all/user/admin - action: Optional[str] = 'all' + action: Optional[str] = "all" class StorageConf(BaseModel): """ 存储配置 """ + # 类型 local/alipan/u115/rclone/alist type: Optional[str] = None # 名称 @@ -97,6 +103,7 @@ class TransferDirectoryConf(BaseModel): """ 文件整理目录配置 """ + # 名称 name: Optional[str] = None # 优先级 @@ -116,7 +123,7 @@ class TransferDirectoryConf(BaseModel): # 监控方式 downloader/monitor,None为不监控 monitor_type: Optional[str] = None # 监控模式 fast / compatibility - monitor_mode: Optional[str] = 'fast' + monitor_mode: Optional[str] = "fast" # 整理方式 move/copy/link/softlink transfer_type: Optional[str] = None # 文件覆盖模式 always/size/never/latest diff --git a/app/schemas/types.py b/app/schemas/types.py index eb19100d..c9073b0f 100644 --- a/app/schemas/types.py +++ b/app/schemas/types.py @@ -293,6 +293,8 @@ class DownloaderType(Enum): Qbittorrent = "Qbittorrent" # Transmission Transmission = "Transmission" + # Rtorrent + Rtorrent = "Rtorrent" # Aria2 # Aria2 = "Aria2"