Refine transfer history batch actions

This commit is contained in:
jxxghp
2026-06-29 07:07:33 +08:00
parent 9b1bdb0cb2
commit 494f809ef0
17 changed files with 1194 additions and 143 deletions

View File

@@ -177,6 +177,23 @@ async def schedule(_: Any = Depends(get_current_active_superuser)) -> Any:
return Scheduler().list()
@router.get(
"/schedule/{job_id}/progress",
summary="后台服务进度",
response_model=schemas.Response,
)
async def schedule_progress(
job_id: str, _: Any = Depends(get_current_active_superuser)
) -> Any:
"""
查询指定后台服务的执行进度。
"""
progress = Scheduler().get_progress(job_id)
if not progress:
return schemas.Response(success=False, message="后台服务不存在")
return schemas.Response(success=True, data=progress.model_dump())
@router.get(
"/schedule2",
summary="后台服务API_TOKEN",
@@ -189,6 +206,23 @@ async def schedule2(_: Annotated[str, Depends(verify_apitoken)]) -> Any:
return Scheduler().list()
@router.get(
"/schedule2/{job_id}/progress",
summary="后台服务进度API_TOKEN",
response_model=schemas.Response,
)
async def schedule_progress2(
job_id: str, _: Annotated[str, Depends(verify_apitoken)]
) -> Any:
"""
查询指定后台服务的执行进度 API_TOKEN认证?token=xxx
"""
progress = Scheduler().get_progress(job_id)
if not progress:
return schemas.Response(success=False, message="后台服务不存在")
return schemas.Response(success=True, data=progress.model_dump())
@router.get("/transfer", summary="文件整理统计", response_model=List[int])
async def transfer(
days: Optional[int] = 7,
@@ -218,22 +252,26 @@ def cpu2(_: Annotated[str, Depends(verify_apitoken)]) -> Any:
return SystemUtils.cpu_usage()
@router.get("/memory", summary="获取当前内存使用量和使用率", response_model=List[int])
@router.get(
"/memory",
summary="获取当前系统内存信息",
response_model=schemas.DashboardMemoryInfo,
)
def memory(_: Any = Depends(get_current_active_superuser)) -> Any:
"""
获取当前内存使用率
获取当前系统内存信息
"""
return SystemUtils.memory_usage()
@router.get(
"/memory2",
summary="获取当前内存使用量和使用率API_TOKEN",
response_model=List[int],
summary="获取当前系统内存信息API_TOKEN",
response_model=schemas.DashboardMemoryInfo,
)
def memory2(_: Annotated[str, Depends(verify_apitoken)]) -> Any:
"""
获取当前内存使用率 API_TOKEN认证?token=xxx
获取当前系统内存信息 API_TOKEN认证?token=xxx
"""
return SystemUtils.memory_usage()

View File

@@ -1,6 +1,6 @@
import threading
from datetime import datetime
from typing import List, Union, Optional, Generator, Any
from typing import Callable, List, Union, Optional, Generator, Any
from app.chain import ChainBase
from app.core.config import global_vars
@@ -191,13 +191,17 @@ class MediaServerChain(ChainBase):
"mediaserver_image_cookies", server=server, image_url=image_url
)
def sync(self):
def sync(self, progress_callback: Optional[Callable[..., None]] = None) -> None:
"""
同步媒体库所有数据到本地数据库
:param progress_callback: 定时服务进度更新回调
"""
# 设置的媒体服务器
mediaservers = ServiceConfigHelper.get_mediaserver_configs()
if not mediaservers:
if progress_callback:
progress_callback(value=100, text="未配置媒体服务器,跳过同步")
return
with lock:
# 汇总统计
@@ -206,7 +210,19 @@ class MediaServerChain(ChainBase):
enabled_servers = [mediaserver.name for mediaserver in mediaservers
if mediaserver and mediaserver.enabled and mediaserver.name]
dboper.delete_excluded_servers(enabled_servers)
total_servers = len(enabled_servers)
if progress_callback:
progress_callback(
value=0,
text=f"开始同步媒体服务器,共 {total_servers} 个 ...",
data={"total": total_servers, "finished": 0},
)
if not total_servers:
if progress_callback:
progress_callback(value=100, text="没有已启用的媒体服务器")
return
# 遍历媒体服务器
server_index = 0
for mediaserver in mediaservers:
if not mediaserver:
continue
@@ -214,15 +230,36 @@ class MediaServerChain(ChainBase):
if not mediaserver.enabled:
logger.info(f"媒体服务器 {mediaserver.name} 未启用,跳过")
continue
server_index += 1
server_name = mediaserver.name
if progress_callback:
progress_callback(
value=(server_index - 1) / total_servers * 100,
text=(
f"正在同步媒体服务器"
f"{server_index}/{total_servers}{server_name} ..."
),
data={
"total": total_servers,
"finished": server_index - 1,
"current": server_name,
},
)
sync_libraries = mediaserver.sync_libraries or []
logger.info(f"开始同步媒体服务器 {server_name} 的数据 ...")
libraries = self.librarys(server_name)
if not libraries:
logger.info(f"没有获取到媒体服务器 {server_name} 的媒体库,跳过")
if progress_callback:
progress_callback(
value=server_index / total_servers * 100,
text=f"媒体服务器 {server_name} 无可同步媒体库",
data={"total": total_servers, "finished": server_index},
)
continue
sync_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
for library in libraries:
total_libraries = len(libraries)
for library_index, library in enumerate(libraries, start=1):
if sync_libraries \
and "all" not in sync_libraries \
and str(library.id) not in sync_libraries:
@@ -255,6 +292,36 @@ class MediaServerChain(ChainBase):
logger.info(f"{server_name} 媒体库 {library.name} 同步完成,共同步数量:{library_count}")
# 总数累加
total_count += library_count
if progress_callback:
progress_callback(
value=(
(server_index - 1 + library_index / total_libraries)
/ total_servers
* 100
),
text=(
f"{server_name} 媒体库"
f"{library_index}/{total_libraries}{library.name} 同步完成"
),
data={
"total": total_servers,
"finished": server_index - 1,
"current": server_name,
"library_total": total_libraries,
"library_finished": library_index,
},
)
stale_count = dboper.delete_stale(server=server_name, sync_time=sync_time)
logger.info(f"媒体服务器 {server_name} 清理陈旧数据完成,删除数量:{stale_count}")
logger.info(f"媒体服务器 {server_name} 数据同步完成,总同步数量:{total_count}")
if progress_callback:
progress_callback(
value=server_index / total_servers * 100,
text=(
f"媒体服务器({server_index}/{total_servers}"
f"{server_name} 同步完成"
),
data={"total": total_servers, "finished": server_index},
)
if progress_callback:
progress_callback(value=100, text="媒体服务器同步完成")

View File

@@ -1,4 +1,4 @@
from typing import List, Optional
from typing import Callable, List, Optional
import pillow_avif # noqa 用于自动注册AVIF支持
@@ -27,11 +27,16 @@ class RecommendChain(ChainBase, metaclass=Singleton):
# 推荐缓存区域
recommend_cache_region = "recommend"
def refresh_recommend(self, manual: bool = False):
def refresh_recommend(
self,
manual: bool = False,
progress_callback: Optional[Callable[..., None]] = None,
) -> None:
"""
刷新推荐
:param manual: 手动触发
:param progress_callback: 定时服务进度更新回调
"""
logger.debug("Starting to refresh Recommend data.")
@@ -56,6 +61,14 @@ class RecommendChain(ChainBase, metaclass=Singleton):
recommends = []
# 记录哪些方法已完成
methods_finished = set()
total_requests = len(recommend_methods) * self.cache_max_pages
finished_requests = 0
if progress_callback:
progress_callback(
value=0,
text=f"开始刷新推荐缓存,共 {total_requests} 个数据分页 ...",
data={"total": total_requests, "finished": 0},
)
# 这里避免区间内连续调用相同来源,因此遍历方案为每页遍历所有推荐来源,再进行页数遍历
for page in range(1, self.cache_max_pages + 1):
for method in recommend_methods:
@@ -67,6 +80,21 @@ class RecommendChain(ChainBase, metaclass=Singleton):
# 手动触发的刷新,总是需要获取最新数据
with fresh(manual):
data = method(page=page)
finished_requests += 1
if progress_callback:
progress_callback(
value=finished_requests / total_requests * 90,
text=(
f"正在刷新推荐缓存"
f"{finished_requests}/{total_requests}..."
),
data={
"total": total_requests,
"finished": finished_requests,
"current": method.__name__,
"page": page,
},
)
if not data:
logger.debug("All recommendation methods have finished fetching data. Ending pagination early.")
methods_finished.add(method)
@@ -77,24 +105,40 @@ class RecommendChain(ChainBase, metaclass=Singleton):
break
# 缓存收集到的海报
self.__cache_posters(recommends)
if progress_callback:
progress_callback(value=90, text="推荐数据刷新完成,正在缓存海报 ...")
self.__cache_posters(recommends, progress_callback=progress_callback)
logger.debug("Recommend data refresh completed.")
if progress_callback:
progress_callback(value=100, text="推荐缓存刷新完成")
def __cache_posters(self, datas: List[dict]):
def __cache_posters(
self,
datas: List[dict],
progress_callback: Optional[Callable[..., None]] = None,
) -> None:
"""
提取 poster_path 并缓存图片
:param datas: 数据列表
:param progress_callback: 定时服务进度更新回调
"""
if not settings.GLOBAL_IMAGE_CACHE:
return
for data in datas:
total_num = len(datas)
for index, data in enumerate(datas, start=1):
if global_vars.is_system_stopped:
return
poster_path = data.get("poster_path")
if poster_path:
poster_url = poster_path.replace("original", "w500")
self.__fetch_and_save_image(poster_url)
if progress_callback:
progress_callback(
value=90 + (index / total_num * 10 if total_num else 10),
text=f"正在缓存推荐海报({index}/{total_num}...",
data={"poster_total": total_num, "poster_finished": index},
)
@staticmethod
def __fetch_and_save_image(url: str):

View File

@@ -1,7 +1,7 @@
import base64
import re
from datetime import datetime
from typing import List, Optional, Tuple, Union, Dict
from typing import Callable, List, Optional, Tuple, Union, Dict
from urllib.parse import urljoin
from app.helper.sites import SitesHelper # noqa
@@ -103,24 +103,54 @@ class SiteChain(ChainBase):
))
return userdata
def refresh_userdatas(self) -> Optional[Dict[str, SiteUserData]]:
def refresh_userdatas(
self,
progress_callback: Optional[Callable[..., None]] = None,
) -> Optional[Dict[str, SiteUserData]]:
"""
刷新所有站点的用户数据
:param progress_callback: 定时服务进度更新回调
"""
any_site_updated = False
result = {}
for site in SitesHelper().get_indexers():
sites = [site for site in SitesHelper().get_indexers() if site.get("is_active")]
total_num = len(sites)
if progress_callback:
progress_callback(
value=0,
text=f"开始刷新站点数据,共 {total_num} 个站点 ...",
data={"total": total_num, "finished": 0},
)
for index, site in enumerate(sites, start=1):
if global_vars.is_system_stopped:
return None
if site.get("is_active"):
userdata = self.refresh_userdata(site)
if userdata:
any_site_updated = True
result[site.get("name")] = userdata
if progress_callback:
progress_callback(
value=(index - 1) / total_num * 100 if total_num else 100,
text=f"正在刷新站点数据({index}/{total_num}{site.get('name')} ...",
data={
"total": total_num,
"finished": index - 1,
"current": site.get("id"),
},
)
userdata = self.refresh_userdata(site)
if userdata:
any_site_updated = True
result[site.get("name")] = userdata
if progress_callback:
progress_callback(
value=index / total_num * 100 if total_num else 100,
text=f"站点数据({index}/{total_num})刷新完成",
data={"total": total_num, "finished": index},
)
if any_site_updated:
eventmanager.send_event(EventType.SiteRefreshed, {
"site_id": "*"
})
if progress_callback:
progress_callback(value=100, text="站点数据刷新完成")
return result
@@ -323,9 +353,16 @@ class SiteChain(ChainBase):
del html
return favicon_url, None
def sync_cookies(self, manual=False) -> Tuple[bool, str]:
def sync_cookies(
self,
manual: bool = False,
progress_callback: Optional[Callable[..., None]] = None,
) -> Tuple[bool, str]:
"""
通过CookieCloud同步站点Cookie
:param manual: 是否手动同步
:param progress_callback: 定时服务进度更新回调
"""
def __indexer_domain(inx: dict, sub_domain: str) -> str:
@@ -340,9 +377,13 @@ class SiteChain(ChainBase):
return sub_domain
logger.info("开始同步CookieCloud站点 ...")
if progress_callback:
progress_callback(value=0, text="开始下载 CookieCloud 数据 ...")
cookies, msg = CookieCloudHelper().download()
if not cookies:
logger.error(f"CookieCloud同步失败{msg}")
if progress_callback:
progress_callback(value=100, text=f"CookieCloud同步失败{msg}")
if manual:
self.messagehelper.put(msg, title="CookieCloud同步失败", role="system")
return False, msg
@@ -353,11 +394,22 @@ class SiteChain(ChainBase):
siteshelper = SitesHelper()
siteoper = SiteOper()
rsshelper = RssHelper()
for domain, cookie in cookies.items():
total_num = len(cookies)
for index, (domain, cookie) in enumerate(cookies.items(), start=1):
# 检查系统是否停止
if global_vars.is_system_stopped:
logger.info("系统正在停止中断CookieCloud同步")
return False, "系统正在停止,同步被中断"
if progress_callback:
progress_callback(
value=(index - 1) / total_num * 100 if total_num else 100,
text=f"正在同步 CookieCloud 站点({index}/{total_num}{domain} ...",
data={
"total": total_num,
"finished": index - 1,
"current": domain,
},
)
# 索引器信息
indexer = siteshelper.get_indexer(domain)
@@ -465,6 +517,12 @@ class SiteChain(ChainBase):
eventmanager.send_event(EventType.SiteUpdated, {
"domain": domain,
})
if progress_callback:
progress_callback(
value=index / total_num * 100 if total_num else 100,
text=f"CookieCloud 站点({index}/{total_num})同步完成",
data={"total": total_num, "finished": index},
)
# 处理完成
ret_msg = f"更新了{_update_count}个站点,新增了{_add_count}个站点"
if _fail_count > 0:
@@ -472,6 +530,8 @@ class SiteChain(ChainBase):
if manual:
self.messagehelper.put(ret_msg, title="CookieCloud同步成功", role="system")
logger.info(f"CookieCloud同步成功{ret_msg}")
if progress_callback:
progress_callback(value=100, text=f"CookieCloud同步成功{ret_msg}")
return True, ret_msg
@eventmanager.register(EventType.SiteUpdated)

