mirror of
https://github.com/RobbieHan/sandboxMP.git
synced 2026-02-03 10:53:15 +08:00
119 lines
4.4 KiB
Python
119 lines
4.4 KiB
Python
# @Time : 2018/12/29 19:25
|
|
# @Author : RobbieHan
|
|
# @File : views_scan.py
|
|
|
|
import ast
|
|
import logging
|
|
from ruamel import yaml
|
|
|
|
from django.views.generic import View, TemplateView
|
|
from django.http import JsonResponse
|
|
from django.shortcuts import render, get_object_or_404
|
|
|
|
from celery_once import AlreadyQueued
|
|
|
|
from system.mixin import LoginRequiredMixin
|
|
from custom import BreadcrumbMixin, SandboxListView, SandboxDeleteView
|
|
from utils.sandbox_utils import ConfigFileMixin
|
|
from system.models import Menu
|
|
from .models import (DeviceScanInfo, ConnectionInfo, DeviceInfo,
|
|
ConnectionAbstract, DeviceAbstract)
|
|
from .tasks import scan_execution
|
|
|
|
error_logger = logging.getLogger('sandbox_error')
|
|
|
|
|
|
class ScanConfigView(LoginRequiredMixin, BreadcrumbMixin, ConfigFileMixin, View):
|
|
|
|
def get(self, request):
|
|
menu = Menu.get_menu_by_request_url(request.path_info)
|
|
template_name = 'cmdb/scan_config.html'
|
|
context = self.get_conf_content()
|
|
context.update(menu)
|
|
return render(request, template_name, context)
|
|
|
|
def post(self, request):
|
|
ret = dict(result=False)
|
|
config = dict()
|
|
hosts = request.POST
|
|
try:
|
|
config['net_address'] = ast.literal_eval(hosts['net_address'])
|
|
config['ssh_username'] = hosts['ssh_username']
|
|
config['ssh_port'] = hosts['ssh_port']
|
|
config['ssh_password'] = hosts['ssh_password']
|
|
config['ssh_private_key'] = hosts['ssh_private_key']
|
|
config['commands'] = ast.literal_eval(hosts['commands'])
|
|
config['auth_type'] = hosts['auth_type']
|
|
config['scan_type'] = hosts['scan_type']
|
|
config['email'] = hosts['email']
|
|
config['send_email'] = hosts['send_email']
|
|
data = dict(hosts=config)
|
|
config_file = self.get_config_file()
|
|
with open(config_file, 'w', encoding='utf-8') as f:
|
|
yaml.dump(data, f, Dumper=yaml.RoundTripDumper, indent=4)
|
|
ret['result'] = True
|
|
except Exception as e:
|
|
error_logger.error(e)
|
|
|
|
return JsonResponse(ret)
|
|
|
|
|
|
class DeviceScanView(LoginRequiredMixin, BreadcrumbMixin, TemplateView):
|
|
template_name = 'cmdb/device_scan.html'
|
|
|
|
|
|
class DeviceScanListView(SandboxListView):
|
|
model = DeviceScanInfo
|
|
fields = ['id', 'sys_hostname', 'hostname', 'mac_address', 'auth_type', 'status', 'os_type', 'device_type']
|
|
|
|
|
|
class DeviceScanDetailView(LoginRequiredMixin, View):
|
|
|
|
def get(self, request):
|
|
ret = Menu.get_menu_by_request_url(request.path_info)
|
|
if 'id' in request.GET and request.GET['id']:
|
|
device = get_object_or_404(DeviceScanInfo, pk=int(request.GET['id']))
|
|
ret['device'] = device
|
|
return render(request, 'cmdb/device_scan_detail.html', ret)
|
|
|
|
|
|
class DeviceScanDeleteView(SandboxDeleteView):
|
|
model = DeviceScanInfo
|
|
|
|
|
|
class DeviceScanExecView(LoginRequiredMixin, View):
|
|
|
|
def get(self, request):
|
|
ret = dict(status='fail')
|
|
try:
|
|
scan_execution.delay()
|
|
ret['status'] = 'success'
|
|
except AlreadyQueued:
|
|
ret['status'] = 'already_queued'
|
|
return JsonResponse(ret)
|
|
|
|
|
|
class DeviceScanInboundView(LoginRequiredMixin, View):
|
|
def post(self, request):
|
|
ret = dict(result=False)
|
|
login_succeed = list(DeviceScanInfo.objects.filter(status='succeed').values())
|
|
connection_fields = [field.name for field in ConnectionAbstract._meta.fields if field.name is not 'id']
|
|
device_fields = [field.name for field in DeviceAbstract._meta.fields if field.name is not 'id']
|
|
device_fields.append('hostname')
|
|
for host in login_succeed:
|
|
connection_defaults = {key: host[key] for key in host.keys() & connection_fields}
|
|
device_defaults = {key: host[key] for key in host.keys() & device_fields}
|
|
connection_info, _ = ConnectionInfo.objects.update_or_create(
|
|
hostname=host['hostname'],
|
|
defaults=connection_defaults
|
|
)
|
|
connection_id = int(getattr(connection_info, 'id'))
|
|
device_defaults['dev_connection'] = connection_id
|
|
device_defaults['changed_by_id'] = request.user.id
|
|
DeviceInfo.objects.update_or_create(
|
|
hostname=host['hostname'],
|
|
defaults=device_defaults
|
|
)
|
|
ret['result'] = True
|
|
return JsonResponse(ret)
|