chore: style with black

This commit is contained in:
100gle
2023-06-09 13:10:10 +08:00
parent aefcbee8e8
commit e336fd7402
29 changed files with 123 additions and 89 deletions

View File

@@ -27,7 +27,7 @@ uvicorn_logging_config = {
def create_app() -> ASGIApp:
app = FastAPI()
# mount routers
app.include_router(web_router)
app.include_router(proxy_router)
@@ -35,6 +35,7 @@ def create_app() -> ASGIApp:
return app
app = create_app()
if __name__ == "__main__":

View File

@@ -38,4 +38,5 @@ def locked(func):
def wrapper(*args, **kwargs):
with lock:
return func(*args, **kwargs)
return wrapper
return wrapper

View File

@@ -7,7 +7,7 @@ from .download import router as download_router
from .log import router as log_router
from .program import router as program_router
__all__ = 'v1'
__all__ = "v1"
# API 1.0
v1 = APIRouter(prefix="/v1")

View File

@@ -19,9 +19,7 @@ async def login(form_data: OAuth2PasswordRequestForm = Depends()):
username = form_data.username
password = form_data.password
auth_user(username, password)
token = create_access_token(
data={"sub": username}, expires_delta=timedelta(days=1)
)
token = create_access_token(data={"sub": username}, expires_delta=timedelta(days=1))
return {"access_token": token, "token_type": "bearer", "expire": 86400}
@@ -32,10 +30,7 @@ async def refresh(current_user: User = Depends(get_current_user)):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="invalid token"
)
token = create_access_token(
data = {"sub": current_user.username}
)
token = create_access_token(data={"sub": current_user.username})
return {"access_token": token, "token_type": "bearer", "expire": 86400}

View File