View File

@@ -5,7 +5,7 @@ import re
import threading
import time
from datetime import datetime
from typing import Any, Dict, List, Optional, Union, Tuple
from typing import Any, Callable, Dict, List, Optional, Union, Tuple
from app import schemas
from app.chain import ChainBase
@@ -979,12 +979,19 @@ class SubscribeChain(ChainBase):
return True
return False
def search(self, sid: Optional[int] = None, state: Optional[str] = 'N', manual: Optional[bool] = False):
def search(
self,
sid: Optional[int] = None,
state: Optional[str] = 'N',
manual: Optional[bool] = False,
progress_callback: Optional[Callable[..., None]] = None,
) -> None:
"""
订阅搜索
:param sid: 订阅ID有值时只处理该订阅
:param state: 订阅状态 N:新建, R:订阅中, P:待定, S:暂停
:param manual: 是否手动搜索
:param progress_callback: 定时服务进度更新回调
:return: 更新订阅状态为R或删除订阅
"""
lock_acquired = False
@@ -1002,12 +1009,29 @@ class SubscribeChain(ChainBase):
subscribes = [subscribe] if subscribe else []
else:
subscribes = subscribeoper.list(self.get_states_for_search(state))
total_num = len(subscribes)
if progress_callback:
progress_callback(
value=0,
text=f"开始订阅搜索,共 {total_num} 个订阅 ...",
data={"total": total_num, "finished": 0},
)
try:
# 遍历订阅
for subscribe in subscribes:
for index, subscribe in enumerate(subscribes, start=1):
if global_vars.is_system_stopped:
break
if progress_callback:
progress_callback(
value=(index - 1) / total_num * 100 if total_num else 100,
text=f"正在搜索订阅({index}/{total_num}{subscribe.name} ...",
data={
"total": total_num,
"finished": index - 1,
"current": subscribe.id,
},
)
mediakey = subscribe.tmdbid or subscribe.doubanid
custom_word_list = subscribe.custom_words.split("\n") if subscribe.custom_words else None
# 校验当前时间减订阅创建时间是否大于1分钟否则跳过先留出编辑订阅的时间
@@ -1021,6 +1045,10 @@ class SubscribeChain(ChainBase):
if not sid and state in ['R', 'P']:
sleep_time = random.randint(60, 300)
logger.info(f'订阅搜索随机休眠 {sleep_time} 秒 ...')
if progress_callback:
progress_callback(
text=f"订阅搜索随机休眠 {sleep_time} 秒后继续 ..."
)
time.sleep(sleep_time)
try:
logger.info(f'开始搜索订阅,标题:{subscribe.name} ...')
@@ -1168,6 +1196,12 @@ class SubscribeChain(ChainBase):
# 如果状态为N则更新为R
if subscribe and subscribe.state == 'N':
subscribeoper.update(subscribe.id, {'state': 'R'})
if progress_callback:
progress_callback(
value=index / total_num * 100 if total_num else 100,
text=f"订阅搜索({index}/{total_num})处理完成",
data={"total": total_num, "finished": index},
)
# 手动触发时发送系统消息
if manual:
@@ -1178,6 +1212,8 @@ class SubscribeChain(ChainBase):
self.messagehelper.put('所有订阅搜索完成!', title="订阅搜索", role="system")
else:
self.messagehelper.put('没有找到订阅!', title="订阅搜索", role="system")
if progress_callback:
progress_callback(value=100, text="订阅搜索完成")
finally:
subscribes.clear()
@@ -1294,17 +1330,54 @@ class SubscribeChain(ChainBase):
elif not downloads:
logger.info(f'{mediainfo.title_year} 继续洗版 ...')
def refresh(self):
def refresh(self, progress_callback: Optional[Callable[..., None]] = None) -> None:
"""
订阅刷新
:param progress_callback: 定时服务进度更新回调
"""
# 触发刷新站点资源,从缓存中匹配订阅
sites = self.get_subscribed_sites()
if sites is None:
if progress_callback:
progress_callback(value=100, text="没有订阅需要刷新")
return
self.match(
TorrentsChain().refresh(sites=sites)
def _update_refresh_progress(
value: Optional[float] = None,
text: Optional[str] = None,
data: Optional[dict] = None,
) -> None:
"""将站点刷新进度映射到订阅刷新的前半阶段。"""
if progress_callback:
progress_callback(
value=(value or 0) * 0.6,
text=text,
data=data,
)
def _update_match_progress(
value: Optional[float] = None,
text: Optional[str] = None,
data: Optional[dict] = None,
) -> None:
"""将订阅匹配进度映射到订阅刷新的后半阶段。"""
if progress_callback:
progress_callback(
value=60 + (value or 0) * 0.4,
text=text,
data=data,
)
torrents = TorrentsChain().refresh(
sites=sites,
progress_callback=_update_refresh_progress if progress_callback else None,
)
self.match(
torrents,
progress_callback=_update_match_progress if progress_callback else None,
)
if progress_callback:
progress_callback(value=100, text="订阅刷新完成")
@staticmethod
def get_sub_sites(subscribe: Subscribe) -> List[int]:
@@ -1349,14 +1422,26 @@ class SubscribeChain(ChainBase):
return ret_sites
def match(self, torrents: Dict[str, List[Context]]):
def match(
self,
torrents: Dict[str, List[Context]],
progress_callback: Optional[Callable[..., None]] = None,
) -> None:
"""
从缓存中匹配订阅,并自动下载
:param torrents: 按站点分组的资源上下文
:param progress_callback: 订阅匹配进度更新回调
"""
if not torrents:
logger.warn('没有缓存资源,无法匹配订阅')
if progress_callback:
progress_callback(value=100, text="没有缓存资源,跳过订阅匹配")
return
if progress_callback:
progress_callback(value=0, text="正在预处理订阅资源 ...")
lock_acquired = False
try:
if lock_acquired := self._rlock.acquire(
@@ -1409,10 +1494,32 @@ class SubscribeChain(ChainBase):
# 所有订阅
subscribes = SubscribeOper().list(self.get_states_for_search('R'))
total_num = len(subscribes)
if progress_callback:
progress_callback(
value=20,
text=f"资源预处理完成,开始匹配 {total_num} 个订阅 ...",
data={"total": total_num, "finished": 0},
)
try:
for subscribe in subscribes:
for index, subscribe in enumerate(subscribes, start=1):
if global_vars.is_system_stopped:
break
if progress_callback:
progress_callback(
value=20 + (
(index - 1) / total_num * 80 if total_num else 80
),
text=(
f"正在匹配订阅({index}/{total_num}"
f"{subscribe.name} ..."
),
data={
"total": total_num,
"finished": index - 1,
"current": subscribe.id,
},
)
logger.info(f'开始匹配订阅,标题:{subscribe.name} ...')
mediakey = subscribe.tmdbid or subscribe.doubanid
try:
@@ -1693,22 +1800,48 @@ class SubscribeChain(ChainBase):
del processed_torrents
subscribes.clear()
del subscribes
if progress_callback:
progress_callback(
value=100,
text="订阅资源匹配完成",
data={"total": total_num, "finished": total_num},
)
finally:
if lock_acquired:
self._rlock.release()
logger.debug(f"match Lock released at {datetime.now()}")
def check(self):
def check(self, progress_callback: Optional[Callable[..., None]] = None) -> None:
"""
定时检查订阅,更新订阅信息
:param progress_callback: 定时服务进度更新回调
"""
# 查询所有订阅
subscribeoper = SubscribeOper()
subscribes = subscribeoper.list()
total_num = len(subscribes)
if progress_callback:
progress_callback(
value=0,
text=f"开始更新订阅元数据,共 {total_num} 个订阅 ...",
data={"total": total_num, "finished": 0},
)
# 遍历订阅
for subscribe in subscribeoper.list():
for index, subscribe in enumerate(subscribes, start=1):
if global_vars.is_system_stopped:
break
logger.info(f'开始更新订阅元数据:{subscribe.name} ...')
if progress_callback:
progress_callback(
value=(index - 1) / total_num * 100 if total_num else 100,
text=f"正在更新订阅元数据({index}/{total_num}{subscribe.name} ...",
data={
"total": total_num,
"finished": index - 1,
"current": subscribe.id,
},
)
try:
meta = build_subscribe_meta(subscribe)
except ValueError:
@@ -1773,6 +1906,14 @@ class SubscribeChain(ChainBase):
subscribe.lack_episode = lack_episode
subscribeoper.update(subscribe.id, update_data)
logger.info(f'{subscribe.name} 订阅元数据更新完成')
if progress_callback:
progress_callback(
value=index / total_num * 100 if total_num else 100,
text=f"订阅元数据({index}/{total_num})更新完成",
data={"total": total_num, "finished": index},
)
if progress_callback:
progress_callback(value=100, text="订阅元数据更新完成")
def get_subscribe_by_source(self, source: str) -> Optional[Subscribe]:
"""
@@ -1788,19 +1929,37 @@ class SubscribeChain(ChainBase):
return SubscribeOper().get_by(**valid_fields)
@staticmethod
def follow():
def follow(progress_callback: Optional[Callable[..., None]] = None) -> None:
"""
刷新follow的用户分享并自动添加订阅
:param progress_callback: 定时服务进度更新回调
"""
follow_users: List[str] = SystemConfigOper().get(SystemConfigKey.FollowSubscribers)
if not follow_users:
if progress_callback:
progress_callback(value=100, text="未配置 Follow 订阅用户,跳过刷新")
return
logger.info(f'开始刷新follow用户分享订阅 ...')
success_count = 0
subscribeoper = SubscribeOper()
for share_sub in MoviePilotServerHelper.get_subscribe_shares():
share_subscribes = MoviePilotServerHelper.get_subscribe_shares() or []
total_num = len(share_subscribes)
if progress_callback:
progress_callback(
value=0,
text=f"开始刷新 Follow 订阅分享,共 {total_num} 条 ...",
data={"total": total_num, "finished": 0},
)
for index, share_sub in enumerate(share_subscribes, start=1):
if global_vars.is_system_stopped:
break
if progress_callback:
progress_callback(
value=(index - 1) / total_num * 100 if total_num else 100,
text=f"正在处理 Follow 订阅分享({index}/{total_num}...",
data={"total": total_num, "finished": index - 1},
)
uid = share_sub.get("share_uid")
if uid and uid in follow_users:
# 订阅已存在则跳过
@@ -1852,16 +2011,49 @@ class SubscribeChain(ChainBase):
else:
logger.error(f'follow用户分享订阅 {title} 添加失败:{message}')
logger.info(f'follow用户分享订阅刷新完成共添加 {success_count} 个订阅')
if progress_callback:
progress_callback(
value=100,
text=f"Follow 订阅分享刷新完成,新增 {success_count} 个订阅",
data={
"total": total_num,
"finished": total_num,
"added": success_count,
},
)
async def cache_calendar(self):
async def cache_calendar(
self,
progress_callback: Optional[Callable[..., None]] = None,
) -> None:
"""
预缓存订阅日历,实际上就是查询一遍所有订阅的媒体信息
前端请示是异常的,所以需要使用异步缓存方法
:param progress_callback: 定时服务进度更新回调
"""
logger.info(f'开始预缓存订阅日历 ...')
for subscribe in await SubscribeOper().async_list():
subscribes = await SubscribeOper().async_list()
total_num = len(subscribes)
if progress_callback:
progress_callback(
value=0,
text=f"开始预缓存订阅日历,共 {total_num} 个订阅 ...",
data={"total": total_num, "finished": 0},
)
for index, subscribe in enumerate(subscribes, start=1):
if global_vars.is_system_stopped:
break
if progress_callback:
progress_callback(
value=(index - 1) / total_num * 100 if total_num else 100,
text=f"正在预缓存订阅日历({index}/{total_num}{subscribe.name} ...",
data={
"total": total_num,
"finished": index - 1,
"current": subscribe.id,
},
)
try:
mtype = MediaType(subscribe.type)
except ValueError:
@@ -1887,7 +2079,15 @@ class SubscribeChain(ChainBase):
logger.warn(
f'未识别到季集信息,标题:{subscribe.name}tmdbid{subscribe.tmdbid}豆瓣ID{subscribe.doubanid},季:{subscribe.season}')
continue
if progress_callback:
progress_callback(
value=index / total_num * 100 if total_num else 100,
text=f"订阅日历({index}/{total_num})预缓存完成",
data={"total": total_num, "finished": index},
)
logger.info(f'订阅日历预缓存完成')
if progress_callback:
progress_callback(value=100, text="订阅日历预缓存完成")
@staticmethod
def __update_subscribe_note(subscribe: Subscribe, downloads: Optional[List[Context]]):

View File

@@ -1,6 +1,6 @@
import re
import traceback
from typing import Dict, List, Union, Optional
from typing import Callable, Dict, List, Union, Optional
from app.helper.sites import SitesHelper # noqa
@@ -192,11 +192,17 @@ class TorrentsChain(ChainBase):
del rss_items
return ret_torrents
def refresh(self, stype: Optional[str] = None, sites: List[int] = None) -> Dict[str, List[Context]]:
def refresh(
self,
stype: Optional[str] = None,
sites: List[int] = None,
progress_callback: Optional[Callable[..., None]] = None,
) -> Dict[str, List[Context]]:
"""
刷新站点最新资源,识别并缓存起来
:param stype: 强制指定缓存类型spider:爬虫缓存rss:rss缓存
:param sites: 强制指定站点ID列表为空则读取设置的订阅站点
:param progress_callback: 资源刷新进度更新回调
"""
def __is_no_cache_site(_domain: str) -> bool:
@@ -226,13 +232,34 @@ class TorrentsChain(ChainBase):
# 需要刷新的站点domain
domains = []
indexers = [
indexer for indexer in SitesHelper().get_indexers()
if not sites or indexer.get("id") in sites
]
total_indexers = len(indexers)
if progress_callback:
progress_callback(
value=0,
text=f"开始刷新站点资源,共 {total_indexers} 个站点 ...",
data={"total": total_indexers, "finished": 0},
)
# 遍历站点缓存资源
for indexer in SitesHelper().get_indexers():
for index, indexer in enumerate(indexers, start=1):
if global_vars.is_system_stopped:
break
# 未开启的站点不刷新
if sites and indexer.get("id") not in sites:
continue
if progress_callback:
progress_callback(
value=(index - 1) / total_indexers * 100 if total_indexers else 100,
text=(
f"正在刷新站点资源({index}/{total_indexers}"
f"{indexer.get('name')} ..."
),
data={
"total": total_indexers,
"finished": index - 1,
"current": indexer.get("id"),
},
)
domain = StringUtils.get_url_domain(indexer.get("domain"))
domains.append(domain)
if stype == "spider":
@@ -335,6 +362,13 @@ class TorrentsChain(ChainBase):
if sites and torrents_cache:
torrents_cache = {k: v for k, v in torrents_cache.items() if k in domains}
if progress_callback:
progress_callback(
value=100,
text="站点资源刷新完成",
data={"total": total_indexers, "finished": total_indexers},
)
return torrents_cache
@staticmethod

View File

@@ -1917,9 +1917,11 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
sample_files.append(item)
return sample_files
def process(self) -> bool:
def process(self, progress_callback: Optional[Callable[..., None]] = None) -> bool:
"""
获取下载器中的种子列表,并执行整理
:param progress_callback: 定时服务进度更新回调
"""
# 全局锁,避免定时服务重复
with downloader_lock:
@@ -1931,9 +1933,13 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
dir_info.monitor_type == "downloader" and dir_info.storage == "local"
for dir_info in download_dirs
):
if progress_callback:
progress_callback(value=100, text="未配置下载器监控目录,跳过整理")
return True
logger.info("开始整理下载器中已经完成下载的文件 ...")
if progress_callback:
progress_callback(value=0, text="正在查询已完成下载任务 ...")
# 从下载器获取种子列表
if torrents_list := self.list_torrents(status=TorrentStatus.TRANSFER):
@@ -1951,14 +1957,38 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
if not torrents:
logger.info("没有已完成下载但未整理的任务")
if progress_callback:
progress_callback(value=100, text="没有已完成下载但未整理的任务")
return False
logger.info(f"获取到 {len(torrents)} 个已完成的下载任务")
if progress_callback:
progress_callback(
value=0,
text=f"获取到 {len(torrents)} 个已完成下载任务",
data={"total": len(torrents), "finished": 0},
)
try:
for torrent in torrents:
total_num = len(torrents)
for index, torrent in enumerate(torrents, start=1):
if global_vars.is_system_stopped:
break
if progress_callback:
torrent_name = (
getattr(torrent, "title", None)
or getattr(torrent, "name", None)
or torrent.hash
)
progress_callback(
value=(index - 1) / total_num * 100,
text=f"正在整理下载任务({index}/{total_num}{torrent_name} ...",
data={
"total": total_num,
"finished": index - 1,
"current": torrent.hash,
},
)
# 文件路径
file_path = torrent.path
@@ -2022,9 +2052,15 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
extension=file_path.suffix.lstrip("."),
),
mediainfo=mediainfo,
downloader=torrent.downloader,
download_hash=torrent.hash,
)
downloader=torrent.downloader,
download_hash=torrent.hash,
)
if progress_callback:
progress_callback(
value=index / total_num * 100,
text=f"下载任务({index}/{total_num})整理处理完成",
data={"total": total_num, "finished": index},
)
finally:
torrents.clear()

View File

@@ -1155,11 +1155,16 @@ class WorkflowChain(ChainBase):
self.process(workflow_id, from_begin=False)
@staticmethod
def process(workflow_id: int, from_begin: Optional[bool] = True) -> Tuple[bool, str]:
def process(
workflow_id: int,
from_begin: Optional[bool] = True,
progress_callback: Optional[Callable[..., None]] = None,
) -> Tuple[bool, str]:
"""
处理工作流
:param workflow_id: 工作流ID
:param from_begin: 是否从头开始默认为True
:param progress_callback: 定时服务进度更新回调
"""
workflowoper = WorkflowOper()
@@ -1173,6 +1178,23 @@ class WorkflowChain(ChainBase):
context=_serialize_workflow_context(context),
execution_state=_serialize_workflow_value(execution_state)
)
if progress_callback:
runtime = execution_state.get("runtime") or {}
finished_actions = int(runtime.get("finished_actions") or 0)
total_actions = len(workflow.actions)
progress_callback(
value=finished_actions / total_actions * 100,
text=(
f"工作流动作({finished_actions}/{total_actions}"
f"{action.name or action.type or action.id} "
f"{'执行完成' if completed else '执行中'}"
),
data={
"total": total_actions,
"finished": finished_actions,
"current": action.id,
},
)
# 重置工作流
if from_begin:
@@ -1191,6 +1213,12 @@ class WorkflowChain(ChainBase):
return False, "工作流无流程"
logger.info(f"开始执行工作流 {workflow.name},共 {len(workflow.actions)} 个动作 ...")
if progress_callback:
progress_callback(
value=0,
text=f"开始执行工作流 {workflow.name} ...",
data={"total": len(workflow.actions), "finished": 0},
)
workflowoper.start(workflow_id)
# 执行工作流
@@ -1207,6 +1235,8 @@ class WorkflowChain(ChainBase):
return False, executor.errmsg
logger.info(f"工作流 {workflow.name} 执行完成")
workflowoper.success(workflow_id)
if progress_callback:
progress_callback(value=100, text=f"工作流 {workflow.name} 执行完成")
return True, ""
@staticmethod

View File

@@ -1624,60 +1624,99 @@ class PluginManager(ConfigReloadMixin, metaclass=Singleton):
return " ".join(normalized_labels) or None
return None
async def async_get_online_plugins(self, force: bool = False) -> List[schemas.Plugin]:
async def async_get_online_plugins(
self,
force: bool = False,
progress_callback: Optional[Callable[..., None]] = None,
) -> List[schemas.Plugin]:
"""
异步获取所有在线插件信息
:param force: 是否强制刷新(忽略缓存)
:param progress_callback: 定时服务进度更新回调
"""
if not settings.PLUGIN_MARKET:
if progress_callback:
progress_callback(value=100, text="未配置插件市场,跳过刷新")
return []
# 用于存储高于 v1 版本的插件(如 v2, v3 等)
async def fetch_market(
market: str,
package_version: Optional[str],
result_version: str,
task_index: int,
) -> Tuple[int, str, List[schemas.Plugin]]:
"""
获取单个市场版本的插件列表并保留结果分组。
"""
plugins = await self.async_get_plugins_from_market(
market,
package_version,
force,
)
return task_index, result_version, plugins or []
higher_version_plugins = []
# 用于存储 v1 版本插件
base_version_plugins = []
# 使用异步并发获取线上插件
import asyncio
tasks = []
task_to_version = {}
for m in settings.PLUGIN_MARKET.split(","):
if not m:
for market in settings.PLUGIN_MARKET.split(","):
if not market:
continue
# 创建任务获取 v1 版本插件
base_task = asyncio.create_task(self.async_get_plugins_from_market(m, None, force))
tasks.append(base_task)
task_to_version[base_task] = "base_version"
# 创建任务获取高版本插件(如 v2、v3
tasks.append(
asyncio.create_task(
fetch_market(market, None, "base_version", len(tasks))
)
)
if settings.VERSION_FLAG:
higher_version_task = asyncio.create_task(
self.async_get_plugins_from_market(m, settings.VERSION_FLAG, force))
tasks.append(higher_version_task)
task_to_version[higher_version_task] = "higher_version"
tasks.append(
asyncio.create_task(
fetch_market(
market,
settings.VERSION_FLAG,
"higher_version",
len(tasks),
)
)
)
# 并发执行所有任务
if tasks:
completed_tasks = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(completed_tasks):
task = tasks[i]
version = task_to_version[task]
# 检查是否有异常
if isinstance(result, Exception):
logger.error(f"获取插件市场数据失败:{str(result)}")
continue
plugins = result
total_tasks = len(tasks)
finished_tasks = 0
task_results = {}
if progress_callback:
progress_callback(
value=0,
text=f"开始刷新插件市场,共 {total_tasks} 个请求 ...",
data={"total": total_tasks, "finished": 0},
)
for completed_task in asyncio.as_completed(tasks):
try:
task_index, version, plugins = await completed_task
task_results[task_index] = (version, plugins)
except Exception as err:
logger.error(f"获取插件市场数据失败:{str(err)}")
finished_tasks += 1
if progress_callback:
progress_callback(
value=finished_tasks / total_tasks * 100,
text=(
f"插件市场请求"
f"{finished_tasks}/{total_tasks})处理完成"
),
data={"total": total_tasks, "finished": finished_tasks},
)
for task_index in sorted(task_results):
version, plugins = task_results[task_index]
if plugins:
if version == "higher_version":
higher_version_plugins.extend(plugins) # 收集高版本插件
higher_version_plugins.extend(plugins)
else:
base_version_plugins.extend(plugins) # 收集 v1 版本插件
base_version_plugins.extend(plugins)
result = self.process_plugins_list(higher_version_plugins, base_version_plugins)
logger.info(f"获取到 {len(result)} 个线上插件")
if progress_callback:
progress_callback(value=100, text="插件市场缓存刷新完成")
return result
async def async_get_plugins_from_market(self, market: str,

View File

@@ -10,13 +10,13 @@ class ProgressHelper:
处理进度辅助类
"""
def __init__(self, key: Union[ProgressKey, str]):
def __init__(self, key: Union[ProgressKey, str]) -> None:
if isinstance(key, Enum):
key = key.value
self._key = key
self._progress = TTLCache(region="progress", maxsize=1024, ttl=24 * 60 * 60)
def __reset(self):
def __reset(self) -> None:
"""
重置进度
"""
@@ -27,7 +27,7 @@ class ProgressHelper:
"data": {}
}
def start(self):
def start(self) -> None:
"""
开始进度
"""
@@ -38,38 +38,52 @@ class ProgressHelper:
current['enable'] = True
self._progress[self._key] = current
def end(self):
def end(
self,
text: Optional[str] = "",
data: Optional[dict] = None,
value: Optional[Union[float, int]] = 100,
) -> None:
"""
结束进度
"""
current = self._progress.get(self._key)
if not current:
return
current.update(
{
"enable": False,
"value": 100,
"text": ""
}
)
if data is not None:
if not current.get('data'):
current['data'] = {}
current['data'].update(data)
current["enable"] = False
if value is not None:
current["value"] = max(min(float(value), 100), 0)
current["text"] = text or ""
self._progress[self._key] = current
def update(self, value: Union[float, int] = None, text: Optional[str] = None, data: dict = None):
def update(
self,
value: Optional[Union[float, int]] = None,
text: Optional[str] = None,
data: Optional[dict] = None,
) -> None:
"""
更新进度
"""
current = self._progress.get(self._key)
if not current or not current.get('enable'):
return
if value:
current['value'] = value
if text:
if value is not None:
current['value'] = max(min(float(value), 100), 0)
if text is not None:
current['text'] = text
if data:
if data is not None:
if not current.get('data'):
current['data'] = {}
current['data'].update(data)
self._progress[self._key] = current
def get(self) -> dict:
def get(self) -> Optional[dict]:
"""
获取当前进度
"""
return self._progress.get(self._key)

View File

@@ -35,6 +35,7 @@ from app.db.models.transferhistory import TransferHistory
from app.db.systemconfig_oper import SystemConfigOper
from app.helper.image import WallpaperHelper
from app.helper.message import MessageHelper
from app.helper.progress import ProgressHelper
from app.helper.sites import SitesHelper # noqa
from app.helper.server import MoviePilotServerHelper
from app.log import logger
@@ -46,6 +47,7 @@ from app.utils.singleton import SingletonClass
from app.utils.timer import TimerUtils
lock = threading.Lock()
SCHEDULER_PROGRESS_PREFIX = "scheduler"
class SchedulerChain(ChainBase):
@@ -55,7 +57,11 @@ class SchedulerChain(ChainBase):
# 每批处理的记录数,避免一次性删除过多数据导致性能问题
DEFAULT_BATCH_SIZE = 500
def cleanup(self, batch_size: Optional[int] = None) -> Dict[str, Any]:
def cleanup(
self,
batch_size: Optional[int] = None,
progress_callback: Optional[Callable[..., None]] = None,
) -> Dict[str, Any]:
"""
按配置保留期执行分批清理。
"""
@@ -80,9 +86,12 @@ class SchedulerChain(ChainBase):
errors = []
plans = self._build_cleanup_plans(started_at=started_at, batch_size=batch_size)
total_plans = len(plans)
if progress_callback:
progress_callback(value=0, text="开始清理数据表 ...")
with SessionFactory() as db:
for plan in plans:
for plan_index, plan in enumerate(plans):
name = plan["name"]
retention_days = plan["retention_days"]
if retention_days <= 0:
@@ -94,9 +103,19 @@ class SchedulerChain(ChainBase):
"skipped": True,
"reason": "retention_days<=0",
}
if progress_callback:
progress_callback(
value=(plan_index + 1) / total_plans * 100,
text=f"数据表 {name} 跳过清理",
)
continue
try:
if progress_callback:
progress_callback(
value=plan_index / total_plans * 100,
text=f"正在清理数据表 {name} ...",
)
table_report = self._cleanup_in_batches(
db=db,
table_name=name,
@@ -116,6 +135,12 @@ class SchedulerChain(ChainBase):
"retention_days": retention_days,
"error": str(err),
}
finally:
if progress_callback:
progress_callback(
value=(plan_index + 1) / total_plans * 100,
text=f"数据表 {name} 清理处理完成",
)
if errors:
report["errors"] = errors
@@ -284,13 +309,33 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
# 初始化
self.init()
def on_config_changed(self):
def on_config_changed(self) -> None:
"""
配置变更后重新初始化定时服务。
"""
self.init()
def get_reload_name(self):
def get_reload_name(self) -> str:
"""
获取配置重载日志中的服务名称。
"""
return "定时服务"
def init(self):
@staticmethod
def _get_progress_key(job_id: str) -> str:
"""
获取定时服务进度缓存键。
"""
return f"{SCHEDULER_PROGRESS_PREFIX}:{job_id}"
@staticmethod
def _format_time(value: Optional[datetime] = None) -> str:
"""
格式化进度事件时间。
"""
return (value or datetime.now()).strftime("%Y-%m-%d %H:%M:%S")
def init(self) -> None:
"""
初始化定时服务
"""
@@ -675,6 +720,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
"""
准备定时任务
"""
started_at = self._format_time()
with self._lock:
job = self._jobs.get(job_id)
if not job:
@@ -683,45 +729,242 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
logger.warning(f"定时任务 {job_id} - {job.get('name')} 正在运行 ...")
return None
self._jobs[job_id]["running"] = True
self._jobs[job_id]["last_started_at"] = started_at
self._jobs[job_id]["last_finished_at"] = None
self._jobs[job_id]["last_error"] = None
progress = ProgressHelper(self._get_progress_key(job_id))
progress.start()
progress.update(
value=0,
text=f"{job.get('name') or job_id} 开始执行 ...",
data={
"id": job_id,
"name": job.get("name"),
"provider": job.get("provider_name", "[系统]"),
"status": "running",
"success": None,
"started_at": started_at,
"finished_at": None,
"error": None,
},
)
return job
def __finish_job(self, job_id: str):
def __finish_job(
self,
job_id: str,
success: bool = True,
error: Optional[str] = None,
) -> None:
"""
完成定时任务
"""
finished_at = self._format_time()
job = None
with self._lock:
try:
self._jobs[job_id]["running"] = False
except KeyError:
pass
job = self._jobs.get(job_id)
if job:
job["running"] = False
job["last_finished_at"] = finished_at
job["last_error"] = error
job_name = job.get("name") if job else job_id
progress = ProgressHelper(self._get_progress_key(job_id))
current_progress = progress.get() or {}
progress_value = 100 if success else current_progress.get("value", 0)
progress.end(
text=f"{job_name} {'执行完成' if success else '执行失败'}",
data={
"id": job_id,
"name": job_name,
"provider": job.get("provider_name", "[系统]") if job else None,
"status": "success" if success else "failed",
"success": success,
"finished_at": finished_at,
"error": error,
},
value=progress_value,
)
def start(self, job_id: str, *args, **kwargs):
def get_progress(self, job_id: str) -> Optional[schemas.ScheduleProgress]:
"""
查询指定定时服务的执行进度。
"""
if not job_id:
return None
with self._lock:
job = self._jobs.get(job_id)
job_name = job.get("name") if job else job_id
provider_name = job.get("provider_name", "[系统]") if job else None
running = bool(job.get("running")) if job else False
last_started_at = job.get("last_started_at") if job else None
last_finished_at = job.get("last_finished_at") if job else None
last_error = job.get("last_error") if job else None
detail = ProgressHelper(self._get_progress_key(job_id)).get() or {}
if not job and not detail:
return None
data = detail.get("data") or {}
value = detail.get("value", 0)
try:
value = float(value)
except (TypeError, ValueError):
value = 0.0
return schemas.ScheduleProgress(
id=job_id,
name=data.get("name") or job_name,
provider=data.get("provider") or provider_name,
enable=bool(detail.get("enable", running)),
value=max(min(value, 100), 0),
text=detail.get("text"),
status=data.get("status") or ("running" if running else "waiting"),
success=data.get("success"),
started_at=data.get("started_at") or last_started_at,
finished_at=data.get("finished_at") or last_finished_at,
error=data.get("error") or last_error,
data=data,
)
def __handle_job_error(self, job_id: str, job: dict, error: Exception) -> None:
"""
记录定时任务执行异常并发送系统错误事件。
"""
logger.error(
f"定时任务 {job.get('name')} 执行失败:{str(error)} - {traceback.format_exc()}"
)
MessageHelper().put(
title=f"{job.get('name')} 执行失败", message=str(error), role="system"
)
eventmanager.send_event(
EventType.SystemError,
{
"type": "scheduler",
"scheduler_id": job_id,
"scheduler_name": job.get("name"),
"error": str(error),
"traceback": traceback.format_exc(),
},
)
def __build_progress_callback(self, job_id: str, job: dict) -> Callable[..., None]:
"""
构建传递给定时任务内部的进度更新回调。
"""
def update_progress(
value: Optional[float] = None,
text: Optional[str] = None,
data: Optional[dict] = None,
) -> None:
"""
更新当前定时任务进度。
"""
progress_data = {
"id": job_id,
"name": job.get("name"),
"provider": job.get("provider_name", "[系统]"),
"status": "running",
"success": None,
}
if data:
progress_data.update(data)
ProgressHelper(self._get_progress_key(job_id)).update(
value=value,
text=text,
data=progress_data,
)
return update_progress
@staticmethod
def __supports_progress_callback(func: Callable[..., Any]) -> bool:
"""
判断定时任务函数是否显式支持进度回调参数。
"""
try:
parameters = inspect.signature(func).parameters
except (TypeError, ValueError):
return False
return "progress_callback" in parameters
@staticmethod
def __get_result_error(result: Any) -> Optional[str]:
"""
从定时任务标准失败返回值中提取错误信息。
"""
if (
isinstance(result, tuple)
and result
and isinstance(result[0], bool)
and result[0] is False
):
return str(result[1]) if len(result) > 1 and result[1] else "定时任务返回失败"
return None
async def __run_coro_job(self, coro, job_id: str, job: dict) -> None:
"""
在当前事件循环内执行协程定时任务并在真实完成后收敛状态。
"""
success = True
error = None
try:
result = await coro
error = self.__get_result_error(result)
success = error is None
except Exception as err:
success = False
error = str(err)
self.__handle_job_error(job_id=job_id, job=job, error=err)
finally:
self.__finish_job(job_id=job_id, success=success, error=error)
def start(self, job_id: str, *args, **kwargs) -> None:
"""
启动定时服务
"""
def __start_coro(coro):
def __start_coro(coro) -> bool:
"""
启动协程
启动协程,返回是否由异步回调自行收敛任务状态。
"""
return asyncio.run_coroutine_threadsafe(coro, global_vars.loop)
try:
running_loop = asyncio.get_running_loop()
except RuntimeError:
running_loop = None
target_loop = global_vars.loop
if running_loop:
asyncio.create_task(self.__run_coro_job(coro=coro, job_id=job_id, job=job))
return True
if target_loop and target_loop.is_running():
asyncio.run_coroutine_threadsafe(
self.__run_coro_job(coro=coro, job_id=job_id, job=job),
target_loop,
)
return True
asyncio.run(coro)
return False
# 获取定时任务
job = self.__prepare_job(job_id)
if not job:
return
success = True
error = None
deferred_finish = False
# 开始运行
try:
if not kwargs:
kwargs = job.get("kwargs") or {}
kwargs = dict(job.get("kwargs") or {})
func = job.get("func")
if not func:
return
if self.__supports_progress_callback(func) and "progress_callback" not in kwargs:
kwargs["progress_callback"] = self.__build_progress_callback(
job_id=job_id, job=job
)
# 是否多进程运行
run_in_process = job.get("run_in_process", False)
if inspect.iscoroutinefunction(func):
# 协程函数
__start_coro(func(*args, **kwargs))
deferred_finish = __start_coro(func(*args, **kwargs))
elif run_in_process:
# 多进程运行
p = multiprocessing.Process(target=func, args=args, kwargs=kwargs)
@@ -729,26 +972,17 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
p.join()
else:
# 普通函数
job["func"](*args, **kwargs)
result = func(*args, **kwargs)
error = self.__get_result_error(result)
success = error is None
except Exception as e:
logger.error(
f"定时任务 {job.get('name')} 执行失败:{str(e)} - {traceback.format_exc()}"
)
MessageHelper().put(
title=f"{job.get('name')} 执行失败", message=str(e), role="system"
)
eventmanager.send_event(
EventType.SystemError,
{
"type": "scheduler",
"scheduler_id": job_id,
"scheduler_name": job.get("name"),
"error": str(e),
"traceback": traceback.format_exc(),
},
)
# 运行结束
self.__finish_job(job_id)
success = False
error = str(e)
self.__handle_job_error(job_id=job_id, job=job, error=e)
finally:
if not deferred_finish:
# 运行结束
self.__finish_job(job_id=job_id, success=success, error=error)
def init_plugin_jobs(self):
"""
@@ -961,12 +1195,17 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
if service.get("running") and name and provider_name:
if job_id not in added:
added.append(job_id)
progress = self.get_progress(job_id)
schedulers.append(
schemas.ScheduleInfo(
id=job_id,
name=name,
provider=provider_name,
status="正在运行",
progress=progress.value if progress else 0,
progress_text=progress.text if progress else None,
progress_enable=progress.enable if progress else False,
progress_detail=progress,
)
)
# 获取其他待执行任务
@@ -983,6 +1222,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
status = "正在运行" if service.get("running") else "等待"
# 下次运行时间
next_run = TimerUtils.time_difference(job.next_run_time)
progress = self.get_progress(job_id)
schedulers.append(
schemas.ScheduleInfo(
id=job_id,
@@ -990,6 +1230,10 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
provider=service.get("provider_name", "[系统]"),
status=status,
next_run=next_run,
progress=progress.value if progress else 0,
progress_text=progress.text if progress else None,
progress_enable=progress.enable if progress else False,
progress_detail=progress,
)
)
return schedulers

View File

@@ -1,6 +1,6 @@
from typing import Optional
from pydantic import BaseModel
from pydantic import BaseModel, Field
class Statistic(BaseModel):
@@ -65,6 +65,50 @@ class DownloaderInfo(BaseModel):
free_space: Optional[float] = 0.0
class DashboardMemoryInfo(BaseModel):
"""仪表板系统内存统计。"""
# 总内存字节数
total: int = 0
# 已使用内存字节数,不包含缓存
used: int = 0
# 缓存与缓冲区占用字节数
cached: int = 0
# 可用内存字节数
available: int = 0
# 已使用内存占总内存百分比,不包含缓存
usage: float = 0.0
class ScheduleProgress(BaseModel):
"""后台服务执行进度信息。"""
# ID
id: Optional[str] = None
# 名称
name: Optional[str] = None
# 提供者
provider: Optional[str] = None
# 是否正在执行
enable: Optional[bool] = False
# 当前完成百分比
value: Optional[float] = 0.0
# 当前进度文本
text: Optional[str] = None
# 执行状态 waiting/running/success/failed
status: Optional[str] = None
# 最近一次执行是否成功
success: Optional[bool] = None
# 最近一次开始时间
started_at: Optional[str] = None
# 最近一次结束时间
finished_at: Optional[str] = None
# 最近一次错误信息
error: Optional[str] = None
# 扩展数据
data: Optional[dict] = Field(default_factory=dict)
class ScheduleInfo(BaseModel):
"""仪表板后台服务信息。"""
@@ -78,6 +122,14 @@ class ScheduleInfo(BaseModel):
status: Optional[str] = None
# 下次执行时间
next_run: Optional[str] = None
# 当前完成百分比
progress: Optional[float] = 0.0
# 进度文本
progress_text: Optional[str] = None
# 是否正在更新进度
progress_enable: Optional[bool] = False
# 进度详情
progress_detail: Optional[ScheduleProgress] = None
class DashboardSystemInfo(BaseModel):

View File

@@ -687,15 +687,27 @@ class SystemUtils:
return psutil.cpu_percent()
@staticmethod
def memory_usage() -> List[int]:
def memory_usage() -> schemas.DashboardMemoryInfo:
"""
获取当前程序的内存使用量和使用率
获取系统已使用、缓存、可用和总内存信息。
"""
current_process = psutil.Process()
process_memory = current_process.memory_info().rss
system_memory = psutil.virtual_memory().total
process_memory_percent = (process_memory / system_memory) * 100
return [process_memory, int(process_memory_percent)]
memory = psutil.virtual_memory()
total = max(0, int(memory.total))
used = max(0, int(memory.used))
cached = max(
0,
int(getattr(memory, "cached", 0) or 0)
+ int(getattr(memory, "buffers", 0) or 0),
)
available = max(0, total - used - cached)
usage = used / total * 100 if total else 0.0
return schemas.DashboardMemoryInfo(
total=total,
used=used,
cached=cached,
available=available,
usage=usage,
)
@staticmethod
def network_usage() -> List[int]:

View File

@@ -114,6 +114,9 @@ MoviePilot 也提供普通 REST API 给前端和自动化客户端使用。所
| :--- | :--- | :--- |
| GET | `/api/v1/system/ping` | 登录用户服务存活检测,用于前端重启后轮询恢复状态 |
| GET | `/api/v1/dashboard/system` | 查询仪表板系统摘要包括主机名称、操作系统、MoviePilot 运行时间和后端版本 |
| GET | `/api/v1/dashboard/schedule` | 查询所有后台定时服务,包含当前完成百分比、进度文本和执行状态 |
| GET | `/api/v1/dashboard/schedule/{job_id}/progress` | 查询指定后台定时服务的实时进度详情 |
| GET | `/api/v1/dashboard/schedule2/{job_id}/progress` | 使用 API_TOKEN 查询指定后台定时服务的实时进度详情 |
| GET | `/api/v1/system/setting/public/{key}` | 登录用户读取白名单内非敏感系统设置仅支持目录、存储、站点范围、默认订阅规则、Follow 订阅者和插件市场地址等前端必需配置 |
| POST | `/api/v1/system/setting/PLUGIN_MARKET/sync-wiki` | 管理员从 MoviePilot Wiki 的插件文档同步公开插件仓库清单,和本地 `PLUGIN_MARKET` 合并去重后写入配置 |

View File

@@ -282,7 +282,7 @@ All endpoints are under the base URL `{MP_HOST}`. Path parameters are shown as `
| POST | `/api/v1/transfer/manual` | Manual transfer. Params: `background`. Body: ManualTransferItem JSON |
| GET | `/api/v1/transfer/now` | Run immediate transfer |
### Dashboard (17 endpoints)
### Dashboard (19 endpoints)
| Method | Path | Description |
|--------|------|-------------|
@@ -296,6 +296,8 @@ All endpoints are under the base URL `{MP_HOST}`. Path parameters are shown as `
| GET | `/api/v1/dashboard/downloader2` | Downloader info (API_TOKEN) |
| GET | `/api/v1/dashboard/schedule` | Scheduled services |
| GET | `/api/v1/dashboard/schedule2` | Scheduled services (API_TOKEN) |
| GET | `/api/v1/dashboard/schedule/{job_id}/progress` | Scheduled service real-time progress |
| GET | `/api/v1/dashboard/schedule2/{job_id}/progress` | Scheduled service real-time progress (API_TOKEN) |
| GET | `/api/v1/dashboard/transfer` | Transfer statistics. Params: `days` |
| GET | `/api/v1/dashboard/cpu` | CPU usage |
| GET | `/api/v1/dashboard/cpu2` | CPU usage (API_TOKEN) |

View File

@@ -30,6 +30,28 @@ def test_dashboard_system_info_returns_runtime_environment(monkeypatch):
assert result.version == "v2.13.16"
def test_memory_usage_returns_used_cached_and_available(monkeypatch):
"""内存统计应拆分已使用、缓存和可用容量,并返回已使用百分比。"""
class FakeMemory:
"""提供固定系统内存值的桩。"""
total = 16 * 1024**3
used = 5 * 1024**3
cached = 3 * 1024**3
buffers = 512 * 1024**2
monkeypatch.setattr(system_module.psutil, "virtual_memory", FakeMemory)
result = SystemUtils.memory_usage()
assert result.total == 16 * 1024**3
assert result.used == 5 * 1024**3
assert result.cached == int(3.5 * 1024**3)
assert result.available == int(7.5 * 1024**3)
assert result.usage == 31.25
def test_monthly_media_statistics_counts_successful_unique_media():
"""本月新增统计应只计算成功记录,并按媒体去重。"""
month = system_module.time.strftime("%Y-%m-", system_module.time.localtime())

View File

@@ -0,0 +1,154 @@
import asyncio
import threading
from uuid import uuid4
from app.core.config import global_vars
from app.scheduler import Scheduler
def _build_scheduler(job_id, func):
"""构造不启动 APScheduler 的定时服务测试对象。"""
scheduler = object.__new__(Scheduler)
scheduler._lock = threading.RLock()
scheduler._jobs = {
job_id: {
"name": "测试定时服务",
"provider_name": "测试",
"func": func,
"running": False,
}
}
return scheduler
def test_scheduler_records_live_and_completed_progress():
"""定时服务应在执行中更新进度,并在成功后收敛为 100%"""
job_id = f"test-success-{uuid4()}"
snapshots = []
def task(progress_callback):
"""上报一次中间进度。"""
progress_callback(value=42, text="正在处理", data={"finished": 2})
snapshots.append(scheduler.get_progress(job_id))
scheduler = _build_scheduler(job_id, task)
scheduler.start(job_id)
assert snapshots[0].enable is True
assert snapshots[0].value == 42
assert snapshots[0].status == "running"
assert snapshots[0].data["finished"] == 2
progress = scheduler.get_progress(job_id)
assert progress.enable is False
assert progress.value == 100
assert progress.status == "success"
assert progress.success is True
assert progress.started_at
assert progress.finished_at
def test_scheduler_failure_preserves_last_progress(monkeypatch):
"""定时服务异常时应保留失败位置,而不是伪装成 100% 完成。"""
job_id = f"test-failure-{uuid4()}"
def task(progress_callback):
"""上报进度后抛出异常。"""
progress_callback(value=37, text="处理失败")
raise RuntimeError("预期失败")
scheduler = _build_scheduler(job_id, task)
monkeypatch.setattr(
scheduler,
"_Scheduler__handle_job_error",
lambda **kwargs: None,
)
scheduler.start(job_id)
progress = scheduler.get_progress(job_id)
assert progress.enable is False
assert progress.value == 37
assert progress.status == "failed"
assert progress.success is False
assert progress.error == "预期失败"
def test_scheduler_treats_standard_failure_result_as_failed():
"""返回 `(False, message)` 的定时服务应记录为业务失败。"""
job_id = f"test-result-failure-{uuid4()}"
def task(progress_callback):
"""返回标准失败结果。"""
progress_callback(value=55, text="业务校验失败")
return False, "业务失败"
scheduler = _build_scheduler(job_id, task)
scheduler.start(job_id)
progress = scheduler.get_progress(job_id)
assert progress.value == 55
assert progress.status == "failed"
assert progress.error == "业务失败"
def test_scheduler_runs_async_job_without_running_global_loop(monkeypatch):
"""全局事件循环未运行时,异步定时服务仍应正常执行并收敛进度。"""
job_id = f"test-async-{uuid4()}"
async def task(progress_callback):
"""上报异步任务进度。"""
progress_callback(value=65, text="异步处理中")
scheduler = _build_scheduler(job_id, task)
target_loop = asyncio.new_event_loop()
monkeypatch.setattr(global_vars, "CURRENT_EVENT_LOOP", target_loop)
try:
scheduler.start(job_id)
finally:
target_loop.close()
progress = scheduler.get_progress(job_id)
assert progress.enable is False
assert progress.value == 100
assert progress.status == "success"
def test_scheduler_runs_async_job_from_current_event_loop(monkeypatch):
"""在异步入口中手动触发定时服务时,不应嵌套调用 `asyncio.run`。"""
job_id = f"test-current-loop-{uuid4()}"
async def task(progress_callback):
"""在当前事件循环中上报进度。"""
progress_callback(value=75, text="当前循环处理中")
async def run_task():
"""从已运行的事件循环启动定时服务。"""
scheduler.start(job_id)
await asyncio.sleep(0)
scheduler = _build_scheduler(job_id, task)
target_loop = asyncio.new_event_loop()
monkeypatch.setattr(global_vars, "CURRENT_EVENT_LOOP", target_loop)
try:
asyncio.run(run_task())
finally:
target_loop.close()
progress = scheduler.get_progress(job_id)
assert progress.enable is False
assert progress.value == 100
assert progress.status == "success"
def test_scheduler_returns_none_for_unknown_job():
"""未注册且无历史进度的定时服务应返回空。"""
job_id = f"test-unknown-{uuid4()}"
scheduler = object.__new__(Scheduler)
scheduler._lock = threading.RLock()
scheduler._jobs = {}
assert scheduler.get_progress(job_id) is None