# -*- coding: utf-8 -*- # from django.core.files.storage import FileSystemStorage from django.conf import settings from django.utils.encoding import filepath_to_uri from collections import OrderedDict import os import sys import logging import datetime import time import hashlib from email.utils import formatdate import calendar import threading import uuid import requests import simplejson as json import ftplib import socket from urllib.parse import urlparse import pytz import smtplib from email.mime.text import MIMEText from email.header import Header import telegram from urllib import parse from wechatpy.enterprise import WeChatClient from PIL import Image, ImageDraw, ImageFont import re import random logger = logging.getLogger('django') def download_file(url, fname): """ 下载文件 """ download_path = os.path.join(settings.BASE_DIR, 'downloads') if not os.path.exists(download_path): os.makedirs(download_path) download_file = os.path.join(download_path, fname) data = requests.get(url, stream=True) try: data.raise_for_status() with open(download_file, "wb") as updatefile: for chunk in data.iter_content(chunk_size=1024): if chunk: updatefile.write(chunk) return True, download_file except Exception as exc: print('There was a problem: %s' % (exc)) return False, None def check_filename(name, extname = ['rpm','tar','tar.gz','gz'], is_url=True): """ 检查文件是否是需要的扩展名 返回: True/False, 文件名 """ if is_url: #解析url parse_url = urlparse(name) url_path=parse_url.path pkg_name = (os.path.basename(url_path)) #file_name, extension_name = os.path.splitext(pkg_name) extension_name = os.path.splitext(pkg_name)[-1][1:] else: pkg_name = (os.path.basename(name)) extension_name = os.path.splitext(pkg_name)[-1][1:] if extension_name in extname: return True, pkg_name else: return False, pkg_name def get_logger(name=None): return logging.getLogger('%s' % name) def get_clientip(request): if 'HTTP_X_FORWARDED_FOR' in request.META: clientip = request.META['HTTP_X_FORWARDED_FOR'] else: clientip = request.META['REMOTE_ADDR'] return {'clientip': clientip, 'user': request.user} def timesince(dt, since='', default="just now"): """ Returns string representing "time since" e.g. 3 days, 5 hours. """ if since == '': since = datetime.datetime.utcnow() if since == None: return default diff = since - dt periods = ( (diff.days / 365, "year", "years"), (diff.days / 30, "month", "months"), (diff.days / 7, "week", "weeks"), (diff.days, "day", "days"), (diff.seconds / 3600, "hour", "hours"), (diff.seconds / 60, "minute", "minutes"), (diff.seconds, "second", "seconds"), ) for period, singular, plural in periods: if period: return "%d %s" % (period, singular if period == 1 else plural) return default _STRPTIME_LOCK = threading.Lock() def now_date(): """当前时间""" return datetime.datetime.now() def date_to_unixtime(datetime): """将datetime转换成unixtime""" return time.mktime(datetime.timetuple()) def to_unixtime_old(time_string, format_string): time_string = time_string.decode("ascii") with _STRPTIME_LOCK: return int(calendar.timegm(time.strptime(time_string, format_string))) def to_unixtime(time_string, format_string): """ 时间字符串转成unix时间戳 """ return int(calendar.timegm(time.strptime(time_string, format_string))) def http_date(timeval=None): """返回符合HTTP标准的GMT时间字符串,用strftime的格式表示就是"%a, %d %b %Y %H:%M:%S GMT"。 但不能使用strftime,因为strftime的结果是和locale相关的。 """ return formatdate(timeval, usegmt=True) def http_to_unixtime(time_string): """把HTTP Date格式的字符串转换为UNIX时间(自1970年1月1日UTC零点的秒数)。 HTTP Date形如 `Sat, 05 Dec 2015 11:10:29 GMT` 。 """ _GMT_FORMAT = "%a, %d %b %Y %H:%M:%S GMT" return to_unixtime(time_string, _GMT_FORMAT) def cst_to_unixtime(time_string): """ 2018-10-17 12:39:30 CST+0800 """ _CST_FORMAT = "%Y-%m-%d %H:%M:%S CST%z" return to_unixtime(time_string, _CST_FORMAT) def iso8601_to_unixtime(time_string): """把ISO8601时间字符串(形如,2012-02-24T06:07:48.000Z)转换为UNIX时间,精确到秒。""" _ISO8601_FORMAT = "%Y-%m-%dT%H:%M:%S.000Z" return to_unixtime(time_string, _ISO8601_FORMAT) def str2datetime(st, s_format='%Y-%m-%d %H:%M:%S'): """字符串转成datetime CST_FORMAT = "%Y-%m-%d %H:%M:%S CST%z" """ return datetime.datetime.strptime(st, s_format) def str2utcdatetime(st,from_format='%Y-%m-%d %H:%M:%S', utc_format='%Y-%m-%dT%H:%M:%SZ',tz=True): """字符串时间转UTC时间(-8:00)""" local_st = datetime.datetime.strptime(st, from_format) time_struct = time.mktime(local_st.timetuple()) utc_st = datetime.datetime.utcfromtimestamp(time_struct) if tz: return utc_st.strftime(utc_format) else: return utc_st def datetime2utc(local_st): """本地时间转UTC时间(-8:00)""" time_struct = time.mktime(local_st.timetuple()) utc_st = datetime.datetime.utcfromtimestamp(time_struct) return utc_st def utc2datetime(utc_st): """UTC时间转本地时间(+8:00)""" now_stamp = time.time() local_time = datetime.datetime.fromtimestamp(now_stamp) utc_time = datetime.datetime.utcfromtimestamp(now_stamp) offset = local_time - utc_time local_st = utc_st + offset return local_st def utc_to_local_unixtime(utc_time_str, utc_format='%Y-%m-%dT%H:%M:%SZ'): """ utc字符转转换成unix时间戳 '2019-01-16T11:30:00Z' '2019-01-16 11:30:00+00:00', utc_format='%Y-%m-%d %H:%M:%S+00:00' """ local_tz = pytz.timezone('Asia/Shanghai') local_format = "%Y-%m-%d %H:%M:%S" utc_dt = datetime.datetime.strptime(utc_time_str, utc_format) local_dt = utc_dt.replace(tzinfo=pytz.utc).astimezone(local_tz) time_str = local_dt.strftime(local_format) return int(time.mktime(time.strptime(time_str, local_format))) def capacity_convert(size, expect='auto', rate=1024): """ :param size: '100MB', '1G' :param expect: 'K, M, G, T :param rate: Default 1024 :return: """ rate_mapping = ( ('K', rate), ('KB', rate), ('M', rate**2), ('MB', rate**2), ('G', rate**3), ('GB', rate**3), ('T', rate**4), ('TB', rate**4), ) rate_mapping = OrderedDict(rate_mapping) std_size = 0 # To KB for unit in rate_mapping: if size.endswith(unit): try: std_size = float(size.strip(unit).strip()) * rate_mapping[unit] except ValueError: pass if expect == 'auto': for unit, rate_ in rate_mapping.items(): if rate > std_size/rate_ > 1: expect = unit break expect_size = std_size / rate_mapping[expect] return expect_size, expect def filesizeformat(value, baseMB=False): """将字节转换为 1.00KB 1.00Mb 1.00gb """ try: bytes = float(value) except: return 0 if baseMB is True: bytes = bytes * 1024 * 1024 base = 1024 if bytes == 0: return '0' ret = '0' if bytes < base: ret = '%d Bytes' % (bytes) elif bytes < base * base: ret = '%.2f KB' % (bytes / base) elif bytes < base * base * base: ret = '%.2f MB' % (bytes / (base * base)) elif bytes < base * base * base * base: if bytes % (base * base * base) >= 0: ret = '%.2f GB' % (bytes / (base * base * base)) else: ret = "%.2f MB" % (bytes / (base * base)) else: ret = '%.2f TB' % (bytes / (base * base * base * base)) return ret def sum_capacity(cap_list): total = 0 for cap in cap_list: size, _ = capacity_convert(cap, expect='K') total += size total = '{} K'.format(total) return capacity_convert(total, expect='auto') def str_to_md5(strs): m=hashlib.md5() m.update(strs.encode("utf8")) return m.hexdigest() def int_random(): """使用uuid生产随机数,返回128位整数""" u = uuid.uuid1() return u.int def hex_random(): """使用uuid生产随机数,返回32位十六进制""" u = uuid.uuid1() return u.hex def iso8601_to_time(time_string): """把ISO8601时间字符串(形如,2012-02-24T06:07:48.000Z)转换为UNIX时间,精确到秒。""" return time_string.strftime('%Y-%m-%d %H:%M:%S') def str_to_datetime(string): """字符串 2017-10-21 00:00:00 格式化为datetime""" return datetime.datetime.strptime(string,"%Y-%m-%d %H:%M:%S") def sumplus(*args): """求和""" r = 0.0 for n in args: r += float(n) return "%.2f" % r def sumplusInt(*args): """求和""" r = 0 for n in args: r += int(n) return r class ImageStorage(FileSystemStorage): """ 图片上传 """ def __init__(self, location=settings.MEDIA_ROOT, base_url=settings.MEDIA_URL): #初始化 super(ImageStorage, self).__init__(location, base_url) #重写 _save方法 def _save(self, name, content): #文件扩展名 ext = os.path.splitext(name)[1] #文件目录 d = os.path.dirname(name) #定义文件名,年月日时分秒随机数 year = time.strftime('%Y') month = time.strftime('%m') fn = time.strftime('%Y%m%d%H%M%S') fn = fn + str(hex_random()) #重写合成文件名 name = os.path.join('images', year,month, d, fn + ext) #调用父类方法 return super(ImageStorage, self)._save(name, content) def htmlfilesave(name, content, gameid): """ 保存文件,返回url地址 """ #文件扩展名 ext = os.path.splitext(name)[1] #定义文件名,年月日时分秒随机数 year = time.strftime('%Y') month = time.strftime('%m') fn = time.strftime('%Y%m%d%H%M%S') fn = fn + str(hex_random()) #重写合成文件名 name = os.path.join('html', year,month, gameid, fn + ext) #完整的文件路径 full_name = os.path.join(settings.MEDIA_ROOT, name) #得到url url = filepath_to_uri(os.path.join(settings.MEDIA_URL, name)) directory = os.path.dirname(full_name) if not os.path.exists(directory): try: os.makedirs(directory) except FileNotFoundError: pass if not os.path.isdir(directory): raise IOError("%s exists and is not a directory." % directory) #写入文件 #open(full_name,'w', encoding='utf8').writelines(content.replace(settings.HTML_HREF_REPLACE_KEY, app_url)) open(full_name,'w', encoding='utf8').writelines(content) return url def writefile(content, apk_url, ext='html'): """ 写文件,返回文件路径 """ ext = '.' + ext fn = time.strftime('%Y%m%d%H%M%S') fn = fn + str(hex_random()) #重写合成文件名 name = os.path.join('cdnfile', fn + ext) #完整的文件路径 full_name = os.path.join(settings.MEDIA_ROOT, name) directory = os.path.dirname(full_name) if not os.path.exists(directory): os.makedirs(directory) #写入文件 open(full_name,'w', encoding='utf8').writelines(content.replace(settings.HTML_HREF_REPLACE_KEY, apk_url)) return full_name def ftpupload(host, port, username, password, upload_dir, upload_file, rmfile=False): """上传文件到ftp""" timeout = 10 ftp = ftplib.FTP() filename = os.path.basename(upload_file) try: #连接ftp ftp.connect(host=host, port=port, timeout=timeout) except (socket.error, socket.gaierror): return False, "连接FTP失败 %s" % host try: #登录 ftp.login(username, password) except ftplib.error_perm: return False, "ftp用户名或密码错误" try: #切换到操作目录 ftp.cwd(upload_dir) except: #切换目录失败,目录不存在 #创建目录 ftp.mkd(upload_dir) #切换目录 ftp.cwd(upload_dir) fd=open(upload_file,'rb') #上传文件 ftp.storbinary('STOR %s' % filename, fd) fd.close() ftp.quit() #删除文件 if rmfile: os.remove(upload_file) #返回写入的ftp地址 url = filepath_to_uri(os.path.join('/', upload_dir, filename)) return True,url class ExcelStorage(FileSystemStorage): """ excel文件上传 """ def __init__(self, location=settings.MEDIA_ROOT, base_url=settings.MEDIA_URL): #初始化 super(ExcelStorage, self).__init__(location, base_url) #重写 _save方法 def _save(self, name, content): #文件扩展名 ext = os.path.splitext(name)[1] #文件目录 d = os.path.dirname(name) #定义文件名,年月日时分秒随机数 year = time.strftime('%Y') month = time.strftime('%m') fn = time.strftime('%Y%m%d%H%M%S') #fn = fn + str(hex_random()) #重写合成文件名 name = os.path.join('excel', year,month, d, fn + ext) #调用父类方法 return super(ExcelStorage, self)._save(name, content) def send_email(mail_type,smtp_user,smtp_password,receiver_users = [],send_data = [] ,isTest=False, title='PT助手提示'): """ 发送邮件 """ now = datetime.datetime.now() time = now.strftime("%Y-%m-%d %H:%M:%S") if isTest: content = "