@@ -8,9 +8,7 @@ from module.security import get_current_user
router = APIRouter(prefix="/bangumi", tags=["bangumi"])
@router.get(
"/getAll", response_model=list[BangumiData]
)
@router.get("/getAll", response_model=list[BangumiData])
async def get_all_data(current_user=Depends(get_current_user)):
if not current_user:
raise HTTPException(
@@ -20,9 +18,7 @@ async def get_all_data(current_user=Depends(get_current_user)):
return torrent.search_all()
@router.get(
"/getData/{bangumi_id}", response_model=BangumiData
)
@router.get("/getData/{bangumi_id}", response_model=BangumiData)
async def get_data(bangumi_id: str, current_user=Depends(get_current_user)):
if not current_user:
raise HTTPException(
@@ -43,7 +39,9 @@ async def update_rule(data: BangumiData, current_user=Depends(get_current_user))
@router.delete("/deleteRule/{bangumi_id}")
async def delete_rule(bangumi_id: str, file: bool = False, current_user=Depends(get_current_user)):
async def delete_rule(
bangumi_id: str, file: bool = False, current_user=Depends(get_current_user)
):
if not current_user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="invalid token"

View File

@@ -8,6 +8,7 @@ from module.security import get_current_user
router = APIRouter(prefix="/download", tags=["download"])
@router.post("/analysis")
async def analysis(link: RssLink, current_user=Depends(get_current_user)):
if not current_user:

View File

@@ -5,7 +5,7 @@ from fastapi import APIRouter, Depends, HTTPException, Response, status
from module.conf import LOG_PATH
from module.security import get_current_user
router = APIRouter(prefix='/log', tags=["log"])
router = APIRouter(prefix="/log", tags=["log"])
@router.get("")

View File

@@ -29,7 +29,9 @@ if VERSION != "DEV_VERSION":
def index(request: Request):
context = {"request": request}
return templates.TemplateResponse("index.html", context)
else:
@router.get("/", status_code=302, tags=["html"])
def index():
return RedirectResponse("/docs")

View File

@@ -1,9 +1,7 @@
import os.path
from module.downloader import DownloadClient
from module.network import RequestContent
from module.conf import settings
from module.downloader import DownloadClient
from module.models import Config
from module.network import RequestContent
class Checker:

View File

@@ -19,11 +19,12 @@ figlet = r"""
|___/
"""
class Program(RenameThread, RSSThread):
@staticmethod
def __start_info():
for line in figlet.splitlines():
logger.info(line.strip("\n"))
logger.info(line.strip("\n"))
logger.info(
f"Version {VERSION} Author: EstrellaXD Twitter: https://twitter.com/Estrella_Pan"
)

View File

@@ -78,7 +78,9 @@ class QbDownloader:
@qb_connect_failed_wait
def torrents_info(self, status_filter, category, tag=None):
return self._client.torrents_info(status_filter=status_filter, category=category, tag=tag)
return self._client.torrents_info(
status_filter=status_filter, category=category, tag=tag
)
def torrents_add(self, urls, save_path, category, torrent_files=None):
resp = self._client.torrents_add(
@@ -87,7 +89,7 @@ class QbDownloader:
torrent_files=torrent_files,
save_path=save_path,
category=category,
use_auto_torrent_management=False
use_auto_torrent_management=False,
)
return resp == "Ok."

View File

@@ -99,7 +99,9 @@ class DownloadClient(TorrentPath):
logger.debug("[Downloader] Finished.")
def get_torrent_info(self, category="Bangumi", status_filter="completed", tag=None):
return self.client.torrents_info(status_filter=status_filter, category=category, tag=tag)
return self.client.torrents_info(
status_filter=status_filter, category=category, tag=tag
)
def rename_torrent_file(self, _hash, old_path, new_path) -> bool:
logger.info(f"{old_path} >> {new_path}")
@@ -116,7 +118,7 @@ class DownloadClient(TorrentPath):
urls=torrent.get("urls"),
torrent_files=torrent.get("torrent_files"),
save_path=torrent.get("save_path"),
category="Bangumi"
category="Bangumi",
):
logger.debug(f"[Downloader] Add torrent: {torrent.get('save_path')}")
return True

View File

@@ -32,8 +32,12 @@ class SeasonCollector(DownloadClient):
torrents = st.get_torrents(link, _filter="|".join(data.filter))
torrent_files = None
if proxy:
torrent_files = [st.get_content(torrent.torrent_link) for torrent in torrents]
return self.add_season_torrents(data=data, torrents=torrents, torrent_files=torrent_files)
torrent_files = [
st.get_content(torrent.torrent_link) for torrent in torrents
]
return self.add_season_torrents(
data=data, torrents=torrents, torrent_files=torrent_files
)
def subscribe_season(self, data: BangumiData):
with BangumiDatabase() as db:

View File

@@ -36,9 +36,7 @@ class Renamer(DownloadClient):
elif method == "advance":
return f"{bangumi_name} S{season}E{episode}{file_info.suffix}"
elif method == "normal":
logger.warning(
"[Renamer] Normal rename method is deprecated."
)
logger.warning("[Renamer] Normal rename method is deprecated.")
return file_info.media_path
elif method == "subtitle_pn":
return f"{file_info.title} S{season}E{episode}.{file_info.language}{file_info.suffix}"
@@ -71,10 +69,10 @@ class Renamer(DownloadClient):
)
if renamed:
n = Notification(
official_title=bangumi_name,
season=ep.season,
episode=ep.episode,
)
official_title=bangumi_name,
season=ep.season,
episode=ep.episode,
)
return n
else:
logger.warning(f"[Renamer] {media_path} parse failed")
@@ -176,6 +174,7 @@ class Renamer(DownloadClient):
if __name__ == "__main__":
from module.conf import setup_logger
settings.log.debug_enable = True
setup_logger()
with Renamer() as renamer:

View File

