Files
Auto_Bangumi/Windows/win_main.py
2022-05-23 11:20:46 +08:00

271 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import re
import sys
import time
import json
import qbittorrentapi
import requests
from bs4 import BeautifulSoup
class EnvInfo:
if getattr(sys, 'frozen', False):
application_path = os.path.dirname(sys.executable)
elif __file__:
application_path = os.path.dirname(__file__)
path = os.path.join(application_path, 'config.json')
with open(path) as f:
data = f.read()
info = json.loads(data)
host_ip = info["host_ip"]
sleep_time = float(info["time"])
user_name = info["user_name"]
password = info["password"]
rss_link = info["rss_link"]
download_path = info["download_path"]
method = info["method"]
# rss_link = "https://mikanani.me/RSS/MyBangumi?token=Td8ceWZZv3s2OZm5ji9RoMer8vk5VS3xzC1Hmg8A26E%3d"
rule_url = "https://raw.githubusercontent.com/EstrellaXD/Bangumi_Auto_Collector/main/AutoBangumi/config/rule.json"
time_show_obj = time.strftime('%Y-%m-%d %X')
bangumi_info = info["bangumi_info"]
class SetRule:
def __init__(self):
self.bangumi_info = EnvInfo.bangumi_info
self.rss_link = EnvInfo.rss_link
self.host_ip = EnvInfo.host_ip
self.user_name = EnvInfo.user_name
self.password = EnvInfo.password
self.download_path = EnvInfo.download_path
self.qb = qbittorrentapi.Client(host=self.host_ip, username=self.user_name, password=self.password)
try:
self.qb.auth_log_in()
except qbittorrentapi.LoginFailed as e:
print(e)
def set_rule(self, bangumi_name, season):
rule = {
'enable': True,
'mustContain': bangumi_name,
'mustNotContain': '720',
'useRegx': True,
'episodeFilter': '',
'smartFilter': False,
'previouslyMatchedEpisodes': [],
'affectedFeeds': [self.rss_link],
'ignoreDays': 0,
'lastMatch': '',
'addPaused': False,
'assignedCategory': 'Bangumi',
'savePath': str(os.path.join(self.download_path, bangumi_name, season))
}
self.qb.rss_set_rule(rule_name=bangumi_name, rule_def=rule)
def rss_feed(self):
try:
self.qb.rss_remove_item(item_path="Mikan_RSS")
self.qb.rss_add_feed(url=self.rss_link, item_path="Mikan_RSS")
sys.stdout.write(f"[{EnvInfo.time_show_obj}] Successes adding RSS Feed." + "\n")
except ConnectionError:
sys.stdout.write(f"[{EnvInfo.time_show_obj}] Error with adding RSS Feed." + "\n")
except qbittorrentapi.exceptions.Conflict409Error:
sys.stdout.write(f"[{EnvInfo.time_show_obj}] RSS Already exists." + "\n")
def run(self):
sys.stdout.write(f"[{EnvInfo.time_show_obj}] Start adding rules." + "\n")
sys.stdout.flush()
for info in self.bangumi_info:
self.set_rule(info["title"], info["season"])
sys.stdout.write(f"[{EnvInfo.time_show_obj}] Finished." + "\n")
sys.stdout.flush()
class MatchRule:
split_rule = r"\[|\]|\【|\】|\★|\|\|\(|\)"
last_rule = r"(.*)( \-)"
sub_title = r"[^\x00-\xff]{1,}| \d{1,2}^.*|\·"
match_rule = r"(S\d{1,2}(.*))"
season_match = r"(.*)(Season \d{1,2}|S\d{1,2}|第.*季|第.*期)"
class CollectRSS:
def __init__(self):
self.bangumi_list = []
try:
self.rules = requests.get(EnvInfo.rule_url).json()
except ConnectionError:
sys.stdout.write(f"[{EnvInfo.time_show_obj}] Get rules Erroe=r")
rss = requests.get(EnvInfo.rss_link, 'utf-8')
soup = BeautifulSoup(rss.text, 'xml')
self.items = soup.find_all('item')
self.info = EnvInfo.bangumi_info
def get_info_list(self):
for item in self.items:
name = item.title.string
# debug 用
# print(name)
exit_flag = False
for rule in self.rules:
for group in rule["group_name"]:
if re.search(group, name):
exit_flag = True
n = re.split(MatchRule.split_rule, name)
while '' in n:
n.remove('')
while ' ' in n:
n.remove(' ')
try:
bangumi_title = n[rule['name_position']].strip()
except IndexError:
continue
sub_title = re.sub(MatchRule.sub_title, "", bangumi_title)
b = re.split(r"\/|\_", sub_title)
while '' in b:
b.remove('')
pre_name = max(b, key=len, default='').strip()
if len(pre_name.encode()) > 3:
bangumi_title = pre_name
for i in range(2):
match_obj = re.match(MatchRule.last_rule, bangumi_title, re.I)
if match_obj is not None:
bangumi_title = match_obj.group(1).strip()
match_obj = re.match(MatchRule.match_rule, bangumi_title, re.I)
if match_obj is not None:
bangumi_title = match_obj.group(2).strip()
if bangumi_title not in self.bangumi_list:
self.bangumi_list.append(bangumi_title)
# debug
# print(bangumi_title)
break
if exit_flag:
break
if not exit_flag:
print(f"[{EnvInfo.time_show_obj}] ERROR Not match with {name}")
def put_info_json(self):
had_data = []
for data in self.info:
had_data.append(data["title"])
for title in self.bangumi_list:
match_title_season = re.match(MatchRule.season_match, title, re.I)
if match_title_season is not None:
json_title = match_title_season.group(1).strip()
json_season = match_title_season.group(2)
else:
json_season = ''
json_title = title
if json_title not in had_data:
self.info.append({
"title": json_title,
"season": json_season
})
sys.stdout.write(f"[{EnvInfo.time_show_obj}] add {json_title} {json_season}" + "\n")
sys.stdout.flush()
EnvInfo.info["bangumi_info"] = self.info
with open(EnvInfo.path, 'w', encoding='utf8') as f:
data = json.dumps(EnvInfo.info, indent=4, separators=(',', ': '), ensure_ascii=False)
f.write(data)
def run(self):
self.get_info_list()
self.put_info_json()
class qBittorrentRename:
def __init__(self):
self.qbt_client = qbittorrentapi.Client(host=EnvInfo.host_ip,
username=EnvInfo.user_name,
password=EnvInfo.password)
try:
self.qbt_client.auth_log_in()
except qbittorrentapi.LoginFailed as e:
print(e)
self.recent_info = self.qbt_client.torrents_info(status_filter='completed',category="Bangumi")
self.hash = None
self.name = None
self.new_name = None
self.path_name = None
self.count = 0
self.rename_count = 0
self.torrent_count = len(self.recent_info)
self.rules = [r'(.*)\[(\d{1,3}|\d{1,3}\.\d{1,2})(?:v\d{1,2})?(?:END)?\](.*)',
r'(.*)\[E(\d{1,3}|\d{1,3}\.\d{1,2})(?:v\d{1,2})?(?:END)?\](.*)',
r'(.*)\[第(\d*\.*\d*)话(?:END)?\](.*)',
r'(.*)\[第(\d*\.*\d*)話(?:END)?\](.*)',
r'(.*)第(\d*\.*\d*)话(?:END)?(.*)',
r'(.*)第(\d*\.*\d*)話(?:END)?(.*)',
r'(.*)- (\d{1,3}|\d{1,3}\.\d{1,2})(?:v\d{1,2})?(?:END)? (.*)']
def rename_normal(self, idx):
self.name = self.recent_info[idx].name
self.hash = self.recent_info[idx].hash
self.path_name = self.recent_info[idx].content_path.split("/")[-1]
file_name = self.name
for rule in self.rules:
matchObj = re.match(rule, file_name, re.I)
if matchObj is not None:
self.new_name = f'{matchObj.group(1).strip()} E{matchObj.group(2)}{matchObj.group(3)}'
def rename_pn(self, idx):
self.name = self.recent_info[idx].name
self.hash = self.recent_info[idx].hash
self.path_name = self.recent_info[idx].content_path.split("/")[-1]
n = re.split(r'\[|\]', self.name)
file_name = self.name.replace(f'[{n[1]}]', '')
for rule in self.rules:
matchObj = re.match(rule, file_name, re.I)
if matchObj is not None:
self.new_name = re.sub(r'\[|\]', '', f'{matchObj.group(1).strip()} E{matchObj.group(2)}{n[-1]}')
def rename(self):
if self.path_name != self.new_name:
self.qbt_client.torrents_rename_file(torrent_hash=self.hash, old_path=self.path_name, new_path=self.new_name)
sys.stdout.write(f"[{time.strftime('%Y-%m-%d %X')}] {self.path_name} >> {self.new_name}")
self.count += 1
else:
return
def clear_info(self):
self.name = None
self.hash = None
self.new_name = None
self.path_name = None
def print_result(self):
sys.stdout.write(f"[{EnvInfo.time_show_obj}] 已完成对{self.torrent_count}个文件的检查" + '\n')
sys.stdout.write(f"[{EnvInfo.time_show_obj}] 已对其中{self.count}个文件进行重命名" + '\n')
sys.stdout.write(f"[{EnvInfo.time_show_obj}] 完成" + '\n')
sys.stdout.flush()
def run(self):
if EnvInfo.method not in ['pn', 'normal']:
print('error method')
elif EnvInfo.method == 'normal':
for i in range(0, self.torrent_count + 1):
try:
self.rename_normal(i)
self.rename()
self.clear_info()
except:
self.print_result()
elif EnvInfo.method == 'pn':
for i in range(0, self.torrent_count + 1):
try:
self.rename_pn(i)
self.rename()
self.clear_info()
except:
self.print_result()
if __name__ == "__main__":
while True:
CollectRSS().run()
SetRule().run()
qBittorrentRename().run()
time.sleep(EnvInfo.sleep_time)