import time
import traceback
from datetime import datetime
import qbittorrentapi
import transmission_rpc
from django.contrib import admin, messages
from django.http import JsonResponse
from django.utils.html import format_html
from django_admin_inline_paginator.admin import TabularInlinePaginated
from simpleui.admin import AjaxAdmin
from transmission_rpc import Torrent
from pt_site.UtilityTool import MessageTemplate, FileSizeConvert
from pt_site.models import Site, Downloader, SignIn, UserLevelRule
from pt_site.models import TorrentInfo, SiteStatus, MySite
# Register your models here.
from pt_site.views import pool, pt_spider
from ptools.base import StatusCodeEnum, DownloaderCategory
admin.site.site_header = 'PT一下,你就晓嘚'
admin.site.site_title = 'PT一下,你就晓嘚'
admin.site.index_title = '我在后台首页'
@admin.register(Site)
class SiteAdmin(admin.ModelAdmin): # instead of ModelAdmin
# 显示字段
list_display = (
'name',
'custom_url',
'sign_in_support',
'get_userinfo_support',
'get_torrent_support',
'search_support',
'created_at',
'updated_at',
# 'remark',
)
actions_selection_counter = True
# list_display_links = None
# 过滤字段
list_filter = ('name',
'sign_in_support',
'get_userinfo_support',
'get_torrent_support',
'search_support',)
# 搜索
search_fields = ('name',)
list_editable = ('sign_in_support',
'get_userinfo_support',
'get_torrent_support',
'search_support')
# def has_delete_permission(self, request, obj=None):
# return False
# def has_add_permission(self, request, obj=None):
# return False
# def has_change_permission(self, request, obj=None):
# return False
# 自定义样式
# def custom_date(self, obj):
# return format_html(
# '{}',
# obj.updated_at.strftime("%Y-%m-%d %H:%M:%S")
# )
# 自定义样式
def custom_url(self, obj):
return format_html(
'{}',
obj.url + obj.page_default,
obj.url
)
def get_search_results(self, request, queryset, search_term):
queryset, use_distinct = super(SiteAdmin, self).get_search_results(request, queryset, search_term)
if not request.META['PATH_INFO'] == '/admin/pt_site/site/':
queryset = queryset.exclude(
pk__in=[site.pk for site in Site.objects.all() if
MySite.objects.filter(site=site).count() >= 1])
return queryset, use_distinct
custom_url.short_description = '站点网址'
formfield_overrides = {}
# 分组设置编辑字段
fieldsets = (
['站点设置', {
# 'classes': ('collapse',), # CSS
'fields': (
('name', 'nickname', 'url', 'logo'),
('limit_speed', 'sp_full', 'tracker',),
),
}],
['功能支持', {
# 'classes': ('collapse',), # CSS
'fields': (
('sign_in_support', 'get_userinfo_support',),
('get_torrent_support', 'search_support',),
),
}],
['签到设置', {
'classes': ('collapse',), # CSS
'fields': (
('sign_in_captcha', 'sign_in_method',),
('sign_in_headers', 'sign_in_params',)
),
}],
['站点主要页面', {
'classes': ('collapse',), # CSS
'fields': (
'page_index',
'page_default',
'page_sign_in',
'page_detail',
'page_download',
'page_user',
'page_control_panel',
'page_search',
'page_message',
'page_leeching',
'page_uploaded',
'page_seeding',
'page_completed',
'page_mybonus',
'page_viewfilelist',
'page_viewpeerlist',
),
}],
['H&R设置', {
'classes': ('collapse',), # CSS
'fields': (
'hr',
('hr_rate', 'hr_time',)
),
}],
['站点信息规则', {
'classes': ('collapse',), # CSS
'fields': (
'my_uid_rule',
'my_passkey_rule',
'my_level_rule',
'ratio_rule',
'my_sp_rule',
'hour_sp_rule',
'my_bonus_rule',
'invitation_rule',
'uploaded_rule',
'downloaded_rule',
'seed_rule',
'leech_rule',
'seed_vol_rule',
'my_hr_rule',
'mailbox_rule',
'notice_rule',
'message_title',
'notice_title',
'notice_content',
'full_site_free',
'time_join_rule',
'latest_active_rule',
),
}],
['种子获取规则', {
'classes': ('collapse',), # CSS
'fields': (
'torrents_rule',
'name_rule',
'title_rule',
'detail_url_rule',
'category_rule',
'poster_rule',
'download_url_rule',
'magnet_url_rule',
'size_rule',
'hr_rule',
'sale_rule',
'sale_expire_rule',
'release_rule',
'seeders_rule',
'leechers_rule',
'completers_rule',
'record_count_rule',
'hash_rule',
'peer_speed_rule',
'viewpeerlist_rule',
'viewfilelist_rule',
),
}]
)
@admin.register(UserLevelRule)
class UserLevelRuleAdmin(AjaxAdmin):
# 显示字段
list_display = (
'site',
'level_id',
'level',
'days',
# 'uploaded',
'downloaded',
'bonus',
'score',
'ratio',
'torrents',
# 'rights',
)
search_fields = ('site__name', 'level',)
class StatusInlines(TabularInlinePaginated):
model = SiteStatus
per_page = 10
fields = [
'uploaded', 'downloaded', 'ratio', 'seed', 'seed_vol', 'leech', 'invitation', 'sp_hour',
'my_sp', 'my_bonus', 'created_at'
]
# classes = ['collapse']
readonly_fields = ['created_at']
ordering = ['-created_at']
# 自定义模板,删除外键显示
template = 'admin/pt_site/inline_status/tabular.html'
can_delete = False
# 禁止添加按钮
def has_add_permission(self, request, obj=None):
return False
# 禁止删除按钮
# def has_delete_permission(self, request, obj=None):
# return False
# 禁止修改按钮
def has_change_permission(self, request, obj=None):
return False
class SignInInlines(TabularInlinePaginated):
model = SignIn
per_page = 15
fields = [
'sign_in_today', 'sign_in_info',
'created_at'
]
classes = ['collapse']
readonly_fields = ['created_at']
ordering = ['-created_at']
can_delete = False
template = 'admin/pt_site/inline_status/tabular.html'
# 自定义模板,删除外键显示
# template = 'admin/pt_site/inline_status/tabular.html'
# 禁止添加按钮
def has_add_permission(self, request, obj=None):
return False
# 禁止删除按钮
def has_delete_permission(self, request, obj=None):
return False
# 禁止修改按钮
def has_change_permission(self, request, obj=None):
return False
@admin.register(MySite)
class MySiteAdmin(AjaxAdmin): # instead of ModelAdmin
# 显示字段
list_display = (
'sort_id',
# 'user_id',
# 'site',
'site_name',
# 'invitation',
# 'my_level',
# 'my_hr',
'userinfo',
'userdata',
'leeching_seeding',
'bonus',
# 'time_join',
'status_today',
'edit',
)
autocomplete_fields = ('site',)
search_fields = ('site__name', 'site__url')
list_display_links = ['edit']
list_editable = ['sort_id']
ordering = ('sort_id',)
# empty_value_display = '**'
inlines = (
StatusInlines,
SignInInlines
)
def bonus(self, obj: MySite):
status_today = obj.sitestatus_set.order_by('-pk').first()
return format_html(
'时魔:{} / '
'魔力:{}
'
'满魔:{} '
'积分/HP:{}',
round(float(status_today.sp_hour), 3) if status_today else 0,
status_today.my_sp if status_today else 0,
'{:.2%}'.format(float(status_today.sp_hour) / obj.site.sp_full) if status_today and obj.site.sp_full != 0 else 0,
status_today.my_bonus if status_today else 0
)
bonus.short_description = '魔力'
def userinfo(self, obj: MySite):
status = obj.sitestatus_set.order_by('-pk').first()
return format_html(
'等级:{} / '
'邀请:{}
'
'H&R:{}',
obj.my_level,
status.invitation if status else 0,
obj.my_hr if obj.my_hr else 0
)
userinfo.short_description = '用户信息'
def userdata(self, obj: MySite):
status_today = obj.sitestatus_set.order_by('-pk').first()
return format_html(
'已下载:{}
已上传:{}',
FileSizeConvert.parse_2_file_size(status_today.downloaded) if status_today else 0,
FileSizeConvert.parse_2_file_size(status_today.uploaded) if status_today else 0
)
userdata.short_description = '数据量'
def leeching_seeding(self, obj: MySite):
status_today = obj.sitestatus_set.order_by('-pk').first()
return format_html(
'做种:{} / 下载:{}
'
' 做种体积:{}',
status_today.seed if status_today else 0,
status_today.leech if status_today else 0,
FileSizeConvert.parse_2_file_size(status_today.seed_vol) if status_today else 0,
)
leeching_seeding.short_description = '下载/做种'
def edit(self, obj: MySite):
return '编辑'
edit.short_description = '操作'
# 自定义更新时间,提醒今日是否更新
def status_today(self, obj: MySite):
is_update = obj.sitestatus_set.filter(
updated_at__date__gte=datetime.today()
).first() and obj.updated_at.date() == datetime.today().date()
signin_today = obj.signin_set.filter(created_at__date__gte=datetime.today()).first()
if not obj.site.sign_in_support:
signin_str = 'yes'
else:
signin_str = 'yes' if signin_today and signin_today.sign_in_today else 'no'
return format_html(
'签到:
'
'更新:{}
'
'注册:{}',
signin_str,
datetime.strftime(obj.updated_at, '%Y-%m-%d %H:%M:%S') if obj else '',
'yes' if is_update else 'no',
datetime.strftime(obj.time_join, '%Y-%m-%d %H:%M:%S') if obj.time_join else ''
)
status_today.short_description = '状态'
# 签到过滤
class SignInFilter(admin.SimpleListFilter):
title = '今日签到' # 过滤标题显示为"以 英雄性别"
parameter_name = 'sign_in_state' # 过滤器使用的过滤字段
def lookups(self, request, model_admin):
return (
(False, '未签到'),
(True, '已签到'),
)
def queryset(self, request, queryset):
# print(queryset)
signin_list = SignIn.objects.filter(
created_at__date__gte=datetime.today(),
sign_in_today=True
).all()
# 已签到
pk_signin_list = [signin.site.pk for signin in signin_list]
# 加入无需签到
pk_signin_list.extend([my_site.pk for my_site in queryset if not my_site.site.sign_in_support])
# print(type(self.value()))
if self.value() is None:
return queryset
if bool(self.value()):
return queryset.exclude(pk__in=pk_signin_list)
if not bool(self.value()):
return queryset.filter(pk__in=pk_signin_list)
# 过滤未抓取个人数据站点
class UpdatedAtFilter(admin.SimpleListFilter):
title = '今日刷新' # 过滤标题显示为"以 英雄性别"
parameter_name = 'status_today' # 过滤器使用的过滤字段
def lookups(self, request, model_admin):
return (
(0, '未刷新'),
(1, '已刷新'),
)
def queryset(self, request, queryset):
update_list = MySite.objects.filter(updated_at__date__gte=datetime.today())
if self.value() is None:
return queryset
if int(self.value()) == 0:
return queryset.exclude(pk__in=update_list)
if int(self.value()) == 1:
return queryset.filter(pk__in=update_list)
list_filter = (SignInFilter, UpdatedAtFilter, 'my_level')
def site_name(self, obj: MySite):
template = ''
template_badge = """{}{}
"""
site = obj.site
if obj.mail == 0:
return format_html(template, site.logo, site.url, site.name)
return format_html(template.format(site.logo, site.url, template_badge.format(site.name, obj.mail)))
# return format_html(template_badge.format(template.format(site.logo, site.url, site.name), obj.mail))
site_name.short_description = format_html('站点')
"""
def sign_in_state(self, obj: MySite):
signin_today = obj.signin_set.filter(created_at__date__gte=datetime.today()).first()
if not obj.site.sign_in_support:
sign_template = '无需'
else:
sign_template = '
'.format(
'yes' if signin_today and signin_today.sign_in_today else 'no'
)
return format_html(sign_template)
sign_in_state.short_description = format_html('签到')
"""
# def get_changeform_initial_data(self, request):
# print(request)
# return super(MySiteAdmin, self).get_changeform_initial_data(request)
# 过滤字段
# list_filter = ('site', 'support')
# 顶部显示按钮
actions = ['sign_in', 'get_status', 'get_torrents', 'sign_in_celery', 'change_user_agent']
# 底部显示按钮
actions_on_bottom = True
# 尝试获取分类,但是各站差异太大
# def get_class(self, request, queryset):
# for my_site in queryset:
# response = pt_spider.send_torrent_info_request(my_site)
# if response.code == 0:
# res = response.data
# classes = pt_spider.parse(res, '//*[@id="ksearchboxmain"]//td/a/img[contains(@class,"c_")]/@class')
# title = pt_spider.parse(res, '//*[@id="ksearchboxmain"]//td/a/img[contains(@class,"c_")]/@title')
# alt = pt_spider.parse(res, '//*[@id="ksearchboxmain"]//td/a/img[contains(@class,"c_")]/@alt')
# print(my_site.site.name)
# for i, j, k in zip(classes, title, alt):
# print('class', i)
# print('title', j)
# print('alt', k)
# # for i in classes:
# # print('class', i)
# # for i in title:
# # print('title', i)
# # for i in alt:
# # print('alt', i)
# messages.success(request, my_site.site.name)
# messages.error(request, my_site.site.name)
def sign_in(self, request, queryset):
start = time.time()
queryset = [my_site for my_site in queryset if my_site.sign_in and
my_site.cookie and my_site.site.sign_in_support and
my_site.signin_set.filter(
created_at__date__gte=datetime.today(),
sign_in_today=True).count() <= 0]
if len(queryset) <= 0:
messages.success(request, '已签到或无需签到!')
results = pool.map(pt_spider.sign_in, queryset)
for my_site, result in zip(queryset, results):
print(my_site, result.code)
if result.code == StatusCodeEnum.OK.code:
messages.success(request, '{}:{}'.format(my_site.site.name, result.msg))
# elif result[0] == 503:
# messages.error(request, my_site.site.name + '签到失败!原因:5秒盾起作用了,别试了!')
else:
messages.error(request, '{} 签到失败!原因:{}'.format(my_site.site.name, result.msg))
end = time.time()
print('耗时:', end - start)
# 显示的文本,与django admin一致
sign_in.short_description = '签到'
# icon,参考element-ui icon与https://fontawesome.com
sign_in.icon = 'el-icon-star-on'
# 指定element-ui的按钮类型,参考https://element.eleme.cn/#/zh-CN/component/button
sign_in.type = 'success'
def change_user_agent(self, request, queryset):
# 这里的queryset 会有数据过滤,只包含选中的数据
post = request.POST
print(post)
# 这里获取到数据后,可以做些业务处理
# post中的_action 是方法名
# post中 _selected 是选中的数据,逗号分割
if not post.get('_selected'):
return JsonResponse(data={
'status': 'error',
'msg': '请先选中数据!'
})
else:
res = MySite.objects.filter(pk__in=post.get('_selected').split(',')).update(
user_agent=post.get('user_agent'))
return JsonResponse(data={
'status': 'success',
'msg': f'{res}个站点成功更换UA!'
})
# 显示的文本,与django admin一致
change_user_agent.short_description = '批量更换UA'
# icon,参考element-ui icon与https://fontawesome.com
change_user_agent.icon = 'el-icon-eleme'
# 指定element-ui的按钮类型,参考https://element.eleme.cn/#/zh-CN/component/button
change_user_agent.type = 'info'
change_user_agent.layer = {
# 这里指定对话框的标题
'title': '批量更换User-Agent',
# 提示信息
'tips': '请填写您获取Cookie时使用的浏览器的User-Agent',
# 确认按钮显示文本
'confirm_button': '确认提交',
# 取消按钮显示文本
'cancel_button': '取消',
# 弹出层对话框的宽度,默认50%
'width': '40%',
# 表单中 label的宽度,对应element-ui的 label-width,默认80px
'labelWidth': "0",
'params': [{
# 这里的type 对应el-input的原生input属性,默认为input
'type': 'input',
# key 对应post参数中的key
'key': 'user_agent',
# 显示的文本
# 'label': 'UserAgent',
# 为空校验,默认为False
'require': True
}]
}
# 获取站点个人数据
def get_status(self, request, queryset):
start = time.time()
# info_list = SiteStatus.objects.filter(update_date=datetime.now().date())
site_list = [my_site for my_site in queryset if my_site.site.get_userinfo_support]
results = pool.map(pt_spider.send_status_request, site_list)
message_template = MessageTemplate.status_message_template
for my_site, result in zip(site_list, results):
if result.code == StatusCodeEnum.OK.code:
res = pt_spider.parse_status_html(my_site, result.data)
# print(my_site.site, result)
if res.code == StatusCodeEnum.OK.code:
site_status = res.data[0]
if isinstance(site_status, SiteStatus):
message = '{}: {}'.format(my_site.site.name,
'信息获取成功!' if res.data[1] else '信息更新成功!')
# status = my_site.sitestatus_set.filter(created_at__date__gte=datetime.today()).first()
# print(status.ratio)
message += message_template.format(
my_site.site.name,
my_site.my_level,
site_status.my_sp,
site_status.sp_hour,
site_status.my_bonus,
site_status.ratio,
FileSizeConvert.parse_2_file_size(site_status.seed_vol),
FileSizeConvert.parse_2_file_size(site_status.uploaded),
FileSizeConvert.parse_2_file_size(site_status.downloaded),
site_status.seed,
site_status.leech,
site_status.invitation,
my_site.my_hr
)
messages.success(
request,
message=message)
else:
messages.error(
request,
'{} 信息更新失败!原因:{}'.format(my_site.site.name, res.msg))
else:
messages.error(
request,
'{} 信息更新失败!原因:'.format(my_site.site.name, res.msg))
else:
messages.error(
request,
'{} 信息更新失败!原因:'.format(my_site.site.name, result.msg))
end = time.time()
print('耗时:', end - start)
get_status.short_description = '更新数据'
# icon,参考element-ui icon与https://fontawesome.com
get_status.icon = 'el-icon-refresh'
# 指定element-ui的按钮类型,参考https://element.eleme.cn/#/zh-CN/component/button
get_status.type = 'primary'
# 拉取种子
def get_torrents(self, request, queryset):
start = time.time()
site_list = [my_site for my_site in queryset if my_site.site.get_torrent_support]
results = pool.map(pt_spider.send_torrent_info_request, site_list)
for my_site, result in zip(site_list, results):
# print(result is tuple[int])
if result.code == StatusCodeEnum.OK.code:
# print(my_site.site, result[0].content.decode('utf8'))
res = pt_spider.get_torrent_info_list(my_site, result.data)
if res.code == StatusCodeEnum.OK.code:
messages.success(
request,
'{} 种子抓取成功!新增种子{}条,更新种子{}条:'.format(my_site.site.name, res.data[0], res.data[1])
)
else:
messages.error(
request,
'{} 解析种子信息失败!原因:{}'.format(my_site.site.name, res.msg)
)
else:
messages.error(request, '{} 抓取种子信息失败!原因:{}'.format(my_site.site.name, result.msg))
end = time.time()
print('耗时:', end - start)
# 显示的文本,与django admin一致
get_torrents.short_description = '拉取促销种子'
# icon,参考element-ui icon与https://fontawesome.com
get_torrents.icon = 'el-icon-download'
# 指定element-ui的按钮类型,参考https://element.eleme.cn/#/zh-CN/component/button
get_torrents.type = 'warning'
fieldsets = (
['用户信息', {
'classes': ('collapse',), # CSS
'fields': (
('site', 'sign_in', 'get_info', 'hr', 'search'),
('user_id', 'passkey',),
('time_join',),
'user_agent',
'cookie',
),
}],
)
"""
@admin.register(SiteStatus)
class SiteStatusAdmin(ImportExportModelAdmin):
formats = (base_formats.XLS, base_formats.CSV, base_formats.JSON)
list_display = ['site',
'upload', 'download', 'ratio',
'my_sp', 'my_bonus',
'seed_vol',
'updated_at']
list_filter = ['site', 'updated_at']
list_display_links = None
ordering = ['site__sort_id']
autocomplete_fields = ('site',)
# 自定义更新时间,提醒今日是否更新
def upload(self, obj: SiteStatus):
return FileSizeConvert.parse_2_file_size(obj.uploaded)
def download(self, obj: SiteStatus):
return FileSizeConvert.parse_2_file_size(obj.downloaded)
upload.short_description = '上传量'
download.short_description = '下载量'
# 禁止添加按钮
def has_add_permission(self, request):
return False
# 禁止删除按钮
# def has_delete_permission(self, request, obj=None):
# return False
# 禁止修改按钮
def has_change_permission(self, request, obj=None):
return False
# def changelist_view(self, request, extra_context=None):
# default_filter = False
# try:
# ref = request.META['HTTP_REFERER']
# pinfo = request.META['PATH_INFO']
# qstr = ref.split(pinfo)
# # request.META['QUERY_STRING'] = 'update_date=' + str(datetime.now().date())
# # print(request.GET, len(qstr))
# # print(pinfo, qstr, ref)
# # print(qstr[1].split('='))
# # 没有参数时使用默认过滤器
# if len(qstr[1].split('=')) <= 1:
# default_filter = True
# # print(request.META['QUERY_STRING'])
# except:
# default_filter = True
# if default_filter:
# q = request.GET.copy()
# # 添加查询参数,默认为只查询当天数据
# q['updated_at'] = str(datetime.now().date())
# # print(q)
# request.GET = q
# # print(request.GET)
# request.META['QUERY_STRING'] = request.GET.urlencode()
# # print(request.META)
#
# return super(SiteStatusAdmin, self).changelist_view(request, extra_context=extra_context)
"""
@admin.register(Downloader)
class DownloaderAdmin(AjaxAdmin): # instead of ModelAdmin
# 显示字段
list_display = ('name', 'category', 'reserved_space', 'created_at', 'updated_at')
# 过滤字段
list_filter = ('name', 'category')
# 搜索
search_fields = ('name', 'category')
# 增加自定义按钮
actions = ['test_button']
def save_model(self, request, obj, form, change):
obj.save()
self.test_connect(request, obj)
def test_button(self, request, queryset):
for downloader in queryset:
self.test_connect(request, downloader)
# 连接测试
@staticmethod
def test_connect(request, downloader):
try:
conn = False
if downloader.category == 'Tr':
conn = transmission_rpc.Client(host=downloader.host, port=downloader.port,
username=downloader.username, password=downloader.password)
if downloader.category == 'Qb':
qb_client = qbittorrentapi.Client(host=downloader.host, port=downloader.port,
username=downloader.username, password=downloader.password)
qb_client.auth_log_in()
conn = qb_client.is_logged_in
if conn:
messages.success(request, '{} 连接成功!'.format(downloader.name))
except Exception as e:
# print(e)
messages.error(
request,
'连接失败!请确认下载器信息填写正确:'.format(downloader.name, e) # 输出异常
)
# return False, str(e)
# 显示的文本,与django admin一致
test_button.short_description = '测试连接'
# icon,参考element-ui icon与https://fontawesome.com
test_button.icon = 'fas fa-audio-description'
# 指定element-ui的按钮类型,参考https://element.eleme.cn/#/zh-CN/component/button
test_button.type = 'success'
# 给按钮追加自定义的颜色
# test_button.style = 'color:white;'
# 模型保存后的操作
# @receiver(post_save, sender=Downloader)
# def post_save_downloader(sender, **kwargs):
# print(kwargs['signal'].__attr__)
# print(sender.test_connect(kwargs['instance']))
def get_downloader():
"""获取下载器列表"""
try:
return [{'key': i.id, 'label': i.name} for i in Downloader.objects.all()]
except Exception as e:
return []
@admin.register(TorrentInfo)
class TorrentInfoAdmin(AjaxAdmin): # instead of ModelAdmin
# 显示字段
list_display = (
'name_href',
'title_href',
'site',
'state',
'hr',
# 'category',
'file_size',
'sale_status',
'seeders',
'leechers',
'completers',
'downloader',
'd_progress',
# 'add_a', # 增加种子链接按钮
'sale_expire',
# 'updated_at'
)
# list_display_links = ['name_href']
def file_size(self, torrent_info: TorrentInfo):
return FileSizeConvert.parse_2_file_size(torrent_info.size)
file_size.short_description = '文件大小'
# 过滤字段
list_filter = ('site', 'title', 'category', 'sale_status',)
# 搜索
search_fields = ('name', 'category')
# 分页
list_per_page = 50
def has_add_permission(self, request):
return False
def has_change_permission(self, request, obj=None):
return False
# 自定义样式
def add_a(self, obj):
return format_html(
# 下载种子
# '下载种子',
'下载种子',
obj.magnet_url
)
def name_href(self, obj: TorrentInfo):
return format_html(
# 下载种子
# '下载种子',
'{}',
# obj.magnet_url,
obj.name,
obj.name[0:25] + ' ...'
)
def title_href(self, obj: TorrentInfo):
return format_html(
# 下载种子
# '下载种子',
'{}',
obj.detail_url,
obj.title,
obj.title[0:20] + ' ...'
)
def d_progress(self, obj: TorrentInfo):
print(obj.hash_string)
if not obj.downloader or not obj.hash_string or len(obj.hash_string) < 32:
return 0
progress = 0
speed = 0
if obj.downloader.category == DownloaderCategory.Transmission:
tr_client = transmission_rpc.Client(
host=obj.downloader.host,
port=obj.downloader.port,
username=obj.downloader.username,
password=obj.downloader.password
)
print(obj.hash_string)
torrent = tr_client.get_torrent(obj.hash_string)
progress = torrent.progress
print(progress)
speed = round(torrent.rateDownload / 1024 / 1024, 2)
if obj.downloader.category == DownloaderCategory.qBittorrent:
pass
if 0 < progress < 100:
return format_html('{} MB/s', speed)
return format_html('{}%', torrent.progress)
# name_href.short_description = '种子名称'
name_href.short_description = format_html(
"""
{}
""", '种子名称'
)
title_href.short_description = '标题'
d_progress.short_description = '下载进度'
add_a.short_description = '下载链接'
# 增加自定义按钮
actions = ['to_download', 'update_state']
# 列表推导式来获取下载器
# downloader_list = [{'key': i.id, 'label': i.name} for i in Downloader.objects.all()]
def update_state(self, request, queryset):
for obj in queryset:
tr_client = transmission_rpc.Client(
host=obj.downloader.host,
port=obj.downloader.port,
username=obj.downloader.username,
password=obj.downloader.password
)
torrent = tr_client.get_torrent(int(obj.hash))
print(round(torrent.rateDownload / 1024 / 1024, 2))
def to_download(self, request, queryset):
# 这里的queryset 会有数据过滤,只包含选中的数据
post = request.POST
print(post)
downloader = Downloader.objects.filter(id=post.get('downloader')).first()
print(downloader)
save_path = post.get('save_path')
# 这里获取到数据后,可以做些业务处理
# post中的_action 是方法名
# post中 _selected 是选中的数据,逗号分割
if not post.get('_selected'):
return JsonResponse(data={
'status': 'error',
'msg': '请先选中数据!'
})
else:
# 计算选中种子总大小,单位:字节
total_size = 0
for torrent_info in queryset:
total_size += torrent_info.size
print(FileSizeConvert.parse_2_file_size(total_size))
if downloader.category == DownloaderCategory.Transmission:
try:
# c = Client(host='192.168.123.2', port=9091, username='ngfchl', password='.wq891222')
# qb_client.auth_log_in()
# print(qb_client.torrents)
tr_client = transmission_rpc.Client(host=downloader.host,
port=downloader.port,
username=downloader.username,
password=downloader.password)
# 判断剩余空间大小,小于预留空间则停止推送种子
free_space = tr_client.free_space(save_path)
print(FileSizeConvert.parse_2_file_size(free_space))
if free_space < total_size:
return JsonResponse(data={
'status': 'error',
'msg': '{}磁盘空间已不足,下载文件总大小{}请及时清理!'.format(
downloader.name,
FileSizeConvert.parse_2_file_size(total_size)
)
})
if free_space <= downloader.reserved_space * 1024 * 1024 * 1024:
print('空间不足!')
return JsonResponse(data={
'status': 'error',
'msg': '{}磁盘剩余空间{}已低于保留值{},请及时清理!'.format(
downloader.name,
FileSizeConvert.parse_2_file_size(free_space),
downloader.reserved_space
)
})
# torrent_list = [i.magnet_url for i in queryset]
for torrent_info in queryset:
# if not torrent_info.hash_string:
# pt_spider.get_hash(torrent_info=torrent_info)
# print(qb_client.torrent_categories.categories.get(torrent.category))
print(torrent_info.magnet_url)
print(torrent_info.site.mysite.cookie)
# res = qb_client.torrents_add(torrent.magnet_url)
res = tr_client.add_torrent(
torrent=torrent_info.magnet_url,
download_dir=save_path,
cookies=torrent_info.site.mysite.cookie
)
print(res)
if isinstance(res, Torrent):
print(res.id)
torrent_info.hash_string = res.hashString
torrent_info.state = True
torrent_info.downloader = downloader
torrent_info.save()
# return JsonResponse(data={
# 'status': 'success',
# 'msg': torrent_info.name + '推送成功!'
# })
# else:
return JsonResponse(data={
'status': 'success',
'msg': '推送结束!'
})
except Exception as e:
print(traceback.format_exc(limit=3))
# raise
return JsonResponse(data={
'status': 'error',
'msg': '{}!'.format(str(e))
})
if downloader.category == DownloaderCategory.qBittorrent:
qb_client = qbittorrentapi.Client(
host=downloader.host,
port=downloader.port,
username=downloader.username,
password=downloader.password,
# 仅返回简单JSON
# SIMPLE_RESPONSES=True
)
try:
print('开始登录下载器', downloader.name)
qb_client.auth_log_in()
# 判断剩余空间是否满足要求
free_space = qb_client.sync.maindata().server_state.free_space_on_disk
print('下载器剩余空间:', FileSizeConvert.parse_2_file_size(free_space))
print('种子总大小:', FileSizeConvert.parse_2_file_size(total_size))
if free_space <= total_size:
return JsonResponse(data={
'status': 'error',
'msg': '{}磁盘空间已不足,下载文件总大小{}请及时清理!'.format(
downloader.name,
FileSizeConvert.parse_2_file_size(total_size)
)
})
if free_space <= downloader.reserved_space * 1024 * 1024 * 1024:
return JsonResponse(data={
'status': 'error',
'msg': '{}磁盘剩余空间{}已低于保留值{},请及时清理!'.format(
downloader.name,
FileSizeConvert.parse_2_file_size(free_space),
downloader.reserved_space
)
})
for torrent_info in queryset:
print('创建种子标签', torrent_info.detail_url)
# qb_client.torrents_create_category(torrent_info.detail_url)
res = qb_client.torrents.add(
# 种子链接
urls=torrent_info.magnet_url,
cookie=torrent_info.site.mysite.cookie,
# 保存路径
download_path=save_path,
# 自动管理种子
use_auto_torrent_management=True,
# 任务标签,用于和种子信息关联
# category=torrent_info.detail_url,
# 跳过HASH检查
# is_skip_checking=True
)
print(res)
# if res == 'Ok.':
# print(torrent_info.magnet_url)
# torrent = qb_client.torrents.info(category=torrent_info.detail_url)
# print(len(torrent))
# torrent_info.hash_string = torrent[0].hash
# torrent_info.save()
# return JsonResponse(data={
# 'status': 'success',
# 'msg': torrent_info.name + '推送成功!'
# })
return JsonResponse(data={
'status': 'success',
'msg': '推送结束!请自行检查是否推送成功!'
})
except Exception as e:
return JsonResponse(data={
'status': 'error',
'msg': '{}!'.format(str(e))
})
# 显示的文本,与django admin一致
to_download.short_description = '推送到下载器'
update_state.short_description = '更新种子'
# icon,参考element-ui icon与https://fontawesome.com
to_download.icon = 'el-icon-upload'
update_state.icon = 'el-icon-refresh'
# 指定element-ui的按钮类型,参考https://element.eleme.cn/#/zh-CN/component/button
to_download.type = 'warning'
update_state.type = 'success'
# 给按钮追加自定义的颜色
# test_button.style = 'color:white;'
# 这里的layer配置是动态的,可以根据需求返回不同的配置
# 这里的queryset 或根据搜索条件来过滤数据
# def async_get_layer_config(self, request, queryset):
# """
# 这个方法只有一个request参数,没有其他的入参
# """
# # 模拟处理业务耗时
# time.sleep(2)
# 可以根据request的用户,来动态设置返回哪些字段,每次点击都会来获取配置显示
to_download.layer = {
# 弹出层中的输入框配置
# 这里指定对话框的标题
'title': '推送到下载器',
# 提示信息
'tips': '请自行检查是否种子是否推送成功!!',
# 确认按钮显示文本
'confirm_button': '确认提交',
# 取消按钮显示文本
'cancel_button': '取消',
# 弹出层对话框的宽度,默认50%
'width': '40%',
# 表单中 label的宽度,对应element-ui的 label-width,默认80px
'labelWidth': "80px",
'params': [{
'type': 'select',
'key': 'downloader',
'label': '下载器',
'width': '200px',
# size对应elementui的size,取值为:medium / small / mini
'size': 'small',
# value字段可以指定默认值
'value': '',
'require': True,
# 列表推导式来获取下载器
'options': get_downloader()
}, {
# 这里的type 对应el-input的原生input属性,默认为input
'type': 'input',
# key 对应post参数中的key
'key': 'save_path',
# 显示的文本
'label': '保存路径',
'value': '/downloads',
'width': '200px',
# 为空校验,默认为False
'require': True
}, ]
}