rss识别v1.7.1(重构目录)

This commit is contained in:
IceKyrin
2022-05-28 16:10:43 +08:00
parent 51ecc01f2f
commit 045f1d855f
2 changed files with 126 additions and 116 deletions

View File

@@ -2,6 +2,7 @@ import re
import json
import zhconv
import logging
from RssFilter.fliter_base import *
class RSSInfoCleaner:
@@ -76,25 +77,6 @@ class RSSInfoCleaner:
self.en_list = []
self.get_info()
def del_rules(self, raw_name, rule_list):
for i in rule_list:
raw_name = raw_name.replace(i, "")
return raw_name
# 获取字符串出现位置
def get_str_location(self, char, target):
locate = []
for index, value in enumerate(char):
if target == value:
locate.append(index)
return locate
# 匹配某字符串最近的括号
def get_gp(self, char, string):
begin = [x for x in self.get_str_location(string, "[") if int(x) < int(string.find(char))][-1] + 1
end = [x for x in self.get_str_location(string, "]") if int(x) > int(string.find(char))][0]
return string[begin:end]
# 清理原链接(中文字符替换为英文)
def clean(self):
file_name = zhconv.convert(self.Name.raw, 'zh-cn')
@@ -107,7 +89,7 @@ class RSSInfoCleaner:
# 除杂x3
file_name = re.sub("[(\[【]检索.*[)\]】]?", "", file_name)
strip = ["特效歌词", "复制磁连", "兼容", "配音", "网盘", "\u200b", "[PSV&PC]", "Fin]", "Fin ", "[mkv]", "[]", ""]
file_name = self.del_rules(file_name, strip)
file_name = del_rules(file_name, strip)
self.Name.raw = str(file_name).replace('', ':').replace('', '[').replace('', ']').replace('-', '-') \
.replace('', '(').replace('', ')').replace("", "&").replace("X", "x").replace("×", "x") \
.replace("", "x").replace("__", "/").replace("_", "/")
@@ -175,7 +157,7 @@ class RSSInfoCleaner:
if self.Name.raw[0] == "[":
try:
# 以特征值为中心,匹配最近的中括号,八成就这个了
gp = self.get_gp(res_char, self.Name.raw.lower())
gp = get_gp(res_char, self.Name.raw.lower())
return gp
except Exception as e:
logging.warning("bug -- res_char:%s,%s,%s" % (res_char, self.Name.raw.lower(), e))
@@ -376,30 +358,6 @@ class RSSInfoCleaner:
else:
return None
def has_en(self, str):
my_re = re.compile(r'[a-z]', re.S)
res = re.findall(my_re, str)
if len(res):
return True
else:
return False
def has_zh(self, str):
my_re = re.compile(r'[\u4e00-\u9fa5]', re.S)
res = re.findall(my_re, str)
if len(res):
return True
else:
return False
def has_jp(self, str):
my_re = re.compile(r'[\u3040-\u31ff]', re.S)
res = re.findall(my_re, str)
if len(res):
return True
else:
return False
# 粗略识别失败re强制匹配
def extract_title(self, raw_name):
title = {
@@ -408,7 +366,7 @@ class RSSInfoCleaner:
}
clean_name = raw_name
if self.has_en(clean_name) and self.has_zh(clean_name):
if has_en(clean_name) and has_zh(clean_name):
# 中英
try:
res = re.search("(([\u4e00-\u9fa5]{2,12}[ /:]{0,3}){1,5}) {0,5}(( ?[a-z':]{1,15}){1,15})", clean_name)
@@ -437,14 +395,14 @@ class RSSInfoCleaner:
except Exception as e:
logging.info(e)
else:
if self.has_zh(clean_name):
if has_zh(clean_name):
# 中文
try:
res = re.search("(([\u4e00-\u9fa5:]{2,15}[ /]?){1,5}) *", clean_name)
title["zh"] = res.group(1).strip(" ")
except Exception as e:
logging.info(e)
elif self.has_en(clean_name):
elif has_en(clean_name):
# 英文
try:
res = re.search("(([a-z:]{2,15}[ /]?){1,15}) *", clean_name)
@@ -458,74 +416,38 @@ class RSSInfoCleaner:
self.Name.zh = title["zh"]
self.Name.en = title["en"]
# 以 / 代替空格分隔中英文名
def add_separator(self, clean_name):
try:
if "/" not in clean_name:
if '\u4e00' <= clean_name[0] <= '\u9fff':
try:
res = re.search(
"(^[\u4e00-\u9fa5\u3040-\u31ff: \-.。,!]{1,20}[ -]{0,5})([a-z: \-.。,!]{1,20} ?)*",
clean_name).group(1)
clean_name = clean_name.replace(res, res.strip(" ") + "/")
except Exception as e:
logging.info(e)
else:
try:
res = re.search(
"^(([a-z: \-.。,!]{1,20} ?)*[ -]{0,5})[\u4e00-\u9fa5\u3040-\u31ff: \-.。,!]{1,20}",
clean_name).group(1)
clean_name = clean_name.replace(res, res.strip(" ") + "/")
except Exception as e:
logging.info(e)
except Exception as e:
logging.info(e)
return clean_name
# 对以/分隔的多个翻译名,进行简单提取
def easy_split(self, clean_name, zh_list, en_list, jp_list):
if "/" in clean_name:
n_list = clean_name.split("/")
for i in n_list:
if self.has_jp(i):
if has_jp(i):
jp_list.append(i.strip(" "))
else:
if self.has_zh(i) is False:
if has_zh(i) is False:
en_list.append(i.strip(" "))
elif self.has_en(i) is False:
elif has_en(i) is False:
zh_list.append(i.strip(" "))
elif self.has_zh(i) and self.has_en(i):
elif has_zh(i) and has_en(i):
# 如果还是同时包含中英文的情况,递龟一下
i = self.add_separator(i)
i = add_separator(i)
self.easy_split(i, zh_list, en_list, jp_list)
else:
self.easy_split(i, zh_list, en_list, jp_list)
else:
if self.has_jp(clean_name):
if has_jp(clean_name):
jp_list.append(clean_name.strip(" "))
else:
if self.has_zh(clean_name) is False:
if has_zh(clean_name) is False:
en_list.append(clean_name.strip(" "))
elif self.has_en(clean_name) is False:
elif has_en(clean_name) is False:
zh_list.append(clean_name.strip(" "))
# 单list验证
def re_verity(self, raw_list, raw_name):
correct_list = []
for c_res in raw_list:
if type(raw_name) is list:
if c_res in raw_name[0].lower() and c_res in raw_name[1].lower():
correct_list.append(c_res)
else:
if c_res in raw_name.lower():
correct_list.append(c_res)
return correct_list
# 混合验证
def all_verity(self, raw_name):
self.zh_list = self.re_verity(self.zh_list, raw_name)
self.en_list = self.re_verity(self.en_list, raw_name)
self.jp_list = self.re_verity(self.jp_list, raw_name)
self.zh_list = re_verity(self.zh_list, raw_name)
self.en_list = re_verity(self.en_list, raw_name)
self.jp_list = re_verity(self.jp_list, raw_name)
# 汇总信息
def get_info(self):
@@ -570,8 +492,8 @@ class RSSInfoCleaner:
clean_name = re.sub(' +', ' ', clean_name).strip(" ").strip("-")
clean_name = re.sub("([(\[] *| *[)\]])", "", clean_name)
if (self.has_zh(clean_name) or self.has_jp(clean_name)) and self.has_en(clean_name):
clean_name = self.add_separator(clean_name)
if (has_zh(clean_name) or has_jp(clean_name)) and has_en(clean_name):
clean_name = add_separator(clean_name)
clean_name = re.sub("(/ */)", "", clean_name)
clean_name = re.sub(" +- +", "/", clean_name)
@@ -583,13 +505,13 @@ class RSSInfoCleaner:
# 去除正确结果后,重新识别其他部分
if self.jp_list:
temp_name = self.del_rules(self.Name.raw, self.jp_list)
temp_name = del_rules(self.Name.raw, self.jp_list)
self.easy_split(temp_name, self.zh_list, self.en_list, self.jp_list)
if self.zh_list and self.en_list == []:
temp_name = self.del_rules(self.Name.clean, self.zh_list)
temp_name = del_rules(self.Name.clean, self.zh_list)
self.easy_split(temp_name, self.zh_list, self.en_list, self.jp_list)
elif self.zh_list == [] and self.en_list:
temp_name = self.del_rules(self.Name.raw, self.en_list)
temp_name = del_rules(self.Name.raw, self.en_list)
self.easy_split(temp_name, self.zh_list, self.en_list, self.jp_list)
elif self.zh_list == [] and self.en_list == []:
self.extract_title(clean_name)
@@ -627,22 +549,6 @@ class RSSInfoCleaner:
if __name__ == "__main__":
import csv
def read_data(file_name, start, rows):
if file_name == "mikan":
with open('RssFilter/mikan.csv', 'r', encoding='utf-8') as csv_file:
reader = csv.reader(csv_file)
raw_data = [row[3] for row in reader][start:start + rows]
return raw_data
elif file_name == "dmhy":
with open('RssFilter/dmhy.csv', 'r', encoding='utf-8') as csv_file:
reader = csv.reader(csv_file)
raw_data = [row[4] for row in reader][start + 1:start + rows + 1]
return raw_data
# mikan/dmhy 获取数据dmhy 最多1w行mikan最多3w行
# site,start,row_nums
name_list = read_data("mikan", 0, 1000)