@@ -14,7 +14,9 @@ class TorrentManager(BangumiDatabase):
def __match_torrents_list(data: BangumiData) -> list:
with DownloadClient() as client:
torrents = client.get_torrent_info(status_filter=None)
return [torrent.hash for torrent in torrents if torrent.save_path == data.save_path]
return [
torrent.hash for torrent in torrents if torrent.save_path == data.save_path
]
def delete_torrents(self, data: BangumiData, client: DownloadClient):
hash_list = self.__match_torrents_list(data)
@@ -34,17 +36,21 @@ class TorrentManager(BangumiDatabase):
self.delete_one(int(_id))
if file:
torrent_message = self.delete_torrents(data, client)
return JSONResponse(status_code=200, content={
"msg": f"Delete {data.official_title} rule. {torrent_message}"
})
return JSONResponse(
status_code=200,
content={
"msg": f"Delete {data.official_title} rule. {torrent_message}"
},
)
logger.info(f"[Manager] Delete rule for {data.official_title}")
return JSONResponse(status_code=200, content={
"msg": f"Delete rule for {data.official_title}"
})
return JSONResponse(
status_code=200,
content={"msg": f"Delete rule for {data.official_title}"},
)
else:
return JSONResponse(status_code=406, content={
"msg": f"Can't find id {_id}"
})
return JSONResponse(
status_code=406, content={"msg": f"Can't find id {_id}"}
)
def disable_rule(self, _id: str | int, file: bool = False):
data = self.search_id(int(_id))
@@ -55,17 +61,23 @@ class TorrentManager(BangumiDatabase):
self.update_one(data)
if file:
torrent_message = self.delete_torrents(data, client)
return JSONResponse(status_code=200, content={
"msg": f"Disable {data.official_title} rule. {torrent_message}"
})
return JSONResponse(
status_code=200,
content={
"msg": f"Disable {data.official_title} rule. {torrent_message}"
},
)
logger.info(f"[Manager] Disable rule for {data.official_title}")
return JSONResponse(status_code=200, content={
"msg": f"Disable {data.official_title} rule.",
})
return JSONResponse(
status_code=200,
content={
"msg": f"Disable {data.official_title} rule.",
},
)
else:
return JSONResponse(status_code=406, content={
"msg": f"Can't find id {_id}"
})
return JSONResponse(
status_code=406, content={"msg": f"Can't find id {_id}"}
)
def enable_rule(self, _id: str | int):
data = self.search_id(int(_id))
@@ -75,21 +87,24 @@ class TorrentManager(BangumiDatabase):
with DownloadClient() as client:
client.set_rule(data)
logger.info(f"[Manager] Enable rule for {data.official_title}")
return JSONResponse(status_code=200, content={
"msg": f"Enable {data.official_title} rule.",
})
return JSONResponse(
status_code=200,
content={
"msg": f"Enable {data.official_title} rule.",
},
)
else:
return JSONResponse(status_code=406, content={
"msg": f"Can't find bangumi id {_id}"
})
return JSONResponse(
status_code=406, content={"msg": f"Can't find bangumi id {_id}"}
)
def update_rule(self, data: BangumiData):
old_data = self.search_id(data.id)
if not old_data:
logger.error(f"[Manager] Can't find data with {data.id}")
return JSONResponse(status_code=406, content={
"msg": f"Can't find data with {data.id}"
})
return JSONResponse(
status_code=406, content={"msg": f"Can't find data with {data.id}"}
)
else:
# Move torrent
match_list = self.__match_torrents_list(data)
@@ -101,9 +116,12 @@ class TorrentManager(BangumiDatabase):
client.remove_rule(data.rule_name)
client.set_rule(data)
self.update_one(data)
return JSONResponse(status_code=200, content={
"msg": f"Set new path for {data.official_title}",
})
return JSONResponse(
status_code=200,
content={
"msg": f"Set new path for {data.official_title}",
},
)
def search_all_bangumi(self):
datas = self.search_all()

View File

@@ -7,4 +7,3 @@ class RSSTorrents(BaseModel):
analyze: bool = Field(..., alias="analyze")
enabled: bool = Field(..., alias="enabled")
torrents: list[str] = Field(..., alias="torrents")

View File

@@ -52,7 +52,11 @@ class RequestContent(RequestURL):
torrent_titles, torrent_urls, torrent_homepage
):
if re.search(_filter, _title) is None:
torrents.append(TorrentInfo(name=_title, torrent_link=torrent_url, homepage=homepage))
torrents.append(
TorrentInfo(
name=_title, torrent_link=torrent_url, homepage=homepage
)
)
return torrents
except ConnectionError:
return []

View File

@@ -23,7 +23,9 @@ class RequestURL:
req.raise_for_status()
return req
except requests.RequestException:
logger.warning(f"[Network] Cannot connect to {url}. Wait for 5 seconds.")
logger.warning(
f"[Network] Cannot connect to {url}. Wait for 5 seconds."
)
try_time += 1
if try_time >= retry:
break
@@ -45,7 +47,9 @@ class RequestURL:
req.raise_for_status()
return req
except requests.RequestException:
logger.warning(f"[Network] Cannot connect to {url}. Wait for 5 seconds.")
logger.warning(
f"[Network] Cannot connect to {url}. Wait for 5 seconds."
)
try_time += 1
if try_time >= retry:
break

View File

@@ -1 +1 @@
from .mikan import mikan_parser
from .mikan import mikan_parser

View File

@@ -6,4 +6,4 @@ def mikan_parser(soup):
torrent_titles.append(item.find("title").text)
torrent_urls.append(item.find("enclosure").attrib["url"])
torrent_homepage.append(item.find("link").text)
return torrent_titles, torrent_urls, torrent_homepage
return torrent_titles, torrent_urls, torrent_homepage

