Files
sandboxMP/apps/cmdb/views_scan.py
RobbieHan a36f8d74f6 cabinet
2019-01-25 14:34:42 +08:00

119 lines
4.4 KiB
Python

# @Time : 2018/12/29 19:25
# @Author : RobbieHan
# @File : views_scan.py
import ast
import logging
from ruamel import yaml
from django.views.generic import View, TemplateView
from django.http import JsonResponse
from django.shortcuts import render, get_object_or_404
from celery_once import AlreadyQueued
from system.mixin import LoginRequiredMixin
from custom import BreadcrumbMixin, SandboxListView, SandboxDeleteView
from utils.sandbox_utils import ConfigFileMixin
from system.models import Menu
from .models import (DeviceScanInfo, ConnectionInfo, DeviceInfo,
ConnectionAbstract, DeviceAbstract)
from .tasks import scan_execution
error_logger = logging.getLogger('sandbox_error')
class ScanConfigView(LoginRequiredMixin, BreadcrumbMixin, ConfigFileMixin, View):
def get(self, request):
menu = Menu.get_menu_by_request_url(request.path_info)
template_name = 'cmdb/scan_config.html'
context = self.get_conf_content()
context.update(menu)
return render(request, template_name, context)
def post(self, request):
ret = dict(result=False)
config = dict()
hosts = request.POST
try:
config['net_address'] = ast.literal_eval(hosts['net_address'])
config['ssh_username'] = hosts['ssh_username']
config['ssh_port'] = hosts['ssh_port']
config['ssh_password'] = hosts['ssh_password']
config['ssh_private_key'] = hosts['ssh_private_key']
config['commands'] = ast.literal_eval(hosts['commands'])
config['auth_type'] = hosts['auth_type']
config['scan_type'] = hosts['scan_type']
config['email'] = hosts['email']
config['send_email'] = hosts['send_email']
data = dict(hosts=config)
config_file = self.get_config_file()
with open(config_file, 'w', encoding='utf-8') as f:
yaml.dump(data, f, Dumper=yaml.RoundTripDumper, indent=4)
ret['result'] = True
except Exception as e:
error_logger.error(e)
return JsonResponse(ret)
class DeviceScanView(LoginRequiredMixin, BreadcrumbMixin, TemplateView):
template_name = 'cmdb/device_scan.html'
class DeviceScanListView(SandboxListView):
model = DeviceScanInfo
fields = ['id', 'sys_hostname', 'hostname', 'mac_address', 'auth_type', 'status', 'os_type', 'device_type']
class DeviceScanDetailView(LoginRequiredMixin, View):
def get(self, request):
ret = Menu.get_menu_by_request_url(request.path_info)
if 'id' in request.GET and request.GET['id']:
device = get_object_or_404(DeviceScanInfo, pk=int(request.GET['id']))
ret['device'] = device
return render(request, 'cmdb/device_scan_detail.html', ret)
class DeviceScanDeleteView(SandboxDeleteView):
model = DeviceScanInfo
class DeviceScanExecView(LoginRequiredMixin, View):
def get(self, request):
ret = dict(status='fail')
try:
scan_execution.delay()
ret['status'] = 'success'
except AlreadyQueued:
ret['status'] = 'already_queued'
return JsonResponse(ret)
class DeviceScanInboundView(LoginRequiredMixin, View):
def post(self, request):
ret = dict(result=False)
login_succeed = list(DeviceScanInfo.objects.filter(status='succeed').values())
connection_fields = [field.name for field in ConnectionAbstract._meta.fields if field.name is not 'id']
device_fields = [field.name for field in DeviceAbstract._meta.fields if field.name is not 'id']
device_fields.append('hostname')
for host in login_succeed:
connection_defaults = {key: host[key] for key in host.keys() & connection_fields}
device_defaults = {key: host[key] for key in host.keys() & device_fields}
connection_info, _ = ConnectionInfo.objects.update_or_create(
hostname=host['hostname'],
defaults=connection_defaults
)
connection_id = int(getattr(connection_info, 'id'))
device_defaults['dev_connection'] = connection_id
device_defaults['changed_by_id'] = request.user.id
DeviceInfo.objects.update_or_create(
hostname=host['hostname'],
defaults=device_defaults
)
ret['result'] = True
return JsonResponse(ret)