Merge pull request #2863 from InfinityPacer/feature/setup

This commit is contained in:
jxxghp
2024-10-18 06:54:03 +08:00
committed by GitHub
10 changed files with 91 additions and 76 deletions

View File

@@ -1,14 +1,15 @@
import asyncio
import io
import json
import tempfile
import time
from collections import deque
from datetime import datetime
from pathlib import Path
from typing import Optional, Union
import tailer
import aiofiles
from PIL import Image
from fastapi import APIRouter, Depends, HTTPException, Header, Response
from fastapi import APIRouter, Depends, HTTPException, Header, Request, Response
from fastapi.responses import StreamingResponse
from app import schemas
@@ -224,19 +225,22 @@ def set_env_setting(env: dict,
@router.get("/progress/{process_type}", summary="实时进度")
def get_progress(process_type: str, _: schemas.TokenPayload = Depends(verify_resource_token)):
async def get_progress(request: Request, process_type: str, _: schemas.TokenPayload = Depends(verify_resource_token)):
"""
实时获取处理进度返回格式为SSE
"""
progress = ProgressHelper()
def event_generator():
while True:
if global_vars.is_system_stopped:
break
detail = progress.get(process_type)
yield 'data: %s\n\n' % json.dumps(detail)
time.sleep(0.2)
async def event_generator():
try:
while not global_vars.is_system_stopped:
if await request.is_disconnected():
break
detail = progress.get(process_type)
yield f"data: {json.dumps(detail)}\n\n"
await asyncio.sleep(0.2)
except asyncio.CancelledError:
return
return StreamingResponse(event_generator(), media_type="text/event-stream")
@@ -273,26 +277,29 @@ def set_setting(key: str, value: Union[list, dict, bool, int, str] = None,
@router.get("/message", summary="实时消息")
def get_message(role: str = "system", _: schemas.TokenPayload = Depends(verify_resource_token)):
async def get_message(request: Request, role: str = "system", _: schemas.TokenPayload = Depends(verify_resource_token)):
"""
实时获取系统消息返回格式为SSE
"""
message = MessageHelper()
def event_generator():
while True:
if global_vars.is_system_stopped:
break
detail = message.get(role)
yield 'data: %s\n\n' % (detail or '')
time.sleep(3)
async def event_generator():
try:
while not global_vars.is_system_stopped:
if await request.is_disconnected():
break
detail = message.get(role)
yield f"data: {detail or ''}\n\n"
await asyncio.sleep(3)
except asyncio.CancelledError:
return
return StreamingResponse(event_generator(), media_type="text/event-stream")
@router.get("/logging", summary="实时日志")
def get_logging(length: int = 50, logfile: str = "moviepilot.log",
_: schemas.TokenPayload = Depends(verify_resource_token)):
async def get_logging(request: Request, length: int = 50, logfile: str = "moviepilot.log",
_: schemas.TokenPayload = Depends(verify_resource_token)):
"""
实时获取系统日志
length = -1 时, 返回text/plain
@@ -306,27 +313,40 @@ def get_logging(length: int = 50, logfile: str = "moviepilot.log",
if not log_path.exists() or not log_path.is_file():
raise HTTPException(status_code=404, detail="Not Found")
def log_generator():
# 读取文件末尾50行不使用tailer模块
with open(log_path, 'r', encoding='utf-8') as f:
for line in f.readlines()[-max(length, 50):]:
yield 'data: %s\n\n' % line
while True:
if global_vars.is_system_stopped:
break
for t in tailer.follow(open(log_path, 'r', encoding='utf-8')):
yield 'data: %s\n\n' % (t or '')
time.sleep(1)
async def log_generator():
try:
# 使用固定大小的双向队列来限制内存使用
lines_queue = deque(maxlen=max(length, 50))
# 使用 aiofiles 异步读取文件
async with aiofiles.open(log_path, mode="r", encoding="utf-8") as f:
# 逐行读取文件,将每一行存入队列
file_content = await f.read()
for line in file_content.splitlines():
lines_queue.append(line)
for line in lines_queue:
yield f"data: {line}\n\n"
# 移动文件指针到文件末尾,继续监听新增内容
await f.seek(0, 2)
while not global_vars.is_system_stopped:
if await request.is_disconnected():
break
line = await f.readline()
if not line:
await asyncio.sleep(0.5)
continue
yield f"data: {line}\n\n"
except asyncio.CancelledError:
return
# 根据length参数返回不同的响应
if length == -1:
# 返回全部日志作为文本响应
if not log_path.exists():
return Response(content="日志文件不存在!", media_type="text/plain")
with open(log_path, 'r', encoding='utf-8') as file:
with open(log_path, "r", encoding='utf-8') as file:
text = file.read()
# 倒序输出
text = '\n'.join(text.split('\n')[::-1])
text = "\n".join(text.split("\n")[::-1])
return Response(content=text, media_type="text/plain")
else:
# 返回SSE流响应

View File

@@ -3,6 +3,7 @@ from typing import List, Union, Optional, Generator
from app import schemas
from app.chain import ChainBase
from app.core.config import global_vars
from app.db.mediaserver_oper import MediaServerOper
from app.helper.service import ServiceConfigHelper
from app.log import logger
@@ -134,6 +135,8 @@ class MediaServerChain(ChainBase):
logger.info(f"正在同步 {server_name} 媒体库 {library.name} ...")
library_count = 0
for item in self.items(server=server_name, library_id=library.id):
if global_vars.is_system_stopped:
return
if not item or not item.item_id:
continue
logger.debug(f"正在同步 {item.title} ...")

View File

@@ -20,7 +20,7 @@ from app.utils.singleton import Singleton
DEFAULT_EVENT_PRIORITY = 10 # 事件的默认优先级
MIN_EVENT_CONSUMER_THREADS = 1 # 最小事件消费者线程数
INITIAL_EVENT_QUEUE_IDLE_TIMEOUT_SECONDS = 1 # 事件队列空闲时的初始超时时间(秒)
MAX_EVENT_QUEUE_IDLE_TIMEOUT_SECONDS = 60 # 事件队列空闲时的最大超时时间(秒)
MAX_EVENT_QUEUE_IDLE_TIMEOUT_SECONDS = 5 # 事件队列空闲时的最大超时时间(秒)
class Event:

View File

@@ -20,7 +20,8 @@ from app.db.init import init_db, update_db
# uvicorn服务
Server = uvicorn.Server(Config(app, host=settings.HOST, port=settings.PORT,
reload=settings.DEV, workers=multiprocessing.cpu_count()))
reload=settings.DEV, workers=multiprocessing.cpu_count(),
timeout_graceful_shutdown=5))
def start_tray():

View File

@@ -64,7 +64,7 @@ class Telegram:
logger.error(f"Telegram消息接收服务异常{str(err)}")
# 启动线程来运行 infinity_polling
self._polling_thread = threading.Thread(target=run_polling)
self._polling_thread = threading.Thread(target=run_polling, daemon=True)
self._polling_thread.start()
logger.info("Telegram消息接收服务启动")

View File

@@ -113,7 +113,7 @@ class Monitor(metaclass=Singleton):
self.stop()
# 启动文件整理线程
self._transfer_thread = threading.Thread(target=self.__start_transfer)
self._transfer_thread = threading.Thread(target=self.__start_transfer, daemon=True)
self._transfer_thread.start()
# 读取目录配置

View File

@@ -14,18 +14,24 @@ async def lifespan(app: FastAPI):
定义应用的生命周期事件
"""
print("Starting up...")
# 启动模块
start_modules(app)
# 初始化路由
init_routers(app)
# 初始化插件
plugin_init_task = asyncio.create_task(init_plugins_async())
try:
# 在此处 yield表示应用已经启动控制权交回 FastAPI 主事件循环
yield
finally:
print("Shutting down...")
try:
# 取消插件初始化
plugin_init_task.cancel()
await plugin_init_task
except asyncio.CancelledError:
print("Plugin installation task cancelled.")
except Exception as e:
print(f"Error during plugin installation shutdown: {e}")
# 清理模块
shutdown_modules(app)

View File

@@ -1,10 +1,8 @@
import signal
import sys
from types import FrameType
from fastapi import FastAPI
from app.core.config import settings, global_vars
from app.core.config import global_vars, settings
from app.core.module import ModuleManager
from app.utils.system import SystemUtils
@@ -89,27 +87,12 @@ def check_auth():
)
def singal_handle():
"""
监听停止信号
"""
def stop_event(signum: int, _: FrameType):
"""
SIGTERM信号处理
"""
print(f"接收到停止信号:{signum},正在停止系统...")
global_vars.stop_system()
# 设置信号处理程序
signal.signal(signal.SIGTERM, stop_event)
signal.signal(signal.SIGINT, stop_event)
def shutdown_modules(_: FastAPI):
"""
服务关闭
"""
# 停止信号
global_vars.stop_system()
# 停止模块
ModuleManager().stop()
# 停止插件
@@ -159,5 +142,3 @@ def start_modules(_: FastAPI):
start_frontend()
# 检查认证状态
check_auth()
# 监听停止信号
singal_handle()

View File

@@ -45,7 +45,6 @@ psutil~=5.9.4
python-dotenv~=1.0.1
python-hosts~=1.0.7
watchdog~=3.0.0
tailer~=0.4.1
openai~=0.27.2
cacheout~=0.14.1
click~=8.1.6
@@ -61,4 +60,5 @@ Pinyin2Hanzi~=0.1.1
pywebpush~=2.0.0
py115j~=0.0.7
oss2~=2.18.6
aligo~=6.2.4
aligo~=6.2.4
aiofiles~=24.1.0

View File

@@ -4,6 +4,8 @@
#
# pip-compile requirements.in
#
aiofiles==24.1.0
# via -r requirements.in
aiohappyeyeballs==2.4.0
# via aiohttp
aiohttp==3.10.5
@@ -63,6 +65,11 @@ click==8.1.7
# uvicorn
cn2an==0.5.22
# via -r requirements.in
colorama==0.4.6
# via
# click
# qrcode
# tqdm
coloredlogs==15.0.1
# via aligo
crcmod==1.7
@@ -95,7 +102,9 @@ frozenlist==1.4.1
func-timeout==4.3.5
# via -r requirements.in
greenlet==2.0.2
# via playwright
# via
# playwright
# sqlalchemy
h11==0.14.0
# via
# httpcore
@@ -139,7 +148,9 @@ openai==0.27.10
oss2==2.18.6
# via -r requirements.in
packaging==24.1
# via docker
# via
# docker
# qbittorrent-api
parse==1.19.1
# via -r requirements.in
passlib==1.7.4
@@ -181,14 +192,6 @@ pyee==9.0.4
# via playwright
pyjwt==2.7.0
# via -r requirements.in
pyobjc-core==10.3.1
# via
# pyobjc-framework-cocoa
# pyobjc-framework-quartz
pyobjc-framework-cocoa==10.3.1
# via pyobjc-framework-quartz
pyobjc-framework-quartz==10.3.1
# via pystray
pyotp==2.9.0
# via -r requirements.in
pyparsing==3.0.9
@@ -197,6 +200,8 @@ pypng==0.20220715.0
# via qrcode
pyquery==2.0.0
# via -r requirements.in
pyreadline3==3.5.4
# via humanfriendly
pysocks==1.7.1
# via requests
pystray==0.19.5
@@ -223,7 +228,7 @@ pyvirtualdisplay==3.0
# via -r requirements.in
pywebpush==2.0.0
# via -r requirements.in
qbittorrent-api==2023.5.48
qbittorrent-api==2024.9.67
# via -r requirements.in
qrcode[pil]==7.4.2
# via
@@ -262,7 +267,6 @@ six==1.16.0
# pystray
# python-dateutil
# pywebpush
# qbittorrent-api
slack-bolt==1.18.0
# via -r requirements.in
slack-sdk==3.21.3
@@ -283,8 +287,6 @@ starlette==0.27.0
# via
# -r requirements.in
# fastapi
tailer==0.4.1
# via -r requirements.in
torrentool==1.2.0
# via -r requirements.in
tqdm==4.66.5
@@ -301,6 +303,8 @@ typing-extensions==4.12.2
# qrcode
# sqlalchemy
# transmission-rpc
tzdata==2024.2
# via tzlocal
tzlocal==5.2
# via
# apscheduler