diff --git a/app/api/endpoints/subscribe.py b/app/api/endpoints/subscribe.py index 2ea8a752..72d16fe3 100644 --- a/app/api/endpoints/subscribe.py +++ b/app/api/endpoints/subscribe.py @@ -17,7 +17,7 @@ from app.db.models.subscribe import Subscribe from app.db.models.subscribehistory import SubscribeHistory from app.db.models.user import User from app.db.systemconfig_oper import SystemConfigOper -from app.db.user_oper import get_current_active_user +from app.db.user_oper import get_current_active_user_async from app.helper.subscribe import SubscribeHelper from app.scheduler import Scheduler from app.schemas.types import MediaType, EventType, SystemConfigKey @@ -53,10 +53,10 @@ async def list_subscribes(_: Annotated[str, Depends(verify_apitoken)]) -> Any: @router.post("/", summary="新增订阅", response_model=schemas.Response) -def create_subscribe( +async def create_subscribe( *, subscribe_in: schemas.Subscribe, - current_user: User = Depends(get_current_active_user), + current_user: User = Depends(get_current_active_user_async), ) -> schemas.Response: """ 新增订阅 @@ -78,10 +78,10 @@ def create_subscribe( title = None # 订阅用户 subscribe_in.username = current_user.name - sid, message = SubscribeChain().add(mtype=mtype, - title=title, - exist_ok=True, - **subscribe_in.dict()) + sid, message = await SubscribeChain().async_add(mtype=mtype, + title=title, + exist_ok=True, + **subscribe_in.dict()) return schemas.Response( success=bool(sid), message=message, data={"id": sid} ) @@ -495,9 +495,9 @@ async def subscribe_share_delete( @router.post("/fork", summary="复用订阅", response_model=schemas.Response) -def subscribe_fork( +async def subscribe_fork( sub: schemas.SubscribeShare, - current_user: User = Depends(get_current_active_user)) -> Any: + current_user: User = Depends(get_current_active_user_async)) -> Any: """ 复用订阅 """ @@ -506,10 +506,10 @@ def subscribe_fork( for key in list(sub_dict.keys()): if not hasattr(schemas.Subscribe(), key): sub_dict.pop(key) - result = create_subscribe(subscribe_in=schemas.Subscribe(**sub_dict), - current_user=current_user) + result = await create_subscribe(subscribe_in=schemas.Subscribe(**sub_dict), + current_user=current_user) if result.success: - SubscribeHelper().sub_fork(share_id=sub.id) + await SubscribeHelper().async_sub_fork(share_id=sub.id) return result diff --git a/app/api/servarr.py b/app/api/servarr.py index d4efc9d6..387bd323 100644 --- a/app/api/servarr.py +++ b/app/api/servarr.py @@ -6,8 +6,8 @@ from sqlalchemy.orm import Session from app import schemas from app.chain.media import MediaChain -from app.chain.tvdb import TvdbChain from app.chain.subscribe import SubscribeChain +from app.chain.tvdb import TvdbChain from app.core.metainfo import MetaInfo from app.core.security import verify_apikey from app.db import get_db, get_async_db @@ -307,7 +307,8 @@ def arr_movie_lookup(term: str, _: Annotated[str, Depends(verify_apikey)], db: S @arr_router.get("/movie/{mid}", summary="电影订阅详情", response_model=schemas.RadarrMovie) -async def arr_movie(mid: int, _: Annotated[str, Depends(verify_apikey)], db: AsyncSession = Depends(get_async_db)) -> Any: +async def arr_movie(mid: int, _: Annotated[str, Depends(verify_apikey)], + db: AsyncSession = Depends(get_async_db)) -> Any: """ 查询Rardar电影订阅 """ @@ -333,25 +334,25 @@ async def arr_movie(mid: int, _: Annotated[str, Depends(verify_apikey)], db: Asy @arr_router.post("/movie", summary="新增电影订阅") -def arr_add_movie(_: Annotated[str, Depends(verify_apikey)], - movie: RadarrMovie, - db: Session = Depends(get_db) - ) -> Any: +async def arr_add_movie(_: Annotated[str, Depends(verify_apikey)], + movie: RadarrMovie, + db: AsyncSession = Depends(get_async_db) + ) -> Any: """ 新增Rardar电影订阅 """ # 检查订阅是否已存在 - subscribe = Subscribe.get_by_tmdbid(db, movie.tmdbId) + subscribe = await Subscribe.async_get_by_tmdbid(db, movie.tmdbId) if subscribe: return { "id": subscribe.id } # 添加订阅 - sid, message = SubscribeChain().add(title=movie.title, - year=movie.year, - mtype=MediaType.MOVIE, - tmdbid=movie.tmdbId, - username="Seerr") + sid, message = await SubscribeChain().async_add(title=movie.title, + year=movie.year, + mtype=MediaType.MOVIE, + tmdbid=movie.tmdbId, + username="Seerr") if sid: return { "id": sid @@ -364,7 +365,8 @@ def arr_add_movie(_: Annotated[str, Depends(verify_apikey)], @arr_router.delete("/movie/{mid}", summary="删除电影订阅", response_model=schemas.Response) -async def arr_remove_movie(mid: int, _: Annotated[str, Depends(verify_apikey)], db: AsyncSession = Depends(get_async_db)) -> Any: +async def arr_remove_movie(mid: int, _: Annotated[str, Depends(verify_apikey)], + db: AsyncSession = Depends(get_async_db)) -> Any: """ 删除Rardar电影订阅 """ @@ -606,7 +608,8 @@ def arr_series_lookup(term: str, _: Annotated[str, Depends(verify_apikey)], db: @arr_router.get("/series/{tid}", summary="剧集详情") -async def arr_serie(tid: int, _: Annotated[str, Depends(verify_apikey)], db: AsyncSession = Depends(get_async_db)) -> Any: +async def arr_serie(tid: int, _: Annotated[str, Depends(verify_apikey)], + db: AsyncSession = Depends(get_async_db)) -> Any: """ 查询Sonarr剧集 """ @@ -640,17 +643,17 @@ async def arr_serie(tid: int, _: Annotated[str, Depends(verify_apikey)], db: Asy @arr_router.post("/series", summary="新增剧集订阅") -def arr_add_series(tv: schemas.SonarrSeries, - _: Annotated[str, Depends(verify_apikey)], - db: Session = Depends(get_db)) -> Any: +async def arr_add_series(tv: schemas.SonarrSeries, + _: Annotated[str, Depends(verify_apikey)], + db: AsyncSession = Depends(get_async_db)) -> Any: """ 新增Sonarr剧集订阅 """ # 检查订阅是否存在 left_seasons = [] for season in tv.seasons: - subscribe = Subscribe.get_by_tmdbid(db, tmdbid=tv.tmdbId, - season=season.get("seasonNumber")) + subscribe = await Subscribe.async_get_by_tmdbid(db, tmdbid=tv.tmdbId, + season=season.get("seasonNumber")) if subscribe: continue left_seasons.append(season) @@ -665,12 +668,12 @@ def arr_add_series(tv: schemas.SonarrSeries, for season in left_seasons: if not season.get("monitored"): continue - sid, message = SubscribeChain().add(title=tv.title, - year=tv.year, - season=season.get("seasonNumber"), - tmdbid=tv.tmdbId, - mtype=MediaType.TV, - username="Seerr") + sid, message = await SubscribeChain().async_add(title=tv.title, + year=tv.year, + season=season.get("seasonNumber"), + tmdbid=tv.tmdbId, + mtype=MediaType.TV, + username="Seerr") if sid: return { @@ -684,15 +687,16 @@ def arr_add_series(tv: schemas.SonarrSeries, @arr_router.put("/series", summary="更新剧集订阅") -def arr_update_series(tv: schemas.SonarrSeries) -> Any: +async def arr_update_series(tv: schemas.SonarrSeries, _: Annotated[str, Depends(verify_apikey)]) -> Any: """ 更新Sonarr剧集订阅 """ - return arr_add_series(tv) + return await arr_add_series(tv) @arr_router.delete("/series/{tid}", summary="删除剧集订阅") -async def arr_remove_series(tid: int, _: Annotated[str, Depends(verify_apikey)], db: AsyncSession = Depends(get_async_db)) -> Any: +async def arr_remove_series(tid: int, _: Annotated[str, Depends(verify_apikey)], + db: AsyncSession = Depends(get_async_db)) -> Any: """ 删除Sonarr剧集订阅 """ diff --git a/app/chain/__init__.py b/app/chain/__init__.py index 0d6e73df..d508b498 100644 --- a/app/chain/__init__.py +++ b/app/chain/__init__.py @@ -922,6 +922,85 @@ class ChainBase(metaclass=ABCMeta): self.messagequeue.send_message("post_message", message=message, immediately=True if message.userid else False) + async def async_post_message(self, + message: Optional[Notification] = None, + meta: Optional[MetaBase] = None, + mediainfo: Optional[MediaInfo] = None, + torrentinfo: Optional[TorrentInfo] = None, + transferinfo: Optional[TransferInfo] = None, + **kwargs) -> None: + """ + 异步发送消息 + :param message: Notification实例 + :param meta: 元数据 + :param mediainfo: 媒体信息 + :param torrentinfo: 种子信息 + :param transferinfo: 文件整理信息 + :param kwargs: 其他参数(覆盖业务对象属性值) + :return: 成功或失败 + """ + # 渲染消息 + message = MessageTemplateHelper.render(message=message, meta=meta, mediainfo=mediainfo, + torrentinfo=torrentinfo, transferinfo=transferinfo, **kwargs) + # 保存消息 + self.messagehelper.put(message, role="user", title=message.title) + await self.messageoper.async_add(**message.dict()) + # 发送消息按设置隔离 + if not message.userid and message.mtype: + # 消息隔离设置 + notify_action = ServiceConfigHelper.get_notification_switch(message.mtype) + if notify_action: + # 'admin' 'user,admin' 'user' 'all' + actions = notify_action.split(",") + # 是否已发送管理员标志 + admin_sended = False + send_orignal = False + useroper = UserOper() + for action in actions: + send_message = copy.deepcopy(message) + if action == "admin" and not admin_sended: + # 仅发送管理员 + logger.info(f"{send_message.mtype} 的消息已设置发送给管理员") + # 读取管理员消息IDS + send_message.targets = useroper.get_settings(settings.SUPERUSER) + admin_sended = True + elif action == "user" and send_message.username: + # 发送对应用户 + logger.info(f"{send_message.mtype} 的消息已设置发送给用户 {send_message.username}") + # 读取用户消息IDS + send_message.targets = useroper.get_settings(send_message.username) + if send_message.targets is None: + # 没有找到用户 + if not admin_sended: + # 回滚发送管理员 + logger.info(f"用户 {send_message.username} 不存在,消息将发送给管理员") + # 读取管理员消息IDS + send_message.targets = useroper.get_settings(settings.SUPERUSER) + admin_sended = True + else: + # 管理员发过了,此消息不发了 + logger.info(f"用户 {send_message.username} 不存在,消息无法发送到对应用户") + continue + elif send_message.username == settings.SUPERUSER: + # 管理员同名已发送 + admin_sended = True + else: + # 按原消息发送全体 + if not admin_sended: + send_orignal = True + break + # 按设定发送 + await self.eventmanager.async_send_event(etype=EventType.NoticeMessage, + data={**send_message.dict(), "type": send_message.mtype}) + await self.messagequeue.async_send_message("post_message", message=send_message) + if not send_orignal: + return + # 发送消息事件 + await self.eventmanager.async_send_event(etype=EventType.NoticeMessage, data={**message.dict(), "type": message.mtype}) + # 按原消息发送 + await self.messagequeue.async_send_message("post_message", message=message, + immediately=True if message.userid else False) + def post_medias_message(self, message: Notification, medias: List[MediaInfo]) -> None: """ 发送媒体信息选择列表 diff --git a/app/chain/subscribe.py b/app/chain/subscribe.py index 53ce22c8..7cceffb2 100644 --- a/app/chain/subscribe.py +++ b/app/chain/subscribe.py @@ -41,6 +41,82 @@ class SubscribeChain(ChainBase): # 避免莫名原因导致长时间持有锁 _LOCK_TIMOUT = 3600 * 2 + @staticmethod + def __get_event_meida(_mediaid: str, _meta: MetaBase) -> Optional[MediaInfo]: + """ + 广播事件解析媒体信息 + """ + event_data = MediaRecognizeConvertEventData( + mediaid=_mediaid, + convert_type=settings.RECOGNIZE_SOURCE + ) + event = eventmanager.send_event(ChainEventType.MediaRecognizeConvert, event_data) + # 使用事件返回的上下文数据 + if event and event.event_data: + event_data: MediaRecognizeConvertEventData = event.event_data + if event_data.media_dict: + mediachain = MediaChain() + new_id = event_data.media_dict.get("id") + if event_data.convert_type == "themoviedb": + return mediachain.recognize_media(meta=_meta, tmdbid=new_id) + elif event_data.convert_type == "douban": + return mediachain.recognize_media(meta=_meta, doubanid=new_id) + return None + + @staticmethod + async def __async_get_event_meida(_mediaid: str, _meta: MetaBase) -> Optional[MediaInfo]: + """ + 广播事件解析媒体信息 + """ + event_data = MediaRecognizeConvertEventData( + mediaid=_mediaid, + convert_type=settings.RECOGNIZE_SOURCE + ) + event = await eventmanager.async_send_event(ChainEventType.MediaRecognizeConvert, event_data) + # 使用事件返回的上下文数据 + if event and event.event_data: + event_data: MediaRecognizeConvertEventData = event.event_data + if event_data.media_dict: + mediachain = MediaChain() + new_id = event_data.media_dict.get("id") + if event_data.convert_type == "themoviedb": + return await mediachain.async_recognize_media(meta=_meta, tmdbid=new_id) + elif event_data.convert_type == "douban": + return await mediachain.async_recognize_media(meta=_meta, doubanid=new_id) + return None + + def __get_default_kwargs(self, mtype: MediaType, **kwargs) -> dict: + """ + 获取订阅默认配置 + :param mtype: 媒体类型 + :param key: 配置键 + :return: 配置值 + """ + return { + 'quality': self.__get_default_subscribe_config(mtype, "quality") if not kwargs.get( + "quality") else kwargs.get("quality"), + 'resolution': self.__get_default_subscribe_config(mtype, "resolution") if not kwargs.get( + "resolution") else kwargs.get("resolution"), + 'effect': self.__get_default_subscribe_config(mtype, "effect") if not kwargs.get( + "effect") else kwargs.get("effect"), + 'include': self.__get_default_subscribe_config(mtype, "include") if not kwargs.get( + "include") else kwargs.get("include"), + 'exclude': self.__get_default_subscribe_config(mtype, "exclude") if not kwargs.get( + "exclude") else kwargs.get("exclude"), + 'best_version': self.__get_default_subscribe_config(mtype, "best_version") if not kwargs.get( + "best_version") else kwargs.get("best_version"), + 'search_imdbid': self.__get_default_subscribe_config(mtype, "search_imdbid") if not kwargs.get( + "search_imdbid") else kwargs.get("search_imdbid"), + 'sites': self.__get_default_subscribe_config(mtype, "sites") or None if not kwargs.get( + "sites") else kwargs.get("sites"), + 'downloader': self.__get_default_subscribe_config(mtype, "downloader") if not kwargs.get( + "downloader") else kwargs.get("downloader"), + 'save_path': self.__get_default_subscribe_config(mtype, "save_path") if not kwargs.get( + "save_path") else kwargs.get("save_path"), + 'filter_groups': self.__get_default_subscribe_config(mtype, "filter_groups") if not kwargs.get( + "filter_groups") else kwargs.get("filter_groups") + } + def add(self, title: str, year: str, mtype: MediaType = None, tmdbid: Optional[int] = None, @@ -60,27 +136,6 @@ class SubscribeChain(ChainBase): 识别媒体信息并添加订阅 """ - def __get_event_meida(_mediaid: str, _meta: MetaBase) -> Optional[MediaInfo]: - """ - 广播事件解析媒体信息 - """ - event_data = MediaRecognizeConvertEventData( - mediaid=_mediaid, - convert_type=settings.RECOGNIZE_SOURCE - ) - event = eventmanager.send_event(ChainEventType.MediaRecognizeConvert, event_data) - # 使用事件返回的上下文数据 - if event and event.event_data: - event_data: MediaRecognizeConvertEventData = event.event_data - if event_data.media_dict: - mediachain = MediaChain() - new_id = event_data.media_dict.get("id") - if event_data.convert_type == "themoviedb": - return mediachain.recognize_media(meta=_meta, tmdbid=new_id) - elif event_data.convert_type == "douban": - return mediachain.recognize_media(meta=_meta, doubanid=new_id) - return None - logger.info(f'开始添加订阅,标题:{title} ...') mediainfo = None @@ -103,7 +158,7 @@ class SubscribeChain(ChainBase): mediainfo = MediaInfo(tmdb_info=tmdbinfo) elif mediaid: # 未知前缀,广播事件解析媒体信息 - mediainfo = __get_event_meida(mediaid, metainfo) + mediainfo = self.__get_event_meida(mediaid, metainfo) else: # 使用TMDBID识别 mediainfo = self.recognize_media(meta=metainfo, mtype=mtype, tmdbid=tmdbid, @@ -114,7 +169,7 @@ class SubscribeChain(ChainBase): mediainfo = self.recognize_media(meta=metainfo, mtype=mtype, doubanid=doubanid, cache=False) elif mediaid: # 未知前缀,广播事件解析媒体信息 - mediainfo = __get_event_meida(mediaid, metainfo) + mediainfo = self.__get_event_meida(mediaid, metainfo) if mediainfo: # 豆瓣标题处理 meta = MetaInfo(mediainfo.title) @@ -176,30 +231,8 @@ class SubscribeChain(ChainBase): mediainfo.bangumi_id = bangumiid # 添加订阅 - kwargs.update({ - 'quality': self.__get_default_subscribe_config(mediainfo.type, "quality") if not kwargs.get( - "quality") else kwargs.get("quality"), - 'resolution': self.__get_default_subscribe_config(mediainfo.type, "resolution") if not kwargs.get( - "resolution") else kwargs.get("resolution"), - 'effect': self.__get_default_subscribe_config(mediainfo.type, "effect") if not kwargs.get( - "effect") else kwargs.get("effect"), - 'include': self.__get_default_subscribe_config(mediainfo.type, "include") if not kwargs.get( - "include") else kwargs.get("include"), - 'exclude': self.__get_default_subscribe_config(mediainfo.type, "exclude") if not kwargs.get( - "exclude") else kwargs.get("exclude"), - 'best_version': self.__get_default_subscribe_config(mediainfo.type, "best_version") if not kwargs.get( - "best_version") else kwargs.get("best_version"), - 'search_imdbid': self.__get_default_subscribe_config(mediainfo.type, "search_imdbid") if not kwargs.get( - "search_imdbid") else kwargs.get("search_imdbid"), - 'sites': self.__get_default_subscribe_config(mediainfo.type, "sites") or None if not kwargs.get( - "sites") else kwargs.get("sites"), - 'downloader': self.__get_default_subscribe_config(mediainfo.type, "downloader") if not kwargs.get( - "downloader") else kwargs.get("downloader"), - 'save_path': self.__get_default_subscribe_config(mediainfo.type, "save_path") if not kwargs.get( - "save_path") else kwargs.get("save_path"), - 'filter_groups': self.__get_default_subscribe_config(mediainfo.type, "filter_groups") if not kwargs.get( - "filter_groups") else kwargs.get("filter_groups") - }) + kwargs.update(self.__get_default_kwargs(mediainfo.type, **kwargs)) + # 操作数据库 sid, err_msg = SubscribeOper().add(mediainfo=mediainfo, season=season, username=username, **kwargs) if not sid: @@ -261,6 +294,183 @@ class SubscribeChain(ChainBase): # 返回结果 return sid, "" + async def async_add(self, title: str, year: str, + mtype: MediaType = None, + tmdbid: Optional[int] = None, + doubanid: Optional[str] = None, + bangumiid: Optional[int] = None, + mediaid: Optional[str] = None, + episode_group: Optional[str] = None, + season: Optional[int] = None, + channel: MessageChannel = None, + source: Optional[str] = None, + userid: Optional[str] = None, + username: Optional[str] = None, + message: Optional[bool] = True, + exist_ok: Optional[bool] = False, + **kwargs) -> Tuple[Optional[int], str]: + """ + 异步识别媒体信息并添加订阅 + """ + + logger.info(f'开始添加订阅,标题:{title} ...') + + mediainfo = None + metainfo = MetaInfo(title) + if year: + metainfo.year = year + if mtype: + metainfo.type = mtype + if season: + metainfo.type = MediaType.TV + metainfo.begin_season = season + # 识别媒体信息 + if settings.RECOGNIZE_SOURCE == "themoviedb": + # TMDB识别模式 + if not tmdbid: + if doubanid: + # 将豆瓣信息转换为TMDB信息 + tmdbinfo = await MediaChain().async_get_tmdbinfo_by_doubanid(doubanid=doubanid, mtype=mtype) + if tmdbinfo: + mediainfo = MediaInfo(tmdb_info=tmdbinfo) + elif mediaid: + # 未知前缀,广播事件解析媒体信息 + mediainfo = await self.__async_get_event_meida(mediaid, metainfo) + else: + # 使用TMDBID识别 + mediainfo = await self.async_recognize_media(meta=metainfo, mtype=mtype, tmdbid=tmdbid, + episode_group=episode_group, cache=False) + else: + if doubanid: + # 豆瓣识别模式,不使用缓存 + mediainfo = await self.async_recognize_media(meta=metainfo, mtype=mtype, doubanid=doubanid, cache=False) + elif mediaid: + # 未知前缀,广播事件解析媒体信息 + mediainfo = await self.__async_get_event_meida(mediaid, metainfo) + if mediainfo: + # 豆瓣标题处理 + meta = MetaInfo(mediainfo.title) + mediainfo.title = meta.name + if not season: + season = meta.begin_season + + # 使用名称识别兜底 + if not mediainfo: + mediainfo = await self.async_recognize_media(meta=metainfo, episode_group=episode_group) + + # 识别失败 + if not mediainfo: + logger.warn(f'未识别到媒体信息,标题:{title},tmdbid:{tmdbid},doubanid:{doubanid}') + return None, "未识别到媒体信息" + + # 总集数 + if mediainfo.type == MediaType.TV: + if not season: + season = 1 + # 总集数 + if not kwargs.get('total_episode'): + if not mediainfo.seasons or episode_group: + # 补充媒体信息 + mediainfo = await self.async_recognize_media(mtype=mediainfo.type, + tmdbid=mediainfo.tmdb_id, + doubanid=mediainfo.douban_id, + bangumiid=mediainfo.bangumi_id, + episode_group=episode_group, + cache=False) + if not mediainfo: + logger.error(f"媒体信息识别失败!") + return None, "媒体信息识别失败" + if not mediainfo.seasons: + logger.error(f"媒体信息中没有季集信息,标题:{title},tmdbid:{tmdbid},doubanid:{doubanid}") + return None, "媒体信息中没有季集信息" + total_episode = len(mediainfo.seasons.get(season) or []) + if not total_episode: + logger.error(f'未获取到总集数,标题:{title},tmdbid:{tmdbid}, doubanid:{doubanid}') + return None, f"未获取到第 {season} 季的总集数" + kwargs.update({ + 'total_episode': total_episode + }) + # 缺失集 + if not kwargs.get('lack_episode'): + kwargs.update({ + 'lack_episode': kwargs.get('total_episode') + }) + else: + # 避免season为0的问题 + season = None + + # 更新媒体图片 + await self.async_obtain_images(mediainfo=mediainfo) + # 合并信息 + if doubanid: + mediainfo.douban_id = doubanid + if bangumiid: + mediainfo.bangumi_id = bangumiid + + # 列新默认参数 + kwargs.update(self.__get_default_kwargs(mediainfo.type, **kwargs)) + + # 操作数据库 + sid, err_msg = await SubscribeOper().async_add(mediainfo=mediainfo, season=season, username=username, **kwargs) + if not sid: + logger.error(f'{mediainfo.title_year} {err_msg}') + if not exist_ok and message: + # 失败发回原用户 + await self.async_post_message(schemas.Notification(channel=channel, + source=source, + mtype=NotificationType.Subscribe, + title=f"{mediainfo.title_year} {metainfo.season} " + f"添加订阅失败!", + text=f"{err_msg}", + image=mediainfo.get_message_image(), + userid=userid)) + return None, err_msg + elif message: + if mediainfo.type == MediaType.TV: + link = settings.MP_DOMAIN('#/subscribe/tv?tab=mysub') + else: + link = settings.MP_DOMAIN('#/subscribe/movie?tab=mysub') + # 订阅成功按规则发送消息 + await self.async_post_message( + schemas.Notification( + channel=channel, + source=source, + mtype=NotificationType.Subscribe, + ctype=ContentType.SubscribeAdded, + image=mediainfo.get_message_image(), + link=link, + userid=userid, + username=username + ), + meta=metainfo, + mediainfo=mediainfo, + username=username + ) + # 发送事件 + await eventmanager.async_send_event(EventType.SubscribeAdded, { + "subscribe_id": sid, + "username": username, + "mediainfo": mediainfo.to_dict(), + }) + # 统计订阅 + await SubscribeHelper().async_sub_reg({ + "name": title, + "year": year, + "type": metainfo.type.value, + "tmdbid": mediainfo.tmdb_id, + "imdbid": mediainfo.imdb_id, + "tvdbid": mediainfo.tvdb_id, + "doubanid": mediainfo.douban_id, + "bangumiid": mediainfo.bangumi_id, + "season": metainfo.begin_season, + "poster": mediainfo.get_poster_image(), + "backdrop": mediainfo.get_backdrop_image(), + "vote": mediainfo.vote_average, + "description": mediainfo.overview + }) + # 返回结果 + return sid, "" + @staticmethod def exists(mediainfo: MediaInfo, meta: MetaBase = None): """ diff --git a/app/db/message_oper.py b/app/db/message_oper.py index 62a8a5d7..aff7dd25 100644 --- a/app/db/message_oper.py +++ b/app/db/message_oper.py @@ -29,7 +29,7 @@ class MessageOper(DbOper): note: Union[list, dict] = None, **kwargs): """ - 新增媒体服务器数据 + 新增消息 :param channel: 消息渠道 :param source: 来源 :param mtype: 消息类型 @@ -57,11 +57,47 @@ class MessageOper(DbOper): # 从kwargs中去掉Message中没有的字段 for k in list(kwargs.keys()): - if k not in Message.__table__.columns.keys(): # noqa + if k not in Message.__table__.columns.keys(): # noqa kwargs.pop(k) Message(**kwargs).create(self._db) + async def async_add(self, + channel: MessageChannel = None, + source: Optional[str] = None, + mtype: NotificationType = None, + title: Optional[str] = None, + text: Optional[str] = None, + image: Optional[str] = None, + link: Optional[str] = None, + userid: Optional[str] = None, + action: Optional[int] = 1, + note: Union[list, dict] = None, + **kwargs): + """ + 异步新增消息 + """ + kwargs.update({ + "channel": channel.value if channel else '', + "source": source, + "mtype": mtype.value if mtype else '', + "title": title, + "text": text, + "image": image, + "link": link, + "userid": userid, + "action": action, + "reg_time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), + "note": note or {} + }) + + # 从kwargs中去掉Message中没有的字段 + for k in list(kwargs.keys()): + if k not in Message.__table__.columns.keys(): # noqa + kwargs.pop(k) + + await Message(**kwargs).async_create(self._db) + def list_by_page(self, page: Optional[int] = 1, count: Optional[int] = 30) -> Optional[str]: """ 获取媒体服务器数据ID diff --git a/app/db/subscribe_oper.py b/app/db/subscribe_oper.py index 1d4b2a36..7f4bb3be 100644 --- a/app/db/subscribe_oper.py +++ b/app/db/subscribe_oper.py @@ -48,6 +48,42 @@ class SubscribeOper(DbOper): else: return subscribe.id, "订阅已存在" + async def async_add(self, mediainfo: MediaInfo, **kwargs) -> Tuple[int, str]: + """ + 异步新增订阅 + """ + subscribe = await Subscribe.async_exists(self._db, + tmdbid=mediainfo.tmdb_id, + doubanid=mediainfo.douban_id, + season=kwargs.get('season')) + kwargs.update({ + "name": mediainfo.title, + "year": mediainfo.year, + "type": mediainfo.type.value, + "tmdbid": mediainfo.tmdb_id, + "imdbid": mediainfo.imdb_id, + "tvdbid": mediainfo.tvdb_id, + "doubanid": mediainfo.douban_id, + "bangumiid": mediainfo.bangumi_id, + "episode_group": mediainfo.episode_group, + "poster": mediainfo.get_poster_image(), + "backdrop": mediainfo.get_backdrop_image(), + "vote": mediainfo.vote_average, + "description": mediainfo.overview, + "date": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + }) + if not subscribe: + subscribe = Subscribe(**kwargs) + await subscribe.async_create(self._db) + # 查询订阅 + subscribe = await Subscribe.async_exists(self._db, + tmdbid=mediainfo.tmdb_id, + doubanid=mediainfo.douban_id, + season=kwargs.get('season')) + return subscribe.id, "新增订阅成功" + else: + return subscribe.id, "订阅已存在" + def exists(self, tmdbid: Optional[int] = None, doubanid: Optional[str] = None, season: Optional[int] = None) -> bool: """ diff --git a/app/helper/message.py b/app/helper/message.py index f1c6bf7d..042a9bb3 100644 --- a/app/helper/message.py +++ b/app/helper/message.py @@ -657,6 +657,17 @@ class MessageQueueManager(metaclass=SingletonClass): }) logger.info(f"消息已加入队列,当前队列长度:{self.queue.qsize()}") + async def async_send_message(self, *args, **kwargs) -> None: + """ + 异步发送消息(直接加入队列) + """ + kwargs.pop("immediately", False) + self.queue.put({ + "args": args, + "kwargs": kwargs + }) + logger.info(f"消息已加入队列,当前队列长度:{self.queue.qsize()}") + def _send(self, *args, **kwargs) -> None: """ 实际发送消息(可通过回调函数自定义) diff --git a/app/helper/subscribe.py b/app/helper/subscribe.py index d8b1d06a..8d5cd238 100644 --- a/app/helper/subscribe.py +++ b/app/helper/subscribe.py @@ -173,6 +173,20 @@ class SubscribeHelper(metaclass=WeakSingleton): return True return False + async def async_sub_reg(self, sub: dict) -> bool: + """ + 异步新增订阅统计 + """ + enabled, _ = self._check_subscribe_share_enabled() + if not enabled: + return False + res = await AsyncRequestUtils(proxies=settings.PROXY, timeout=5, headers={ + "Content-Type": "application/json" + }).post_res(self._sub_reg, json=sub) + if res and res.status_code == 200: + return True + return False + def sub_done(self, sub: dict) -> bool: """ 完成订阅统计