修复因启动django子线程导致自动任务失效的bug,并优化自动任务

This commit is contained in:
ngfchl
2022-10-11 11:53:56 +08:00
parent 3e198854cf
commit 4891af1250
4 changed files with 285 additions and 205 deletions

View File

@@ -12,11 +12,13 @@ from pt_site import views as tasks
from pt_site.views import pt_spider
from ptools.base import Trigger
scheduler = tasks.scheduler
# Register your models here.
# @admin.register(Task)
@admin.register(Task)
class TaskAdmin(admin.ModelAdmin): # instead of ModelAdmin
# 显示字段
list_display = (
@@ -64,58 +66,74 @@ class TaskJobAdmin(admin.ModelAdmin): # instead of ModelAdmin
list_editable = ('task_exec',)
def save_model(self, request, obj: TaskJob, form, change):
obj.save()
# 从字符串获取function
func = getattr(tasks, obj.task.name)
# 检查任务是否存在,已存在就删除任务
exist_job = tasks.scheduler.get_job(obj.job_id)
if exist_job:
logger.info(exist_job.id + '任务已存在,将移除后重新添加!')
exist_job.remove()
logger.info(exist_job.id + '任务移除成功')
if not obj.task_exec:
logger.info(exist_job.id + '任务未开启,将只入库不执行!')
super().save_model(request, obj, form, change)
else:
try:
# 添加任务
logger.info(exist_job.id + ' 任务添加中!')
if obj.trigger == Trigger.cron:
new_job = tasks.scheduler.add_job(func,
trigger=CronTrigger.from_crontab(obj.expression_time),
id=obj.job_id,
replace_existing=obj.replace_existing,
misfire_grace_time=obj.misfire_grace_time,
jitter=obj.jitter, )
if obj.trigger == Trigger.interval:
time_delta = 1
time_str = obj.expression_time.split('*')
for i in time_str:
time_delta *= int(i)
new_job = tasks.scheduler.add_job(func,
trigger=obj.trigger,
id=obj.job_id,
seconds=time_delta,
replace_existing=obj.replace_existing,
misfire_grace_time=obj.misfire_grace_time,
jitter=obj.jitter, )
exist_job = scheduler.get_job(obj.job_id)
logger.info('当前任务:{} | {}'.format(obj.job_id, exist_job))
try:
if not obj.task_exec:
logger.info(obj.job_id + '任务未开启')
super().save_model(request, obj, form, change)
# else:
# 添加任务
# print(new_job.pending)
# pt_spider.send_text('计划任务:' + new_job.id + info)
# messages.success(request, new_job.id + info)
# 如果任务未启用,只保存,不入库,已存在任务就删除
logger.info(exist_job.id + ' 任务添加成功!')
logger.info(obj.job_id + ' 任务状态是否暂停:' + str(new_job.pending))
logger.info('当前存在的所有自动任务:')
logger.info(tasks.scheduler.get_jobs())
messages.success(request,
obj.job_id + ' 保存成功!' + ('如需执行任务,请勾选开启任务!' if obj.task_exec else ''))
except Exception as e:
obj.task_exec = False
obj.save()
# raise
pt_spider.send_text('计划任务:' + obj.job_id + '任务添加失败!原因:' + str(e))
messages.error(request, obj.job_id + '任务添加失败!原因:' + str(e))
if obj.trigger == Trigger.cron:
if exist_job:
logger.info(obj.job_id + '任务已存在,修改中!')
exist_job.reschedule(trigger=CronTrigger.from_crontab(obj.expression_time))
logger.info(exist_job.id + '任务修改成功!')
else:
logger.info(obj.job_id + ' 任务添加中!')
exist_job = scheduler.add_job(
func,
trigger=CronTrigger.from_crontab(obj.expression_time),
id=obj.job_id,
max_instances=1,
replace_existing=obj.replace_existing,
misfire_grace_time=obj.misfire_grace_time,
jitter=obj.jitter
)
if obj.trigger == Trigger.interval:
time_delta = 1
time_str = obj.expression_time.split('*')
for i in time_str:
time_delta *= int(i)
if exist_job:
logger.info(obj.job_id + '任务已存在,修改中!')
exist_job.reschedule(trigger=obj.trigger, seconds=time_delta)
logger.info(exist_job.id + '任务修改成功!')
else:
logger.info(obj.job_id + ' 任务添加中!')
exist_job = scheduler.add_job(
func,
trigger=obj.trigger,
id=obj.job_id,
seconds=time_delta,
max_instances=1,
replace_existing=obj.replace_existing,
misfire_grace_time=obj.misfire_grace_time,
jitter=obj.jitter
)
logger.info('当前操作:{} 成功!'.format(exist_job))
if not obj.task_exec:
exist_job.pause()
else:
exist_job.resume()
logger.info(obj.job_id + ' 任务暂停:' + str(exist_job.pending))
logger.info('当前自动任务:')
logger.info(scheduler.get_jobs())
super().save_model(request, obj, form, change)
messages.success(request,
obj.job_id + ' 保存成功!' + ('' if obj.task_exec else '如需执行任务,请勾选开启任务!'))
except Exception as e:
obj.task_exec = False
obj.save()
raise
msg = obj.job_id + '任务添加失败!原因:' + str(e)
logger.error(msg)
pt_spider.send_text('计划任务:' + msg)
messages.error(request, msg)
def delete_model(self, request, obj):
print(obj)

View File

@@ -0,0 +1,58 @@
# Generated by Django 4.1 on 2022-10-11 09:58
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("auto_pt", "0001_initial"),
]
operations = [
migrations.AlterField(
model_name="notify",
name="name",
field=models.CharField(
choices=[
("wechat_work_push", "企业微信通知"),
("wxpusher_push", "WxPusher通知"),
("pushdeer_push", "PushDeer通知"),
("bark_push", "Bark通知"),
("iyuu_push", "爱语飞飞"),
],
default="wechat_work_push",
max_length=64,
verbose_name="通知方式",
),
),
migrations.AlterField(
model_name="taskjob",
name="expression_time",
field=models.CharField(
help_text="在间隔任务表示间隔时长使用数字单位corn任务中为五位corn表达式“15 8 * * 2022”",
max_length=64,
verbose_name="时间表达式",
),
),
migrations.AlterField(
model_name="taskjob",
name="jitter",
field=models.IntegerField(
default=1200, help_text="增强时间随机性", verbose_name="时间浮动参数"
),
),
migrations.AlterField(
model_name="taskjob",
name="misfire_grace_time",
field=models.IntegerField(
default=600,
help_text="强制执行结束的时间, 为避免撞车导致任务丢失, 没执行完就别执行了",
verbose_name="任务运行时间",
),
),
migrations.AlterField(
model_name="taskjob",
name="task_exec",
field=models.BooleanField(default=True, verbose_name="开启任务"),
),
]

