diff --git a/app/actions/fetch_medias.py b/app/actions/fetch_medias.py index 7cb74367..f60307d4 100644 --- a/app/actions/fetch_medias.py +++ b/app/actions/fetch_medias.py @@ -1,4 +1,4 @@ -from typing import List +from typing import List, Optional from pydantic import Field @@ -17,7 +17,9 @@ class FetchMediasParams(ActionParams): """ 获取媒体数据参数 """ - sources: List[str] = Field([], description="媒体数据来源") + source_type: Optional[str] = Field("ranking", description="来源") + sources: Optional[List[str]] = Field([], description="榜单") + api_path: Optional[str] = Field(None, description="API路径") class FetchMediasAction(BaseAction): @@ -129,27 +131,37 @@ class FetchMediasAction(BaseAction): 获取媒体数据,填充到medias """ params = FetchMediasParams(**params) - for name in params.sources: - if global_vars.is_workflow_stopped(workflow_id): - break - source = self.__get_source(name) - if not source: - continue - logger.info(f"获取媒体数据 {source} ...") - results = [] - if source.get("func"): - results = source['func']() - else: - # 调用内部API获取数据 - api_url = f"http://127.0.0.1:{settings.PORT}/api/v1/{source['api_path']}?token={settings.API_TOKEN}" - res = RequestUtils(timeout=15).post_res(api_url) - if res: - results = res.json() - if results: - logger.info(f"{name} 获取到 {len(results)} 条数据") - self._medias.extend([MediaInfo(**r) for r in results]) - else: - logger.error(f"{name} 获取数据失败") + if params.source_type == "ranking": + for name in params.sources: + if global_vars.is_workflow_stopped(workflow_id): + break + source = self.__get_source(name) + if not source: + continue + logger.info(f"获取媒体数据 {source} ...") + results = [] + if source.get("func"): + results = source['func']() + else: + # 调用内部API获取数据 + api_url = f"http://127.0.0.1:{settings.PORT}/api/v1/{source['api_path']}?token={settings.API_TOKEN}" + res = RequestUtils(timeout=15).post_res(api_url) + if res: + results = res.json() + if results: + logger.info(f"{name} 获取到 {len(results)} 条数据") + self._medias.extend([MediaInfo(**r) for r in results]) + else: + logger.error(f"{name} 获取数据失败") + else: + # 调用内部API获取数据 + api_url = f"http://127.0.0.1:{settings.PORT}{params.api_path}?token={settings.API_TOKEN}" + res = RequestUtils(timeout=15).post_res(api_url) + if res: + results = res.json() + if results: + logger.info(f"{params.api_path} 获取到 {len(results)} 条数据") + self._medias.extend([MediaInfo(**r) for r in results]) if self._medias: context.medias.extend(self._medias)