View File

@@ -1 +1 @@
from .notification import PostNotification
from .notification import PostNotification

View File

@@ -1,4 +1,4 @@
from .bark import BarkNotification
from .server_chan import ServerChanNotification
from .telegram import TelegramNotification
from .wecom import WecomNotification
from .wecom import WecomNotification

View File

@@ -24,4 +24,4 @@ class SlackNotification(RequestContent):
data = {"title": notify.official_title, "body": text, "device_key": self.token}
resp = self.post_data(self.notification_url, data)
logger.debug(f"Bark notification: {resp.status_code}")
return resp.status_code == 200
return resp.status_code == 200

View File

@@ -11,7 +11,7 @@ class WecomNotification(RequestContent):
def __init__(self, token, chat_id, **kwargs):
super().__init__()
#Chat_id is used as noti_url in this push tunnel
# Chat_id is used as noti_url in this push tunnel
self.notification_url = f"{chat_id}"
self.token = token

View File

@@ -2,8 +2,8 @@ import re
import time
from dataclasses import dataclass
from module.network import RequestContent
from module.conf import TMDB_API
from module.network import RequestContent
@dataclass
@@ -22,10 +22,10 @@ LANGUAGE = {
"en": "en-US"
}
search_url = lambda e: \
f"https://api.themoviedb.org/3/search/tv?api_key={TMDB_API}&page=1&query={e}&include_adult=false"
info_url = lambda e, key: \
f"https://api.themoviedb.org/3/tv/{e}?api_key={TMDB_API}&language={LANGUAGE[key]}"
def search_url(e):
return f"https://api.themoviedb.org/3/search/tv?api_key={TMDB_API}&page=1&query={e}&include_adult=false"
def info_url(e, key):
return f"https://api.themoviedb.org/3/tv/{e}?api_key={TMDB_API}&language={LANGUAGE[key]}"
def is_animation(tv_id, language) -> bool:

View File

@@ -61,7 +61,9 @@ class RSSAnalyser:
logger.debug(f"[RSS] New title found: {data.official_title}")
return new_data
def torrent_to_data(self, torrent: TorrentInfo, rss_link: str | None = None) -> BangumiData:
def torrent_to_data(
self, torrent: TorrentInfo, rss_link: str | None = None
) -> BangumiData:
data = self._title_analyser.raw_parser(raw=torrent.name, rss_link=rss_link)
if data:
try:
@@ -75,7 +77,9 @@ class RSSAnalyser:
self.official_title_parser(data, mikan_title)
return data
def rss_to_data(self, rss_link: str, database: BangumiDatabase, full_parse: bool = True) -> list[BangumiData]:
def rss_to_data(
self, rss_link: str, database: BangumiDatabase, full_parse: bool = True
) -> list[BangumiData]:
rss_torrents = self.get_rss_torrents(rss_link, full_parse)
torrents_to_add = database.match_list(rss_torrents, rss_link)
if not torrents_to_add:

View File

@@ -6,7 +6,6 @@ from module.network import RequestContent, TorrentInfo
class RSSPoller(RSSDatabase):
@staticmethod
def polling(rss_link, req: RequestContent) -> list[TorrentInfo]:
return req.get_torrents(rss_link)

View File

@@ -13,7 +13,9 @@ SEARCH_KEY = [
class SearchTorrent(RequestContent):
def search_torrents(self, keywords: list[str], site: str = "mikan") -> list[TorrentBase]:
def search_torrents(
self, keywords: list[str], site: str = "mikan"
) -> list[TorrentBase]:
url = search_url(site, keywords)
# TorrentInfo to TorrentBase
torrents = self.get_torrents(url)
@@ -25,6 +27,7 @@ class SearchTorrent(RequestContent):
"torrent_link": torrent.torrent_link,
"homepage": torrent.homepage,
}
return [TorrentBase(**d) for d in to_dict()]
def search_season(self, data: BangumiData):
@@ -33,7 +36,7 @@ class SearchTorrent(RequestContent):
return [torrent for torrent in torrents if data.title_raw in torrent.name]
if __name__ == '__main__':
if __name__ == "__main__":
with SearchTorrent() as st:
for t in st.search_torrents(["魔法科高校の劣等生"]):
print(t)
print(t)

View File

@@ -69,4 +69,3 @@ def test_raw_parser():
assert info.resolution == "720P"
assert info.episode == 5
assert info.season == 1