diff --git a/auto_pt/admin.py b/auto_pt/admin.py
index b98d573..20e194a 100644
--- a/auto_pt/admin.py
+++ b/auto_pt/admin.py
@@ -12,11 +12,13 @@ from pt_site import views as tasks
from pt_site.views import pt_spider
from ptools.base import Trigger
+scheduler = tasks.scheduler
+
# Register your models here.
-# @admin.register(Task)
+@admin.register(Task)
class TaskAdmin(admin.ModelAdmin): # instead of ModelAdmin
# 显示字段
list_display = (
@@ -64,58 +66,74 @@ class TaskJobAdmin(admin.ModelAdmin): # instead of ModelAdmin
list_editable = ('task_exec',)
def save_model(self, request, obj: TaskJob, form, change):
- obj.save()
# 从字符串获取function
func = getattr(tasks, obj.task.name)
# 检查任务是否存在,已存在就删除任务
- exist_job = tasks.scheduler.get_job(obj.job_id)
- if exist_job:
- logger.info(exist_job.id + '任务已存在,将移除后重新添加!')
- exist_job.remove()
- logger.info(exist_job.id + '任务移除成功!')
- if not obj.task_exec:
- logger.info(exist_job.id + '任务未开启,将只入库不执行!')
- super().save_model(request, obj, form, change)
- else:
- try:
- # 添加任务
- logger.info(exist_job.id + ' 任务添加中!')
- if obj.trigger == Trigger.cron:
- new_job = tasks.scheduler.add_job(func,
- trigger=CronTrigger.from_crontab(obj.expression_time),
- id=obj.job_id,
- replace_existing=obj.replace_existing,
- misfire_grace_time=obj.misfire_grace_time,
- jitter=obj.jitter, )
- if obj.trigger == Trigger.interval:
- time_delta = 1
- time_str = obj.expression_time.split('*')
- for i in time_str:
- time_delta *= int(i)
- new_job = tasks.scheduler.add_job(func,
- trigger=obj.trigger,
- id=obj.job_id,
- seconds=time_delta,
- replace_existing=obj.replace_existing,
- misfire_grace_time=obj.misfire_grace_time,
- jitter=obj.jitter, )
+ exist_job = scheduler.get_job(obj.job_id)
+ logger.info('当前任务:{} | {}'.format(obj.job_id, exist_job))
+ try:
+ if not obj.task_exec:
+ logger.info(obj.job_id + '任务未开启!')
+ super().save_model(request, obj, form, change)
+ # else:
+ # 添加任务
- # print(new_job.pending)
- # pt_spider.send_text('计划任务:' + new_job.id + info)
- # messages.success(request, new_job.id + info)
- # 如果任务未启用,只保存,不入库,已存在任务就删除
- logger.info(exist_job.id + ' 任务添加成功!')
- logger.info(obj.job_id + ' 任务状态是否暂停:' + str(new_job.pending))
- logger.info('当前存在的所有自动任务:')
- logger.info(tasks.scheduler.get_jobs())
- messages.success(request,
- obj.job_id + ' 保存成功!' + ('如需执行任务,请勾选开启任务!' if obj.task_exec else ''))
- except Exception as e:
- obj.task_exec = False
- obj.save()
- # raise
- pt_spider.send_text('计划任务:' + obj.job_id + '任务添加失败!原因:' + str(e))
- messages.error(request, obj.job_id + '任务添加失败!原因:' + str(e))
+ if obj.trigger == Trigger.cron:
+ if exist_job:
+ logger.info(obj.job_id + '任务已存在,修改中!')
+ exist_job.reschedule(trigger=CronTrigger.from_crontab(obj.expression_time))
+ logger.info(exist_job.id + '任务修改成功!')
+ else:
+ logger.info(obj.job_id + ' 任务添加中!')
+ exist_job = scheduler.add_job(
+ func,
+ trigger=CronTrigger.from_crontab(obj.expression_time),
+ id=obj.job_id,
+ max_instances=1,
+ replace_existing=obj.replace_existing,
+ misfire_grace_time=obj.misfire_grace_time,
+ jitter=obj.jitter
+ )
+ if obj.trigger == Trigger.interval:
+ time_delta = 1
+ time_str = obj.expression_time.split('*')
+ for i in time_str:
+ time_delta *= int(i)
+ if exist_job:
+ logger.info(obj.job_id + '任务已存在,修改中!')
+ exist_job.reschedule(trigger=obj.trigger, seconds=time_delta)
+ logger.info(exist_job.id + '任务修改成功!')
+ else:
+ logger.info(obj.job_id + ' 任务添加中!')
+ exist_job = scheduler.add_job(
+ func,
+ trigger=obj.trigger,
+ id=obj.job_id,
+ seconds=time_delta,
+ max_instances=1,
+ replace_existing=obj.replace_existing,
+ misfire_grace_time=obj.misfire_grace_time,
+ jitter=obj.jitter
+ )
+ logger.info('当前操作:{} 成功!'.format(exist_job))
+ if not obj.task_exec:
+ exist_job.pause()
+ else:
+ exist_job.resume()
+ logger.info(obj.job_id + ' 任务暂停:' + str(exist_job.pending))
+ logger.info('当前自动任务:')
+ logger.info(scheduler.get_jobs())
+ super().save_model(request, obj, form, change)
+ messages.success(request,
+ obj.job_id + ' 保存成功!' + ('' if obj.task_exec else '如需执行任务,请勾选开启任务!'))
+ except Exception as e:
+ obj.task_exec = False
+ obj.save()
+ raise
+ msg = obj.job_id + '任务添加失败!原因:' + str(e)
+ logger.error(msg)
+ pt_spider.send_text('计划任务:' + msg)
+ messages.error(request, msg)
def delete_model(self, request, obj):
print(obj)
diff --git a/auto_pt/migrations/0002_alter_notify_name_alter_taskjob_expression_time_and_more.py b/auto_pt/migrations/0002_alter_notify_name_alter_taskjob_expression_time_and_more.py
new file mode 100644
index 0000000..278f209
--- /dev/null
+++ b/auto_pt/migrations/0002_alter_notify_name_alter_taskjob_expression_time_and_more.py
@@ -0,0 +1,58 @@
+# Generated by Django 4.1 on 2022-10-11 09:58
+
+from django.db import migrations, models
+
+
+class Migration(migrations.Migration):
+ dependencies = [
+ ("auto_pt", "0001_initial"),
+ ]
+
+ operations = [
+ migrations.AlterField(
+ model_name="notify",
+ name="name",
+ field=models.CharField(
+ choices=[
+ ("wechat_work_push", "企业微信通知"),
+ ("wxpusher_push", "WxPusher通知"),
+ ("pushdeer_push", "PushDeer通知"),
+ ("bark_push", "Bark通知"),
+ ("iyuu_push", "爱语飞飞"),
+ ],
+ default="wechat_work_push",
+ max_length=64,
+ verbose_name="通知方式",
+ ),
+ ),
+ migrations.AlterField(
+ model_name="taskjob",
+ name="expression_time",
+ field=models.CharField(
+ help_text="在间隔任务表示间隔时长使用数字,单位:秒,corn任务中为五位corn表达式:“15 8 * * 2022”",
+ max_length=64,
+ verbose_name="时间表达式",
+ ),
+ ),
+ migrations.AlterField(
+ model_name="taskjob",
+ name="jitter",
+ field=models.IntegerField(
+ default=1200, help_text="增强时间随机性", verbose_name="时间浮动参数"
+ ),
+ ),
+ migrations.AlterField(
+ model_name="taskjob",
+ name="misfire_grace_time",
+ field=models.IntegerField(
+ default=600,
+ help_text="强制执行结束的时间, 为避免撞车导致任务丢失, 没执行完就别执行了",
+ verbose_name="任务运行时间",
+ ),
+ ),
+ migrations.AlterField(
+ model_name="taskjob",
+ name="task_exec",
+ field=models.BooleanField(default=True, verbose_name="开启任务"),
+ ),
+ ]
diff --git a/pt_site/views.py b/pt_site/views.py
index a5b3378..2b21f80 100644
--- a/pt_site/views.py
+++ b/pt_site/views.py
@@ -25,179 +25,182 @@ scheduler.add_jobstore(DjangoJobStore(), 'default')
pool = ThreadPoolExecutor(2)
pt_spider = PtSpider()
logger = logging.getLogger('ptools')
+
+
# Create your views here.
-try:
-
- def auto_sign_in():
- """自动签到"""
- start = time.time()
- # 获取本人所有站点
- queryset = MySite.objects.all()
- message_list = pt_spider.do_sign_in(pool, queryset)
- end = time.time()
- consuming = '> {} 任务运行成功!耗时:{}完成时间:{} \n'.format(
- '自动签到', end - start,
- time.strftime("%Y-%m-%d %H:%M:%S")
- )
-
- if message_list == 0:
- logger.info('已经全部签到咯!!')
- else:
- logger.info(message_list + consuming)
- pt_spider.send_text(message_list + consuming)
- logger.info('{} 任务运行成功!完成时间:{}'.format('自动签到', time.strftime("%Y-%m-%d %H:%M:%S")))
- def auto_get_status():
- """
- 更新个人数据
- """
- start = time.time()
- message_list = ''
- queryset = MySite.objects.all()
- site_list = [my_site for my_site in queryset if my_site.site.get_userinfo_support]
- results = pool.map(pt_spider.send_status_request, site_list)
- message_template = MessageTemplate.status_message_template
- for my_site, result in zip(site_list, results):
- if result.code == StatusCodeEnum.OK.code:
- res = pt_spider.parse_status_html(my_site, result.data)
- logger.info('自动更新个人数据: {}, {}'.format(my_site.site, res))
- if res.code == StatusCodeEnum.OK.code:
- status = res.data[0]
- message = message_template.format(
- my_site.my_level,
- status.my_sp,
- my_site.sp_hour,
- status.my_bonus,
- status.ratio,
- FileSizeConvert.parse_2_file_size(status.downloaded),
- FileSizeConvert.parse_2_file_size(status.uploaded),
- my_site.seed,
- my_site.leech,
- my_site.invitation,
- my_site.my_hr
- )
- logger.info('组装Message:{}'.format(message))
- message_list += ('> ' + my_site.site.name + ' 信息更新成功!' + message + ' \n')
- # pt_spider.send_text(my_site.site.name + ' 信息更新成功!' + message)
- logger.info(my_site.site.name + '信息更新成功!' + message)
- else:
- print(res)
- message = '> ' + my_site.site.name + ' 信息更新失败!原因:' + res.msg + ' \n'
- message_list = message + message_list
- # pt_spider.send_text(my_site.site.name + ' 信息更新失败!原因:' + str(res[0]))
- logger.warning(my_site.site.name + '信息更新失败!原因:' + res.msg)
- else:
- # pt_spider.send_text(my_site.site.name + ' 信息更新失败!原因:' + str(result[1]))
- message = '> ' + my_site.site.name + ' 信息更新失败!原因:' + result.msg + ' \n'
- message_list = message + message_list
- logger.warning(my_site.site.name + '信息更新失败!原因:' + result.msg)
- end = time.time()
- consuming = '> {} 任务运行成功!耗时:{} 完成时间:{} \n'.format(
- '自动更新个人数据', end - start,
- time.strftime("%Y-%m-%d %H:%M:%S")
- )
- logger.info(message_list + consuming)
- pt_spider.send_text(text=message_list + consuming)
+def auto_sign_in():
+ """自动签到"""
+ start = time.time()
+ # 获取本人所有站点
+ queryset = MySite.objects.all()
+ message_list = pt_spider.do_sign_in(pool, queryset)
+ end = time.time()
+ consuming = '> {} 任务运行成功!耗时:{}完成时间:{} \n'.format(
+ '自动签到', end - start,
+ time.strftime("%Y-%m-%d %H:%M:%S")
+ )
-
- def auto_update_torrents():
- """
- 拉取最新种子
- """
- start = time.time()
- message_list = ''
- queryset = MySite.objects.all()
- site_list = [my_site for my_site in queryset if my_site.site.get_torrent_support]
- results = pool.map(pt_spider.send_torrent_info_request, site_list)
- for my_site, result in zip(site_list, results):
- logger.info('获取种子:{}{}'.format(my_site.site.name, result))
- # print(result is tuple[int])
- if result.code == StatusCodeEnum.OK.code:
- res = pt_spider.get_torrent_info_list(my_site, result.data)
- # 通知推送
- if res.code == StatusCodeEnum.OK.code:
- message = '> {} 种子抓取成功!新增种子{}条,更新种子{}条! \n'.format(my_site.site.name, res.data[0],
- res.data[1])
- message_list += message
- else:
- message = '> ' + my_site.site.name + '抓取种子信息失败!原因:' + res.msg + ' \n'
- message_list = message + message_list
- # 日志
- logger.info(
- '{} 种子抓取成功!新增种子{}条,更新种子{}条! '.format(my_site.site.name, res.data[0], res.data[
- 1]) if res.code == StatusCodeEnum.OK.code else my_site.site.name + '抓取种子信息失败!原因:' + res.msg)
- else:
- # pt_spider.send_text(my_site.site.name + ' 抓取种子信息失败!原因:' + result[0])
- message = '> ' + my_site.site.name + ' 抓取种子信息失败!原因:' + result.msg + ' \n'
- message_list = message + message_list
- logger.info(my_site.site.name + '抓取种子信息失败!原因:' + result.msg)
- end = time.time()
- consuming = '> {} 任务运行成功!耗时:{} 当前时间:{} \n'.format(
- '拉取最新种子',
- end - start,
- time.strftime("%Y-%m-%d %H:%M:%S"))
+ if message_list == 0:
+ logger.info('已经全部签到咯!!')
+ else:
logger.info(message_list + consuming)
pt_spider.send_text(message_list + consuming)
+ logger.info('{} 任务运行成功!完成时间:{}'.format('自动签到', time.strftime("%Y-%m-%d %H:%M:%S")))
- def auto_remove_expire_torrents():
- """
- 删除过期种子
- """
- start = time.time()
- torrent_info_list = TorrentInfo.objects.all().filter(downloader__isnull=False)
- for torrent_info in torrent_info_list:
- expire_time = torrent_info.sale_expire
- if '无限期' in expire_time:
- # ToDo 先更新种子信息,然后再判断
+def auto_get_status():
+ """
+ 更新个人数据
+ """
+ start = time.time()
+ message_list = ''
+ queryset = MySite.objects.all()
+ site_list = [my_site for my_site in queryset if my_site.site.get_userinfo_support]
+ results = pool.map(pt_spider.send_status_request, site_list)
+ message_template = MessageTemplate.status_message_template
+ for my_site, result in zip(site_list, results):
+ if result.code == StatusCodeEnum.OK.code:
+ res = pt_spider.parse_status_html(my_site, result.data)
+ logger.info('自动更新个人数据: {}, {}'.format(my_site.site, res))
+ if res.code == StatusCodeEnum.OK.code:
+ status = res.data[0]
+ message = message_template.format(
+ my_site.my_level,
+ status.my_sp,
+ my_site.sp_hour,
+ status.my_bonus,
+ status.ratio,
+ FileSizeConvert.parse_2_file_size(status.downloaded),
+ FileSizeConvert.parse_2_file_size(status.uploaded),
+ my_site.seed,
+ my_site.leech,
+ my_site.invitation,
+ my_site.my_hr
+ )
+ logger.info('组装Message:{}'.format(message))
+ message_list += ('> ' + my_site.site.name + ' 信息更新成功!' + message + ' \n')
+ # pt_spider.send_text(my_site.site.name + ' 信息更新成功!' + message)
+ logger.info(my_site.site.name + '信息更新成功!' + message)
+ else:
+ print(res)
+ message = '> ' + my_site.site.name + ' 信息更新失败!原因:' + res.msg + ' \n'
+ message_list = message + message_list
+ # pt_spider.send_text(my_site.site.name + ' 信息更新失败!原因:' + str(res[0]))
+ logger.warning(my_site.site.name + '信息更新失败!原因:' + res.msg)
+ else:
+ # pt_spider.send_text(my_site.site.name + ' 信息更新失败!原因:' + str(result[1]))
+ message = '> ' + my_site.site.name + ' 信息更新失败!原因:' + result.msg + ' \n'
+ message_list = message + message_list
+ logger.warning(my_site.site.name + '信息更新失败!原因:' + result.msg)
+ end = time.time()
+ consuming = '> {} 任务运行成功!耗时:{} 完成时间:{} \n'.format(
+ '自动更新个人数据', end - start,
+ time.strftime("%Y-%m-%d %H:%M:%S")
+ )
+ logger.info(message_list + consuming)
+ pt_spider.send_text(text=message_list + consuming)
+
+
+def auto_update_torrents():
+ """
+ 拉取最新种子
+ """
+ start = time.time()
+ message_list = ''
+ queryset = MySite.objects.all()
+ site_list = [my_site for my_site in queryset if my_site.site.get_torrent_support]
+ results = pool.map(pt_spider.send_torrent_info_request, site_list)
+ for my_site, result in zip(site_list, results):
+ logger.info('获取种子:{}{}'.format(my_site.site.name, result))
+ # print(result is tuple[int])
+ if result.code == StatusCodeEnum.OK.code:
+ res = pt_spider.get_torrent_info_list(my_site, result.data)
+ # 通知推送
+ if res.code == StatusCodeEnum.OK.code:
+ message = '> {} 种子抓取成功!新增种子{}条,更新种子{}条! \n'.format(my_site.site.name, res.data[0],
+ res.data[1])
+ message_list += message
+ else:
+ message = '> ' + my_site.site.name + '抓取种子信息失败!原因:' + res.msg + ' \n'
+ message_list = message + message_list
+ # 日志
+ logger.info(
+ '{} 种子抓取成功!新增种子{}条,更新种子{}条! '.format(my_site.site.name, res.data[0], res.data[
+ 1]) if res.code == StatusCodeEnum.OK.code else my_site.site.name + '抓取种子信息失败!原因:' + res.msg)
+ else:
+ # pt_spider.send_text(my_site.site.name + ' 抓取种子信息失败!原因:' + result[0])
+ message = '> ' + my_site.site.name + ' 抓取种子信息失败!原因:' + result.msg + ' \n'
+ message_list = message + message_list
+ logger.info(my_site.site.name + '抓取种子信息失败!原因:' + result.msg)
+ end = time.time()
+ consuming = '> {} 任务运行成功!耗时:{} 当前时间:{} \n'.format(
+ '拉取最新种子',
+ end - start,
+ time.strftime("%Y-%m-%d %H:%M:%S"))
+ logger.info(message_list + consuming)
+ pt_spider.send_text(message_list + consuming)
+
+
+def auto_remove_expire_torrents():
+ """
+ 删除过期种子
+ """
+ start = time.time()
+ torrent_info_list = TorrentInfo.objects.all().filter(downloader__isnull=False)
+ for torrent_info in torrent_info_list:
+ expire_time = torrent_info.sale_expire
+ if '无限期' in expire_time:
+ # ToDo 先更新种子信息,然后再判断
+ continue
+ if expire_time.endswith(':'):
+ expire_time += '00'
+ time_now = datetime.datetime.now()
+ expire_time_parse = datetime.datetime.strptime(expire_time, '%Y-%m-%d %H:%M:%S')
+
+ if time_now >= expire_time_parse:
+ if not torrent_info.downloader:
+ # 未推送到下载器,跳过或删除?
continue
- if expire_time.endswith(':'):
- expire_time += '00'
- time_now = datetime.datetime.now()
- expire_time_parse = datetime.datetime.strptime(expire_time, '%Y-%m-%d %H:%M:%S')
-
- if time_now >= expire_time_parse:
- if not torrent_info.downloader:
- # 未推送到下载器,跳过或删除?
- continue
- if pt_spider.get_torrent_info_from_downloader(torrent_info).code == StatusCodeEnum.OK.code:
- # todo 设定任务规则:
- # 免费到期后,下载完毕的种子是删除还是保留?
- # 未下载完成的,是暂停还是删除?
- torrent_info.delete()
- end = time.time()
- pt_spider.send_text(
- '> {} 任务运行成功!耗时:{}{} \n'.format('签到', end - start, time.strftime("%Y-%m-%d %H:%M:%S")))
+ if pt_spider.get_torrent_info_from_downloader(torrent_info).code == StatusCodeEnum.OK.code:
+ # todo 设定任务规则:
+ # 免费到期后,下载完毕的种子是删除还是保留?
+ # 未下载完成的,是暂停还是删除?
+ torrent_info.delete()
+ end = time.time()
+ pt_spider.send_text(
+ '> {} 任务运行成功!耗时:{}{} \n'.format('签到', end - start, time.strftime("%Y-%m-%d %H:%M:%S")))
- def auto_push_to_downloader():
- """推送到下载器"""
- start = time.time()
- print('推送到下载器')
- end = time.time()
- pt_spider.send_text(
- '> {} 任务运行成功!耗时:{}{} \n'.format('签到', end - start, time.strftime("%Y-%m-%d %H:%M:%S")))
+def auto_push_to_downloader():
+ """推送到下载器"""
+ start = time.time()
+ print('推送到下载器')
+ end = time.time()
+ pt_spider.send_text(
+ '> {} 任务运行成功!耗时:{}{} \n'.format('签到', end - start, time.strftime("%Y-%m-%d %H:%M:%S")))
- def auto_get_torrent_hash():
- """自动获取种子HASH"""
- start = time.time()
- print('自动获取种子HASH')
- time.sleep(5)
- end = time.time()
- pt_spider.send_text(
- '> {} 任务运行成功!耗时:{}{} \n'.format('获取种子HASH', end - start, time.strftime("%Y-%m-%d %H:%M:%S")))
+def auto_get_torrent_hash():
+ """自动获取种子HASH"""
+ start = time.time()
+ print('自动获取种子HASH')
+ time.sleep(5)
+ end = time.time()
+ pt_spider.send_text(
+ '> {} 任务运行成功!耗时:{}{} \n'.format('获取种子HASH', end - start, time.strftime("%Y-%m-%d %H:%M:%S")))
+try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(("127.0.0.1", 44444))
- scheduler.start()
- logger.info('启动后台任务')
+ logger.info('启动Django主线程')
except socket.error:
- logger.info("!!!scheduler started, DO NOTHING")
+ logger.info('启动后台任务')
+ scheduler.start()
except Exception as e:
- logger.info(e)
- logger.info('启动后台任务启动任务失败!')
+ logger.info('启动后台任务启动任务失败!{}'.format(e))
# 有错误就停止定时器
- # scheduler.shutdown()
+ pt_spider.send_text(text='启动后台任务启动任务失败!')
+ scheduler.shutdown()
diff --git a/ptools/settings.py b/ptools/settings.py
index 6a64287..f6d9112 100644
--- a/ptools/settings.py
+++ b/ptools/settings.py
@@ -233,6 +233,7 @@ DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField'
# DATA_UPLOAD_MAX_MEMORY_SIZE:这个设置翻译过来就是:数据上传最大内存大小
DATA_UPLOAD_MAX_MEMORY_SIZE = 10485760
APSCHEDULER_DATETIME_FORMAT = 'Y-m-d H:i:s' # Default
+APSCHEDULER_RUN_NOW_TIMEOUT = 300 # 任务执行超时时间
# 自定义配置
SIMPLEUI_HOME_TITLE = 'PT一下你就知道'
SIMPLEUI_HOME_ICON = 'fa fa-optin-monster'