View File

@@ -25,179 +25,182 @@ scheduler.add_jobstore(DjangoJobStore(), 'default')
pool = ThreadPoolExecutor(2)
pt_spider = PtSpider()
logger = logging.getLogger('ptools')
# Create your views here.
try:
def auto_sign_in():
"""自动签到"""
start = time.time()
# 获取本人所有站点
queryset = MySite.objects.all()
message_list = pt_spider.do_sign_in(pool, queryset)
end = time.time()
consuming = '> <font color="blue">{} 任务运行成功!耗时:{}完成时间:{} </font>\n'.format(
'自动签到', end - start,
time.strftime("%Y-%m-%d %H:%M:%S")
)
if message_list == 0:
logger.info('已经全部签到咯!!')
else:
logger.info(message_list + consuming)
pt_spider.send_text(message_list + consuming)
logger.info('{} 任务运行成功!完成时间:{}'.format('自动签到', time.strftime("%Y-%m-%d %H:%M:%S")))
def auto_get_status():
"""
更新个人数据
"""
start = time.time()
message_list = ''
queryset = MySite.objects.all()
site_list = [my_site for my_site in queryset if my_site.site.get_userinfo_support]
results = pool.map(pt_spider.send_status_request, site_list)
message_template = MessageTemplate.status_message_template
for my_site, result in zip(site_list, results):
if result.code == StatusCodeEnum.OK.code:
res = pt_spider.parse_status_html(my_site, result.data)
logger.info('自动更新个人数据: {}, {}'.format(my_site.site, res))
if res.code == StatusCodeEnum.OK.code:
status = res.data[0]
message = message_template.format(
my_site.my_level,
status.my_sp,
my_site.sp_hour,
status.my_bonus,
status.ratio,
FileSizeConvert.parse_2_file_size(status.downloaded),
FileSizeConvert.parse_2_file_size(status.uploaded),
my_site.seed,
my_site.leech,
my_site.invitation,
my_site.my_hr
)
logger.info('组装Message{}'.format(message))
message_list += ('> ' + my_site.site.name + ' 信息更新成功!' + message + ' \n')
# pt_spider.send_text(my_site.site.name + ' 信息更新成功!' + message)
logger.info(my_site.site.name + '信息更新成功!' + message)
else:
print(res)
message = '> <font color="red">' + my_site.site.name + ' 信息更新失败!原因:' + res.msg + '</font> \n'
message_list = message + message_list
# pt_spider.send_text(my_site.site.name + ' 信息更新失败!原因:' + str(res[0]))
logger.warning(my_site.site.name + '信息更新失败!原因:' + res.msg)
else:
# pt_spider.send_text(my_site.site.name + ' 信息更新失败!原因:' + str(result[1]))
message = '> <font color="red">' + my_site.site.name + ' 信息更新失败!原因:' + result.msg + '</font> \n'
message_list = message + message_list
logger.warning(my_site.site.name + '信息更新失败!原因:' + result.msg)
end = time.time()
consuming = '> <font color="blue">{} 任务运行成功!耗时:{} 完成时间:{} </font> \n'.format(
'自动更新个人数据', end - start,
time.strftime("%Y-%m-%d %H:%M:%S")
)
logger.info(message_list + consuming)
pt_spider.send_text(text=message_list + consuming)
def auto_sign_in():
"""自动签到"""
start = time.time()
# 获取本人所有站点
queryset = MySite.objects.all()
message_list = pt_spider.do_sign_in(pool, queryset)
end = time.time()
consuming = '> <font color="blue">{} 任务运行成功!耗时:{}完成时间:{} </font>\n'.format(
'自动签到', end - start,
time.strftime("%Y-%m-%d %H:%M:%S")
)
def auto_update_torrents():
"""
拉取最新种子
"""
start = time.time()
message_list = ''
queryset = MySite.objects.all()
site_list = [my_site for my_site in queryset if my_site.site.get_torrent_support]
results = pool.map(pt_spider.send_torrent_info_request, site_list)
for my_site, result in zip(site_list, results):
logger.info('获取种子:{}{}'.format(my_site.site.name, result))
# print(result is tuple[int])
if result.code == StatusCodeEnum.OK.code:
res = pt_spider.get_torrent_info_list(my_site, result.data)
# 通知推送
if res.code == StatusCodeEnum.OK.code:
message = '> {} 种子抓取成功!新增种子{}条,更新种子{}条! \n'.format(my_site.site.name, res.data[0],
res.data[1])
message_list += message
else:
message = '> <font color="red">' + my_site.site.name + '抓取种子信息失败!原因:' + res.msg + '</font> \n'
message_list = message + message_list
# 日志
logger.info(
'{} 种子抓取成功!新增种子{}条,更新种子{}条! '.format(my_site.site.name, res.data[0], res.data[
1]) if res.code == StatusCodeEnum.OK.code else my_site.site.name + '抓取种子信息失败!原因:' + res.msg)
else:
# pt_spider.send_text(my_site.site.name + ' 抓取种子信息失败!原因:' + result[0])
message = '> <font color="red">' + my_site.site.name + ' 抓取种子信息失败!原因:' + result.msg + '</font> \n'
message_list = message + message_list
logger.info(my_site.site.name + '抓取种子信息失败!原因:' + result.msg)
end = time.time()
consuming = '> {} 任务运行成功!耗时:{} 当前时间:{} \n'.format(
'拉取最新种子',
end - start,
time.strftime("%Y-%m-%d %H:%M:%S"))
if message_list == 0:
logger.info('已经全部签到咯!!')
else:
logger.info(message_list + consuming)
pt_spider.send_text(message_list + consuming)
logger.info('{} 任务运行成功!完成时间:{}'.format('自动签到', time.strftime("%Y-%m-%d %H:%M:%S")))
def auto_remove_expire_torrents():
"""
删除过期种子
"""
start = time.time()
torrent_info_list = TorrentInfo.objects.all().filter(downloader__isnull=False)
for torrent_info in torrent_info_list:
expire_time = torrent_info.sale_expire
if '无限期' in expire_time:
# ToDo 先更新种子信息,然后再判断
def auto_get_status():
"""
更新个人数据
"""
start = time.time()
message_list = ''
queryset = MySite.objects.all()
site_list = [my_site for my_site in queryset if my_site.site.get_userinfo_support]
results = pool.map(pt_spider.send_status_request, site_list)
message_template = MessageTemplate.status_message_template
for my_site, result in zip(site_list, results):
if result.code == StatusCodeEnum.OK.code:
res = pt_spider.parse_status_html(my_site, result.data)
logger.info('自动更新个人数据: {}, {}'.format(my_site.site, res))
if res.code == StatusCodeEnum.OK.code:
status = res.data[0]
message = message_template.format(
my_site.my_level,
status.my_sp,
my_site.sp_hour,
status.my_bonus,
status.ratio,
FileSizeConvert.parse_2_file_size(status.downloaded),
FileSizeConvert.parse_2_file_size(status.uploaded),
my_site.seed,
my_site.leech,
my_site.invitation,
my_site.my_hr
)
logger.info('组装Message{}'.format(message))
message_list += ('> ' + my_site.site.name + ' 信息更新成功!' + message + ' \n')
# pt_spider.send_text(my_site.site.name + ' 信息更新成功!' + message)
logger.info(my_site.site.name + '信息更新成功!' + message)
else:
print(res)
message = '> <font color="red">' + my_site.site.name + ' 信息更新失败!原因:' + res.msg + '</font> \n'
message_list = message + message_list
# pt_spider.send_text(my_site.site.name + ' 信息更新失败!原因:' + str(res[0]))
logger.warning(my_site.site.name + '信息更新失败!原因:' + res.msg)
else:
# pt_spider.send_text(my_site.site.name + ' 信息更新失败!原因:' + str(result[1]))
message = '> <font color="red">' + my_site.site.name + ' 信息更新失败!原因:' + result.msg + '</font> \n'
message_list = message + message_list
logger.warning(my_site.site.name + '信息更新失败!原因:' + result.msg)
end = time.time()
consuming = '> <font color="blue">{} 任务运行成功!耗时:{} 完成时间:{} </font> \n'.format(
'自动更新个人数据', end - start,
time.strftime("%Y-%m-%d %H:%M:%S")
)
logger.info(message_list + consuming)
pt_spider.send_text(text=message_list + consuming)
def auto_update_torrents():
"""
拉取最新种子
"""
start = time.time()
message_list = ''
queryset = MySite.objects.all()
site_list = [my_site for my_site in queryset if my_site.site.get_torrent_support]
results = pool.map(pt_spider.send_torrent_info_request, site_list)
for my_site, result in zip(site_list, results):
logger.info('获取种子:{}{}'.format(my_site.site.name, result))
# print(result is tuple[int])
if result.code == StatusCodeEnum.OK.code:
res = pt_spider.get_torrent_info_list(my_site, result.data)
# 通知推送
if res.code == StatusCodeEnum.OK.code:
message = '> {} 种子抓取成功!新增种子{}条,更新种子{}条! \n'.format(my_site.site.name, res.data[0],
res.data[1])
message_list += message
else:
message = '> <font color="red">' + my_site.site.name + '抓取种子信息失败!原因:' + res.msg + '</font> \n'
message_list = message + message_list
# 日志
logger.info(
'{} 种子抓取成功!新增种子{}条,更新种子{}条! '.format(my_site.site.name, res.data[0], res.data[
1]) if res.code == StatusCodeEnum.OK.code else my_site.site.name + '抓取种子信息失败!原因:' + res.msg)
else:
# pt_spider.send_text(my_site.site.name + ' 抓取种子信息失败!原因:' + result[0])
message = '> <font color="red">' + my_site.site.name + ' 抓取种子信息失败!原因:' + result.msg + '</font> \n'
message_list = message + message_list
logger.info(my_site.site.name + '抓取种子信息失败!原因:' + result.msg)
end = time.time()
consuming = '> {} 任务运行成功!耗时:{} 当前时间:{} \n'.format(
'拉取最新种子',
end - start,
time.strftime("%Y-%m-%d %H:%M:%S"))
logger.info(message_list + consuming)
pt_spider.send_text(message_list + consuming)
def auto_remove_expire_torrents():
"""
删除过期种子
"""
start = time.time()
torrent_info_list = TorrentInfo.objects.all().filter(downloader__isnull=False)
for torrent_info in torrent_info_list:
expire_time = torrent_info.sale_expire
if '无限期' in expire_time:
# ToDo 先更新种子信息,然后再判断
continue
if expire_time.endswith(':'):
expire_time += '00'
time_now = datetime.datetime.now()
expire_time_parse = datetime.datetime.strptime(expire_time, '%Y-%m-%d %H:%M:%S')
if time_now >= expire_time_parse:
if not torrent_info.downloader:
# 未推送到下载器,跳过或删除?
continue
if expire_time.endswith(':'):
expire_time += '00'
time_now = datetime.datetime.now()
expire_time_parse = datetime.datetime.strptime(expire_time, '%Y-%m-%d %H:%M:%S')
if time_now >= expire_time_parse:
if not torrent_info.downloader:
# 未推送到下载器,跳过或删除?
continue
if pt_spider.get_torrent_info_from_downloader(torrent_info).code == StatusCodeEnum.OK.code:
# todo 设定任务规则:
# 免费到期后,下载完毕的种子是删除还是保留?
# 未下载完成的,是暂停还是删除?
torrent_info.delete()
end = time.time()
pt_spider.send_text(
'> {} 任务运行成功!耗时:{}{} \n'.format('签到', end - start, time.strftime("%Y-%m-%d %H:%M:%S")))
if pt_spider.get_torrent_info_from_downloader(torrent_info).code == StatusCodeEnum.OK.code:
# todo 设定任务规则:
# 免费到期后,下载完毕的种子是删除还是保留?
# 未下载完成的,是暂停还是删除?
torrent_info.delete()
end = time.time()
pt_spider.send_text(
'> {} 任务运行成功!耗时:{}{} \n'.format('签到', end - start, time.strftime("%Y-%m-%d %H:%M:%S")))
def auto_push_to_downloader():
"""推送到下载器"""
start = time.time()
print('推送到下载器')
end = time.time()
pt_spider.send_text(
'> {} 任务运行成功!耗时:{}{} \n'.format('签到', end - start, time.strftime("%Y-%m-%d %H:%M:%S")))
def auto_push_to_downloader():
"""推送到下载器"""
start = time.time()
print('推送到下载器')
end = time.time()
pt_spider.send_text(
'> {} 任务运行成功!耗时:{}{} \n'.format('签到', end - start, time.strftime("%Y-%m-%d %H:%M:%S")))
def auto_get_torrent_hash():
"""自动获取种子HASH"""
start = time.time()
print('自动获取种子HASH')
time.sleep(5)
end = time.time()
pt_spider.send_text(
'> {} 任务运行成功!耗时:{}{} \n'.format('获取种子HASH', end - start, time.strftime("%Y-%m-%d %H:%M:%S")))
def auto_get_torrent_hash():
"""自动获取种子HASH"""
start = time.time()
print('自动获取种子HASH')
time.sleep(5)
end = time.time()
pt_spider.send_text(
'> {} 任务运行成功!耗时:{}{} \n'.format('获取种子HASH', end - start, time.strftime("%Y-%m-%d %H:%M:%S")))
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(("127.0.0.1", 44444))
scheduler.start()
logger.info('启动后台任务')
logger.info('启动Django主线程')
except socket.error:
logger.info("!!!scheduler started, DO NOTHING")
logger.info('启动后台任务')
scheduler.start()
except Exception as e:
logger.info(e)
logger.info('启动后台任务启动任务失败!')
logger.info('启动后台任务启动任务失败!{}'.format(e))
# 有错误就停止定时器
# scheduler.shutdown()
pt_spider.send_text(text='启动后台任务启动任务失败!')
scheduler.shutdown()

View File

@@ -233,6 +233,7 @@ DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField'
# DATA_UPLOAD_MAX_MEMORY_SIZE这个设置翻译过来就是数据上传最大内存大小
DATA_UPLOAD_MAX_MEMORY_SIZE = 10485760
APSCHEDULER_DATETIME_FORMAT = 'Y-m-d H:i:s' # Default
APSCHEDULER_RUN_NOW_TIMEOUT = 300 # 任务执行超时时间
# 自定义配置
SIMPLEUI_HOME_TITLE = 'PT一下你就知道'
SIMPLEUI_HOME_ICON = 'fa fa-optin-monster'