From 3d514a3a8d6c71e545232527e818442b30e682d8 Mon Sep 17 00:00:00 2001 From: xiaolang <996625688@qq.com> Date: Fri, 27 May 2022 03:42:41 +0800 Subject: [PATCH] =?UTF-8?q?rss=E8=AF=86=E5=88=AB=E7=89=88=E6=9C=AC1.1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Windows/chain_db.py | 322 ++++++++++++++++++++++++++++++++++++++++++++ Windows/rename.py | 172 +++++++++++++++-------- 2 files changed, 438 insertions(+), 56 deletions(-) create mode 100644 Windows/chain_db.py diff --git a/Windows/chain_db.py b/Windows/chain_db.py new file mode 100644 index 00000000..fe82bf5a --- /dev/null +++ b/Windows/chain_db.py @@ -0,0 +1,322 @@ +from pymysql import connect +from pymysql.cursors import DictCursor +from config import config_const + + +class OpenDB(object): + def __init__(self): + super(OpenDB, self).__init__() + # 初始化 + self.conn = connect( + host=config_const['db_config']['host'], + port=config_const['db_config']['port'], + user=config_const['db_config']['user'], + passwd=config_const['db_config']['passwd'], + db=config_const['db_config']['db'], + charset=config_const['db_config']['charset'] + ) + # 获取游标 + self.cs = self.conn.cursor(DictCursor) + + def __enter__(self): + # 返回游标进行执行操作 + return self.cs + + def __exit__(self, exc_type, exc_val, exc_tb): + # 结束提交数据并关闭数据库 + self.conn.commit() + self.cs.close() + self.conn.close() + + +class ChainDb(object): + def __init__(self, database): + super(ChainDb, self).__init__() + self.where_keys = '' + self.where_vals = [] + self.params = '*' + self.limit = '' + self.order_by = '' + self.group_by = '' + if config_const['db_config']['pre']: + self.database = config_const['db_config']['pre'] + database + else: + self.database = database + + # 查询条件拼接 + @staticmethod + def _select(where, where_keys, where_vals, method='and'): + is_method = 0 + where_method, where_compare = [], [] + for item in where.values(): + if isinstance(item, list) and len(item) > 1: + # 判断条件类型,默认是and + if len(item) > 2: + if len(item) == 3 and item[2] and (item[2] == 'and' or item[2] == 'or'): + where_method.append(item[2]) + else: + where_method.append('and') + else: + where_method.append('and') + # 判断条件方式,大于或等于等等 + sign_res = False + if item[0]: + compare_sign = ['>', '<', '=', '<>', '>=', '<='] + for sign in compare_sign: + if item[0] == sign: + sign_res = True + break + if sign_res: + where_compare.append(item[0]) + else: + where_compare.append('=') + # 条件值 + where_vals.append(item[1]) + is_method = 1 + else: + where_vals.append(item) + is_method = 0 + if is_method == 1: + # 拼接and或or条件,最后一个去掉 + keys = list(where.keys()) + method_len = len(where_vals) + for i in range(method_len): + where_keys += keys[i] + where_compare[i] + '%s ' + ( + where_method[i] if i <= method_len - 2 else '') + ' ' + else: + # 拼接条件 + where_keys = (' ' + method + ' ').join([item + '=%s' for item in where.keys()]) + where_vals = tuple(where_vals) + return {'keys': where_keys, 'vals': where_vals} + + # 执行mysql + @staticmethod + def query(sql=None, tuple_values=None): + if not sql: + raise Exception('sql语句不能为空') + if tuple_values and not isinstance(tuple_values, tuple): + raise Exception('参数类型必须是元组') + try: + with OpenDB() as cs: + if tuple_values: + cs.execute(sql, tuple_values) + else: + cs.execute(sql) + return cs + except Exception as e: + return e + + # 查询条件 + def where(self, where=None, value=None, method='and'): + where_keys, where_vals = '', [] + if value is not None: + where = {where: value} + # 判断参数 + if where and not isinstance(where, dict): + raise Exception('查询条件必须是字典类型') + if where: + # 查询字段 + _select = self._select(where, where_keys, where_vals, method) + where_keys, where_vals = _select['keys'], _select['vals'] + if hasattr(self, 'where_keys') and self.where_keys: + self.where_keys += ' ' + method + ' ' + where_keys + self.where_vals += where_vals + else: + self.where_keys = where_keys + self.where_vals = where_vals + return self + + # 查询字段 + def field(self, params=None): + if not params: + self.params = '*' + else: + if isinstance(params, list): + params = ','.join(params) + self.params = params + return self + + # 分页 + def page(self, page=1, rows=10): + self.limit = str((page - 1) * rows) + ',' + str(rows) + return self + + # 归组 + def group(self, param=None): + if not param: + return self + if not isinstance(param, str): + raise Exception('参数必须是字符串类型') + self.group_by = param + return self + + # 排序 + def order(self, order_by=None): + if not order_by: + return self + self.order_by = order_by + return self + + # 格式化sql + def format_sql(self): + if not hasattr(self, 'params'): + self.params = '*' + # select * from user u left join class c on u.user_id = c.user_id where u.user_id = 1 + sql = "SELECT {} FROM {}".format(self.params, self.database) + if hasattr(self, 'where_keys') and self.where_keys: + sql += " WHERE {}".format(self.where_keys) + if hasattr(self, 'order_by') and self.order_by: + sql += " ORDER BY {}".format(self.order_by) + if hasattr(self, 'group_by') and self.group_by: + sql += " GROUP BY {}".format(self.group_by) + if hasattr(self, 'limit') and self.limit: + sql += " limit {}".format(self.limit) + return sql + + # 查询多个数据 + def select(self): + try: + with OpenDB() as cs: + sql = self.format_sql() + if hasattr(self, 'where_vals') and self.where_vals: + print(sql % self.where_vals) + cs.execute(sql, self.where_vals) + else: + cs.execute(sql) + return cs.fetchall() + except Exception as e: + return e + + # 查询单个数据 + def find(self): + try: + with OpenDB() as cs: + sql = self.format_sql() + if self.where_vals: + cs.execute(sql, self.where_vals) + else: + cs.execute(sql) + return cs.fetchone() + except Exception as e: + return e + + # 查找单个字段 + def value(self, field=None): + if not field: + raise Exception('查询字段不能为空') + try: + with OpenDB() as cs: + sql = self.format_sql() + if self.where_vals: + cs.execute(sql, self.where_vals) + else: + cs.execute(sql) + row = cs.fetchone() + if field not in row: + raise Exception('没有查询到该字段') + return row[field] + except Exception as e: + return e + + # 新增数据 + def insert(self, data): + if not data: + raise Exception('新增内容不能为空') + if not isinstance(data, dict) and not isinstance(data, list): + raise Exception('参数必须是字典类型') + if isinstance(data, list): + # 批量插入数据处理 + if not data: + raise Exception('列表内容不能为空') + # 参数赋值 + long_item, long_data, values_list = 0, [], [] + # 循环获取字段最多的字典 + for item in data: + item_len = len(item) + if item_len > long_item: + long_item = item_len + long_data = item + # 字段最多的字典不存在 + if not long_data: + raise Exception('新增字段不能为空') + # 获取字典keys值 + keys_list = list(long_data.keys()) + # 获取新增keys值和values占位符 + keys = ','.join(long_data.keys()) + values = ','.join(['%s' for i in range(len(long_data))]) + # 循环获取字段 + for item in data: + item_new = [] + # 循环字段最多到列表 + for key in keys_list: + # 判断字段是否存在 + has_key = item.get(key, -1) + if has_key == -1: + # 字段不存在 + item_new.append('') + else: + # 字段存在 + item_new.append(item[key]) + # 转换为元组后新增到新的元组 + values_list.append(tuple(item_new)) + values_list = tuple(values_list) + is_more = 1 + else: + # 单个插入数据处理 + keys = ','.join(data.keys()) + values_list = tuple(data.values()) + # 根据字段个数来生成相应的字符串占位符 + values = ','.join(['%s' for i in range(len(data))]) + is_more = 0 + # 添加数据 + try: + with OpenDB() as cs: + sql = "INSERT INTO {} ({}) VALUES ({})".format(self.database, keys, values) + if is_more == 1: + # 批量插入 + cs.executemany(sql, values_list) + else: + # 单个插入 + cs.execute(sql, values_list) + if cs.rowcount <= 0: + raise Exception('新增内容为空,插入数据库失败') + last_id = cs.lastrowid if cs.rowcount == 1 else cs.rowcount + return last_id + except Exception as e: + return e + + # 更新内容 + def update(self, data=None): + # 条件判断 + if not data: + raise Exception('更新内容不能为空') + if not isinstance(data, dict): + raise Exception('参数必须是字典类型') + if not hasattr(self, 'where_vals'): + raise Exception('查询条件不能为空') + # 更新字段 + set_keys = ','.join([item + '=%s' for item in data.keys()]) + set_vals = [item for item in data.values()] + # 合并dict并转化为元组 + val_list = tuple(set_vals + list(self.where_vals)) + try: + with OpenDB() as cs: + sql = "UPDATE {} SET {} WHERE {}".format(self.database, set_keys, self.where_keys) + row = cs.execute(sql, val_list) + return row + except Exception as e: + return e + + # 删除数据 + def delete(self): + try: + with OpenDB() as cs: + sql = "DELETE FROM {}".format(self.database) + if hasattr(self, 'where_keys'): + sql += " WHERE {}".format(self.where_keys) + row = cs.execute(sql, self.where_vals) + else: + row = cs.execute(sql) + return row + except Exception as e: + return e diff --git a/Windows/rename.py b/Windows/rename.py index 8423a692..00250831 100644 --- a/Windows/rename.py +++ b/Windows/rename.py @@ -7,6 +7,8 @@ import requests import logging import pandas as pd +from chain_db import ChainDb + class Rename: def __init__(self, file_name): @@ -27,10 +29,10 @@ class Rename: self.file_info = {} self.pre_analyse = None - self.regognize_group() + self.recognize_group() # 获取字符串出现位置 - def getStrInfo(self, char, target): + def get_str_location(self, char, target): locate = [] for index, value in enumerate(char): if target == value: @@ -39,8 +41,8 @@ class Rename: # 匹配某字符串最近的括号 def get_gp(self, char, string): - start = [x for x in self.getStrInfo(string, "[") if int(x) < int(string.find(char))][-1] + 1 - end = [x for x in self.getStrInfo(string, "]") if int(x) > int(string.find(char))][0] + start = [x for x in self.get_str_location(string, "[") if int(x) < int(string.find(char))][-1] + 1 + end = [x for x in self.get_str_location(string, "]") if int(x) > int(string.find(char))][0] return string[start:end] # 清理原链接(中文字符替换为英文) @@ -49,7 +51,7 @@ class Rename: # 去广告 file_name = re.sub("[((\[【]?(字幕)?[\u4e00-\u9fa5]{0,3}(新人|招募?新?)[\u4e00-\u9fa5]{0,5}[))\]】]?", "", file_name) # 除杂 - file_name = re.sub("[((\[【]?★?(\d{4}[年][春夏秋冬]?)?[\d一二三四五六七八九十]{1,2}月新?番?★?[))\]】]?", "", file_name) + file_name = re.sub("[((\[【]?★?(\d{4}年[春夏秋冬]?)?[\d一二三四五六七八九十]{1,2}月新?番?★?[))\]】]?", "", file_name) # 除杂x2 file_name = re.sub("[((\[【]?(2(\d{3}[年.][春夏秋冬]?)\d{1,2}\.?\d{1,2})[))\]】]?", "", file_name) # 除杂x3 @@ -67,11 +69,10 @@ class Rename: for s2 in str2: if s1 in s2: return [True, s2[1:]] - else: - return [False, name] + return [False, name] # 检索字幕组特征 - def regognize_group(self): + def recognize_group(self): character = self.group_character group = self.group_char rule = self.group_rule @@ -123,7 +124,7 @@ class Rename: # 获取字幕组名 def get_group(self): # 是否匹配成功(哪种方式匹配成功) - status = self.regognize_group() + status = self.recognize_group() # 检索到的特征值 res_char = self.pre_analyse # 强条 @@ -138,9 +139,7 @@ class Rename: # 以特征值为中心,匹配最近的中括号,八成就这个了 gp = self.get_gp(res_char, self.file_name.lower()) # 防止太长炸了,一般不会这么长的字幕组名 - if len(gp) < 30: - pass - else: + if len(gp) > 30: print("name:%s\r\nchar:%s,gp:%s" % (self.file_name, res_char, gp)) return gp except Exception as e: @@ -234,7 +233,7 @@ class Rename: file_name = str(self.file_name).lower() type_list = [] # 英文标示 - for i in range(3): + for _ in range(3): try: res = re.search( "[((\[【]?((bd|remux|(viu)?tvb?|bilibili|b ?global|baha|web[ -]?(dl|rip))[ -]?(iso|mut|rip)?)[))\]】]?", @@ -369,52 +368,13 @@ class Rename: else: return False - # 拿到的数据挨个测试 - def get_info(self): - # 获取到的信息 - info = { - "group": self.get_group(), - "dpi": self.get_dpi(), - "season": self.get_season(), - "episode": self.get_episode(), - "vision": self.get_vision(), - "lang": self.get_language(), - "ass": self.get_ass(), - "type": self.get_type(), - "code": self.get_code(), - "source": self.get_source() - } - - # 字母全部小写 - clean_name = self.file_name.lower() - # 去除拿到的有效信息 - for k, v in info.items(): - if v is not None: - if type(v) is list: - for i in v: - if i is not None: - clean_name = clean_name.replace(i, "") - else: - clean_name = clean_name.replace(v, "") - # 除杂 - clean_list = ["pc&psp", "pc&psv", "fin", "opus", "movie", "tvb", "end", "bangumi.online", "donghua", - "话全", "第话", "第集", "全集", " 话", " 集", "+", "@", "。"] - for i in clean_list: - clean_name = clean_name.replace(i, "").replace(" ]", "]").replace("[ ", "[").replace(" ", "") - # 分隔各字段 - clean_name = clean_name.replace("[", "").replace("]", " ").replace("()", "").replace("( )", "") - # 去除多余空格 - clean_name = re.sub(' +', ' ', clean_name).strip(" ") - # 剩下来的几乎就是干净番名了,再刮不到不管了 - info["clean_name"] = clean_name - + def extract_title(self, raw_name): title = { "zh": None, "en": None, } - clean_name = re.sub('[^a-zA-Z\u4e00-\u9fa5:@#$%^&*()\[\]/ ]', "", clean_name) - clean_name = re.sub(' +', ' ', clean_name).strip(" ") + clean_name = raw_name if self.has_en(clean_name) and self.has_zh(clean_name): # 中英 try: @@ -462,10 +422,110 @@ class Rename: if v is not None and "/" in v: zh_list = v.split("/") title[k] = zh_list[0].strip(" ") + return title + + def add_separator(self, clean_name): + if "/" not in clean_name: + if '\u4e00' <= clean_name[0] <= '\u9fff': + try: + res = re.search("(^[a\u4e00-\u9fa5: ]{1,10} ?)([a-z:]{1,20} ?){1,10}", clean_name).group(1) + clean_name = clean_name.replace(res, res.strip(" ") + "/") + print("zh_pre:%s" % clean_name) + except Exception as e: + print(e) + else: + try: + res = re.search("^(([a-z:]{1,20} ?){1,10} )[\u4e00-\u9fa5: a]{1,20}", clean_name).group(1) + clean_name = clean_name.replace(res, res.strip(" ") + "/") + print("en_pre:%s" % clean_name) + except Exception as e: + print(e) + return clean_name + + def easy_split(self, clean_name, zh_list, en_list): + if "/" in clean_name: + n_list = clean_name.split("/") + for i in n_list: + if self.has_zh(i) is False: + en_list.append(i.strip(" ")) + elif self.has_en(i) is False: + zh_list.append(i.strip(" ")) + else: + # 如果还是同时包含中英文的情况,递龟一下 + i = self.add_separator(i) + self.easy_split(i, zh_list, en_list) + else: + if self.has_zh(clean_name) is False: + en_list.append(clean_name.strip(" ")) + elif self.has_en(clean_name) is False: + zh_list.append(clean_name.strip(" ")) + + # 拿到的数据挨个测试 + def get_info(self): + # 获取到的信息 + info = { + "group": self.get_group(), + "dpi": self.get_dpi(), + "season": self.get_season(), + "episode": self.get_episode(), + "vision": self.get_vision(), + "lang": self.get_language(), + "ass": self.get_ass(), + "type": self.get_type(), + "code": self.get_code(), + "source": self.get_source() + } + + # 字母全部小写 + clean_name = self.file_name.lower() + # 去除拿到的有效信息 + for k, v in info.items(): + if v is not None: + if type(v) is list: + for i in v: + clean_name = clean_name.replace(i, "") if i is not None else clean_name + else: + clean_name = clean_name.replace(v, "") + # 除杂 + clean_list = ["pc&psp", "pc&psv", "fin", "opus", "movie", "tvb", "end", "web", "bangumi.online", "donghua", + "话全", "第话", "第集", "全集", "话", "集", "+", "@", "轨", "。"] + for i in clean_list: + clean_name = clean_name.replace(i, "") + # 去除多余空格 + clean_name = re.sub(' +', ' ', clean_name).strip(" ") + # 分隔各字段 + clean_name = re.sub("([(\[] *| *[)\]])", "", clean_name) + + # 剩下来的几乎就是干净番名了,再刮不到不管了 + info["clean_name"] = clean_name + clean_name = re.sub('[^a-zA-Z\u4e00-\u9fa5:@#$%^&*()\[\]/ ]', "", clean_name) + clean_name = re.sub(' +', ' ', clean_name).strip(" ") + clean_name = re.sub("([(\[] *| *[)\]])", "", clean_name) + print(clean_name) + + title = { + "zh": None, + "en": None + } + zh_list = [] + en_list = [] + clean_name = self.add_separator(clean_name) + self.easy_split(clean_name, zh_list, en_list) + title["zh"] = zh_list if zh_list else None + title["en"] = en_list if en_list else None + if title["zh"] is None and title["en"] is None: + title = self.extract_title(clean_name) + print(title) info["title"] = title return info if __name__ == "__main__": - # 使用方法 - print(Rename(name).get_info()) + raw = ChainDb("spider_dmhy").page(1, 1000).field("name").select() + name_list = [x["name"] for x in raw] + start = time.time() + for name in name_list: + print(name) + Rename(name).get_info() + print() + print("%s" % (time.time() - start))