Files
MoviePilot/app/modules/themoviedb/tmdbv3api/tmdb.py
Attente b609567c38 feat(cache): 引入 fresh 和 async_fresh 以控制缓存行为
- 新增 `fresh` 和 `async_fresh` 用于在同步和异步函数中
临时禁用缓存。
- 通过 `_fresh` 这一 contextvars 变量实现上下文感知的
缓存刷新机制
- 修改了 `cached` 装饰器逻辑,在 `is_fresh()` 为 True
时跳过缓存读取。

- 修复 download 模块中路径处理问题,使用 `Path.as_posix()` 确保跨平台兼容性。
2025-10-19 22:31:50 +08:00

275 lines
8.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
import asyncio
import logging
import time
from datetime import datetime
import requests
import requests.exceptions
from app.core.cache import cached
from app.core.config import settings
from app.utils.http import RequestUtils, AsyncRequestUtils
from .exceptions import TMDbException
logger = logging.getLogger(__name__)
class TMDb(object):
def __init__(self, session=None, language=None):
self._api_key = settings.TMDB_API_KEY
self._language = language or settings.TMDB_LOCALE or "en-US"
self._session_id = None
self._session = session
self._wait_on_rate_limit = True
self._debug_enabled = False
self._proxies = settings.PROXY
self._domain = settings.TMDB_API_DOMAIN
self._page = None
self._total_results = None
self._total_pages = None
if not self._session:
self._session = requests.Session()
self._req = RequestUtils(ua=settings.NORMAL_USER_AGENT, session=self._session, proxies=self.proxies)
self._async_req = AsyncRequestUtils(ua=settings.NORMAL_USER_AGENT, proxies=self.proxies)
self._remaining = 40
self._reset = None
self._timeout = 15
self.__clear_async_cache__ = False
@property
def page(self):
return self._page
@property
def total_results(self):
return self._total_results
@property
def total_pages(self):
return self._total_pages
@property
def api_key(self):
return self._api_key
@property
def domain(self):
return self._domain
@property
def proxies(self):
return self._proxies
@proxies.setter
def proxies(self, proxies):
self._proxies = proxies
@api_key.setter
def api_key(self, api_key):
self._api_key = str(api_key)
@domain.setter
def domain(self, domain):
self._domain = str(domain)
@property
def language(self):
return self._language
@language.setter
def language(self, language):
self._language = language
@property
def has_session(self):
return True if self._session_id else False
@property
def session_id(self):
if not self._session_id:
raise TMDbException("Must Authenticate to create a session run Authentication(username, password)")
return self._session_id
@session_id.setter
def session_id(self, session_id):
self._session_id = session_id
@property
def wait_on_rate_limit(self):
return self._wait_on_rate_limit
@wait_on_rate_limit.setter
def wait_on_rate_limit(self, wait_on_rate_limit):
self._wait_on_rate_limit = bool(wait_on_rate_limit)
@property
def debug(self):
return self._debug_enabled
@debug.setter
def debug(self, debug):
self._debug_enabled = bool(debug)
@cached(maxsize=settings.CONF.tmdb, ttl=settings.CONF.meta, skip_none=True)
def cached_request(self, method, url, data, json,
_ts=datetime.strftime(datetime.now(), '%Y%m%d')):
return self.request(method, url, data, json)
@cached(maxsize=settings.CONF.tmdb, ttl=settings.CONF.meta, skip_none=True)
async def async_cached_request(self, method, url, data, json,
_ts=datetime.strftime(datetime.now(), '%Y%m%d')):
if self.__clear_async_cache__:
self.__clear_async_cache__ = False
await self.async_cached_request.cache_clear()
return await self.async_request(method, url, data, json)
def request(self, method, url, data, json):
if method == "GET":
req = self._req.get_res(url, params=data, json=json)
else:
req = self._req.post_res(url, data=data, json=json)
if req is None:
raise TMDbException("无法连接TheMovieDb请检查网络连接")
return req
async def async_request(self, method, url, data, json):
if method == "GET":
req = await self._async_req.get_res(url, params=data, json=json)
else:
req = await self._async_req.post_res(url, data=data, json=json)
if req is None:
raise TMDbException("无法连接TheMovieDb请检查网络连接")
return req
def cache_clear(self):
self.__clear_async_cache__ = True
return self.cached_request.cache_clear()
def _validate_api_key(self):
if self.api_key is None or self.api_key == "":
raise TMDbException("TheMovieDb API Key 未设置!")
def _build_url(self, action, params=""):
return "https://%s/3%s?api_key=%s&%s&language=%s" % (
self.domain,
action,
self.api_key,
params,
self.language,
)
def _handle_headers(self, headers):
if "X-RateLimit-Remaining" in headers:
self._remaining = int(headers["X-RateLimit-Remaining"])
if "X-RateLimit-Reset" in headers:
self._reset = int(headers["X-RateLimit-Reset"])
def _handle_rate_limit(self):
if self._remaining < 1:
current_time = int(time.time())
sleep_time = self._reset - current_time
if self.wait_on_rate_limit:
logger.warning("达到请求频率限制,休眠:%d 秒..." % sleep_time)
return abs(sleep_time)
else:
raise TMDbException("达到请求频率限制,请稍后再试!")
return 0
def _process_json_response(self, json_data, is_async=False):
if "page" in json_data:
self._page = json_data["page"]
if "total_results" in json_data:
self._total_results = json_data["total_results"]
if "total_pages" in json_data:
self._total_pages = json_data["total_pages"]
if self.debug:
logger.info(json_data)
if is_async:
logger.info(self.async_cached_request.cache_info())
else:
logger.info(self.cached_request.cache_info())
@staticmethod
def _handle_errors(json_data):
if "errors" in json_data:
raise TMDbException(json_data["errors"])
if "success" in json_data and json_data["success"] is False:
raise TMDbException(json_data["status_message"])
def _request_obj(self, action, params="", call_cached=True,
method="GET", data=None, json=None, key=None):
self._validate_api_key()
url = self._build_url(action, params)
if call_cached and method != "POST":
req = self.cached_request(method, url, data, json,
_ts=datetime.strftime(datetime.now(), '%Y%m%d'))
else:
req = self.request(method, url, data, json)
if req is None:
return None
self._handle_headers(req.headers)
rate_limit_result = self._handle_rate_limit()
if rate_limit_result:
logger.warning("达到请求频率限制,将在 %d 秒后重试..." % rate_limit_result)
time.sleep(rate_limit_result)
return self._request_obj(action, params, call_cached, method, data, json, key)
json_data = req.json()
self._process_json_response(json_data, is_async=False)
self._handle_errors(json_data)
if key:
return json_data.get(key)
return json_data
async def _async_request_obj(self, action, params="", call_cached=True,
method="GET", data=None, json=None, key=None):
self._validate_api_key()
url = self._build_url(action, params)
if call_cached and method != "POST":
req = await self.async_cached_request(method, url, data, json,
_ts=datetime.strftime(datetime.now(), '%Y%m%d'))
else:
req = await self.async_request(method, url, data, json)
if req is None:
return None
self._handle_headers(req.headers)
rate_limit_result = self._handle_rate_limit()
if rate_limit_result:
logger.warning("达到请求频率限制,将在 %d 秒后重试..." % rate_limit_result)
await asyncio.sleep(rate_limit_result)
return await self._async_request_obj(action, params, call_cached, method, data, json, key)
json_data = req.json()
self._process_json_response(json_data, is_async=True)
self._handle_errors(json_data)
if key:
return json_data.get(key)
return json_data
def close(self):
if self._session:
self._session.close()