From 5cd48d5447bc4e19e5416ad6eae7b6725c6df783 Mon Sep 17 00:00:00 2001 From: jxxghp Date: Sun, 24 Sep 2023 12:41:59 +0800 Subject: [PATCH] =?UTF-8?q?fix=20=E4=BC=98=E5=8C=96=E5=AE=9A=E6=97=B6?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E8=B0=83=E5=BA=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/api/endpoints/dashboard.py | 34 +----- app/api/endpoints/site.py | 15 +-- app/api/endpoints/subscribe.py | 32 +++--- app/api/endpoints/system.py | 16 ++- app/chain/cookiecloud.py | 18 +--- app/chain/mediaserver.py | 11 -- app/chain/subscribe.py | 39 ------- app/command.py | 118 +++++++++++++------- app/scheduler.py | 192 +++++++++++++++++++++++++++------ 9 files changed, 268 insertions(+), 207 deletions(-) diff --git a/app/api/endpoints/dashboard.py b/app/api/endpoints/dashboard.py index 44832127..0d7894ac 100644 --- a/app/api/endpoints/dashboard.py +++ b/app/api/endpoints/dashboard.py @@ -11,9 +11,7 @@ from app.core.security import verify_token from app.db import get_db from app.db.models.transferhistory import TransferHistory from app.scheduler import Scheduler -from app.utils.string import StringUtils from app.utils.system import SystemUtils -from app.utils.timer import TimerUtils router = APIRouter() @@ -83,37 +81,7 @@ def schedule(_: schemas.TokenPayload = Depends(verify_token)) -> Any: """ 查询后台服务信息 """ - # 返回计时任务 - schedulers = [] - # 去重 - added = [] - jobs = Scheduler().list() - # 按照下次运行时间排序 - jobs.sort(key=lambda x: x.next_run_time) - for job in jobs: - if job.name not in added: - added.append(job.name) - else: - continue - if not StringUtils.is_chinese(job.name): - continue - if not job.next_run_time: - status = "已停止" - next_run = "" - else: - next_run = TimerUtils.time_difference(job.next_run_time) - if not next_run: - status = "正在运行" - else: - status = "阻塞" if job.pending else "等待" - schedulers.append(schemas.ScheduleInfo( - id=job.id, - name=job.name, - status=status, - next_run=next_run - )) - - return schedulers + return Scheduler().list() @router.get("/transfer", summary="文件整理统计", response_model=List[int]) diff --git a/app/api/endpoints/site.py b/app/api/endpoints/site.py index 92316cc4..c7876d0e 100644 --- a/app/api/endpoints/site.py +++ b/app/api/endpoints/site.py @@ -5,7 +5,6 @@ from sqlalchemy.orm import Session from starlette.background import BackgroundTasks from app import schemas -from app.chain.cookiecloud import CookieCloudChain from app.chain.site import SiteChain from app.chain.torrents import TorrentsChain from app.core.event import EventManager @@ -15,19 +14,13 @@ from app.db.models.site import Site from app.db.models.siteicon import SiteIcon from app.db.systemconfig_oper import SystemConfigOper from app.helper.sites import SitesHelper +from app.scheduler import Scheduler from app.schemas.types import SystemConfigKey, EventType from app.utils.string import StringUtils router = APIRouter() -def start_cookiecloud_sync(db: Session): - """ - 后台启动CookieCloud站点同步 - """ - CookieCloudChain(db).process(manual=True) - - @router.get("/", summary="所有站点", response_model=List[schemas.Site]) def read_sites(db: Session = Depends(get_db), _: schemas.TokenPayload = Depends(verify_token)) -> List[dict]: @@ -101,12 +94,11 @@ def delete_site( @router.get("/cookiecloud", summary="CookieCloud同步", response_model=schemas.Response) def cookie_cloud_sync(background_tasks: BackgroundTasks, - db: Session = Depends(get_db), _: schemas.TokenPayload = Depends(verify_token)) -> Any: """ 运行CookieCloud同步站点信息 """ - background_tasks.add_task(start_cookiecloud_sync, db) + background_tasks.add_task(Scheduler().start, job_id="cookiecloud") return schemas.Response(success=True, message="CookieCloud同步任务已启动!") @@ -119,7 +111,8 @@ def cookie_cloud_sync(db: Session = Depends(get_db), Site.reset(db) SystemConfigOper().set(SystemConfigKey.IndexerSites, []) SystemConfigOper().set(SystemConfigKey.RssSites, []) - CookieCloudChain().process(manual=True) + # 启动定时服务 + Scheduler().start("cookiecloud", manual=True) # 插件站点删除 EventManager().send_event(EventType.SiteDeleted, { diff --git a/app/api/endpoints/subscribe.py b/app/api/endpoints/subscribe.py index 8cb05512..fcf22a25 100644 --- a/app/api/endpoints/subscribe.py +++ b/app/api/endpoints/subscribe.py @@ -1,5 +1,5 @@ import json -from typing import List, Any, Optional +from typing import List, Any from fastapi import APIRouter, Request, BackgroundTasks, Depends, HTTPException, Header from sqlalchemy.orm import Session @@ -12,6 +12,7 @@ from app.db import get_db from app.db.models.subscribe import Subscribe from app.db.models.user import User from app.db.userauth import get_current_active_user +from app.scheduler import Scheduler from app.schemas.types import MediaType router = APIRouter() @@ -26,13 +27,6 @@ def start_subscribe_add(db: Session, title: str, year: str, mtype=mtype, tmdbid=tmdbid, season=season, username=username) -def start_subscribe_search(db: Session, sid: Optional[int], state: Optional[str]): - """ - 启动订阅搜索任务 - """ - SubscribeChain(db).search(sid=sid, state=state, manual=True) - - @router.get("/", summary="所有订阅", response_model=List[schemas.Subscribe]) def read_subscribes( db: Session = Depends(get_db), @@ -140,35 +134,36 @@ def subscribe_mediaid( @router.get("/refresh", summary="刷新订阅", response_model=schemas.Response) def refresh_subscribes( - db: Session = Depends(get_db), _: schemas.TokenPayload = Depends(verify_token)) -> Any: """ 刷新所有订阅 """ - SubscribeChain(db).refresh() + Scheduler().start("subscribe_refresh") return schemas.Response(success=True) @router.get("/check", summary="刷新订阅 TMDB 信息", response_model=schemas.Response) def check_subscribes( - db: Session = Depends(get_db), _: schemas.TokenPayload = Depends(verify_token)) -> Any: """ - 刷新所有订阅 + 刷新订阅 TMDB 信息 """ - SubscribeChain(db).check() + Scheduler().start("subscribe_tmdb") return schemas.Response(success=True) @router.get("/search", summary="搜索所有订阅", response_model=schemas.Response) def search_subscribes( background_tasks: BackgroundTasks, - db: Session = Depends(get_db), _: schemas.TokenPayload = Depends(verify_token)) -> Any: """ 搜索所有订阅 """ - background_tasks.add_task(start_subscribe_search, db=db, sid=None, state='R') + background_tasks.add_task( + Scheduler().start, + job_id="subscribe_search", + sid=None, state='R' + ) return schemas.Response(success=True) @@ -176,12 +171,15 @@ def search_subscribes( def search_subscribe( subscribe_id: int, background_tasks: BackgroundTasks, - db: Session = Depends(get_db), _: schemas.TokenPayload = Depends(verify_token)) -> Any: """ 根据订阅编号搜索订阅 """ - background_tasks.add_task(start_subscribe_search, db=db, sid=subscribe_id, state=None) + background_tasks.add_task( + Scheduler().start, + job_id="subscribe_search", + sid=subscribe_id, state=None + ) return schemas.Response(success=True) diff --git a/app/api/endpoints/system.py b/app/api/endpoints/system.py index dd8bfb10..15826531 100644 --- a/app/api/endpoints/system.py +++ b/app/api/endpoints/system.py @@ -1,9 +1,9 @@ import json import time -import tailer from datetime import datetime from typing import Union +import tailer from fastapi import APIRouter, HTTPException, Depends from fastapi.responses import StreamingResponse from sqlalchemy.orm import Session @@ -11,13 +11,13 @@ from sqlalchemy.orm import Session from app import schemas from app.chain.search import SearchChain from app.core.config import settings -from app.core.event import eventmanager from app.core.security import verify_token from app.db import get_db from app.db.systemconfig_oper import SystemConfigOper from app.helper.message import MessageHelper from app.helper.progress import ProgressHelper -from app.schemas.types import SystemConfigKey, EventType +from app.scheduler import Scheduler +from app.schemas.types import SystemConfigKey from app.utils.http import RequestUtils from app.utils.system import SystemUtils from version import APP_VERSION @@ -214,15 +214,13 @@ def restart_system(_: schemas.TokenPayload = Depends(verify_token)): return schemas.Response(success=ret, message=msg) -@router.get("/command", summary="执行命令", response_model=schemas.Response) -def execute_command(cmd: str, +@router.get("/runscheduler", summary="运行服务", response_model=schemas.Response) +def execute_command(jobid: str, _: schemas.TokenPayload = Depends(verify_token)): """ 执行命令 """ - if not cmd: + if not jobid: return schemas.Response(success=False, message="命令不能为空!") - eventmanager.send_event(etype=EventType.CommandExcute, data={ - "cmd": cmd - }) + Scheduler().start(jobid) return schemas.Response(success=True) diff --git a/app/chain/cookiecloud.py b/app/chain/cookiecloud.py index 65b5f7a9..f1943ab8 100644 --- a/app/chain/cookiecloud.py +++ b/app/chain/cookiecloud.py @@ -1,5 +1,5 @@ import base64 -from typing import Tuple, Optional, Union +from typing import Tuple, Optional from urllib.parse import urljoin from lxml import etree @@ -16,7 +16,6 @@ from app.helper.message import MessageHelper from app.helper.rss import RssHelper from app.helper.sites import SitesHelper from app.log import logger -from app.schemas import Notification, NotificationType, MessageChannel from app.utils.http import RequestUtils from app.utils.site import SiteUtils @@ -40,21 +39,6 @@ class CookieCloudChain(ChainBase): password=settings.COOKIECLOUD_PASSWORD ) - def remote_sync(self, channel: MessageChannel, userid: Union[int, str]): - """ - 远程触发同步站点,发送消息 - """ - self.post_message(Notification(channel=channel, mtype=NotificationType.SiteMessage, - title="开始同步CookieCloud站点 ...", userid=userid)) - # 开始同步 - success, msg = self.process() - if success: - self.post_message(Notification(channel=channel, mtype=NotificationType.SiteMessage, - title=f"同步站点成功,{msg}", userid=userid)) - else: - self.post_message(Notification(channel=channel, mtype=NotificationType.SiteMessage, - title=f"同步站点失败:{msg}", userid=userid)) - def process(self, manual=False) -> Tuple[bool, str]: """ 通过CookieCloud同步站点Cookie diff --git a/app/chain/mediaserver.py b/app/chain/mediaserver.py index 094d2e3b..37503a1a 100644 --- a/app/chain/mediaserver.py +++ b/app/chain/mediaserver.py @@ -10,7 +10,6 @@ from app.core.config import settings from app.db import SessionFactory from app.db.mediaserver_oper import MediaServerOper from app.log import logger -from app.schemas import MessageChannel, Notification lock = threading.Lock() @@ -41,16 +40,6 @@ class MediaServerChain(ChainBase): """ return self.run_module("mediaserver_tv_episodes", server=server, item_id=item_id) - def remote_sync(self, channel: MessageChannel, userid: Union[int, str]): - """ - 同步豆瓣想看数据,发送消息 - """ - self.post_message(Notification(channel=channel, - title="开始媒体服务器 ...", userid=userid)) - self.sync() - self.post_message(Notification(channel=channel, - title="同步媒体服务器完成!", userid=userid)) - def sync(self): """ 同步媒体库所有数据到本地数据库 diff --git a/app/chain/subscribe.py b/app/chain/subscribe.py index 5b40bd6b..3ec07464 100644 --- a/app/chain/subscribe.py +++ b/app/chain/subscribe.py @@ -132,45 +132,6 @@ class SubscribeChain(ChainBase): return True return False - def remote_refresh(self, channel: MessageChannel, userid: Union[str, int] = None): - """ - 远程刷新订阅,发送消息 - """ - self.post_message(Notification(channel=channel, - title=f"开始刷新订阅 ...", userid=userid)) - self.refresh() - self.post_message(Notification(channel=channel, - title=f"订阅刷新完成!", userid=userid)) - - def remote_search(self, arg_str: str, channel: MessageChannel, userid: Union[str, int] = None): - """ - 远程搜索订阅,发送消息 - """ - if arg_str and not str(arg_str).isdigit(): - self.post_message(Notification(channel=channel, - title="请输入正确的命令格式:/subscribe_search [id]," - "[id]为订阅编号,不输入订阅编号时搜索所有订阅", userid=userid)) - return - if arg_str: - sid = int(arg_str) - subscribe = self.subscribeoper.get(sid) - if not subscribe: - self.post_message(Notification(channel=channel, - title=f"订阅编号 {sid} 不存在!", userid=userid)) - return - self.post_message(Notification(channel=channel, - title=f"开始搜索 {subscribe.name} ...", userid=userid)) - # 搜索订阅 - self.search(sid=int(arg_str)) - self.post_message(Notification(channel=channel, - title=f"{subscribe.name} 搜索完成!", userid=userid)) - else: - self.post_message(Notification(channel=channel, - title=f"开始搜索所有订阅 ...", userid=userid)) - self.search(state='R') - self.post_message(Notification(channel=channel, - title=f"订阅搜索完成!", userid=userid)) - def search(self, sid: int = None, state: str = 'N', manual: bool = False): """ 订阅搜索 diff --git a/app/command.py b/app/command.py index b86e8d85..4e4be4dc 100644 --- a/app/command.py +++ b/app/command.py @@ -1,11 +1,9 @@ import traceback from threading import Thread, Event -from typing import Any, Union +from typing import Any, Union, Dict from app.chain import ChainBase -from app.chain.cookiecloud import CookieCloudChain from app.chain.download import DownloadChain -from app.chain.mediaserver import MediaServerChain from app.chain.site import SiteChain from app.chain.subscribe import SubscribeChain from app.chain.system import SystemChain @@ -15,6 +13,8 @@ from app.core.event import eventmanager, EventManager from app.core.plugin import PluginManager from app.db import SessionFactory from app.log import logger +from app.scheduler import Scheduler +from app.schemas import Notification from app.schemas.types import EventType, MessageChannel from app.utils.object import ObjectUtils from app.utils.singleton import Singleton @@ -49,13 +49,15 @@ class Command(metaclass=Singleton): self.pluginmanager = PluginManager() # 处理链 self.chain = CommandChian(self._db) + # 定时服务管理 + self.scheduler = Scheduler() # 内置命令 self._commands = { "/cookiecloud": { - "func": CookieCloudChain(self._db).remote_sync, + "id": "cookiecloud", + "type": "scheduler", "description": "同步站点", - "category": "站点", - "data": {} + "category": "站点" }, "/sites": { "func": SiteChain(self._db).remote_list, @@ -79,10 +81,10 @@ class Command(metaclass=Singleton): "data": {} }, "/mediaserver_sync": { - "func": MediaServerChain(self._db).remote_sync, + "id": "mediaserver_sync", + "type": "scheduler", "description": "同步媒体服务器", - "category": "管理", - "data": {} + "category": "管理" }, "/subscribes": { "func": SubscribeChain(self._db).remote_list, @@ -91,16 +93,16 @@ class Command(metaclass=Singleton): "data": {} }, "/subscribe_refresh": { - "func": SubscribeChain(self._db).remote_refresh, + "id": "subscribe_refresh", + "type": "scheduler", "description": "刷新订阅", - "category": "订阅", - "data": {} + "category": "订阅" }, "/subscribe_search": { - "func": SubscribeChain(self._db).remote_search, + "id": "subscribe_search", + "type": "scheduler", "description": "搜索订阅", - "category": "订阅", - "data": {} + "category": "订阅" }, "/subscribe_delete": { "func": SubscribeChain(self._db).remote_delete, @@ -108,9 +110,9 @@ class Command(metaclass=Singleton): "data": {} }, "/subscribe_tmdb": { - "func": SubscribeChain(self._db).check, - "description": "订阅TMDB数据刷新", - "data": {} + "id": "subscribe_tmdb", + "type": "scheduler", + "description": "订阅元数据更新" }, "/downloading": { "func": DownloadChain(self._db).remote_downloading, @@ -119,10 +121,10 @@ class Command(metaclass=Singleton): "data": {} }, "/transfer": { - "func": TransferChain(self._db).process, + "id": "transfer", + "type": "scheduler", "description": "下载文件整理", - "category": "管理", - "data": {} + "category": "管理" }, "/redo": { "func": TransferChain(self._db).remote_transfer, @@ -180,6 +182,56 @@ class Command(metaclass=Singleton): except Exception as e: logger.error(f"事件处理出错:{str(e)} - {traceback.format_exc()}") + def __run_command(self, command: Dict[str, any], + data_str: str = "", + channel: MessageChannel = None, userid: Union[str, int] = None): + """ + 运行定时服务 + """ + if command.get("type") == "scheduler": + # 定时服务 + if userid: + self.chain.post_message( + Notification( + channel=channel, + title=f"开始执行 {command.get('description')} ...", + userid=userid + ) + ) + + # 执行定时任务 + self.scheduler.start(job_id=command.get("id")) + + if userid: + self.chain.post_message( + Notification( + channel=channel, + title=f"{command.get('description')} 执行完成", + userid=userid + ) + ) + else: + # 命令 + cmd_data = command['data'] if command.get('data') else {} + args_num = ObjectUtils.arguments(command['func']) + if args_num > 0: + if cmd_data: + # 有内置参数直接使用内置参数 + data = cmd_data.get("data") or {} + data['channel'] = channel + data['user'] = userid + cmd_data['data'] = data + command['func'](**cmd_data) + elif args_num == 2: + # 没有输入参数,只输入渠道和用户ID + command['func'](channel, userid) + elif args_num > 2: + # 多个输入参数:用户输入、用户ID + command['func'](data_str, channel, userid) + else: + # 没有参数 + command['func']() + def stop(self): """ 停止事件处理线程 @@ -225,25 +277,11 @@ class Command(metaclass=Singleton): logger.info(f"用户 {userid} 开始执行:{command.get('description')} ...") else: logger.info(f"开始执行:{command.get('description')} ...") - cmd_data = command['data'] if command.get('data') else {} - args_num = ObjectUtils.arguments(command['func']) - if args_num > 0: - if cmd_data: - # 有内置参数直接使用内置参数 - data = cmd_data.get("data") or {} - data['channel'] = channel - data['user'] = userid - cmd_data['data'] = data - command['func'](**cmd_data) - elif args_num == 2: - # 没有输入参数,只输入渠道和用户ID - command['func'](channel, userid) - elif args_num > 2: - # 多个输入参数:用户输入、用户ID - command['func'](data_str, channel, userid) - else: - # 没有参数 - command['func']() + + # 执行命令 + self.__run_command(command, data_str=data_str, + channel=channel, userid=userid) + if userid: logger.info(f"用户 {userid} {command.get('description')} 执行完成") else: diff --git a/app/scheduler.py b/app/scheduler.py index 3f93f52b..d764d3b9 100644 --- a/app/scheduler.py +++ b/app/scheduler.py @@ -1,10 +1,12 @@ import logging from datetime import datetime, timedelta +from typing import List import pytz from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.schedulers.background import BackgroundScheduler +from app import schemas from app.chain import ChainBase from app.chain.cookiecloud import CookieCloudChain from app.chain.mediaserver import MediaServerChain @@ -40,65 +42,153 @@ class Scheduler(metaclass=Singleton): def __init__(self): # 数据库连接 self._db = SessionFactory() + # 各服务的运行状态 + self._jobs = { + "cookiecloud": { + "func": CookieCloudChain(self._db).process, + "running": False, + }, + "mediaserver_sync": { + "func": MediaServerChain(self._db).sync, + "running": False, + }, + "subscribe_tmdb": { + "func": SubscribeChain(self._db).check, + "running": False, + }, + "subscribe_search": { + "func": SubscribeChain(self._db).search, + "running": False, + }, + "subscribe_refresh": { + "func": SubscribeChain(self._db).refresh, + "running": False, + }, + "transfer": { + "func": TransferChain(self._db).process, + "running": False, + } + } + # 调试模式不启动定时服务 if settings.DEV: return + # CookieCloud定时同步 if settings.COOKIECLOUD_INTERVAL: - self._scheduler.add_job(CookieCloudChain(self._db).process, - "interval", - minutes=settings.COOKIECLOUD_INTERVAL, - next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=1), - id="cookiecloud", - name="同步CookieCloud站点") + self._scheduler.add_job( + self.start, + "interval", + id="cookiecloud", + name="同步CookieCloud站点", + minutes=settings.COOKIECLOUD_INTERVAL, + next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=1), + kwargs={ + 'job_id': 'cookiecloud' + } + ) # 媒体服务器同步 if settings.MEDIASERVER_SYNC_INTERVAL: - self._scheduler.add_job(MediaServerChain(self._db).sync, "interval", - hours=settings.MEDIASERVER_SYNC_INTERVAL, - next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=5), - id="mediaserver_sync", - name="同步媒体服务器") + self._scheduler.add_job( + self.start, + "interval", + id="mediaserver_sync", + name="同步媒体服务器", + hours=settings.MEDIASERVER_SYNC_INTERVAL, + next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=5), + kwargs={ + 'job_id': 'mediaserver_sync' + } + ) # 新增订阅时搜索(5分钟检查一次) - self._scheduler.add_job(SubscribeChain(self._db).search, "interval", - minutes=5, kwargs={'state': 'N'}) + self._scheduler.add_job( + self.start, + "interval", + minutes=5, + kwargs={ + 'job_id': 'subscribe_search', + 'state': 'N' + } + ) # 检查更新订阅TMDB数据(每隔6小时) - self._scheduler.add_job(SubscribeChain(self._db).check, "interval", hours=6, - id="subscribe_tmdb", name="订阅元数据更新") + self._scheduler.add_job( + self.start, + "interval", + id="subscribe_tmdb", + name="订阅元数据更新", + hours=6, + kwargs={ + 'job_id': 'subscribe_tmdb' + } + ) # 订阅状态每隔24小时搜索一次 if settings.SUBSCRIBE_SEARCH: - self._scheduler.add_job(SubscribeChain(self._db).search, "interval", - hours=24, kwargs={'state': 'R'}, - id="subscribe_search", name="订阅搜索") + self._scheduler.add_job( + self.start, + "interval", + id="subscribe_search", + name="订阅搜索", + hours=24, + kwargs={ + 'job_id': 'subscribe_search', + 'state': 'R' + } + ) if settings.SUBSCRIBE_MODE == "spider": # 站点首页种子定时刷新模式 triggers = TimerUtils.random_scheduler(num_executions=30) for trigger in triggers: - self._scheduler.add_job(SubscribeChain(self._db).refresh, "cron", - hour=trigger.hour, minute=trigger.minute, - id=f"subscribe_refresh|{trigger.hour}:{trigger.minute}", - name="订阅刷新") + self._scheduler.add_job( + self.start, + "cron", + id=f"subscribe_refresh|{trigger.hour}:{trigger.minute}", + name="订阅刷新", + hour=trigger.hour, + minute=trigger.minute, + kwargs={ + 'job_id': 'subscribe_refresh' + }) else: # RSS订阅模式 if not settings.SUBSCRIBE_RSS_INTERVAL: settings.SUBSCRIBE_RSS_INTERVAL = 30 elif settings.SUBSCRIBE_RSS_INTERVAL < 5: settings.SUBSCRIBE_RSS_INTERVAL = 5 - self._scheduler.add_job(SubscribeChain(self._db).refresh, "interval", - minutes=settings.SUBSCRIBE_RSS_INTERVAL, - id="subscribe_refresh", name="订阅刷新") + self._scheduler.add_job( + self.start, + "interval", + id="subscribe_refresh", + name="RSS订阅刷新", + minutes=settings.SUBSCRIBE_RSS_INTERVAL, + kwargs={ + 'job_id': 'subscribe_refresh' + } + ) # 下载器文件转移(每5分钟) if settings.DOWNLOADER_MONITOR: - self._scheduler.add_job(TransferChain(self._db).process, "interval", minutes=5, - id="transfer", name="下载文件整理") + self._scheduler.add_job( + self.start, + "interval", + id="transfer", + name="下载文件整理", + minutes=5, + kwargs={ + 'job_id': 'transfer' + } + ) # 公共定时服务 - self._scheduler.add_job(SchedulerChain(self._db).scheduler_job, "interval", minutes=10) + self._scheduler.add_job( + SchedulerChain(self._db).scheduler_job, + "interval", + minutes=10 + ) # 打印服务 logger.debug(self._scheduler.print_jobs()) @@ -106,11 +196,53 @@ class Scheduler(metaclass=Singleton): # 启动定时服务 self._scheduler.start() - def list(self): + def start(self, job_id: str, *args, **kwargs): + """ + 启动定时服务 + """ + # 处理job_id格式 + job = self._jobs.get(job_id) + if not job: + return + if job.get("running"): + logger.warning(f"定时任务 {job_id} 正在运行 ...") + return + self._jobs[job_id]["running"] = True + try: + job["func"](*args, **kwargs) + except Exception as e: + logger.error(f"定时任务 {job_id} 执行失败:{e}") + self._jobs[job_id]["running"] = False + + def list(self) -> List[schemas.ScheduleInfo]: """ 当前所有任务 """ - return self._scheduler.get_jobs() + # 返回计时任务 + schedulers = [] + # 去重 + added = [] + jobs = self._scheduler.get_jobs() + # 按照下次运行时间排序 + jobs.sort(key=lambda x: x.next_run_time) + for job in jobs: + if job.name not in added: + added.append(job.name) + else: + continue + if not self._jobs.get(job.id): + continue + # 任务状态 + status = "正在运行" if self._jobs[job.id].get("running") else "等待" + # 下次运行时间 + next_run = TimerUtils.time_difference(job.next_run_time) + schedulers.append(schemas.ScheduleInfo( + id=job.id, + name=job.name, + status=status, + next_run=next_run + )) + return schedulers def stop(self): """