From 4891af12505e4069ed4cabd379302b33b2a957d8 Mon Sep 17 00:00:00 2001 From: ngfchl Date: Tue, 11 Oct 2022 11:53:56 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=9B=A0=E5=90=AF=E5=8A=A8dj?= =?UTF-8?q?ango=E5=AD=90=E7=BA=BF=E7=A8=8B=E5=AF=BC=E8=87=B4=E8=87=AA?= =?UTF-8?q?=E5=8A=A8=E4=BB=BB=E5=8A=A1=E5=A4=B1=E6=95=88=E7=9A=84bug?= =?UTF-8?q?=EF=BC=8C=E5=B9=B6=E4=BC=98=E5=8C=96=E8=87=AA=E5=8A=A8=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- auto_pt/admin.py | 116 ++++--- ..._alter_taskjob_expression_time_and_more.py | 58 ++++ pt_site/views.py | 315 +++++++++--------- ptools/settings.py | 1 + 4 files changed, 285 insertions(+), 205 deletions(-) create mode 100644 auto_pt/migrations/0002_alter_notify_name_alter_taskjob_expression_time_and_more.py diff --git a/auto_pt/admin.py b/auto_pt/admin.py index b98d573..20e194a 100644 --- a/auto_pt/admin.py +++ b/auto_pt/admin.py @@ -12,11 +12,13 @@ from pt_site import views as tasks from pt_site.views import pt_spider from ptools.base import Trigger +scheduler = tasks.scheduler + # Register your models here. -# @admin.register(Task) +@admin.register(Task) class TaskAdmin(admin.ModelAdmin): # instead of ModelAdmin # 显示字段 list_display = ( @@ -64,58 +66,74 @@ class TaskJobAdmin(admin.ModelAdmin): # instead of ModelAdmin list_editable = ('task_exec',) def save_model(self, request, obj: TaskJob, form, change): - obj.save() # 从字符串获取function func = getattr(tasks, obj.task.name) # 检查任务是否存在,已存在就删除任务 - exist_job = tasks.scheduler.get_job(obj.job_id) - if exist_job: - logger.info(exist_job.id + '任务已存在,将移除后重新添加!') - exist_job.remove() - logger.info(exist_job.id + '任务移除成功!') - if not obj.task_exec: - logger.info(exist_job.id + '任务未开启,将只入库不执行!') - super().save_model(request, obj, form, change) - else: - try: - # 添加任务 - logger.info(exist_job.id + ' 任务添加中!') - if obj.trigger == Trigger.cron: - new_job = tasks.scheduler.add_job(func, - trigger=CronTrigger.from_crontab(obj.expression_time), - id=obj.job_id, - replace_existing=obj.replace_existing, - misfire_grace_time=obj.misfire_grace_time, - jitter=obj.jitter, ) - if obj.trigger == Trigger.interval: - time_delta = 1 - time_str = obj.expression_time.split('*') - for i in time_str: - time_delta *= int(i) - new_job = tasks.scheduler.add_job(func, - trigger=obj.trigger, - id=obj.job_id, - seconds=time_delta, - replace_existing=obj.replace_existing, - misfire_grace_time=obj.misfire_grace_time, - jitter=obj.jitter, ) + exist_job = scheduler.get_job(obj.job_id) + logger.info('当前任务:{} | {}'.format(obj.job_id, exist_job)) + try: + if not obj.task_exec: + logger.info(obj.job_id + '任务未开启!') + super().save_model(request, obj, form, change) + # else: + # 添加任务 - # print(new_job.pending) - # pt_spider.send_text('计划任务:' + new_job.id + info) - # messages.success(request, new_job.id + info) - # 如果任务未启用,只保存,不入库,已存在任务就删除 - logger.info(exist_job.id + ' 任务添加成功!') - logger.info(obj.job_id + ' 任务状态是否暂停:' + str(new_job.pending)) - logger.info('当前存在的所有自动任务:') - logger.info(tasks.scheduler.get_jobs()) - messages.success(request, - obj.job_id + ' 保存成功!' + ('如需执行任务,请勾选开启任务!' if obj.task_exec else '')) - except Exception as e: - obj.task_exec = False - obj.save() - # raise - pt_spider.send_text('计划任务:' + obj.job_id + '任务添加失败!原因:' + str(e)) - messages.error(request, obj.job_id + '任务添加失败!原因:' + str(e)) + if obj.trigger == Trigger.cron: + if exist_job: + logger.info(obj.job_id + '任务已存在,修改中!') + exist_job.reschedule(trigger=CronTrigger.from_crontab(obj.expression_time)) + logger.info(exist_job.id + '任务修改成功!') + else: + logger.info(obj.job_id + ' 任务添加中!') + exist_job = scheduler.add_job( + func, + trigger=CronTrigger.from_crontab(obj.expression_time), + id=obj.job_id, + max_instances=1, + replace_existing=obj.replace_existing, + misfire_grace_time=obj.misfire_grace_time, + jitter=obj.jitter + ) + if obj.trigger == Trigger.interval: + time_delta = 1 + time_str = obj.expression_time.split('*') + for i in time_str: + time_delta *= int(i) + if exist_job: + logger.info(obj.job_id + '任务已存在,修改中!') + exist_job.reschedule(trigger=obj.trigger, seconds=time_delta) + logger.info(exist_job.id + '任务修改成功!') + else: + logger.info(obj.job_id + ' 任务添加中!') + exist_job = scheduler.add_job( + func, + trigger=obj.trigger, + id=obj.job_id, + seconds=time_delta, + max_instances=1, + replace_existing=obj.replace_existing, + misfire_grace_time=obj.misfire_grace_time, + jitter=obj.jitter + ) + logger.info('当前操作:{} 成功!'.format(exist_job)) + if not obj.task_exec: + exist_job.pause() + else: + exist_job.resume() + logger.info(obj.job_id + ' 任务暂停:' + str(exist_job.pending)) + logger.info('当前自动任务:') + logger.info(scheduler.get_jobs()) + super().save_model(request, obj, form, change) + messages.success(request, + obj.job_id + ' 保存成功!' + ('' if obj.task_exec else '如需执行任务,请勾选开启任务!')) + except Exception as e: + obj.task_exec = False + obj.save() + raise + msg = obj.job_id + '任务添加失败!原因:' + str(e) + logger.error(msg) + pt_spider.send_text('计划任务:' + msg) + messages.error(request, msg) def delete_model(self, request, obj): print(obj) diff --git a/auto_pt/migrations/0002_alter_notify_name_alter_taskjob_expression_time_and_more.py b/auto_pt/migrations/0002_alter_notify_name_alter_taskjob_expression_time_and_more.py new file mode 100644 index 0000000..278f209 --- /dev/null +++ b/auto_pt/migrations/0002_alter_notify_name_alter_taskjob_expression_time_and_more.py @@ -0,0 +1,58 @@ +# Generated by Django 4.1 on 2022-10-11 09:58 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("auto_pt", "0001_initial"), + ] + + operations = [ + migrations.AlterField( + model_name="notify", + name="name", + field=models.CharField( + choices=[ + ("wechat_work_push", "企业微信通知"), + ("wxpusher_push", "WxPusher通知"), + ("pushdeer_push", "PushDeer通知"), + ("bark_push", "Bark通知"), + ("iyuu_push", "爱语飞飞"), + ], + default="wechat_work_push", + max_length=64, + verbose_name="通知方式", + ), + ), + migrations.AlterField( + model_name="taskjob", + name="expression_time", + field=models.CharField( + help_text="在间隔任务表示间隔时长使用数字,单位:秒,corn任务中为五位corn表达式:“15 8 * * 2022”", + max_length=64, + verbose_name="时间表达式", + ), + ), + migrations.AlterField( + model_name="taskjob", + name="jitter", + field=models.IntegerField( + default=1200, help_text="增强时间随机性", verbose_name="时间浮动参数" + ), + ), + migrations.AlterField( + model_name="taskjob", + name="misfire_grace_time", + field=models.IntegerField( + default=600, + help_text="强制执行结束的时间, 为避免撞车导致任务丢失, 没执行完就别执行了", + verbose_name="任务运行时间", + ), + ), + migrations.AlterField( + model_name="taskjob", + name="task_exec", + field=models.BooleanField(default=True, verbose_name="开启任务"), + ), + ] diff --git a/pt_site/views.py b/pt_site/views.py index a5b3378..2b21f80 100644 --- a/pt_site/views.py +++ b/pt_site/views.py @@ -25,179 +25,182 @@ scheduler.add_jobstore(DjangoJobStore(), 'default') pool = ThreadPoolExecutor(2) pt_spider = PtSpider() logger = logging.getLogger('ptools') + + # Create your views here. -try: - - def auto_sign_in(): - """自动签到""" - start = time.time() - # 获取本人所有站点 - queryset = MySite.objects.all() - message_list = pt_spider.do_sign_in(pool, queryset) - end = time.time() - consuming = '> {} 任务运行成功!耗时:{}完成时间:{} \n'.format( - '自动签到', end - start, - time.strftime("%Y-%m-%d %H:%M:%S") - ) - - if message_list == 0: - logger.info('已经全部签到咯!!') - else: - logger.info(message_list + consuming) - pt_spider.send_text(message_list + consuming) - logger.info('{} 任务运行成功!完成时间:{}'.format('自动签到', time.strftime("%Y-%m-%d %H:%M:%S"))) - def auto_get_status(): - """ - 更新个人数据 - """ - start = time.time() - message_list = '' - queryset = MySite.objects.all() - site_list = [my_site for my_site in queryset if my_site.site.get_userinfo_support] - results = pool.map(pt_spider.send_status_request, site_list) - message_template = MessageTemplate.status_message_template - for my_site, result in zip(site_list, results): - if result.code == StatusCodeEnum.OK.code: - res = pt_spider.parse_status_html(my_site, result.data) - logger.info('自动更新个人数据: {}, {}'.format(my_site.site, res)) - if res.code == StatusCodeEnum.OK.code: - status = res.data[0] - message = message_template.format( - my_site.my_level, - status.my_sp, - my_site.sp_hour, - status.my_bonus, - status.ratio, - FileSizeConvert.parse_2_file_size(status.downloaded), - FileSizeConvert.parse_2_file_size(status.uploaded), - my_site.seed, - my_site.leech, - my_site.invitation, - my_site.my_hr - ) - logger.info('组装Message:{}'.format(message)) - message_list += ('> ' + my_site.site.name + ' 信息更新成功!' + message + ' \n') - # pt_spider.send_text(my_site.site.name + ' 信息更新成功!' + message) - logger.info(my_site.site.name + '信息更新成功!' + message) - else: - print(res) - message = '> ' + my_site.site.name + ' 信息更新失败!原因:' + res.msg + ' \n' - message_list = message + message_list - # pt_spider.send_text(my_site.site.name + ' 信息更新失败!原因:' + str(res[0])) - logger.warning(my_site.site.name + '信息更新失败!原因:' + res.msg) - else: - # pt_spider.send_text(my_site.site.name + ' 信息更新失败!原因:' + str(result[1])) - message = '> ' + my_site.site.name + ' 信息更新失败!原因:' + result.msg + ' \n' - message_list = message + message_list - logger.warning(my_site.site.name + '信息更新失败!原因:' + result.msg) - end = time.time() - consuming = '> {} 任务运行成功!耗时:{} 完成时间:{} \n'.format( - '自动更新个人数据', end - start, - time.strftime("%Y-%m-%d %H:%M:%S") - ) - logger.info(message_list + consuming) - pt_spider.send_text(text=message_list + consuming) +def auto_sign_in(): + """自动签到""" + start = time.time() + # 获取本人所有站点 + queryset = MySite.objects.all() + message_list = pt_spider.do_sign_in(pool, queryset) + end = time.time() + consuming = '> {} 任务运行成功!耗时:{}完成时间:{} \n'.format( + '自动签到', end - start, + time.strftime("%Y-%m-%d %H:%M:%S") + ) - - def auto_update_torrents(): - """ - 拉取最新种子 - """ - start = time.time() - message_list = '' - queryset = MySite.objects.all() - site_list = [my_site for my_site in queryset if my_site.site.get_torrent_support] - results = pool.map(pt_spider.send_torrent_info_request, site_list) - for my_site, result in zip(site_list, results): - logger.info('获取种子:{}{}'.format(my_site.site.name, result)) - # print(result is tuple[int]) - if result.code == StatusCodeEnum.OK.code: - res = pt_spider.get_torrent_info_list(my_site, result.data) - # 通知推送 - if res.code == StatusCodeEnum.OK.code: - message = '> {} 种子抓取成功!新增种子{}条,更新种子{}条! \n'.format(my_site.site.name, res.data[0], - res.data[1]) - message_list += message - else: - message = '> ' + my_site.site.name + '抓取种子信息失败!原因:' + res.msg + ' \n' - message_list = message + message_list - # 日志 - logger.info( - '{} 种子抓取成功!新增种子{}条,更新种子{}条! '.format(my_site.site.name, res.data[0], res.data[ - 1]) if res.code == StatusCodeEnum.OK.code else my_site.site.name + '抓取种子信息失败!原因:' + res.msg) - else: - # pt_spider.send_text(my_site.site.name + ' 抓取种子信息失败!原因:' + result[0]) - message = '> ' + my_site.site.name + ' 抓取种子信息失败!原因:' + result.msg + ' \n' - message_list = message + message_list - logger.info(my_site.site.name + '抓取种子信息失败!原因:' + result.msg) - end = time.time() - consuming = '> {} 任务运行成功!耗时:{} 当前时间:{} \n'.format( - '拉取最新种子', - end - start, - time.strftime("%Y-%m-%d %H:%M:%S")) + if message_list == 0: + logger.info('已经全部签到咯!!') + else: logger.info(message_list + consuming) pt_spider.send_text(message_list + consuming) + logger.info('{} 任务运行成功!完成时间:{}'.format('自动签到', time.strftime("%Y-%m-%d %H:%M:%S"))) - def auto_remove_expire_torrents(): - """ - 删除过期种子 - """ - start = time.time() - torrent_info_list = TorrentInfo.objects.all().filter(downloader__isnull=False) - for torrent_info in torrent_info_list: - expire_time = torrent_info.sale_expire - if '无限期' in expire_time: - # ToDo 先更新种子信息,然后再判断 +def auto_get_status(): + """ + 更新个人数据 + """ + start = time.time() + message_list = '' + queryset = MySite.objects.all() + site_list = [my_site for my_site in queryset if my_site.site.get_userinfo_support] + results = pool.map(pt_spider.send_status_request, site_list) + message_template = MessageTemplate.status_message_template + for my_site, result in zip(site_list, results): + if result.code == StatusCodeEnum.OK.code: + res = pt_spider.parse_status_html(my_site, result.data) + logger.info('自动更新个人数据: {}, {}'.format(my_site.site, res)) + if res.code == StatusCodeEnum.OK.code: + status = res.data[0] + message = message_template.format( + my_site.my_level, + status.my_sp, + my_site.sp_hour, + status.my_bonus, + status.ratio, + FileSizeConvert.parse_2_file_size(status.downloaded), + FileSizeConvert.parse_2_file_size(status.uploaded), + my_site.seed, + my_site.leech, + my_site.invitation, + my_site.my_hr + ) + logger.info('组装Message:{}'.format(message)) + message_list += ('> ' + my_site.site.name + ' 信息更新成功!' + message + ' \n') + # pt_spider.send_text(my_site.site.name + ' 信息更新成功!' + message) + logger.info(my_site.site.name + '信息更新成功!' + message) + else: + print(res) + message = '> ' + my_site.site.name + ' 信息更新失败!原因:' + res.msg + ' \n' + message_list = message + message_list + # pt_spider.send_text(my_site.site.name + ' 信息更新失败!原因:' + str(res[0])) + logger.warning(my_site.site.name + '信息更新失败!原因:' + res.msg) + else: + # pt_spider.send_text(my_site.site.name + ' 信息更新失败!原因:' + str(result[1])) + message = '> ' + my_site.site.name + ' 信息更新失败!原因:' + result.msg + ' \n' + message_list = message + message_list + logger.warning(my_site.site.name + '信息更新失败!原因:' + result.msg) + end = time.time() + consuming = '> {} 任务运行成功!耗时:{} 完成时间:{} \n'.format( + '自动更新个人数据', end - start, + time.strftime("%Y-%m-%d %H:%M:%S") + ) + logger.info(message_list + consuming) + pt_spider.send_text(text=message_list + consuming) + + +def auto_update_torrents(): + """ + 拉取最新种子 + """ + start = time.time() + message_list = '' + queryset = MySite.objects.all() + site_list = [my_site for my_site in queryset if my_site.site.get_torrent_support] + results = pool.map(pt_spider.send_torrent_info_request, site_list) + for my_site, result in zip(site_list, results): + logger.info('获取种子:{}{}'.format(my_site.site.name, result)) + # print(result is tuple[int]) + if result.code == StatusCodeEnum.OK.code: + res = pt_spider.get_torrent_info_list(my_site, result.data) + # 通知推送 + if res.code == StatusCodeEnum.OK.code: + message = '> {} 种子抓取成功!新增种子{}条,更新种子{}条! \n'.format(my_site.site.name, res.data[0], + res.data[1]) + message_list += message + else: + message = '> ' + my_site.site.name + '抓取种子信息失败!原因:' + res.msg + ' \n' + message_list = message + message_list + # 日志 + logger.info( + '{} 种子抓取成功!新增种子{}条,更新种子{}条! '.format(my_site.site.name, res.data[0], res.data[ + 1]) if res.code == StatusCodeEnum.OK.code else my_site.site.name + '抓取种子信息失败!原因:' + res.msg) + else: + # pt_spider.send_text(my_site.site.name + ' 抓取种子信息失败!原因:' + result[0]) + message = '> ' + my_site.site.name + ' 抓取种子信息失败!原因:' + result.msg + ' \n' + message_list = message + message_list + logger.info(my_site.site.name + '抓取种子信息失败!原因:' + result.msg) + end = time.time() + consuming = '> {} 任务运行成功!耗时:{} 当前时间:{} \n'.format( + '拉取最新种子', + end - start, + time.strftime("%Y-%m-%d %H:%M:%S")) + logger.info(message_list + consuming) + pt_spider.send_text(message_list + consuming) + + +def auto_remove_expire_torrents(): + """ + 删除过期种子 + """ + start = time.time() + torrent_info_list = TorrentInfo.objects.all().filter(downloader__isnull=False) + for torrent_info in torrent_info_list: + expire_time = torrent_info.sale_expire + if '无限期' in expire_time: + # ToDo 先更新种子信息,然后再判断 + continue + if expire_time.endswith(':'): + expire_time += '00' + time_now = datetime.datetime.now() + expire_time_parse = datetime.datetime.strptime(expire_time, '%Y-%m-%d %H:%M:%S') + + if time_now >= expire_time_parse: + if not torrent_info.downloader: + # 未推送到下载器,跳过或删除? continue - if expire_time.endswith(':'): - expire_time += '00' - time_now = datetime.datetime.now() - expire_time_parse = datetime.datetime.strptime(expire_time, '%Y-%m-%d %H:%M:%S') - - if time_now >= expire_time_parse: - if not torrent_info.downloader: - # 未推送到下载器,跳过或删除? - continue - if pt_spider.get_torrent_info_from_downloader(torrent_info).code == StatusCodeEnum.OK.code: - # todo 设定任务规则: - # 免费到期后,下载完毕的种子是删除还是保留? - # 未下载完成的,是暂停还是删除? - torrent_info.delete() - end = time.time() - pt_spider.send_text( - '> {} 任务运行成功!耗时:{}{} \n'.format('签到', end - start, time.strftime("%Y-%m-%d %H:%M:%S"))) + if pt_spider.get_torrent_info_from_downloader(torrent_info).code == StatusCodeEnum.OK.code: + # todo 设定任务规则: + # 免费到期后,下载完毕的种子是删除还是保留? + # 未下载完成的,是暂停还是删除? + torrent_info.delete() + end = time.time() + pt_spider.send_text( + '> {} 任务运行成功!耗时:{}{} \n'.format('签到', end - start, time.strftime("%Y-%m-%d %H:%M:%S"))) - def auto_push_to_downloader(): - """推送到下载器""" - start = time.time() - print('推送到下载器') - end = time.time() - pt_spider.send_text( - '> {} 任务运行成功!耗时:{}{} \n'.format('签到', end - start, time.strftime("%Y-%m-%d %H:%M:%S"))) +def auto_push_to_downloader(): + """推送到下载器""" + start = time.time() + print('推送到下载器') + end = time.time() + pt_spider.send_text( + '> {} 任务运行成功!耗时:{}{} \n'.format('签到', end - start, time.strftime("%Y-%m-%d %H:%M:%S"))) - def auto_get_torrent_hash(): - """自动获取种子HASH""" - start = time.time() - print('自动获取种子HASH') - time.sleep(5) - end = time.time() - pt_spider.send_text( - '> {} 任务运行成功!耗时:{}{} \n'.format('获取种子HASH', end - start, time.strftime("%Y-%m-%d %H:%M:%S"))) +def auto_get_torrent_hash(): + """自动获取种子HASH""" + start = time.time() + print('自动获取种子HASH') + time.sleep(5) + end = time.time() + pt_spider.send_text( + '> {} 任务运行成功!耗时:{}{} \n'.format('获取种子HASH', end - start, time.strftime("%Y-%m-%d %H:%M:%S"))) +try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind(("127.0.0.1", 44444)) - scheduler.start() - logger.info('启动后台任务') + logger.info('启动Django主线程') except socket.error: - logger.info("!!!scheduler started, DO NOTHING") + logger.info('启动后台任务') + scheduler.start() except Exception as e: - logger.info(e) - logger.info('启动后台任务启动任务失败!') + logger.info('启动后台任务启动任务失败!{}'.format(e)) # 有错误就停止定时器 - # scheduler.shutdown() + pt_spider.send_text(text='启动后台任务启动任务失败!') + scheduler.shutdown() diff --git a/ptools/settings.py b/ptools/settings.py index 6a64287..f6d9112 100644 --- a/ptools/settings.py +++ b/ptools/settings.py @@ -233,6 +233,7 @@ DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField' # DATA_UPLOAD_MAX_MEMORY_SIZE:这个设置翻译过来就是:数据上传最大内存大小 DATA_UPLOAD_MAX_MEMORY_SIZE = 10485760 APSCHEDULER_DATETIME_FORMAT = 'Y-m-d H:i:s' # Default +APSCHEDULER_RUN_NOW_TIMEOUT = 300 # 任务执行超时时间 # 自定义配置 SIMPLEUI_HOME_TITLE = 'PT一下你就知道' SIMPLEUI_HOME_ICON = 'fa fa-optin-monster'