View File

@@ -0,0 +1,104 @@
import re
import logging
import csv
def read_data(file_name, start, rows):
if file_name == "mikan":
with open('RssFilter/mikan.csv', 'r', encoding='utf-8') as csv_file:
reader = csv.reader(csv_file)
raw_data = [row[3] for row in reader][start:start + rows]
return raw_data
elif file_name == "dmhy":
with open('RssFilter/dmhy.csv', 'r', encoding='utf-8') as csv_file:
reader = csv.reader(csv_file)
raw_data = [row[4] for row in reader][start + 1:start + rows + 1]
return raw_data
# 以 / 代替空格分隔中英文名
def add_separator(clean_name):
try:
if "/" not in clean_name:
if '\u4e00' <= clean_name[0] <= '\u9fff':
try:
res = re.search(
"(^[\u4e00-\u9fa5\u3040-\u31ff: \-.。,!]{1,20}[ -]{0,5})([a-z: \-.。,!]{1,20} ?)*",
clean_name).group(1)
clean_name = clean_name.replace(res, res.strip(" ") + "/")
except Exception as e:
logging.info(e)
else:
try:
res = re.search(
"^(([a-z: \-.。,!]{1,20} ?)*[ -]{0,5})[\u4e00-\u9fa5\u3040-\u31ff: \-.。,!]{1,20}",
clean_name).group(1)
clean_name = clean_name.replace(res, res.strip(" ") + "/")
except Exception as e:
logging.info(e)
except Exception as e:
logging.info(e)
return clean_name
def del_rules(raw_name, rule_list):
for i in rule_list:
raw_name = raw_name.replace(i, "")
return raw_name
# 获取字符串出现位置
def get_str_location(char, target):
locate = []
for index, value in enumerate(char):
if target == value:
locate.append(index)
return locate
# 匹配某字符串最近的括号
def get_gp(char, string):
begin = [x for x in get_str_location(string, "[") if int(x) < int(string.find(char))][-1] + 1
end = [x for x in get_str_location(string, "]") if int(x) > int(string.find(char))][0]
return string[begin:end]
def has_en(str):
my_re = re.compile(r'[a-z]', re.S)
res = re.findall(my_re, str)
if len(res):
return True
else:
return False
def has_zh(str):
my_re = re.compile(r'[\u4e00-\u9fa5]', re.S)
res = re.findall(my_re, str)
if len(res):
return True
else:
return False
def has_jp(str):
my_re = re.compile(r'[\u3040-\u31ff]', re.S)
res = re.findall(my_re, str)
if len(res):
return True
else:
return False
# 单list验证
def re_verity(raw_list, raw_name):
correct_list = []
for c_res in raw_list:
if type(raw_name) is list:
if c_res in raw_name[0].lower() and c_res in raw_name[1].lower():
correct_list.append(c_res)
else:
if c_res in raw_name.lower():
correct_list.append(c_res)
return correct_list