rss识别版本1.1

This commit is contained in:
xiaolang
2022-05-27 03:42:41 +08:00
parent 2cc34e6cae
commit 3d514a3a8d
2 changed files with 438 additions and 56 deletions

322
Windows/chain_db.py Normal file
View File

@@ -0,0 +1,322 @@
from pymysql import connect
from pymysql.cursors import DictCursor
from config import config_const
class OpenDB(object):
def __init__(self):
super(OpenDB, self).__init__()
# 初始化
self.conn = connect(
host=config_const['db_config']['host'],
port=config_const['db_config']['port'],
user=config_const['db_config']['user'],
passwd=config_const['db_config']['passwd'],
db=config_const['db_config']['db'],
charset=config_const['db_config']['charset']
)
# 获取游标
self.cs = self.conn.cursor(DictCursor)
def __enter__(self):
# 返回游标进行执行操作
return self.cs
def __exit__(self, exc_type, exc_val, exc_tb):
# 结束提交数据并关闭数据库
self.conn.commit()
self.cs.close()
self.conn.close()
class ChainDb(object):
def __init__(self, database):
super(ChainDb, self).__init__()
self.where_keys = ''
self.where_vals = []
self.params = '*'
self.limit = ''
self.order_by = ''
self.group_by = ''
if config_const['db_config']['pre']:
self.database = config_const['db_config']['pre'] + database
else:
self.database = database
# 查询条件拼接
@staticmethod
def _select(where, where_keys, where_vals, method='and'):
is_method = 0
where_method, where_compare = [], []
for item in where.values():
if isinstance(item, list) and len(item) > 1:
# 判断条件类型默认是and
if len(item) > 2:
if len(item) == 3 and item[2] and (item[2] == 'and' or item[2] == 'or'):
where_method.append(item[2])
else:
where_method.append('and')
else:
where_method.append('and')
# 判断条件方式,大于或等于等等
sign_res = False
if item[0]:
compare_sign = ['>', '<', '=', '<>', '>=', '<=']
for sign in compare_sign:
if item[0] == sign:
sign_res = True
break
if sign_res:
where_compare.append(item[0])
else:
where_compare.append('=')
# 条件值
where_vals.append(item[1])
is_method = 1
else:
where_vals.append(item)
is_method = 0
if is_method == 1:
# 拼接and或or条件最后一个去掉
keys = list(where.keys())
method_len = len(where_vals)
for i in range(method_len):
where_keys += keys[i] + where_compare[i] + '%s ' + (
where_method[i] if i <= method_len - 2 else '') + ' '
else:
# 拼接条件
where_keys = (' ' + method + ' ').join([item + '=%s' for item in where.keys()])
where_vals = tuple(where_vals)
return {'keys': where_keys, 'vals': where_vals}
# 执行mysql
@staticmethod
def query(sql=None, tuple_values=None):
if not sql:
raise Exception('sql语句不能为空')
if tuple_values and not isinstance(tuple_values, tuple):
raise Exception('参数类型必须是元组')
try:
with OpenDB() as cs:
if tuple_values:
cs.execute(sql, tuple_values)
else:
cs.execute(sql)
return cs
except Exception as e:
return e
# 查询条件
def where(self, where=None, value=None, method='and'):
where_keys, where_vals = '', []
if value is not None:
where = {where: value}
# 判断参数
if where and not isinstance(where, dict):
raise Exception('查询条件必须是字典类型')
if where:
# 查询字段
_select = self._select(where, where_keys, where_vals, method)
where_keys, where_vals = _select['keys'], _select['vals']
if hasattr(self, 'where_keys') and self.where_keys:
self.where_keys += ' ' + method + ' ' + where_keys
self.where_vals += where_vals
else:
self.where_keys = where_keys
self.where_vals = where_vals
return self
# 查询字段
def field(self, params=None):
if not params:
self.params = '*'
else:
if isinstance(params, list):
params = ','.join(params)
self.params = params
return self
# 分页
def page(self, page=1, rows=10):
self.limit = str((page - 1) * rows) + ',' + str(rows)
return self
# 归组
def group(self, param=None):
if not param:
return self
if not isinstance(param, str):
raise Exception('参数必须是字符串类型')
self.group_by = param
return self
# 排序
def order(self, order_by=None):
if not order_by:
return self
self.order_by = order_by
return self
# 格式化sql
def format_sql(self):
if not hasattr(self, 'params'):
self.params = '*'
# select * from user u left join class c on u.user_id = c.user_id where u.user_id = 1
sql = "SELECT {} FROM {}".format(self.params, self.database)
if hasattr(self, 'where_keys') and self.where_keys:
sql += " WHERE {}".format(self.where_keys)
if hasattr(self, 'order_by') and self.order_by:
sql += " ORDER BY {}".format(self.order_by)
if hasattr(self, 'group_by') and self.group_by:
sql += " GROUP BY {}".format(self.group_by)
if hasattr(self, 'limit') and self.limit:
sql += " limit {}".format(self.limit)
return sql
# 查询多个数据
def select(self):
try:
with OpenDB() as cs:
sql = self.format_sql()
if hasattr(self, 'where_vals') and self.where_vals:
print(sql % self.where_vals)
cs.execute(sql, self.where_vals)
else:
cs.execute(sql)
return cs.fetchall()
except Exception as e:
return e
# 查询单个数据
def find(self):
try:
with OpenDB() as cs:
sql = self.format_sql()
if self.where_vals:
cs.execute(sql, self.where_vals)
else:
cs.execute(sql)
return cs.fetchone()
except Exception as e:
return e
# 查找单个字段
def value(self, field=None):
if not field:
raise Exception('查询字段不能为空')
try:
with OpenDB() as cs:
sql = self.format_sql()
if self.where_vals:
cs.execute(sql, self.where_vals)
else:
cs.execute(sql)
row = cs.fetchone()
if field not in row:
raise Exception('没有查询到该字段')
return row[field]
except Exception as e:
return e
# 新增数据
def insert(self, data):
if not data:
raise Exception('新增内容不能为空')
if not isinstance(data, dict) and not isinstance(data, list):
raise Exception('参数必须是字典类型')
if isinstance(data, list):
# 批量插入数据处理
if not data:
raise Exception('列表内容不能为空')
# 参数赋值
long_item, long_data, values_list = 0, [], []
# 循环获取字段最多的字典
for item in data:
item_len = len(item)
if item_len > long_item:
long_item = item_len
long_data = item
# 字段最多的字典不存在
if not long_data:
raise Exception('新增字段不能为空')
# 获取字典keys值
keys_list = list(long_data.keys())
# 获取新增keys值和values占位符
keys = ','.join(long_data.keys())
values = ','.join(['%s' for i in range(len(long_data))])
# 循环获取字段
for item in data:
item_new = []
# 循环字段最多到列表
for key in keys_list:
# 判断字段是否存在
has_key = item.get(key, -1)
if has_key == -1:
# 字段不存在
item_new.append('')
else:
# 字段存在
item_new.append(item[key])
# 转换为元组后新增到新的元组
values_list.append(tuple(item_new))
values_list = tuple(values_list)
is_more = 1
else:
# 单个插入数据处理
keys = ','.join(data.keys())
values_list = tuple(data.values())
# 根据字段个数来生成相应的字符串占位符
values = ','.join(['%s' for i in range(len(data))])
is_more = 0
# 添加数据
try:
with OpenDB() as cs:
sql = "INSERT INTO {} ({}) VALUES ({})".format(self.database, keys, values)
if is_more == 1:
# 批量插入
cs.executemany(sql, values_list)
else:
# 单个插入
cs.execute(sql, values_list)
if cs.rowcount <= 0:
raise Exception('新增内容为空,插入数据库失败')
last_id = cs.lastrowid if cs.rowcount == 1 else cs.rowcount
return last_id
except Exception as e:
return e
# 更新内容
def update(self, data=None):
# 条件判断
if not data:
raise Exception('更新内容不能为空')
if not isinstance(data, dict):
raise Exception('参数必须是字典类型')
if not hasattr(self, 'where_vals'):
raise Exception('查询条件不能为空')
# 更新字段
set_keys = ','.join([item + '=%s' for item in data.keys()])
set_vals = [item for item in data.values()]
# 合并dict并转化为元组
val_list = tuple(set_vals + list(self.where_vals))
try:
with OpenDB() as cs:
sql = "UPDATE {} SET {} WHERE {}".format(self.database, set_keys, self.where_keys)
row = cs.execute(sql, val_list)
return row
except Exception as e:
return e
# 删除数据
def delete(self):
try:
with OpenDB() as cs:
sql = "DELETE FROM {}".format(self.database)
if hasattr(self, 'where_keys'):
sql += " WHERE {}".format(self.where_keys)
row = cs.execute(sql, self.where_vals)
else:
row = cs.execute(sql)
return row
except Exception as e:
return e

