From fc96cfe8a0efc1610e2460b7e351911c4974a813 Mon Sep 17 00:00:00 2001 From: TimoYoung Date: Tue, 27 May 2025 17:32:25 +0800 Subject: [PATCH 1/2] =?UTF-8?q?feat:tvdb=E6=A8=A1=E5=9D=97=E9=87=8D?= =?UTF-8?q?=E5=86=99=EF=BC=8C=E6=9B=B4=E6=8D=A2tvdbv4=20api,=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0=E6=90=9C=E7=B4=A2=E8=83=BD=E5=8A=9B=20sonarr=20/serie?= =?UTF-8?q?s/lookup=E6=8E=A5=E5=8F=A3=E9=87=8D=E5=86=99=EF=BC=8C=E7=9B=B4?= =?UTF-8?q?=E6=8E=A5=E7=94=A8=E6=A0=87=E9=A2=98=E5=9C=A8tvdb=E6=9F=A5?= =?UTF-8?q?=E8=AF=A2=E5=89=A7=E9=9B=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/api/servarr.py | 123 +-- app/chain/tvdb.py | 13 + app/core/config.py | 3 +- app/modules/thetvdb/__init__.py | 64 +- app/modules/thetvdb/tvdb_v4_official.py | 465 ++++++++++ app/modules/thetvdb/tvdbapi.py | 1107 ----------------------- 6 files changed, 592 insertions(+), 1183 deletions(-) create mode 100644 app/chain/tvdb.py create mode 100644 app/modules/thetvdb/tvdb_v4_official.py delete mode 100644 app/modules/thetvdb/tvdbapi.py diff --git a/app/api/servarr.py b/app/api/servarr.py index fe91e081..37120d8a 100644 --- a/app/api/servarr.py +++ b/app/api/servarr.py @@ -5,6 +5,7 @@ from sqlalchemy.orm import Session from app import schemas from app.chain.media import MediaChain +from app.chain.tvdb import TvdbChain from app.chain.subscribe import SubscribeChain from app.core.metainfo import MetaInfo from app.core.security import verify_apikey @@ -520,87 +521,87 @@ def arr_series_lookup(term: str, _: Annotated[str, Depends(verify_apikey)], db: """ # 季信息 seas: List[int] = [] - + # tvdbid 列表 + tvdbids: List[int] = [] # 获取TVDBID if not term.startswith("tvdb:"): - mediainfo = MediaChain().recognize_media(meta=MetaInfo(term), - mtype=MediaType.TV) - if not mediainfo: - return [SonarrSeries()] - - # 季信息 - if mediainfo.seasons: - seas = list(mediainfo.seasons) + title = term.replace("+", " ") + tvdbids = TvdbChain().get_tvdbid_by_name(title=title) else: tvdbid = int(term.replace("tvdb:", "")) + tvdbids.append(tvdbid) + sonarr_series_list = [] + for tvdbid in tvdbids: # 查询TVDB信息 tvdbinfo = MediaChain().tvdb_info(tvdbid=tvdbid) if not tvdbinfo: - return [SonarrSeries()] + continue - # 季信息 - sea_num = tvdbinfo.get('season') + # 季信息(只取默认季类型,排除特别季) + sea_num = len([season for season in tvdbinfo.get('seasons') if + season['type']['id'] == tvdbinfo.get('defaultSeasonType') and season['number'] > 0]) if sea_num: seas = list(range(1, int(sea_num) + 1)) # 根据TVDB查询媒体信息 - mediainfo = MediaChain().recognize_media(meta=MetaInfo(tvdbinfo.get('seriesName')), + mediainfo = MediaChain().recognize_media(meta=MetaInfo(tvdbinfo.get('name')), mtype=MediaType.TV) + if not mediainfo: + continue + # 查询是否存在 + exists = MediaChain().media_exists(mediainfo) + if exists: + hasfile = True + else: + hasfile = False - # 查询是否存在 - exists = MediaChain().media_exists(mediainfo) - if exists: - hasfile = True - else: - hasfile = False - - # 查询订阅信息 - seasons: List[dict] = [] - subscribes = Subscribe.get_by_tmdbid(db, mediainfo.tmdb_id) - if subscribes: - # 已监控 - monitored = True - # 已监控季 - sub_seas = [sub.season for sub in subscribes] - for sea in seas: - if sea in sub_seas: - seasons.append({ - "seasonNumber": sea, - "monitored": True, - }) - else: + # 查询订阅信息 + seasons: List[dict] = [] + subscribes = Subscribe.get_by_tmdbid(db, mediainfo.tmdb_id) + if subscribes: + # 已监控 + monitored = True + # 已监控季 + sub_seas = [sub.season for sub in subscribes] + for sea in seas: + if sea in sub_seas: + seasons.append({ + "seasonNumber": sea, + "monitored": True, + }) + else: + seasons.append({ + "seasonNumber": sea, + "monitored": False, + }) + subid = subscribes[-1].id + else: + subid = None + monitored = False + for sea in seas: seasons.append({ "seasonNumber": sea, "monitored": False, }) - subid = subscribes[-1].id - else: - subid = None - monitored = False - for sea in seas: - seasons.append({ - "seasonNumber": sea, - "monitored": False, - }) + sonarr_series = SonarrSeries( + id=subid, + title=mediainfo.title, + seasonCount=len(seasons), + seasons=seasons, + remotePoster=mediainfo.get_poster_image(), + year=mediainfo.year, + tmdbId=mediainfo.tmdb_id, + tvdbId=tvdbid, + imdbId=mediainfo.imdb_id, + profileId=1, + languageProfileId=1, + monitored=monitored, + hasFile=hasfile, + ) + sonarr_series_list.append(sonarr_series) - return [SonarrSeries( - id=subid, - title=mediainfo.title, - seasonCount=len(seasons), - seasons=seasons, - remotePoster=mediainfo.get_poster_image(), - year=mediainfo.year, - tmdbId=mediainfo.tmdb_id, - tvdbId=mediainfo.tvdb_id, - imdbId=mediainfo.imdb_id, - profileId=1, - languageProfileId=1, - qualityProfileId=1, - isAvailable=True, - monitored=monitored, - hasFile=hasfile - )] + return sonarr_series_list if sonarr_series_list else [SonarrSeries()] @arr_router.get("/series/{tid}", summary="剧集详情") diff --git a/app/chain/tvdb.py b/app/chain/tvdb.py new file mode 100644 index 00000000..2e1b3dae --- /dev/null +++ b/app/chain/tvdb.py @@ -0,0 +1,13 @@ +from typing import List +from app.chain import ChainBase +from app.utils.singleton import Singleton + + +class TvdbChain(ChainBase, metaclass=Singleton): + """ + Tvdb处理链,单例运行 + """ + + def get_tvdbid_by_name(self, title: str) -> List[int]: + tvdb_info_list = self.run_module("search_tvdb", title=title) + return [int(item["tvdb_id"]) for item in tvdb_info_list] diff --git a/app/core/config.py b/app/core/config.py index cd4820dc..3564965f 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -111,7 +111,8 @@ class ConfigModel(BaseModel): # TMDB API Key TMDB_API_KEY: str = "db55323b8d3e4154498498a75642b381" # TVDB API Key - TVDB_API_KEY: str = "6b481081-10aa-440c-99f2-21d17717ee02" + TVDB_V4_API_KEY: str = "ed2aa66b-7899-4677-92a7-67bc9ce3d93a" + TVDB_V4_API_PIN: str = "" # Fanart开关 FANART_ENABLE: bool = True # Fanart API Key diff --git a/app/modules/thetvdb/__init__.py b/app/modules/thetvdb/__init__.py index bd48e0b9..7b6239d0 100644 --- a/app/modules/thetvdb/__init__.py +++ b/app/modules/thetvdb/__init__.py @@ -3,19 +3,42 @@ from typing import Optional, Tuple, Union from app.core.config import settings from app.log import logger from app.modules import _ModuleBase -from app.modules.thetvdb import tvdbapi +from app.modules.thetvdb import tvdb_v4_official from app.schemas.types import ModuleType, MediaRecognizeType -from app.utils.http import RequestUtils class TheTvDbModule(_ModuleBase): - tvdb: tvdbapi.Tvdb = None + tvdb: tvdb_v4_official.TVDB = None def init_module(self) -> None: - self.tvdb = tvdbapi.Tvdb(apikey=settings.TVDB_API_KEY, - cache=False, - select_first=True, - proxies=settings.PROXY) + self._initialize_tvdb_session() + + def _initialize_tvdb_session(self) -> None: + """ + 创建或刷新 TVDB 登录会话 + """ + try: + self.tvdb = tvdb_v4_official.TVDB(apikey=settings.TVDB_V4_API_KEY, pin=settings.TVDB_V4_API_PIN) + except Exception as e: + logger.error(f"TVDB 登录失败: {str(e)}") + + def _handle_tvdb_call(self, func, *args, **kwargs): + """ + 包裹 TVDB 调用,处理 token 失效情况并尝试重新初始化 + """ + try: + return func(*args, **kwargs) + except ValueError as e: + # 检查错误信息中是否包含 token 失效相关描述 + if "failed to get" in str(e) or "Unauthorized" in str(e) or "NOT_MODIFIED" in str(e): + logger.warning("TVDB Token 可能已失效,正在尝试重新登录...") + self._initialize_tvdb_session() + return func(*args, **kwargs) + else: + raise + except Exception as e: + logger.error(f"TVDB 调用出错: {str(e)}") + raise @staticmethod def get_name() -> str: @@ -43,18 +66,17 @@ class TheTvDbModule(_ModuleBase): return 4 def stop(self): - self.tvdb.close() + pass def test(self) -> Tuple[bool, str]: """ 测试模块连接性 """ - ret = RequestUtils(proxies=settings.PROXY).get_res("https://api.thetvdb.com/series/81189") - if ret and ret.status_code == 200: + try: + self._handle_tvdb_call(self.tvdb.get_series, 81189) return True, "" - elif ret: - return False, f"无法连接 api.thetvdb.com,错误码:{ret.status_code}" - return False, "api.thetvdb.com 网络连接失败" + except Exception as e: + return False, str(e) def init_setting(self) -> Tuple[str, Union[str, bool]]: pass @@ -67,6 +89,20 @@ class TheTvDbModule(_ModuleBase): """ try: logger.info(f"开始获取TVDB信息: {tvdbid} ...") - return self.tvdb[tvdbid].data + return self._handle_tvdb_call(self.tvdb.get_series_extended, tvdbid) except Exception as err: logger.error(f"获取TVDB信息失败: {str(err)}") + return None + + def search_tvdb(self, title: str) -> list: + """ + 用标题搜索TVDB + :param title: 标题 + :return: TVDB信息 + """ + try: + logger.info(f"开始用标题搜索TVDB: {title} ...") + return self._handle_tvdb_call(self.tvdb.search, title) + except Exception as err: + logger.error(f"用标题搜索TVDB失败: {str(err)}") + return [] diff --git a/app/modules/thetvdb/tvdb_v4_official.py b/app/modules/thetvdb/tvdb_v4_official.py new file mode 100644 index 00000000..be5ab4d8 --- /dev/null +++ b/app/modules/thetvdb/tvdb_v4_official.py @@ -0,0 +1,465 @@ +"""Official python package for using the tvdb v4 api""" + +__author__ = "Weylin Wagnon" +__version__ = "1.0.12" + +import json +import string +import urllib +import urllib.request +from http import HTTPStatus +from urllib.error import HTTPError + + +class Auth: + def __init__(self, url, apikey, pin=""): + loginInfo = {"apikey": apikey} + if pin != "": + loginInfo["pin"] = pin + + loginInfoBytes = json.dumps(loginInfo, indent=2).encode("utf-8") + req = urllib.request.Request(url, data=loginInfoBytes) + req.add_header("Content-Type", "application/json") + try: + with urllib.request.urlopen(req, data=loginInfoBytes) as response: + res = json.load(response) + self.token = res["data"]["token"] + except HTTPError as e: + res = json.load(e) + raise Exception("Code:{}, {}".format(e, res["message"])) + + def get_token(self): + return self.token + + +class Request: + def __init__(self, auth_token): + self.auth_token = auth_token + self.links = None + + def make_request(self, url, if_modified_since=None): + req = urllib.request.Request(url) + req.add_header("Authorization", "Bearer {}".format(self.auth_token)) + if if_modified_since: + req.add_header("If-Modified-Since", "{}".format(if_modified_since)) + try: + with urllib.request.urlopen(req) as response: + res = json.load(response) + except HTTPError as e: + try: + if e.code == HTTPStatus.NOT_MODIFIED: + return { + "code": HTTPStatus.NOT_MODIFIED.real, + "message": "Not-Modified", + } + res = json.load(e) + except: + res = {} + data = res.get("data", None) + if data is not None and res.get("status", "failure") != "failure": + self.links = res.get("links", None) + return data + msg = res.get("message", None) + if not msg: + msg = "UNKNOWN FAILURE" + raise ValueError("failed to get " + url + "\n " + str(msg)) + + +class Url: + def __init__(self): + self.base_url = "https://api4.thetvdb.com/v4/" + + def construct( + self, url_sect, url_id=None, url_subsect=None, url_lang=None, **query + ): + url = self.base_url + url_sect + if url_id: + url += "/" + str(url_id) + if url_subsect: + url += "/" + url_subsect + if url_lang: + url += "/" + url_lang + if query: + query = {var: val for var, val in query.items() if val is not None} + if query: + url += "?" + urllib.parse.urlencode(query) + return url + + +class TVDB: + def __init__(self, apikey: str, pin=""): + self.url = Url() + login_url = self.url.construct("login") + self.auth = Auth(login_url, apikey, pin) + auth_token = self.auth.get_token() + self.request = Request(auth_token) + + def get_req_links(self) -> dict: + return self.request.links + + def get_artwork_statuses(self, meta=None, if_modified_since=None) -> list: + """Returns a list of artwork statuses""" + url = self.url.construct("artwork/statuses", meta=meta) + return self.request.make_request(url, if_modified_since) + + def get_artwork_types(self, meta=None, if_modified_since=None) -> list: + """Returns a list of artwork types""" + url = self.url.construct("artwork/types", meta=meta) + return self.request.make_request(url, if_modified_since) + + def get_artwork(self, id: int, meta=None, if_modified_since=None) -> dict: + """Returns an artwork dictionary""" + url = self.url.construct("artwork", id, meta=meta) + return self.request.make_request(url, if_modified_since) + + def get_artwork_extended(self, id: int, meta=None, if_modified_since=None) -> dict: + """Returns an artwork extended dictionary""" + url = self.url.construct("artwork", id, "extended", meta=meta) + return self.request.make_request(url, if_modified_since) + + def get_all_awards(self, meta=None, if_modified_since=None) -> list: + """Returns a list of awards""" + url = self.url.construct("awards", meta=meta) + return self.request.make_request(url, if_modified_since) + + def get_award(self, id: int, meta=None, if_modified_since=None) -> dict: + """Returns an award dictionary""" + url = self.url.construct("awards", id, meta=meta) + return self.request.make_request(url, if_modified_since) + + def get_award_extended(self, id: int, meta=None, if_modified_since=None) -> dict: + """Returns an award extended dictionary""" + url = self.url.construct("awards", id, "extended", meta=meta) + return self.request.make_request(url, if_modified_since) + + def get_all_award_categories(self, meta=None, if_modified_since=None) -> list: + """Returns a list of award categories""" + url = self.url.construct("awards/categories", meta=meta) + return self.request.make_request(url, if_modified_since) + + def get_award_category(self, id: int, meta=None, if_modified_since=None) -> dict: + """Returns an award category dictionary""" + url = self.url.construct("awards/categories", id, meta=meta) + return self.request.make_request(url, if_modified_since) + + def get_award_category_extended( + self, id: int, meta=None, if_modified_since=None + ) -> dict: + """Returns an award category extended dictionary""" + url = self.url.construct("awards/categories", id, "extended", meta=meta) + return self.request.make_request(url, if_modified_since) + + def get_content_ratings(self, meta=None, if_modified_since=None) -> list: + """Returns a list of content ratings""" + url = self.url.construct("content/ratings", meta=meta) + return self.request.make_request(url, if_modified_since) + + def get_countries(self, meta=None, if_modified_since=None) -> list: + """Returns a list of countries""" + url = self.url.construct("countries", meta=meta) + return self.request.make_request(url, if_modified_since) + + def get_all_companies(self, page=None, meta=None, if_modified_since=None) -> list: + """Returns a list of companies""" + url = self.url.construct("companies", page=page, meta=meta) + return self.request.make_request(url, if_modified_since) + + def get_company_types(self, meta=None, if_modified_since=None) -> list: + """Returns a list of company types""" + url = self.url.construct("companies/types", meta=meta) + return self.request.make_request(url, if_modified_since) + + def get_company(self, id: int, meta=None, if_modified_since=None) -> dict: + """Returns a company dictionary""" + url = self.url.construct("companies", id, meta=meta) + return self.request.make_request(url, if_modified_since) + + def get_all_series(self, page=None, meta=None, if_modified_since=None) -> list: + """Returns a list of series""" + url = self.url.construct("series", page=page, meta=meta) + return self.request.make_request(url, if_modified_since) + + def get_series(self, id: int, meta=None, if_modified_since=None) -> dict: + """Returns a series dictionary""" + url = self.url.construct("series", id, meta=meta) + return self.request.make_request(url, if_modified_since) + + def get_series_by_slug( + self, slug: string, meta=None, if_modified_since=None + ) -> dict: + """Returns a series dictionary""" + url = self.url.construct("series/slug", slug, meta=meta) + return self.request.make_request(url, if_modified_since) + + def get_series_extended( + self, id: int, meta=None, short=False, if_modified_since=None + ) -> dict: + """Returns a series extended dictionary""" + url = self.url.construct("series", id, "extended", meta=meta, short=short) + return self.request.make_request(url, if_modified_since) + + def get_series_episodes( + self, + id: int, + season_type: str = "default", + page: int = 0, + lang: str = None, + meta=None, + if_modified_since=None, + **kwargs + ) -> dict: + """Returns a series episodes dictionary""" + url = self.url.construct( + "series", id, "episodes/" + season_type, lang, page=page, meta=meta, **kwargs + ) + return self.request.make_request(url, if_modified_since) + + def get_series_translation( + self, id: int, lang: str, meta=None, if_modified_since=None + ) -> dict: + """Returns a series translation dictionary""" + url = self.url.construct("series", id, "translations", lang, meta=meta) + return self.request.make_request(url, if_modified_since) + + def get_series_artworks( + self, id: int, lang: str, type=None, if_modified_since=None + ) -> dict: + """Returns a series record with an artwork array""" + url = self.url.construct("series", id, "artworks", lang=lang, type=type) + return self.request.make_request(url, if_modified_since) + + def get_series_nextAired(self, id: int, if_modified_since=None) -> dict: + """Returns a series extended dictionary""" + url = self.url.construct("series", id, "nextAired") + return self.request.make_request(url, if_modified_since) + + def get_all_movies(self, page=None, meta=None, if_modified_since=None) -> list: + """Returns a list of movies""" + url = self.url.construct("movies", page=page, meta=meta) + return self.request.make_request(url, if_modified_since) + + def get_movie(self, id: int, meta=None, if_modified_since=None) -> dict: + """Returns a movie dictionary""" + url = self.url.construct("movies", id, meta=meta) + return self.request.make_request(url, if_modified_since) + + def get_movie_by_slug( + self, slug: string, meta=None, if_modified_since=None + ) -> dict: + """Returns a movie dictionary""" + url = self.url.construct("movies/slug", slug, meta=meta) + return self.request.make_request(url, if_modified_since) + + def get_movie_extended( + self, id: int, meta=None, short=False, if_modified_since=None + ) -> dict: + """Returns a movie extended dictionary""" + url = self.url.construct("movies", id, "extended", meta=meta, short=short) + return self.request.make_request(url, if_modified_since) + + def get_movie_translation( + self, id: int, lang: str, meta=None, if_modified_since=None + ) -> dict: + """Returns a movie translation dictionary""" + url = self.url.construct("movies", id, "translations", lang, meta=meta) + return self.request.make_request(url, if_modified_since) + + def get_all_seasons(self, page=None, meta=None, if_modified_since=None) -> list: + """Returns a list of seasons""" + url = self.url.construct("seasons", page=page, meta=meta) + return self.request.make_request(url, if_modified_since) + + def get_season(self, id: int, meta=None, if_modified_since=None) -> dict: + """Returns a season dictionary""" + url = self.url.construct("seasons", id, meta=meta) + return self.request.make_request(url, if_modified_since) + + def get_season_extended(self, id: int, meta=None, if_modified_since=None) -> dict: + """Returns a season extended dictionary""" + url = self.url.construct("seasons", id, "extended", meta=meta) + return self.request.make_request(url, if_modified_since) + + def get_season_types(self, meta=None, if_modified_since=None) -> list: + """Returns a list of season types""" + url = self.url.construct("seasons/types", meta=meta) + return self.request.make_request(url, if_modified_since) + + def get_season_translation( + self, id: int, lang: str, meta=None, if_modified_since=None + ) -> dict: + """Returns a seasons translation dictionary""" + url = self.url.construct("seasons", id, "translations", lang, meta=meta) + return self.request.make_request(url, if_modified_since) + + def get_all_episodes(self, page=None, meta=None, if_modified_since=None) -> list: + """Returns a list of episodes""" + url = self.url.construct("episodes", page=page, meta=meta) + return self.request.make_request(url, if_modified_since) + + def get_episode(self, id: int, meta=None, if_modified_since=None) -> dict: + """Returns an episode dictionary""" + url = self.url.construct("episodes", id, meta=meta) + return self.request.make_request(url, if_modified_since) + + def get_episode_extended(self, id: int, meta=None, if_modified_since=None) -> dict: + """Returns an episode extended dictionary""" + url = self.url.construct("episodes", id, "extended", meta=meta) + return self.request.make_request(url, if_modified_since) + + def get_episode_translation( + self, id: int, lang: str, meta=None, if_modified_since=None + ) -> dict: + """Returns an episode translation dictionary""" + url = self.url.construct("episodes", id, "translations", lang, meta=meta) + return self.request.make_request(url, if_modified_since) + + get_episodes_translation = ( + get_episode_translation # Support the old name of the function. + ) + + def get_all_genders(self, meta=None, if_modified_since=None) -> list: + """Returns a list of genders""" + url = self.url.construct("genders", meta=meta) + return self.request.make_request(url, if_modified_since) + + def get_all_genres(self, meta=None, if_modified_since=None) -> list: + """Returns a list of genres""" + url = self.url.construct("genres", meta=meta) + return self.request.make_request(url, if_modified_since) + + def get_genre(self, id: int, meta=None, if_modified_since=None) -> dict: + """Returns a genres dictionary""" + url = self.url.construct("genres", id, meta=meta) + return self.request.make_request(url, if_modified_since) + + def get_all_languages(self, meta=None, if_modified_since=None) -> list: + """Returns a list of languages""" + url = self.url.construct("languages", meta=meta) + return self.request.make_request(url, if_modified_since) + + def get_all_people(self, page=None, meta=None, if_modified_since=None) -> list: + """Returns a list of people""" + url = self.url.construct("people", page=page, meta=meta) + return self.request.make_request(url, if_modified_since) + + def get_person(self, id: int, meta=None, if_modified_since=None) -> dict: + """Returns a people dictionary""" + url = self.url.construct("people", id, meta=meta) + return self.request.make_request(url, if_modified_since) + + def get_person_extended(self, id: int, meta=None, if_modified_since=None) -> dict: + """Returns a people extended dictionary""" + url = self.url.construct("people", id, "extended", meta=meta) + return self.request.make_request(url, if_modified_since) + + def get_person_translation( + self, id: int, lang: str, meta=None, if_modified_since=None + ) -> dict: + """Returns an people translation dictionary""" + url = self.url.construct("people", id, "translations", lang, meta=meta) + return self.request.make_request(url, if_modified_since) + + def get_character(self, id: int, meta=None, if_modified_since=None) -> dict: + """Returns a character dictionary""" + url = self.url.construct("characters", id, meta=meta) + return self.request.make_request(url, if_modified_since) + + def get_people_types(self, meta=None, if_modified_since=None) -> list: + """Returns a list of people types""" + url = self.url.construct("people/types", meta=meta) + return self.request.make_request(url, if_modified_since) + + get_all_people_types = get_people_types # Support the old function name + + def get_source_types(self, meta=None, if_modified_since=None) -> list: + """Returns a list of source types""" + url = self.url.construct("sources/types", meta=meta) + return self.request.make_request(url, if_modified_since) + + get_all_sourcetypes = get_source_types # Support the old function name + + # kwargs accepts args such as: page=2, action='update', type='artwork' + def get_updates(self, since: int, **kwargs) -> list: + """Returns a list of updates""" + url = self.url.construct("updates", since=since, **kwargs) + return self.request.make_request(url) + + def get_all_tag_options(self, page=None, meta=None, if_modified_since=None) -> list: + """Returns a list of tag options""" + url = self.url.construct("tags/options", page=page, meta=meta) + return self.request.make_request(url, if_modified_since) + + def get_tag_option(self, id: int, meta=None, if_modified_since=None) -> dict: + """Returns a tag option dictionary""" + url = self.url.construct("tags/options", id, meta=meta) + return self.request.make_request(url, if_modified_since) + + def get_all_lists(self, page=None, meta=None) -> dict: + url = self.url.construct("lists", page=page, meta=meta) + return self.request.make_request(url) + + def get_list(self, id: int, meta=None, if_modified_since=None) -> dict: + url = self.url.construct("lists", id, meta=meta) + return self.request.make_request(url), if_modified_since + + def get_list_by_slug(self, slug: string, meta=None, if_modified_since=None) -> dict: + """Returns a movie dictionary""" + url = self.url.construct("lists/slug", slug, meta=meta) + return self.request.make_request(url, if_modified_since) + + def get_list_extended(self, id: int, meta=None, if_modified_since=None) -> dict: + url = self.url.construct("lists", id, "extended", meta=meta) + return self.request.make_request(url), if_modified_since + + def get_list_translation( + self, id: int, lang: str, meta=None, if_modified_since=None + ) -> dict: + """Returns an list translation dictionary""" + url = self.url.construct("lists", id, "translations", lang, meta=meta) + return self.request.make_request(url, if_modified_since) + + def get_inspiration_types(self, meta=None, if_modified_since=None) -> dict: + url = self.url.construct("inspiration/types", meta=meta) + return self.request.make_request(url), if_modified_since + + def search(self, query, **kwargs) -> list: + """Returns a list of search results""" + url = self.url.construct("search", query=query, **kwargs) + return self.request.make_request(url) + + def search_by_remote_id(self, remoteid: str) -> list: + """Returns a list of search results by remote id exact match""" + url = self.url.construct("search/remoteid", remoteid) + return self.request.make_request(url) + + def get_tags(self, slug: str, if_modified_since=None) -> dict: + """Returns a tag option dictionary""" + url = self.url.construct("entities", url_subsect=slug) + return self.request.make_request(url, if_modified_since) + + def get_entities_types(self, if_modified_since=None) -> dict: + """Returns a entities types dictionary""" + url = self.url.construct("entities") + return self.request.make_request(url, if_modified_since) + + def get_user_by_id(self, id: int) -> dict: + """Returns a user info dictionary""" + url = self.url.construct("user", id) + return self.request.make_request(url) + + def get_user(self) -> dict: + """Returns a user info dictionary""" + url = self.url.construct("user") + return self.request.make_request(url) + + def get_user_favorites(self) -> dict: + """Returns a user info dictionary""" + url = self.url.construct('user/favorites') + return self.request.make_request(url) + + +if __name__ == "__main__": + tvdb = TVDB("ed2aa66b-7899-4677-92a7-67bc9ce3d93a") + query = "romance in the alley" + res = tvdb.search(query) + print(res) diff --git a/app/modules/thetvdb/tvdbapi.py b/app/modules/thetvdb/tvdbapi.py deleted file mode 100644 index 308854f6..00000000 --- a/app/modules/thetvdb/tvdbapi.py +++ /dev/null @@ -1,1107 +0,0 @@ -""" - Simple-to-use Python interface to The TVDB's API (thetvdb.com) -""" - -__author__ = "dbr/Ben" -__version__ = "3.1.0" - -import getpass -import hashlib -import logging -import os -import sys -import tempfile -import time -import types -import warnings -from typing import Optional, Union - -import requests -import requests_cache -from requests_cache.backends.base import _to_bytes, _DEFAULT_HEADERS # noqa - -IS_PY2 = sys.version_info[0] == 2 - -if IS_PY2: - user_input = raw_input # noqa - from urllib import quote as url_quote # noqa -else: - from urllib.parse import quote as url_quote - - user_input = input - -if IS_PY2: - int_types = (int, long) # noqa - text_type = unicode # noqa -else: - int_types = int - text_type = str - -LOG = logging.getLogger("tvdb_api") - - -# Exceptions - - -class TvdbBaseException(Exception): - """Base exception for all tvdb_api errors - """ - - pass - - -class TvdbError(TvdbBaseException): - """An error with thetvdb.com (Cannot connect, for example) - """ - - pass - - -class TvdbUserAbort(TvdbBaseException): - """User aborted the interactive selection (via - the q command, ^c etc) - """ - - pass - - -class TvdbNotAuthorized(TvdbBaseException): - """An authorization error with thetvdb.com - """ - - pass - - -class TvdbDataNotFound(TvdbBaseException): - """Base for all attribute-not-found errors (e.gg missing show/season/episode/data) - """ - - pass - - -class TvdbShowNotFound(TvdbDataNotFound): - """Show cannot be found on thetvdb.com (non-existant show) - """ - - pass - - -class TvdbSeasonNotFound(TvdbDataNotFound): - """Season cannot be found on thetvdb.com - """ - - pass - - -class TvdbEpisodeNotFound(TvdbDataNotFound): - """Episode cannot be found on thetvdb.com - """ - - pass - - -class TvdbResourceNotFound(TvdbDataNotFound): - """Resource cannot be found on thetvdb.com - """ - - pass - - -class TvdbAttributeNotFound(TvdbDataNotFound): - """Raised if an episode does not have the requested - attribute (such as a episode name) - """ - - pass - - -# Backwards compatability re-bindings -tvdb_exception = TvdbBaseException # Deprecated, for backwards compatability -tvdb_error = TvdbError # Deprecated, for backwards compatability -tvdb_userabort = TvdbUserAbort # Deprecated, for backwards compatability -tvdb_notauthorized = TvdbNotAuthorized # Deprecated, for backwards compatability -tvdb_shownotfound = TvdbShowNotFound # Deprecated, for backwards compatability -tvdb_seasonnotfound = TvdbSeasonNotFound # Deprecated, for backwards compatability -tvdb_episodenotfound = TvdbEpisodeNotFound # Deprecated, for backwards compatability -tvdb_resourcenotfound = TvdbResourceNotFound # Deprecated, for backwards compatability -tvdb_attributenotfound = TvdbAttributeNotFound # Deprecated, for backwards compatability - -tvdb_invalidlanguage = TvdbError # Unused/removed. This exists for backwards compatability. - - -# UI - - -class BaseUI(object): - """Base user interface for Tvdb show selection. - - Selects first show. - - A UI is a callback. A class, it's __init__ function takes two arguments: - - - config, which is the Tvdb config dict, setup in tvdb_api.py - - log, which is Tvdb's logger instance (which uses the logging module). You can - call log.info() log.warning() etc - - It must have a method "selectSeries", this is passed a list of dicts, each dict - contains the the keys "name" (human readable show name), and "sid" (the shows - ID as on thetvdb.com). For example: - - [{'name': u'Lost', 'sid': u'73739'}, - {'name': u'Lost Universe', 'sid': u'73181'}] - - The "selectSeries" method must return the appropriate dict, or it can raise - tvdb_userabort (if the selection is aborted), tvdb_shownotfound (if the show - cannot be found). - - A simple example callback, which returns a random series: - - """ - - def __init__(self, config, log=None): - self.config = config - if log is not None: - warnings.warn( - "the UI's log parameter is deprecated, instead use\n" - "use import logging; logging.getLogger('ui').info('blah')\n" - "The self.log attribute will be removed in the next version" - ) - self.log = logging.getLogger(__name__) - - def selectSeries(self, allSeries): - return allSeries[0] - - -class ConsoleUI(BaseUI): - """Interactively allows the user to select a show from a console based UI - """ - - @staticmethod - def _displaySeries(allSeries, limit: Optional[int] = 6): - """Helper function, lists series with corresponding ID - """ - if limit is not None: - toshow = allSeries[:limit] - else: - toshow = allSeries - - print("TVDB Search Results:") - for i, cshow in enumerate(toshow): - i_show = i + 1 # Start at more human readable number 1 (not 0) - LOG.debug('Showing allSeries[%s], series %s)' % (i_show, allSeries[i]['seriesName'])) - if i == 0: - extra = " (default)" - else: - extra = "" - - output = "%s -> %s [%s] # http://thetvdb.com/?tab=series&id=%s%s" % ( - i_show, - cshow['seriesName'], - cshow['language'], - str(cshow['id']), - extra, - ) - if IS_PY2: - print(output.encode("UTF-8", "ignore")) - else: - print(output) - - def selectSeries(self, allSeries): - self._displaySeries(allSeries) - - if len(allSeries) == 1: - # Single result, return it! - print("Automatically selecting only result") - return allSeries[0] - - if self.config['select_first'] is True: - print("Automatically returning first search result") - return allSeries[0] - - while True: # return breaks this loop - try: - print("Enter choice (first number, return for default, 'all', ? for help):") - ans = user_input() - except KeyboardInterrupt: - raise tvdb_userabort("User aborted (^c keyboard interupt)") - except EOFError: - raise tvdb_userabort("User aborted (EOF received)") - - LOG.debug('Got choice of: %s' % ans) - try: - selected_id = int(ans) - 1 # The human entered 1 as first result, not zero - except ValueError: # Input was not number - if len(ans.strip()) == 0: - # Default option - LOG.debug('Default option, returning first series') - return allSeries[0] - if ans == "q": - LOG.debug('Got quit command (q)') - raise tvdb_userabort("User aborted ('q' quit command)") - elif ans == "?": - print("## Help") - print("# Enter the number that corresponds to the correct show.") - print("# a - display all results") - print("# all - display all results") - print("# ? - this help") - print("# q - abort tvnamer") - print("# Press return with no input to select first result") - elif ans.lower() in ["a", "all"]: - self._displaySeries(allSeries, limit=None) - else: - LOG.debug('Unknown keypress %s' % ans) - else: - LOG.debug('Trying to return ID: %d' % selected_id) - try: - return allSeries[selected_id] - except IndexError: - LOG.debug('Invalid show number entered!') - print("Invalid number (%s) selected!") - self._displaySeries(allSeries) - - -# Main API - - -class ShowContainer(dict): - """Simple dict that holds a series of Show instances - """ - - def __init__(self): - super().__init__() - self._stack = [] - self._lastgc = time.time() - - def __setitem__(self, key, value): - self._stack.append(key) - - # keep only the 100th latest results - if time.time() - self._lastgc > 20: - for o in self._stack[:-100]: - del self[o] - self._stack = self._stack[-100:] - - self._lastgc = time.time() - - super(ShowContainer, self).__setitem__(key, value) - - -class Show(dict): - """Holds a dict of seasons, and show data. - """ - - def __init__(self): - dict.__init__(self) - self.data = {} - - def __repr__(self): - return "" % ( - self.data.get(u'seriesName', 'instance'), - len(self), - ) - - def __getitem__(self, key): - if key in self: - # Key is an episode, return it - return dict.__getitem__(self, key) - - if key in self.data: - # Non-numeric request is for show-data - return dict.__getitem__(self.data, key) - - # Data wasn't found, raise appropriate error - if isinstance(key, int) or key.isdigit(): - # Episode number x was not found - raise tvdb_seasonnotfound("Could not find season %s" % (repr(key))) - else: - # If it's not numeric, it must be an attribute name, which - # doesn't exist, so attribute error. - raise tvdb_attributenotfound("Cannot find attribute %s" % (repr(key))) - - def aired_on(self, date): - ret = self.search(str(date), 'firstAired') - if len(ret) == 0: - raise tvdb_episodenotfound("Could not find any episodes that aired on %s" % date) - return ret - - def search(self, term=None, key=None): - """ - Search all episodes in show. Can search all data, or a specific key - (for example, episodename) - - Always returns an array (can be empty). First index contains the first - match, and so on. - - Each array index is an Episode() instance, so doing - search_results[0]['episodename'] will retrieve the episode name of the - first match. - - Search terms are converted to lower case (unicode) strings. - - """ - results = [] - for cur_season in self.values(): - searchresult = cur_season.search(term=term, key=key) - if len(searchresult) != 0: - results.extend(searchresult) - - return results - - -class Season(dict): - def __init__(self, show=None): - """The show attribute points to the parent show - """ - super().__init__() - self.show = show - - def __repr__(self): - return "" % (len(self.keys())) - - def __getitem__(self, episode_number): - if episode_number not in self: - raise tvdb_episodenotfound("Could not find episode %s" % (repr(episode_number))) - else: - return dict.__getitem__(self, episode_number) - - def search(self, term=None, key=None): - """Search all episodes in season, returns a list of matching Episode - instances. - - >>> t = Tvdb() - >>> t['scrubs'][1].search('first day') - [] - >>> - - See Show.search documentation for further information on search - """ - results = [] - for ep in self.values(): - searchresult = ep.search(term=term, key=key) - if searchresult is not None: - results.append(searchresult) - return results - - -class Episode(dict): - def __init__(self, season=None): - """The season attribute points to the parent season - """ - super().__init__() - self.season = season - - def __repr__(self): - seasno = self.get(u'airedSeason', 0) - epno = self.get(u'airedEpisodeNumber', 0) - epname = self.get(u'episodeName') - if epname is not None: - return "" % (seasno, epno, epname) - else: - return "" % (seasno, epno) - - def __getitem__(self, key): - try: - return dict.__getitem__(self, key) - except KeyError: - raise tvdb_attributenotfound("Cannot find attribute %s" % (repr(key))) - - def search(self, term=None, key=None): - """Search episode data for term, if it matches, return the Episode (self). - The key parameter can be used to limit the search to a specific element, - for example, episodename. - - This primarily for use use by Show.search and Season.search. See - Show.search for further information on search - - Simple example: - - >>> e = Episode() - >>> e['episodeName'] = "An Example" - >>> e.search("examp") - - >>> - - Limiting by key: - - >>> e.search("examp", key = "episodeName") - - >>> - """ - if term is None: - raise TypeError("must supply string to search for (contents)") - - term = text_type(term).lower() - for cur_key, cur_value in self.items(): - cur_key = text_type(cur_key) - cur_value = text_type(cur_value).lower() - if key is not None and cur_key != key: - # Do not search this key - continue - if cur_value.find(text_type(term)) > -1: - return self - - -class Actors(list): - """Holds all Actor instances for a show - """ - - pass - - -class Actor(dict): - """Represents a single actor. Should contain.. - - id, - image, - name, - role, - sortorder - """ - - def __repr__(self): - return "" % self.get("name") - - -def create_key(self, request): - """A new cache_key algo is required as the authentication token - changes with each run. Also there are other header params which - also change with each request (e.g. timestamp). Excluding all - headers means that Accept-Language is excluded which means - different language requests will return the cached response from - the wrong language. - - The _loadurl part checks the cache before a get is performed so - that an auth token can be obtained. If the response is already in - the cache, the auth token is not required. This prevents the need - to do a get which may access the host and fail because the session - is not yet not authorized. It is not necessary to authorize if the - cache is to be used thus saving host and network traffic. - """ - - if self._ignored_parameters: - url, body = self._remove_ignored_parameters(request) - else: - url, body = request.url, request.body - key = hashlib.sha256() - key.update(_to_bytes(request.method.upper())) - key.update(_to_bytes(url)) - if request.body: - key.update(_to_bytes(body)) - else: - if self._include_get_headers and request.headers != _DEFAULT_HEADERS: - for name, value in sorted(request.headers.items()): - # include only Accept-Language as it is important for context - if name in ['Accept-Language']: - key.update(_to_bytes(name)) - key.update(_to_bytes(value)) - return key.hexdigest() - - -class Tvdb: - """Create easy-to-use interface to name of season/episode name - >>> t = Tvdb() - >>> t['Scrubs'][1][24]['episodeName'] - u'My Last Day' - """ - - def __init__( - self, - interactive=False, - select_first=False, - cache: Union[str, bool, requests.Session] = True, - banners=False, - actors=False, - custom_ui=None, - language=None, - search_all_languages=False, - apikey=None, - username=None, - userkey=None, - forceConnect=None, # noqa - dvdorder=False, - proxies=None, - ): - - """interactive (True/False): - When True, uses built-in console UI is used to select the correct show. - When False, the first search result is used. - - select_first (True/False): - Automatically selects the first series search result (rather - than showing the user a list of more than one series). - Is overridden by interactive = False, or specifying a custom_ui - - cache (True/False/str/requests_cache.CachedSession): - - Retrieved URLs can be persisted to to disc. - - True/False enable or disable default caching. Passing - string specifies the directory where to store the - "tvdb.sqlite3" cache file. Alternatively a custom - requests.Session instance can be passed (e.g maybe a - customised instance of `requests_cache.CachedSession`) - - banners (True/False): - Retrieves the banners for a show. These are accessed - via the _banners key of a Show(), for example: - - >>> Tvdb(banners=True)['scrubs']['_banners'].keys() - [u'fanart', u'poster', u'seasonwide', u'season', u'series'] - - actors (True/False): - Retrieves a list of the actors for a show. These are accessed - via the _actors key of a Show(), for example: - - >>> t = Tvdb(actors=True) - >>> t['scrubs']['_actors'][0]['name'] - u'John C. McGinley' - - custom_ui (tvdb_ui.BaseUI subclass): - A callable subclass of tvdb_ui.BaseUI (overrides interactive option) - - language (2 character language abbreviation): - The 2 digit language abbreviation used for the returned data, - and is also used when searching. For a complete list, call - the `Tvdb.available_languages` method. - Default is "en" (English). - - search_all_languages (True/False): - By default, Tvdb will only search in the language specified using - the language option. When this is True, it will search for the - show in and language - - apikey (str/unicode): - Your API key for TheTVDB. You can easily register a key with in - a few minutes: - https://thetvdb.com/api-information - - username (str/unicode or None): - Specify a user account to use for actions which require - authentication (e.g marking a series as favourite, submitting ratings) - - userkey (str/unicode, or None): - User authentication key relating to "username". - - forceConnect: - DEPRECATED. Disabled the timeout-throttling logic. Now has no function - proxies: - proxy dict - """ - - if forceConnect is not None: - warnings.warn( - "forceConnect argument is deprecated and will be removed soon", - category=DeprecationWarning, - ) - - self.shows = ShowContainer() # Holds all Show classes - self.corrections = {} # Holds show-name to show_id mapping - - self.config = {} - - # Ability to pull key form env-var mostly for unit-tests - _test_key = os.getenv('TVDB_API_KEY') - if apikey is None and _test_key is not None: - apikey = _test_key - - if apikey is None: - raise ValueError( - ( - "apikey argument is now required - an API key can be easily registered " - "at https://thetvdb.com/api-information" - ) - ) - self.config['auth_payload'] = { - "apikey": apikey, - "username": username or "", - "userkey": userkey or "", - } - - self.config['custom_ui'] = custom_ui - - self.config['interactive'] = interactive # prompt for correct series? - - self.config['select_first'] = select_first - - self.config['search_all_languages'] = search_all_languages - - self.config['dvdorder'] = dvdorder - - if cache is True: - cache_dir = self._getTempDir() - LOG.debug("Caching using requests_cache to %s" % cache_dir) - self.session = requests_cache.CachedSession( - expire_after=21600, # 6 hours - backend='sqlite', - cache_name=cache_dir, - include_get_headers=True, - ) - self.session.cache.create_key = types.MethodType(create_key, self.session.cache) - self.session.remove_expired_responses() - self.config['cache_enabled'] = True - elif cache is False: - LOG.debug("Caching disabled") - self.session = requests.Session() - self.config['cache_enabled'] = False - elif isinstance(cache, str): - LOG.debug("Caching using requests_cache to specified directory %s" % cache) - # Specified cache path - self.session = requests_cache.CachedSession( - expire_after=21600, # 6 hours - backend='sqlite', - cache_name=os.path.join(cache, "tvdb_api"), - include_get_headers=True, - ) - self.session.cache.create_key = types.MethodType(create_key, self.session.cache) - self.session.remove_expired_responses() - else: - LOG.debug("Using specified requests.Session") - self.session = cache - try: - self.session.get # noqa - except AttributeError: - raise ValueError( - ( - "cache argument must be True/False, string as cache path " - "or requests.Session-type object (e.g from requests_cache.CachedSession)" - ) - ) - - self.config['banners_enabled'] = banners - self.config['actors_enabled'] = actors - - if language is None: - self.config['language'] = 'en' - else: - self.config['language'] = language - - # The following url_ configs are based of the - # https://api.thetvdb.com/swagger - self.config['base_url'] = "http://thetvdb.com" - self.config['api_url'] = "https://api.thetvdb.com" - - self.config['url_getSeries'] = u"%(api_url)s/search/series?name=%%s" % self.config - - self.config['url_epInfo'] = u"%(api_url)s/series/%%s/episodes" % self.config - - self.config['url_seriesInfo'] = u"%(api_url)s/series/%%s" % self.config - self.config['url_actorsInfo'] = u"%(api_url)s/series/%%s/actors" % self.config - - self.config['url_seriesBanner'] = u"%(api_url)s/series/%%s/images" % self.config - self.config['url_seriesBannerInfo'] = ( - u"%(api_url)s/series/%%s/images/query?keyType=%%s" % self.config - ) - self.config['url_artworkPrefix'] = u"%(base_url)s/banners/%%s" % self.config - - self.__authorized = False - self.headers = { - 'Content-Type': 'application/json', - 'Accept': 'application/json', - 'Accept-Language': self.config['language'], - } - self.proxies = proxies - - def close(self): - if self.session: - self.session.close() - - @staticmethod - def _getTempDir(): - """Returns the [system temp dir]/tvdb_api-u501 (or - tvdb_api-myuser) - """ - py_major = sys.version_info[ - 0 - ] # Prefix with major version as 2 and 3 write incompatible caches - if hasattr(os, 'getuid'): - uid = "-u%d" % (os.getuid()) - else: - # For Windows - uid = getpass.getuser() - - return os.path.join(tempfile.gettempdir(), "tvdb_api-%s-py%s" % (uid, py_major)) - - def _loadUrl(self, url, data=None, language=None): - """Return response from The TVDB API""" - - if not language: - language = self.config['language'] - self.headers['Accept-Language'] = language - - # TODO: Handle Exceptions - # TODO: Update Token - - # encoded url is used for hashing in the cache so - # python 2 and 3 generate the same hash - if not self.__authorized: - # only authorize of we haven't before and we - # don't have the url in the cache - cache_key = None - with requests.Session() as fake_session_for_key: - fake_session_for_key.headers['Accept-Language'] = language - try: - # in case the session class has no cache object, fail gracefully - cache_key = self.session.cache.create_key( - fake_session_for_key.prepare_request(requests.Request('GET', url)) - ) - except Exception: # noqa - # FIXME: Can this just check for hasattr(self.session, "cache") instead? - pass - - # fmt: off - # No fmt because mangles noqa comment - https://github.com/psf/black/issues/195 - if not cache_key or not self.session.cache.has_key( - cache_key): # noqa: not a dict, has_key is part of requests-cache API - self.authorize() - # fmt: on - - response = self.session.get(url, headers=self.headers, proxies=self.proxies) - r = response.json() - LOG.debug("loadurl: %s language=%s" % (url, language)) - LOG.debug("response:") - LOG.debug(r) - error = r.get('Error') - errors = r.get('errors') - r_data = r.get('data') - links = r.get('links') - - if error: - if error == u'Resource not found': - # raise(tvdb_resourcenotfound) - # handle no data at a different level so it is more specific - pass - elif error.lower() == u'not authorized': - # Note: Error string sometimes comes back as "Not authorized" or "Not Authorized" - raise tvdb_notauthorized() - elif error.startswith(u"ID: ") and error.endswith("not found"): - # FIXME: Refactor error out of in this method - raise tvdb_shownotfound("%s" % error) - else: - raise tvdb_error("%s" % error) - - if errors: - if errors and u'invalidLanguage' in errors: - # raise(tvdb_invalidlanguage(errors[u'invalidLanguage'])) - # invalidLanguage does not mean there is no data - # there is just less data (missing translations) - pass - - if data and isinstance(data, list): - data.extend(r_data) - else: - data = r_data - - if links and links['next']: - url = url.split('?')[0] - _url = url + "?page=%s" % links['next'] - self._loadUrl(_url, data) - - return data - - def authorize(self): - LOG.debug("auth") - r = self.session.post( - 'https://api.thetvdb.com/login', - json=self.config['auth_payload'], - headers=self.headers, - proxies=self.proxies, - ) - r_json = r.json() - error = r_json.get('Error') - if error: - if error == u'Not Authorized': - raise tvdb_notauthorized - token = r_json.get('token') - self.headers['Authorization'] = "Bearer %s" % text_type(token) - self.__authorized = True - - def _getetsrc(self, url, language=None): - """Loads a URL using caching, returns an ElementTree of the source - """ - src = self._loadUrl(url, language=language) - - return src - - def _setItem(self, sid, seas, ep, attrib, value): - """Creates a new episode, creating Show(), Season() and - Episode()s as required. Called by _getShowData to populate show - - Since the nice-to-use tvdb[1][24]['name] interface - makes it impossible to do tvdb[1][24]['name] = "name" - and still be capable of checking if an episode exists - so we can raise tvdb_shownotfound, we have a slightly - less pretty method of setting items.. but since the API - is supposed to be read-only, this is the best way to - do it! - The problem is that calling tvdb[1][24]['episodename'] = "name" - calls __getitem__ on tvdb[1], there is no way to check if - tvdb.__dict__ should have a key "1" before we auto-create it - """ - if sid not in self.shows: - self.shows[sid] = Show() - if seas not in self.shows[sid]: - self.shows[sid][seas] = Season(show=self.shows[sid]) - if ep not in self.shows[sid][seas]: - self.shows[sid][seas][ep] = Episode(season=self.shows[sid][seas]) - self.shows[sid][seas][ep][attrib] = value - - def _setShowData(self, sid, key, value): - """Sets self.shows[sid] to a new Show instance, or sets the data - """ - if sid not in self.shows: - self.shows[sid] = Show() - self.shows[sid].data[key] = value - - def search(self, series): - """This searches TheTVDB.com for the series name - and returns the result list - """ - series = url_quote(series.encode("utf-8")) - LOG.debug("Searching for show %s" % series) - series_resp = self._getetsrc(self.config['url_getSeries'] % series) - if not series_resp: - LOG.debug('Series result returned zero') - raise tvdb_shownotfound( - "Show-name search returned zero results (cannot find show on TVDB)" - ) - - all_series = [] - for series in series_resp: - series['language'] = self.config['language'] - LOG.debug('Found series %(seriesName)s' % series) - all_series.append(series) - - return all_series - - def _getSeries(self, series): - """This searches TheTVDB.com for the series name, - If a custom_ui UI is configured, it uses this to select the correct - series. If not, and interactive == True, ConsoleUI is used, if not - BaseUI is used to select the first result. - """ - all_series = self.search(series) - - if self.config['custom_ui'] is not None: - LOG.debug("Using custom UI %s" % (repr(self.config['custom_ui']))) - ui = self.config['custom_ui'](config=self.config) - else: - if not self.config['interactive']: - LOG.debug('Auto-selecting first search result using BaseUI') - ui = BaseUI(config=self.config) - else: - LOG.debug('Interactively selecting show using ConsoleUI') - ui = ConsoleUI(config=self.config) - - return ui.selectSeries(all_series) - - def available_languages(self): - """Returns a list of the available language abbreviations - which can be used in Tvdb(language="...") etc - """ - et = self._getetsrc("https://api.thetvdb.com/languages") - languages = [x['abbreviation'] for x in et] - return sorted(languages) - - def _parseBanners(self, sid): - """Parses banners XML, from - http://thetvdb.com/api/[APIKEY]/series/[SERIES ID]/banners.xml - - Banners are retrieved using t['show name]['_banners'], for example: - - >>> t = Tvdb(banners = True) - >>> t['scrubs']['_banners'].keys() - [u'fanart', u'poster', u'seasonwide', u'season', u'series'] - >>> t['scrubs']['_banners']['poster']['680x1000'][35308]['_bannerpath'] - u'http://thetvdb.com/banners/posters/76156-2.jpg' - >>> - - Any key starting with an underscore has been processed (not the raw - data from the XML) - - This interface will be improved in future versions. - """ - LOG.debug('Getting season banners for %s' % sid) - banners_resp = self._getetsrc(self.config['url_seriesBanner'] % sid) - banners = {} - for cur_banner in banners_resp.keys(): - btype = None - banners_info = self._getetsrc(self.config['url_seriesBannerInfo'] % (sid, cur_banner)) - for banner_info in banners_info: - bid = banner_info.get('id') - btype = banner_info.get('keyType') - btype2 = banner_info.get('resolution') - if btype is None or btype2 is None: - continue - - if btype not in banners: - banners[btype] = {} - if btype2 not in banners[btype]: - banners[btype][btype2] = {} - if bid not in banners[btype][btype2]: - banners[btype][btype2][bid] = {} - - banners[btype][btype2][bid]['bannerpath'] = banner_info['fileName'] - banners[btype][btype2][bid]['resolution'] = banner_info['resolution'] - banners[btype][btype2][bid]['subKey'] = banner_info['subKey'] - - for k, v in list(banners[btype][btype2][bid].items()): - if k.endswith("path"): - new_key = "_%s" % k - LOG.debug("Transforming %s to %s" % (k, new_key)) - new_url = self.config['url_artworkPrefix'] % v - banners[btype][btype2][bid][new_key] = new_url - if btype: - banners[btype]['raw'] = banners_info - self._setShowData(sid, "_banners", banners) - - def _parseActors(self, sid): - """Parsers actors XML, from - http://thetvdb.com/api/[APIKEY]/series/[SERIES ID]/actors.xml - - Any key starting with an underscore has been processed (not the raw - data from the XML) - """ - LOG.debug("Getting actors for %s" % sid) - actors_resp = self._getetsrc(self.config['url_actorsInfo'] % sid) - - cur_actors = Actors() - for cur_actor_item in actors_resp: - cur_actor = Actor() - for tag, value in cur_actor_item.items(): - if value is not None: - if tag == "image": - value = self.config['url_artworkPrefix'] % value - cur_actor[tag] = value - cur_actors.append(cur_actor) - self._setShowData(sid, '_actors', cur_actors) - - def _getShowData(self, sid, language): - """Takes a series ID, gets the epInfo URL and parses the TVDB - XML file into the shows dict in layout: - shows[series_id][season_number][episode_number] - """ - - if self.config['language'] is None: - LOG.debug('Config language is none, using show language') - if language is None: - raise tvdb_error("config['language'] was None, this should not happen") - else: - LOG.debug( - 'Configured language %s override show language of %s' - % (self.config['language'], language) - ) - - # Parse show information - LOG.debug('Getting all series data for %s' % sid) - series_info_resp = self._getetsrc(self.config['url_seriesInfo'] % sid) - for tag, value in series_info_resp.items(): - if value is not None: - if tag in ['banner', 'fanart', 'poster']: - value = self.config['url_artworkPrefix'] % value - - self._setShowData(sid, tag, value) - # set language - self._setShowData(sid, u'language', self.config['language']) - - # Parse banners - if self.config['banners_enabled']: - self._parseBanners(sid) - - # Parse actors - if self.config['actors_enabled']: - self._parseActors(sid) - - # Parse episode data - LOG.debug('Getting all episodes of %s' % sid) - - url = self.config['url_epInfo'] % sid - - eps_resp = self._getetsrc(url, language=language) - - for cur_ep in eps_resp: - - if self.config['dvdorder']: - LOG.debug('Using DVD ordering.') - use_dvd = ( - cur_ep.get('dvdSeason') is not None - and cur_ep.get('dvdEpisodeNumber') is not None - ) - else: - use_dvd = False - - if use_dvd: - elem_seasnum, elem_epno = cur_ep.get('dvdSeason'), cur_ep.get('dvdEpisodeNumber') - else: - elem_seasnum, elem_epno = cur_ep['airedSeason'], cur_ep['airedEpisodeNumber'] - - if elem_seasnum is None or elem_epno is None: - LOG.warning( - "An episode has incomplete season/episode number (season: %r, episode: %r)" - % (elem_seasnum, elem_epno) - ) - # TODO: Should this happen? - continue # Skip to next episode - - seas_no = elem_seasnum - ep_no = elem_epno - - for cur_item in cur_ep.keys(): - tag = cur_item - value = cur_ep[cur_item] - if value is not None: - if tag == 'filename': - value = self.config['url_artworkPrefix'] % value - self._setItem(sid, seas_no, ep_no, tag, value) - - def _nameToSid(self, name): - """Takes show name, returns the correct series ID (if the show has - already been grabbed), or grabs all episodes and returns - the correct SID. - """ - if name in self.corrections: - LOG.debug('Correcting %s to %s' % (name, self.corrections[name])) - sid = self.corrections[name] - else: - LOG.debug('Getting show %s' % name) - selected_series = self._getSeries(name) - sid = selected_series['id'] - LOG.debug('Got %(seriesName)s, id %(id)s' % selected_series) - - self.corrections[name] = sid - self._getShowData(selected_series['id'], self.config['language']) - - return sid - - def __getitem__(self, key): - """Handles tvdb_instance['seriesname'] calls. - The dict index should be the show id - """ - if isinstance(key, int_types): - sid = key - else: - sid = self._nameToSid(key) - LOG.debug('Got series id %s' % sid) - - if sid not in self.shows: - self._getShowData(sid, self.config['language']) - return self.shows[sid] - - def __repr__(self): - return repr(self.shows) - - -def main(): - """Simple example of using tvdb_api - it just - grabs an episode name interactively. - """ - import logging - - logging.basicConfig(level=logging.DEBUG) - - tvdb_instance = Tvdb(interactive=False, cache=False) - print(tvdb_instance['Lost']['seriesname']) - print(tvdb_instance['Lost'][1][4]['episodename']) - - -if __name__ == '__main__': - main() From 6bd86a724e92ff8c573c3567f1eefae643bee403 Mon Sep 17 00:00:00 2001 From: TimoYoung Date: Tue, 27 May 2025 17:58:37 +0800 Subject: [PATCH 2/2] =?UTF-8?q?fix:=E5=8C=BA=E5=88=86series=E5=92=8Cmovie?= =?UTF-8?q?=20id?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/modules/thetvdb/__init__.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/app/modules/thetvdb/__init__.py b/app/modules/thetvdb/__init__.py index 7b6239d0..4ad07a16 100644 --- a/app/modules/thetvdb/__init__.py +++ b/app/modules/thetvdb/__init__.py @@ -30,10 +30,13 @@ class TheTvDbModule(_ModuleBase): return func(*args, **kwargs) except ValueError as e: # 检查错误信息中是否包含 token 失效相关描述 - if "failed to get" in str(e) or "Unauthorized" in str(e) or "NOT_MODIFIED" in str(e): + if "Unauthorized" in str(e): logger.warning("TVDB Token 可能已失效,正在尝试重新登录...") self._initialize_tvdb_session() return func(*args, **kwargs) + elif "NotFoundException" in str(e): + logger.warning("TVDB 剧集不存在") + return None else: raise except Exception as e: @@ -96,13 +99,14 @@ class TheTvDbModule(_ModuleBase): def search_tvdb(self, title: str) -> list: """ - 用标题搜索TVDB + 用标题搜索TVDB剧集 :param title: 标题 :return: TVDB信息 """ try: - logger.info(f"开始用标题搜索TVDB: {title} ...") - return self._handle_tvdb_call(self.tvdb.search, title) + logger.info(f"开始用标题搜索TVDB剧集: {title} ...") + res = self._handle_tvdb_call(self.tvdb.search, title) + return [item for item in res if item.get("type") == "series"] except Exception as err: - logger.error(f"用标题搜索TVDB失败: {str(err)}") + logger.error(f"用标题搜索TVDB剧集失败: {str(err)}") return []