测试

" else: content = '''

%s

%s ''' % (time,"
".join(send_data)) #title = "PT助手" #receiver = ["249545020@qq.com"] receiver = receiver_users msg = '' msg = MIMEText(content, 'html', 'utf-8') # 设置发送人 msg['From'] = Header(smtp_user) # 设置接收人 msg['To'] = Header(str(";".join(receiver))) # 设置邮件标题 msg['Subject'] = Header(title) try: if mail_type == "qq": smtp_client = smtplib.SMTP_SSL("smtp.qq.com", 465) elif mail_type == "163": smtp_client = smtplib.SMTP_SSL("smtp.163.com", 465) elif mail_type == "sina": try: #sina不支持sslv3 smtp_client = smtplib.SMTP_SSL("smtp.sina.com", 465) except: #sslv3失败后使用明文 smtp_client = smtplib.SMTP("smtp.sina.com", 25) else: return False,"未知邮件类型" smtp_client.login(smtp_user, smtp_password) smtp_client.sendmail(smtp_user, receiver, msg.as_string()) smtp_client.quit() except smtplib.SMTPException as e: logging.error(str(e)) return False,str(e) return True,"发送成功" def send_telegram(tg_chat_id,tg_token,send_data = [] ,isTest=False, title='PT助手提示'): bot = telegram.Bot(token = tg_token) now = datetime.datetime.now() time = now.strftime("%Y-%m-%d %H:%M:%S") try: if isTest: bot.send_message(chat_id=tg_chat_id, text='%s[测试]' % time, parse_mode=telegram.ParseMode.HTML) else: bot.send_message(chat_id=tg_chat_id, text='%s\r\n%s' % (time,"\r\n".join(send_data).replace('font','a')), parse_mode=telegram.ParseMode.HTML) except: ex_type, ex_val, ex_stack = sys.exc_info() logging.error(str(ex_val)) if 'Chat not found' in str(ex_val): msg = "发送测试失败,未找到频道ID" elif 'Unauthorized' in str(ex_val): msg = "发送测试失败,令牌配置错误" else: msg = "发送测试失败,请联系管理员" return False, msg return True,"发送成功" def send_iyuu(iyuu_key,send_data = [] ,isTest=False, title='PT助手提示'): user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36' #针对不同通知类型进行处理 now = datetime.datetime.now() time = now.strftime("%Y-%m-%d %H:%M:%S") headers = { 'user_agent': user_agent, 'Content-type': 'application/x-www-form-urlencoded' } now_time = parse.quote('
') + parse.unquote(time) + parse.quote('

') api = 'https://iyuu.cn/' + iyuu_key + '.send' if isTest: send_txts = parse.quote('
[测试]

') sen_url = api + '?text='+ parse.quote('测试提示') + '&desp=' + now_time + send_txts else: send_txts = parse.quote('
'.join(send_data)) sen_url = api + '?text='+ parse.quote(title) + '&desp=' + now_time + send_txts try: response = requests.get(sen_url, headers=headers ,verify=False) response_msg = json.loads(response.text) if response.status_code == 200 and response_msg['errcode'] == 0: msg = "发送成功" else: msg = "发送测试失败,%s" % response_msg['errmsg'] return False, msg except: logging.error("连接IYUU失败") return False, "连接IYUU失败" return True,"发送成功" def send_enwechat(corp_id = None, agent_id = None, agent_secret = None, user_ids = [], send_data = [] ,isTest=False, title='PT助手提示'): """发送企业微信消息""" weclient = EnWechat(corp_id=corp_id, agent_id=agent_id, agent_secret=agent_secret) now = datetime.datetime.now() time = now.strftime("%Y-%m-%d %H:%M:%S") if isTest: #发送测试消息 text = '
测试 %s
' % time flag, response = weclient.send_text(user_ids, text) if flag: msg = "发送成功" return True, msg else: msg = response["errmsg"] return False, msg else: #生成图片 image_file_path = create_image(send_data) #上传图片 flag, response = weclient.upload_image(image_file_path) if flag: #发送文本卡片 flag, response = weclient.send_text_card(user_ids, response) if flag: return True, "发送成功" else: return False, response else: return False, response def getSiteUrl(url): """ 获取官网url """ u = urlparse(url) site_url = u.scheme + "://" + u.netloc return site_url def parseUrl(url): """ 拆解url #scheme='https', netloc='127.0.0.1:8080', path='/', params='', query='', fragment='' #{'protocol': '', 'url': '', 'host': '', 'port': None, 'path': 'userdetails.php', 'params': '', 'query': {'id': ['16465']}} """ data = {} u = urlparse(url) # http,https data['protocol'] = u.scheme data['url'] = u.netloc data['host'] = u.netloc.split(":")[0] data['port'] = u.port data['path'] = u.path data['params'] = u.params #将查询字符串字典话 data['query'] = parse.parse_qs(u.query) return data def create_image(msg_content = []): """ 图片生成 msg 生成图片的文字 """ #f = open("e:/test/test.txt","r",encoding='utf-8').readlines() if sys.platform == "win32": # 设置字体及字号,加载宋体,否则中文乱码 font = ImageFont.truetype("simsun.ttc", 20, encoding="unic") else: #linux加载目录下字体 font = ImageFont.truetype(os.path.join(settings.BASE_DIR, "fonts", "simsun.ttc"), 20, encoding="unic") #图片宽度 image_width = 500 #图片高度,可以通过font.getbbox(line)[3]获取字体高度 21 image_high = len(msg_content) * 25 # 设置画布大小及背景色(白色) image = Image.new('RGB', (image_width, image_high), (255,255,255)) draw = ImageDraw.Draw(image) x = 10 y = 5 for line in msg_content: #print(line) if '成功' in line: fill='green' else: fill='red' #替换掉<>间的内容 line = re.sub(re.compile('<.*?>') , '', line) #通过x,y坐标循环写入 draw.text((x, y), line, font=font, fill=fill) #print (font.getbbox(line)) #获取行的字体高度 y += font.getbbox(line)[3] del draw image_file_name = os.path.join(settings.TMP_LOG_DIR, "msg.png") image.save(image_file_name,"PNG") # 保存图片 return image_file_name def cookie_parse(cookie_str): cookie_dict = {} cookies = cookie_str.split(";") for cookie in cookies: cookie = cookie.split("=") cookie_dict[cookie[0]] = cookie[1] return cookie_dict class EnWechat: def __init__(self, corp_id=None, agent_id=None, agent_secret=None): """企业微信""" self.corp_id = corp_id self.agent_id = agent_id self.agent_secret = agent_secret self.client = self.weclient() def weclient(self): return WeChatClient(self.corp_id, self.agent_secret) def getDepartmentUsers(self, department_id = None): """获取部门下的所有用户ID #返回:{姓名: 用户ID} """ try: return True, self.client.department.get_map_users(id = department_id) except Exception as e: return False, str(e) def getDepartmentUsersID(self, department_id = None): """获取部门下的所有用户ID """ try: #返回:{姓名: 用户ID} user_ids = self.client.department.get_map_users(id = department_id) return list(user_ids.values()) except Exception as e: return False, str(e) def send_text(self, user_ids = [], msg = None): """发送文本消息 返回 {'errcode': 0, 'errmsg': 'ok', 'msgid': 'fcLc6UhB2absSaoEDgOVFJc7rUS6HAUpkVR6y2ltzLhzzjUH_WfDNW5dVl7nScRT2AG7lQ475r8M9qyISlX9BQ'} """ r = self.client.message.send_text(self.agent_id, user_ids, msg) if r['errcode'] == 0: return True, r['msgid'] else: return True, r['errmsg'] def send_markdown(self, user_ids = [], msg = None): """发送markdown消息 https://developer.work.weixin.qq.com/document/path/90236#%E6%94%AF%E6%8C%81%E7%9A%84markdown%E8%AF%AD%E6%B3%95 """ return True, self.client.message.send_markdown(self.agent_id, user_ids, msg) #try: #return True, self.client.message.send_markdown(self.agent_id, user_ids, msg) #except Exception as e: #return False, str(e) def send_image(self, user_ids = [], media_id = None): """发送图片消息 """ return True, self.client.message.send_image(self.agent_id, user_ids, media_id) def send_text_card(self, user_ids = [], url = None): """发送文本卡片消息 url为上传微信的素材地址 """ now = datetime.datetime.now() time = now.strftime("%Y-%m-%d %H:%M:%S") description = '
' + time + '
' title = '签到通知' return True, self.client.message.send_text_card(self.agent_id, user_ids, title, description, url) def upload_image(self, image_file_path = None): """ 上传图片素材(永久,url限制微信使用) image_file_path 要上传的本地图片路径 返回 url地址 {'errcode': 0, 'errmsg': 'ok', 'url': 'https://wework.qpic.cn/wwpic/592170_HhXMIUytR62CdSy_1662426978/0'} """ with open(image_file_path, 'rb') as f: image = f.read() #上传图片 reponse = self.client.media.upload_img(image) if reponse['errcode'] == 0: url = reponse['url'] return True, url else: msg = reponse['errmsg'] return False, msg def upload(self, media_type = "image", media_file = None): """ 上传临时素材(保留3天) media_type 媒体文件类型,分别有图片(image)、语音(voice)、视频(video)和缩略图(thumb) media_file – 要上传的文件,一个 File-object 返回 media_id {"errcode": 0, "errmsg": "ok", "type": "image", "media_id": "3Y7qMZUYZvhZG13oNoKMKMWaxtNupLKIR4lgUsDc0abqicTjGnuE9APt-NiivZ3tg", "created_at": "1662427932"} """ with open(media_file, 'rb') as f: file_content = f.read() #上传 reponse = self.client.media.upload(media_type, file_content) if reponse['errcode'] == 0: media_id = reponse['media_id'] return True, media_id else: msg = reponse['errmsg'] return False, msg def Unicode(): """ 随机产生汉字 """ val = random.randint(0x4e00, 0x9fbf) return chr(val) def GBK2312(): """ 随机产生汉字 """ head = random.randint(0xb0, 0xf7) body = random.randint(0xa1, 0xfe) val = f'{head:x} {body:x}' str = bytes.fromhex(val).decode('gb2312') return str