View File

@@ -7,6 +7,8 @@ import requests
import logging
import pandas as pd
from chain_db import ChainDb
class Rename:
def __init__(self, file_name):
@@ -27,10 +29,10 @@ class Rename:
self.file_info = {}
self.pre_analyse = None
self.regognize_group()
self.recognize_group()
# 获取字符串出现位置
def getStrInfo(self, char, target):
def get_str_location(self, char, target):
locate = []
for index, value in enumerate(char):
if target == value:
@@ -39,8 +41,8 @@ class Rename:
# 匹配某字符串最近的括号
def get_gp(self, char, string):
start = [x for x in self.getStrInfo(string, "[") if int(x) < int(string.find(char))][-1] + 1
end = [x for x in self.getStrInfo(string, "]") if int(x) > int(string.find(char))][0]
start = [x for x in self.get_str_location(string, "[") if int(x) < int(string.find(char))][-1] + 1
end = [x for x in self.get_str_location(string, "]") if int(x) > int(string.find(char))][0]
return string[start:end]
# 清理原链接(中文字符替换为英文)
@@ -49,7 +51,7 @@ class Rename:
# 去广告
file_name = re.sub("[(\[【]?(字幕)?[\u4e00-\u9fa5]{0,3}(新人|招募?新?)[\u4e00-\u9fa5]{0,5}[)\]】]?", "", file_name)
# 除杂
file_name = re.sub("[(\[【]?★?(\d{4}[年][春夏秋冬]?)?[\d一二三四五六七八九十]{1,2}月新?番?★?[)\]】]?", "", file_name)
file_name = re.sub("[(\[【]?★?(\d{4}[春夏秋冬]?)?[\d一二三四五六七八九十]{1,2}月新?番?★?[)\]】]?", "", file_name)
# 除杂x2
file_name = re.sub("[(\[【]?(2(\d{3}[年.][春夏秋冬]?)\d{1,2}\.?\d{1,2})[)\]】]?", "", file_name)
# 除杂x3
@@ -67,11 +69,10 @@ class Rename:
for s2 in str2:
if s1 in s2:
return [True, s2[1:]]
else:
return [False, name]
return [False, name]
# 检索字幕组特征
def regognize_group(self):
def recognize_group(self):
character = self.group_character
group = self.group_char
rule = self.group_rule
@@ -123,7 +124,7 @@ class Rename:
# 获取字幕组名
def get_group(self):
# 是否匹配成功(哪种方式匹配成功)
status = self.regognize_group()
status = self.recognize_group()
# 检索到的特征值
res_char = self.pre_analyse
# 强条
@@ -138,9 +139,7 @@ class Rename:
# 以特征值为中心,匹配最近的中括号,八成就这个了
gp = self.get_gp(res_char, self.file_name.lower())
# 防止太长炸了,一般不会这么长的字幕组名
if len(gp) < 30:
pass
else:
if len(gp) > 30:
print("name:%s\r\nchar:%s,gp:%s" % (self.file_name, res_char, gp))
return gp
except Exception as e:
@@ -234,7 +233,7 @@ class Rename:
file_name = str(self.file_name).lower()
type_list = []
# 英文标示
for i in range(3):
for _ in range(3):
try:
res = re.search(
"[(\[【]?((bd|remux|(viu)?tvb?|bilibili|b ?global|baha|web[ -]?(dl|rip))[ -]?(iso|mut|rip)?)[)\]】]?",
@@ -369,52 +368,13 @@ class Rename:
else:
return False
# 拿到的数据挨个测试
def get_info(self):
# 获取到的信息
info = {
"group": self.get_group(),
"dpi": self.get_dpi(),
"season": self.get_season(),
"episode": self.get_episode(),
"vision": self.get_vision(),
"lang": self.get_language(),
"ass": self.get_ass(),
"type": self.get_type(),
"code": self.get_code(),
"source": self.get_source()
}
# 字母全部小写
clean_name = self.file_name.lower()
# 去除拿到的有效信息
for k, v in info.items():
if v is not None:
if type(v) is list:
for i in v:
if i is not None:
clean_name = clean_name.replace(i, "")
else:
clean_name = clean_name.replace(v, "")
# 除杂
clean_list = ["pc&psp", "pc&psv", "fin", "opus", "movie", "tvb", "end", "bangumi.online", "donghua",
"话全", "第话", "第集", "全集", "", "", "+", "@", ""]
for i in clean_list:
clean_name = clean_name.replace(i, "").replace(" ]", "]").replace("[ ", "[").replace(" ", "")
# 分隔各字段
clean_name = clean_name.replace("[", "").replace("]", " ").replace("()", "").replace("( )", "")
# 去除多余空格
clean_name = re.sub(' +', ' ', clean_name).strip(" ")
# 剩下来的几乎就是干净番名了,再刮不到不管了
info["clean_name"] = clean_name
def extract_title(self, raw_name):
title = {
"zh": None,
"en": None,
}
clean_name = re.sub('[^a-zA-Z\u4e00-\u9fa5:@#$%^&*()\[\]/ ]', "", clean_name)
clean_name = re.sub(' +', ' ', clean_name).strip(" ")
clean_name = raw_name
if self.has_en(clean_name) and self.has_zh(clean_name):
# 中英
try:
@@ -462,10 +422,110 @@ class Rename:
if v is not None and "/" in v:
zh_list = v.split("/")
title[k] = zh_list[0].strip(" ")
return title
def add_separator(self, clean_name):
if "/" not in clean_name:
if '\u4e00' <= clean_name[0] <= '\u9fff':
try:
res = re.search("(^[a\u4e00-\u9fa5: ]{1,10} ?)([a-z:]{1,20} ?){1,10}", clean_name).group(1)
clean_name = clean_name.replace(res, res.strip(" ") + "/")
print("zh_pre:%s" % clean_name)
except Exception as e:
print(e)
else:
try:
res = re.search("^(([a-z:]{1,20} ?){1,10} )[\u4e00-\u9fa5: a]{1,20}", clean_name).group(1)
clean_name = clean_name.replace(res, res.strip(" ") + "/")
print("en_pre:%s" % clean_name)
except Exception as e:
print(e)
return clean_name
def easy_split(self, clean_name, zh_list, en_list):
if "/" in clean_name:
n_list = clean_name.split("/")
for i in n_list:
if self.has_zh(i) is False:
en_list.append(i.strip(" "))
elif self.has_en(i) is False:
zh_list.append(i.strip(" "))
else:
# 如果还是同时包含中英文的情况,递龟一下
i = self.add_separator(i)
self.easy_split(i, zh_list, en_list)
else:
if self.has_zh(clean_name) is False:
en_list.append(clean_name.strip(" "))
elif self.has_en(clean_name) is False:
zh_list.append(clean_name.strip(" "))
# 拿到的数据挨个测试
def get_info(self):
# 获取到的信息
info = {
"group": self.get_group(),
"dpi": self.get_dpi(),
"season": self.get_season(),
"episode": self.get_episode(),
"vision": self.get_vision(),
"lang": self.get_language(),
"ass": self.get_ass(),
"type": self.get_type(),
"code": self.get_code(),
"source": self.get_source()
}
# 字母全部小写
clean_name = self.file_name.lower()
# 去除拿到的有效信息
for k, v in info.items():
if v is not None:
if type(v) is list:
for i in v:
clean_name = clean_name.replace(i, "") if i is not None else clean_name
else:
clean_name = clean_name.replace(v, "")
# 除杂
clean_list = ["pc&psp", "pc&psv", "fin", "opus", "movie", "tvb", "end", "web", "bangumi.online", "donghua",
"话全", "第话", "第集", "全集", "", "", "+", "@", "", ""]
for i in clean_list:
clean_name = clean_name.replace(i, "")
# 去除多余空格
clean_name = re.sub(' +', ' ', clean_name).strip(" ")
# 分隔各字段
clean_name = re.sub("([(\[] *| *[)\]])", "", clean_name)
# 剩下来的几乎就是干净番名了,再刮不到不管了
info["clean_name"] = clean_name
clean_name = re.sub('[^a-zA-Z\u4e00-\u9fa5:@#$%^&*()\[\]/ ]', "", clean_name)
clean_name = re.sub(' +', ' ', clean_name).strip(" ")
clean_name = re.sub("([(\[] *| *[)\]])", "", clean_name)
print(clean_name)
title = {
"zh": None,
"en": None
}
zh_list = []
en_list = []
clean_name = self.add_separator(clean_name)
self.easy_split(clean_name, zh_list, en_list)
title["zh"] = zh_list if zh_list else None
title["en"] = en_list if en_list else None
if title["zh"] is None and title["en"] is None:
title = self.extract_title(clean_name)
print(title)
info["title"] = title
return info
if __name__ == "__main__":
# 使用方法
print(Rename(name).get_info())
raw = ChainDb("spider_dmhy").page(1, 1000).field("name").select()
name_list = [x["name"] for x in raw]
start = time.time()
for name in name_list:
print(name)
Rename(name).get_info()
print()
print("%s" % (time.time() - start))