mirror of
https://github.com/EstrellaXD/Auto_Bangumi.git
synced 2026-04-14 10:30:35 +08:00
Merge pull request #12 from IceKyrin/main
rss识别v1.6(保留标点,支持日文,bug:中日文有时未分离)
This commit is contained in:
@@ -6,23 +6,38 @@ import logging
|
||||
|
||||
class RSSInfoCleaner:
|
||||
class Name:
|
||||
raw_name = None
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
raw = None
|
||||
conv = None
|
||||
zh = None
|
||||
en = None
|
||||
jp = None
|
||||
clean = None
|
||||
|
||||
class Info:
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
group = None
|
||||
dpi = None
|
||||
season = None
|
||||
episode = None
|
||||
vision = None
|
||||
lang = None
|
||||
|
||||
class Tag:
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
dpi = None
|
||||
ass = None
|
||||
lang = None
|
||||
type = None
|
||||
code = None
|
||||
source = None
|
||||
zh = None
|
||||
en = None
|
||||
clean_name = None
|
||||
|
||||
def __init__(self, file_name):
|
||||
self.Name.file_name = file_name # 接收文件名参数
|
||||
self.Name.raw = file_name # 接收文件名参数
|
||||
self.clean() # 清理广告等杂质
|
||||
# 加载日志,匹配特征等
|
||||
logging.basicConfig(level=logging.DEBUG,
|
||||
@@ -42,19 +57,19 @@ class RSSInfoCleaner:
|
||||
self.pre_analyse = None
|
||||
# 匹配字幕组特征
|
||||
self.recognize_group()
|
||||
self.Name.group = self.get_group()
|
||||
self.Name.dpi = self.get_dpi()
|
||||
self.Name.season = self.get_season()
|
||||
self.Name.episode = self.get_episode()
|
||||
self.Name.vision = self.get_vision()
|
||||
self.Name.lang = self.get_language()
|
||||
self.Name.ass = self.get_ass()
|
||||
self.Name.type = self.get_type()
|
||||
self.Name.code = self.get_code()
|
||||
self.Name.source = self.get_source()
|
||||
self.Info.group = self.get_group()
|
||||
self.Tag.dpi = self.get_dpi()
|
||||
self.Info.season = self.get_season()
|
||||
self.Info.episode = self.get_episode()
|
||||
self.Info.vision = self.get_vision()
|
||||
self.Tag.lang = self.get_language()
|
||||
self.Tag.ass = self.get_ass()
|
||||
self.Tag.type = self.get_type()
|
||||
self.Tag.code = self.get_code()
|
||||
self.Tag.source = self.get_source()
|
||||
self.Name.zh = None
|
||||
self.Name.en = None
|
||||
self.Name.clean_name = None
|
||||
self.Name.clean = None
|
||||
self.get_info()
|
||||
|
||||
# 获取字符串出现位置
|
||||
@@ -73,7 +88,7 @@ class RSSInfoCleaner:
|
||||
|
||||
# 清理原链接(中文字符替换为英文)
|
||||
def clean(self):
|
||||
file_name = zhconv.convert(self.Name.file_name, 'zh-cn')
|
||||
file_name = zhconv.convert(self.Name.raw, 'zh-cn')
|
||||
# 去广告
|
||||
file_name = re.sub("[((\[【]?(字幕)?[\u4e00-\u9fa5]{0,3}(新人|招募?新?)[\u4e00-\u9fa5]{0,5}[))\]】]?", "", file_name)
|
||||
# 除杂
|
||||
@@ -85,7 +100,7 @@ class RSSInfoCleaner:
|
||||
strip = ["复制磁连", "兼容", "配音", "网盘", "\u200b", "[]", "★"]
|
||||
for i in strip:
|
||||
file_name = file_name.replace(i, "")
|
||||
self.Name.file_name = str(file_name).replace(':', ':').replace('【', '[').replace('】', ']').replace('-', '-') \
|
||||
self.Name.raw = str(file_name).replace(':', ':').replace('【', '[').replace('】', ']').replace('-', '-') \
|
||||
.replace('(', '(').replace(')', ')').replace("&", "&").replace("X", "x").replace("×", "x") \
|
||||
.replace("Ⅹ", "x").replace("-", " ").replace("_", " ")
|
||||
|
||||
@@ -98,12 +113,12 @@ class RSSInfoCleaner:
|
||||
character = group + character
|
||||
# !强规则,人工录入标准名,区分大小写,优先匹配
|
||||
for char in rule:
|
||||
if "[%s]" % char in self.Name.file_name:
|
||||
if "[%s]" % char in self.Name.raw:
|
||||
self.pre_analyse = char.lower()
|
||||
return "enforce"
|
||||
# 如果文件名以 [字幕组名] 开头
|
||||
if self.Name.file_name[0] == "[":
|
||||
str_split = self.Name.file_name.lower().split("]")
|
||||
if self.Name.raw[0] == "[":
|
||||
str_split = self.Name.raw.lower().split("]")
|
||||
# 检索特征值是否位于文件名第1、2、最后一段
|
||||
for char in character:
|
||||
if char in str_split[0] or char in str_split[1] or char in str_split[-1]:
|
||||
@@ -118,16 +133,16 @@ class RSSInfoCleaner:
|
||||
self.pre_analyse = None
|
||||
return False
|
||||
# 文件名以 -字幕组名 结尾
|
||||
elif "-" in self.Name.file_name:
|
||||
elif "-" in self.Name.raw:
|
||||
for char in character:
|
||||
if char in self.Name.file_name.lower().split("-")[-1]:
|
||||
self.pre_analyse = self.Name.file_name.lower().split("-")[-1]
|
||||
if char in self.Name.raw.lower().split("-")[-1]:
|
||||
self.pre_analyse = self.Name.raw.lower().split("-")[-1]
|
||||
return "reserve"
|
||||
self.pre_analyse = None
|
||||
return False
|
||||
# 文件名以空格分隔 字幕组名为第一段
|
||||
else:
|
||||
first_str = self.Name.file_name.lower().split(" ")[0]
|
||||
first_str = self.Name.raw.lower().split(" ")[0]
|
||||
for char in character:
|
||||
if char in first_str:
|
||||
self.pre_analyse = first_str
|
||||
@@ -148,14 +163,14 @@ class RSSInfoCleaner:
|
||||
# 大部分情况
|
||||
elif status == "success":
|
||||
# 如果是 [字幕组名] ,这么标准的格式直接else送走吧,剩下的匹配一下
|
||||
if "[%s]" % res_char not in self.Name.file_name.lower():
|
||||
if self.Name.file_name[0] == "[":
|
||||
if "[%s]" % res_char not in self.Name.raw.lower():
|
||||
if self.Name.raw[0] == "[":
|
||||
try:
|
||||
# 以特征值为中心,匹配最近的中括号,八成就这个了
|
||||
gp = self.get_gp(res_char, self.Name.file_name.lower())
|
||||
gp = self.get_gp(res_char, self.Name.raw.lower())
|
||||
return gp
|
||||
except Exception as e:
|
||||
print("bug -- res_char:%s,%s,%s" % (res_char, self.Name.file_name.lower(), e))
|
||||
logging.warning("bug -- res_char:%s,%s,%s" % (res_char, self.Name.raw.lower(), e))
|
||||
else:
|
||||
return res_char
|
||||
# 再见
|
||||
@@ -163,7 +178,7 @@ class RSSInfoCleaner:
|
||||
|
||||
# 扒了6W数据,硬找的参数,没啥说的
|
||||
def get_dpi(self):
|
||||
file_name = self.Name.file_name
|
||||
file_name = self.Name.raw
|
||||
dpi_list = ["4k", "2160p", "1440p", "1080p", "1036p", "816p", "810p", "720p", "576p", "544P", "540p", "480p",
|
||||
"1080i", "1080+",
|
||||
"3840x2160", "1920x1080", "1920x1036", "1920x804", "1920x800", "1536x864", "1452x1080", "1440x1080",
|
||||
@@ -177,7 +192,7 @@ class RSSInfoCleaner:
|
||||
|
||||
# 获取语种
|
||||
def get_language(self):
|
||||
file_name = self.Name.file_name
|
||||
file_name = self.Name.raw
|
||||
lang = []
|
||||
# 中文标示
|
||||
try:
|
||||
@@ -199,7 +214,7 @@ class RSSInfoCleaner:
|
||||
|
||||
# 文件种类
|
||||
def get_type(self):
|
||||
file_name = self.Name.file_name
|
||||
file_name = self.Name.raw
|
||||
type_list = []
|
||||
# 英文标示
|
||||
try:
|
||||
@@ -214,7 +229,7 @@ class RSSInfoCleaner:
|
||||
|
||||
# 编码格式
|
||||
def get_code(self):
|
||||
file_name = self.Name.file_name
|
||||
file_name = self.Name.raw
|
||||
code = []
|
||||
# 英文标示
|
||||
try:
|
||||
@@ -229,7 +244,7 @@ class RSSInfoCleaner:
|
||||
|
||||
# 来源
|
||||
def get_source(self):
|
||||
file_name = str(self.Name.file_name).lower()
|
||||
file_name = str(self.Name.raw).lower()
|
||||
type_list = []
|
||||
# 英文标示
|
||||
for _ in range(3):
|
||||
@@ -250,7 +265,7 @@ class RSSInfoCleaner:
|
||||
|
||||
# 获取季度
|
||||
def get_season(self):
|
||||
file_name = self.Name.file_name.lower()
|
||||
file_name = self.Name.raw.lower()
|
||||
season = []
|
||||
# 中文标示
|
||||
try:
|
||||
@@ -270,7 +285,7 @@ class RSSInfoCleaner:
|
||||
|
||||
# 获取集数
|
||||
def get_episode(self):
|
||||
file_name = self.Name.file_name.lower()
|
||||
file_name = self.Name.raw.lower()
|
||||
episode = []
|
||||
# [10 11]集点名批评这种命名方法,几个国漫的组
|
||||
try:
|
||||
@@ -305,7 +320,7 @@ class RSSInfoCleaner:
|
||||
|
||||
# 获取版本
|
||||
def get_vision(self):
|
||||
file_name = self.Name.file_name.lower()
|
||||
file_name = self.Name.raw.lower()
|
||||
vision = []
|
||||
# 中文
|
||||
try:
|
||||
@@ -332,7 +347,7 @@ class RSSInfoCleaner:
|
||||
|
||||
# 获取字幕类型
|
||||
def get_ass(self):
|
||||
file_name = self.Name.file_name.lower()
|
||||
file_name = self.Name.raw.lower()
|
||||
ass = []
|
||||
# 中文标示
|
||||
try:
|
||||
@@ -367,6 +382,14 @@ class RSSInfoCleaner:
|
||||
else:
|
||||
return False
|
||||
|
||||
def has_jp(self, str):
|
||||
my_re = re.compile(r'[\u3040-\u31ff]', re.S)
|
||||
res = re.findall(my_re, str)
|
||||
if len(res):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
# 粗略识别失败,re强制匹配
|
||||
def extract_title(self, raw_name):
|
||||
title = {
|
||||
@@ -431,13 +454,15 @@ class RSSInfoCleaner:
|
||||
if "/" not in clean_name:
|
||||
if '\u4e00' <= clean_name[0] <= '\u9fff':
|
||||
try:
|
||||
res = re.search("(^[a\u4e00-\u9fa5: ]{1,10} ?)([a-z:]{1,20} ?){1,10}", clean_name).group(1)
|
||||
res = re.search("(^[\u4e00-\u9fa5\u3040-\u31ff: .。,!!]{1,20} ?)([a-z: .。,,!!]{1,20} ?)*",
|
||||
clean_name).group(1)
|
||||
clean_name = clean_name.replace(res, res.strip(" ") + "/")
|
||||
except Exception as e:
|
||||
logging.info(e)
|
||||
else:
|
||||
try:
|
||||
res = re.search("^(([a-z:]{1,20} ?){1,10} )[\u4e00-\u9fa5: a]{1,20}", clean_name).group(1)
|
||||
res = re.search("^(([a-z: .。,,!!]{1,20} ?)* ?)[\u4e00-\u9fa5\u3040-\u31ff: .。,,!!]{1,20}",
|
||||
clean_name).group(1)
|
||||
clean_name = clean_name.replace(res, res.strip(" ") + "/")
|
||||
except Exception as e:
|
||||
logging.info(e)
|
||||
@@ -446,43 +471,51 @@ class RSSInfoCleaner:
|
||||
return clean_name
|
||||
|
||||
# 对以/分隔的多个翻译名,进行简单提取
|
||||
def easy_split(self, clean_name, zh_list, en_list):
|
||||
def easy_split(self, clean_name, zh_list, en_list, jp_list):
|
||||
if "/" in clean_name:
|
||||
n_list = clean_name.split("/")
|
||||
for i in n_list:
|
||||
if self.has_zh(i) is False:
|
||||
en_list.append(i.strip(" "))
|
||||
elif self.has_en(i) is False:
|
||||
zh_list.append(i.strip(" "))
|
||||
if self.has_jp(i):
|
||||
jp_list.append(i.strip(" "))
|
||||
else:
|
||||
# 如果还是同时包含中英文的情况,递龟一下
|
||||
i = self.add_separator(i)
|
||||
self.easy_split(i, zh_list, en_list)
|
||||
if self.has_zh(i) is False:
|
||||
en_list.append(i.strip(" "))
|
||||
elif self.has_en(i) is False:
|
||||
zh_list.append(i.strip(" "))
|
||||
elif self.has_zh(i) and self.has_en(i):
|
||||
# 如果还是同时包含中英文的情况,递龟一下
|
||||
i = self.add_separator(i)
|
||||
self.easy_split(i, zh_list, en_list, jp_list)
|
||||
else:
|
||||
self.easy_split(i, zh_list, en_list, jp_list)
|
||||
else:
|
||||
if self.has_zh(clean_name) is False:
|
||||
en_list.append(clean_name.strip(" "))
|
||||
elif self.has_en(clean_name) is False:
|
||||
zh_list.append(clean_name.strip(" "))
|
||||
if self.has_jp(clean_name):
|
||||
jp_list.append(clean_name.strip(" "))
|
||||
else:
|
||||
if self.has_zh(clean_name) is False:
|
||||
en_list.append(clean_name.strip(" "))
|
||||
elif self.has_en(clean_name) is False:
|
||||
zh_list.append(clean_name.strip(" "))
|
||||
|
||||
# 汇总信息
|
||||
def get_info(self):
|
||||
# 获取到的信息
|
||||
info = {
|
||||
"group": self.Name.group,
|
||||
"dpi": self.Name.dpi,
|
||||
"season": self.Name.season,
|
||||
"episode": self.Name.episode,
|
||||
"vision": self.Name.vision,
|
||||
"lang": self.Name.lang,
|
||||
"ass": self.Name.ass,
|
||||
"type": self.Name.type,
|
||||
"code": self.Name.code,
|
||||
"source": self.Name.source
|
||||
"group": self.Info.group,
|
||||
"dpi": self.Tag.dpi,
|
||||
"season": self.Info.season,
|
||||
"episode": self.Info.episode,
|
||||
"vision": self.Info.vision,
|
||||
"lang": self.Tag.lang,
|
||||
"ass": self.Tag.ass,
|
||||
"type": self.Tag.type,
|
||||
"code": self.Tag.code,
|
||||
"source": self.Tag.source
|
||||
}
|
||||
|
||||
# 字母全部小写
|
||||
clean_name = self.Name.file_name.lower()
|
||||
# clean_name = self.Name.file_name
|
||||
clean_name = self.Name.raw.lower()
|
||||
|
||||
# 去除拿到的有效信息
|
||||
for k, v in info.items():
|
||||
if v is not None:
|
||||
@@ -503,20 +536,84 @@ class RSSInfoCleaner:
|
||||
|
||||
# 剩下来的几乎就是干净番名了,再刮不到不管了
|
||||
info["clean_name"] = clean_name
|
||||
clean_name = re.sub('[^a-zA-Z\u4e00-\u9fa5:@#$%^&*()\[\]/ ]', "", clean_name)
|
||||
clean_name = re.sub('[^a-zA-Z\u4e00-\u9fa5\u3040-\u31ff:*()\[\]/ .。,,!!]', "", clean_name)
|
||||
clean_name = re.sub(' +', ' ', clean_name).strip(" ")
|
||||
clean_name = re.sub("([(\[] *| *[)\]])", "", clean_name)
|
||||
|
||||
zh_list = []
|
||||
en_list = []
|
||||
raw_zh_list = []
|
||||
raw_jp_list = []
|
||||
raw_en_list = []
|
||||
|
||||
clean_name = self.add_separator(clean_name)
|
||||
clean_name = re.sub("(/ */)", "", clean_name)
|
||||
self.easy_split(clean_name, zh_list, en_list)
|
||||
|
||||
self.easy_split(clean_name, raw_zh_list, raw_en_list, raw_jp_list)
|
||||
self.Name.clean = clean_name
|
||||
zh_list = []
|
||||
en_list = []
|
||||
jp_list = []
|
||||
for res in raw_zh_list:
|
||||
correct_res = re.search(res, self.Name.raw.lower())
|
||||
if correct_res:
|
||||
zh_list.append(correct_res.group())
|
||||
for res in raw_en_list:
|
||||
correct_res = re.search(res, self.Name.raw.lower())
|
||||
if correct_res:
|
||||
en_list.append(correct_res.group())
|
||||
for res in raw_jp_list:
|
||||
correct_res = re.search(res, self.Name.raw.lower())
|
||||
if correct_res:
|
||||
jp_list.append(correct_res.group())
|
||||
|
||||
if jp_list:
|
||||
temp_name = self.Name.clean
|
||||
for i in jp_list:
|
||||
temp_name = temp_name.replace(i, "")
|
||||
self.easy_split(temp_name, zh_list, en_list, jp_list)
|
||||
if zh_list and en_list == []:
|
||||
temp_name = self.Name.clean
|
||||
for i in zh_list:
|
||||
temp_name = temp_name.replace(i, "")
|
||||
self.easy_split(temp_name, zh_list, en_list, jp_list)
|
||||
elif zh_list == [] and en_list:
|
||||
temp_name = self.Name.clean
|
||||
for i in en_list:
|
||||
temp_name = temp_name.replace(i, "")
|
||||
self.easy_split(temp_name, zh_list, en_list, jp_list)
|
||||
elif zh_list == [] and en_list == []:
|
||||
self.extract_title(clean_name)
|
||||
|
||||
self.Name.zh = zh_list if zh_list else None
|
||||
self.Name.en = en_list if en_list else None
|
||||
if self.Name.zh is None and self.Name.en is None:
|
||||
self.extract_title(clean_name)
|
||||
self.Name.jp = jp_list if jp_list else None
|
||||
|
||||
return info
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import csv
|
||||
|
||||
|
||||
def read_data(file_name, rows):
|
||||
if file_name == "mikan":
|
||||
with open('mikan.csv', 'r', encoding='utf-8') as csv_file:
|
||||
reader = csv.reader(csv_file)
|
||||
raw_data = [row[3] for row in reader][0:rows]
|
||||
return raw_data
|
||||
elif file_name == "dmhy":
|
||||
with open('dmhy.csv', 'r', encoding='utf-8') as csv_file:
|
||||
reader = csv.reader(csv_file)
|
||||
raw_data = [row[4] for row in reader][1:rows + 1]
|
||||
return raw_data
|
||||
|
||||
|
||||
# mikan/dmhy 获取数据,dmhy 最多1w行,mikan最多3w行
|
||||
name_list = read_data("dmhy", 100)
|
||||
for name in name_list:
|
||||
print(name)
|
||||
print("pure_name:%s" % RSSInfoCleaner(name).Name.raw)
|
||||
print("clean_name:%s" % RSSInfoCleaner(name).Name.clean)
|
||||
print("zh:%s" % RSSInfoCleaner(name).Name.zh)
|
||||
print("en:%s" % RSSInfoCleaner(name).Name.en)
|
||||
print("jp:%s" % RSSInfoCleaner(name).Name.jp)
|
||||
print()
|
||||
|
||||
|
Can't render this file because it is too large.
|
|
Can't render this file because it is too large.
|
@@ -49,7 +49,8 @@
|
||||
"nvacg",
|
||||
"RHxDymy",
|
||||
"PoInSu",
|
||||
"DHR百合組"
|
||||
"DHR百合組",
|
||||
"东京不够热"
|
||||
],
|
||||
"name_position": 1
|
||||
}
|
||||
|
||||
@@ -1,322 +0,0 @@
|
||||
from pymysql import connect
|
||||
from pymysql.cursors import DictCursor
|
||||
from config import config_const
|
||||
|
||||
|
||||
class OpenDB(object):
|
||||
def __init__(self):
|
||||
super(OpenDB, self).__init__()
|
||||
# 初始化
|
||||
self.conn = connect(
|
||||
host=config_const['db_config']['host'],
|
||||
port=config_const['db_config']['port'],
|
||||
user=config_const['db_config']['user'],
|
||||
passwd=config_const['db_config']['passwd'],
|
||||
db=config_const['db_config']['db'],
|
||||
charset=config_const['db_config']['charset']
|
||||
)
|
||||
# 获取游标
|
||||
self.cs = self.conn.cursor(DictCursor)
|
||||
|
||||
def __enter__(self):
|
||||
# 返回游标进行执行操作
|
||||
return self.cs
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
# 结束提交数据并关闭数据库
|
||||
self.conn.commit()
|
||||
self.cs.close()
|
||||
self.conn.close()
|
||||
|
||||
|
||||
class ChainDb(object):
|
||||
def __init__(self, database):
|
||||
super(ChainDb, self).__init__()
|
||||
self.where_keys = ''
|
||||
self.where_vals = []
|
||||
self.params = '*'
|
||||
self.limit = ''
|
||||
self.order_by = ''
|
||||
self.group_by = ''
|
||||
if config_const['db_config']['pre']:
|
||||
self.database = config_const['db_config']['pre'] + database
|
||||
else:
|
||||
self.database = database
|
||||
|
||||
# 查询条件拼接
|
||||
@staticmethod
|
||||
def _select(where, where_keys, where_vals, method='and'):
|
||||
is_method = 0
|
||||
where_method, where_compare = [], []
|
||||
for item in where.values():
|
||||
if isinstance(item, list) and len(item) > 1:
|
||||
# 判断条件类型,默认是and
|
||||
if len(item) > 2:
|
||||
if len(item) == 3 and item[2] and (item[2] == 'and' or item[2] == 'or'):
|
||||
where_method.append(item[2])
|
||||
else:
|
||||
where_method.append('and')
|
||||
else:
|
||||
where_method.append('and')
|
||||
# 判断条件方式,大于或等于等等
|
||||
sign_res = False
|
||||
if item[0]:
|
||||
compare_sign = ['>', '<', '=', '<>', '>=', '<=']
|
||||
for sign in compare_sign:
|
||||
if item[0] == sign:
|
||||
sign_res = True
|
||||
break
|
||||
if sign_res:
|
||||
where_compare.append(item[0])
|
||||
else:
|
||||
where_compare.append('=')
|
||||
# 条件值
|
||||
where_vals.append(item[1])
|
||||
is_method = 1
|
||||
else:
|
||||
where_vals.append(item)
|
||||
is_method = 0
|
||||
if is_method == 1:
|
||||
# 拼接and或or条件,最后一个去掉
|
||||
keys = list(where.keys())
|
||||
method_len = len(where_vals)
|
||||
for i in range(method_len):
|
||||
where_keys += keys[i] + where_compare[i] + '%s ' + (
|
||||
where_method[i] if i <= method_len - 2 else '') + ' '
|
||||
else:
|
||||
# 拼接条件
|
||||
where_keys = (' ' + method + ' ').join([item + '=%s' for item in where.keys()])
|
||||
where_vals = tuple(where_vals)
|
||||
return {'keys': where_keys, 'vals': where_vals}
|
||||
|
||||
# 执行mysql
|
||||
@staticmethod
|
||||
def query(sql=None, tuple_values=None):
|
||||
if not sql:
|
||||
raise Exception('sql语句不能为空')
|
||||
if tuple_values and not isinstance(tuple_values, tuple):
|
||||
raise Exception('参数类型必须是元组')
|
||||
try:
|
||||
with OpenDB() as cs:
|
||||
if tuple_values:
|
||||
cs.execute(sql, tuple_values)
|
||||
else:
|
||||
cs.execute(sql)
|
||||
return cs
|
||||
except Exception as e:
|
||||
return e
|
||||
|
||||
# 查询条件
|
||||
def where(self, where=None, value=None, method='and'):
|
||||
where_keys, where_vals = '', []
|
||||
if value is not None:
|
||||
where = {where: value}
|
||||
# 判断参数
|
||||
if where and not isinstance(where, dict):
|
||||
raise Exception('查询条件必须是字典类型')
|
||||
if where:
|
||||
# 查询字段
|
||||
_select = self._select(where, where_keys, where_vals, method)
|
||||
where_keys, where_vals = _select['keys'], _select['vals']
|
||||
if hasattr(self, 'where_keys') and self.where_keys:
|
||||
self.where_keys += ' ' + method + ' ' + where_keys
|
||||
self.where_vals += where_vals
|
||||
else:
|
||||
self.where_keys = where_keys
|
||||
self.where_vals = where_vals
|
||||
return self
|
||||
|
||||
# 查询字段
|
||||
def field(self, params=None):
|
||||
if not params:
|
||||
self.params = '*'
|
||||
else:
|
||||
if isinstance(params, list):
|
||||
params = ','.join(params)
|
||||
self.params = params
|
||||
return self
|
||||
|
||||
# 分页
|
||||
def page(self, page=1, rows=10):
|
||||
self.limit = str((page - 1) * rows) + ',' + str(rows)
|
||||
return self
|
||||
|
||||
# 归组
|
||||
def group(self, param=None):
|
||||
if not param:
|
||||
return self
|
||||
if not isinstance(param, str):
|
||||
raise Exception('参数必须是字符串类型')
|
||||
self.group_by = param
|
||||
return self
|
||||
|
||||
# 排序
|
||||
def order(self, order_by=None):
|
||||
if not order_by:
|
||||
return self
|
||||
self.order_by = order_by
|
||||
return self
|
||||
|
||||
# 格式化sql
|
||||
def format_sql(self):
|
||||
if not hasattr(self, 'params'):
|
||||
self.params = '*'
|
||||
# select * from user u left join class c on u.user_id = c.user_id where u.user_id = 1
|
||||
sql = "SELECT {} FROM {}".format(self.params, self.database)
|
||||
if hasattr(self, 'where_keys') and self.where_keys:
|
||||
sql += " WHERE {}".format(self.where_keys)
|
||||
if hasattr(self, 'order_by') and self.order_by:
|
||||
sql += " ORDER BY {}".format(self.order_by)
|
||||
if hasattr(self, 'group_by') and self.group_by:
|
||||
sql += " GROUP BY {}".format(self.group_by)
|
||||
if hasattr(self, 'limit') and self.limit:
|
||||
sql += " limit {}".format(self.limit)
|
||||
return sql
|
||||
|
||||
# 查询多个数据
|
||||
def select(self):
|
||||
try:
|
||||
with OpenDB() as cs:
|
||||
sql = self.format_sql()
|
||||
if hasattr(self, 'where_vals') and self.where_vals:
|
||||
print(sql % self.where_vals)
|
||||
cs.execute(sql, self.where_vals)
|
||||
else:
|
||||
cs.execute(sql)
|
||||
return cs.fetchall()
|
||||
except Exception as e:
|
||||
return e
|
||||
|
||||
# 查询单个数据
|
||||
def find(self):
|
||||
try:
|
||||
with OpenDB() as cs:
|
||||
sql = self.format_sql()
|
||||
if self.where_vals:
|
||||
cs.execute(sql, self.where_vals)
|
||||
else:
|
||||
cs.execute(sql)
|
||||
return cs.fetchone()
|
||||
except Exception as e:
|
||||
return e
|
||||
|
||||
# 查找单个字段
|
||||
def value(self, field=None):
|
||||
if not field:
|
||||
raise Exception('查询字段不能为空')
|
||||
try:
|
||||
with OpenDB() as cs:
|
||||
sql = self.format_sql()
|
||||
if self.where_vals:
|
||||
cs.execute(sql, self.where_vals)
|
||||
else:
|
||||
cs.execute(sql)
|
||||
row = cs.fetchone()
|
||||
if field not in row:
|
||||
raise Exception('没有查询到该字段')
|
||||
return row[field]
|
||||
except Exception as e:
|
||||
return e
|
||||
|
||||
# 新增数据
|
||||
def insert(self, data):
|
||||
if not data:
|
||||
raise Exception('新增内容不能为空')
|
||||
if not isinstance(data, dict) and not isinstance(data, list):
|
||||
raise Exception('参数必须是字典类型')
|
||||
if isinstance(data, list):
|
||||
# 批量插入数据处理
|
||||
if not data:
|
||||
raise Exception('列表内容不能为空')
|
||||
# 参数赋值
|
||||
long_item, long_data, values_list = 0, [], []
|
||||
# 循环获取字段最多的字典
|
||||
for item in data:
|
||||
item_len = len(item)
|
||||
if item_len > long_item:
|
||||
long_item = item_len
|
||||
long_data = item
|
||||
# 字段最多的字典不存在
|
||||
if not long_data:
|
||||
raise Exception('新增字段不能为空')
|
||||
# 获取字典keys值
|
||||
keys_list = list(long_data.keys())
|
||||
# 获取新增keys值和values占位符
|
||||
keys = ','.join(long_data.keys())
|
||||
values = ','.join(['%s' for i in range(len(long_data))])
|
||||
# 循环获取字段
|
||||
for item in data:
|
||||
item_new = []
|
||||
# 循环字段最多到列表
|
||||
for key in keys_list:
|
||||
# 判断字段是否存在
|
||||
has_key = item.get(key, -1)
|
||||
if has_key == -1:
|
||||
# 字段不存在
|
||||
item_new.append('')
|
||||
else:
|
||||
# 字段存在
|
||||
item_new.append(item[key])
|
||||
# 转换为元组后新增到新的元组
|
||||
values_list.append(tuple(item_new))
|
||||
values_list = tuple(values_list)
|
||||
is_more = 1
|
||||
else:
|
||||
# 单个插入数据处理
|
||||
keys = ','.join(data.keys())
|
||||
values_list = tuple(data.values())
|
||||
# 根据字段个数来生成相应的字符串占位符
|
||||
values = ','.join(['%s' for i in range(len(data))])
|
||||
is_more = 0
|
||||
# 添加数据
|
||||
try:
|
||||
with OpenDB() as cs:
|
||||
sql = "INSERT INTO {} ({}) VALUES ({})".format(self.database, keys, values)
|
||||
if is_more == 1:
|
||||
# 批量插入
|
||||
cs.executemany(sql, values_list)
|
||||
else:
|
||||
# 单个插入
|
||||
cs.execute(sql, values_list)
|
||||
if cs.rowcount <= 0:
|
||||
raise Exception('新增内容为空,插入数据库失败')
|
||||
last_id = cs.lastrowid if cs.rowcount == 1 else cs.rowcount
|
||||
return last_id
|
||||
except Exception as e:
|
||||
return e
|
||||
|
||||
# 更新内容
|
||||
def update(self, data=None):
|
||||
# 条件判断
|
||||
if not data:
|
||||
raise Exception('更新内容不能为空')
|
||||
if not isinstance(data, dict):
|
||||
raise Exception('参数必须是字典类型')
|
||||
if not hasattr(self, 'where_vals'):
|
||||
raise Exception('查询条件不能为空')
|
||||
# 更新字段
|
||||
set_keys = ','.join([item + '=%s' for item in data.keys()])
|
||||
set_vals = [item for item in data.values()]
|
||||
# 合并dict并转化为元组
|
||||
val_list = tuple(set_vals + list(self.where_vals))
|
||||
try:
|
||||
with OpenDB() as cs:
|
||||
sql = "UPDATE {} SET {} WHERE {}".format(self.database, set_keys, self.where_keys)
|
||||
row = cs.execute(sql, val_list)
|
||||
return row
|
||||
except Exception as e:
|
||||
return e
|
||||
|
||||
# 删除数据
|
||||
def delete(self):
|
||||
try:
|
||||
with OpenDB() as cs:
|
||||
sql = "DELETE FROM {}".format(self.database)
|
||||
if hasattr(self, 'where_keys'):
|
||||
sql += " WHERE {}".format(self.where_keys)
|
||||
row = cs.execute(sql, self.where_vals)
|
||||
else:
|
||||
row = cs.execute(sql)
|
||||
return row
|
||||
except Exception as e:
|
||||
return e
|
||||
Reference in New Issue
Block a user