Files
seal/sql/views.py
2019-07-09 17:39:08 +08:00

207 lines
7.3 KiB
Python

import logging
from django.contrib.auth.mixins import LoginRequiredMixin, PermissionRequiredMixin
from django.views.generic import ListView, View, CreateView, UpdateView, DetailView
from django.shortcuts import render, HttpResponse
from sql.form import DatabaseForm
from sql.models import database
import json
from django.db.models import Q
import subprocess
from sql.handle import con_database
import pymysql
import prettytable as pt
tb = pt.PrettyTable()
logger = logging.getLogger('sql')
class SqlDdl(LoginRequiredMixin, CreateView):
model = database
form_class = DatabaseForm
template_name = 'sql/sql-ddl.html'
class SqlDdlQuery(LoginRequiredMixin, View):
model = database
def get(self, request, pk):
region = request.GET.get("region")
name = request.GET.get("name")
data_base = request.GET.get("data_base")
table = request.GET.get("table")
ret = {"data": []}
if pk == "name":
obj = database.objects.filter(region=region)
for i in obj:
ret['data'].append(i.name)
elif pk == "databases":
obj = database.objects.get(name=name)
try:
with con_database.SQLgo(
ip=obj.address,
user=obj.username,
password=obj.get_password(),
port=obj.port
) as f:
ret['data'] = f.baseItems(sql='show databases')
except Exception as e:
print(e)
elif pk == "tables":
obj = database.objects.get(name=name)
try:
with con_database.SQLgo(
ip=obj.address,
user=obj.username,
password=obj.get_password(),
port=obj.port,
db=data_base
) as f:
ret['data'] = f.baseItems(sql='show tables')
except Exception as e:
print(e)
elif pk == "structure":
obj = database.objects.get(name=name)
try:
with con_database.SQLgo(
ip=obj.address,
user=obj.username,
password=obj.get_password(),
port=obj.port,
db=data_base
) as f:
field = f.gen_alter(table_name=table) # 表结构详情
idx = f.index(table_name=table) # 索引
ret['data'] = {'idx': idx, 'field': field}
except Exception as e:
print(e)
else:
pass
print(ret)
return HttpResponse(json.dumps(ret))
def post(self, request, pk):
ret = {"data": []}
name = request.GET.get("name")
backup = request.GET.get("backup")
# 语法检查
if pk == "advice":
sql = request.POST.get("sql")
sql = sql.replace("\"", "'")
sql_cmd = f'echo "{sql}" | ./sql/bin/soar '
print(sql_cmd)
cmd = subprocess.Popen(sql_cmd, shell=True, stdout=subprocess.PIPE)
cmd = cmd.communicate()
cmd = cmd[0].decode().rstrip()
ret['data'] = cmd
# SQL检测
elif pk == "sql_test":
sql_post = request.POST.get("sql")
sql_post = sql_post.replace("\"", "'")
obj = database.objects.get(name=name)
sql = f'''/*--user={obj.username};--password={obj.get_password()};--host={obj.address};--check=1;--port={obj.port};--execute=0;--backup={backup};*/
inception_magic_start;
{sql_post}
inception_magic_commit;'''
conn = pymysql.connect(host='127.0.0.1', user='', passwd='',
db='', port=4000, charset="utf8mb4")
cur = conn.cursor()
cur.execute(sql)
result = cur.fetchall()
cur.close()
conn.close()
tb.field_names = [i[0] for i in cur.description]
for row in result:
tb.add_row(row)
print(tb)
for i in result:
ret['data'].append({
"order_id":i[0],
"stage": i[1],
"error_level": i[2],
"stage_status": i[3],
"error_message": i[4],
"sql": i[5],
"affected_rows": i[6],
})
ret['error'] = 0
for j in ret['data']:
if j['error_level'] != 0:
ret['error'] = 1
# sql执行
elif pk == "sql_exe":
sql_post = request.POST.get("sql")
sql_post = sql_post.replace("\"", "'")
obj = database.objects.get(name=name)
# 先检查
sql1 = f'''/*--user={obj.username};--password={obj.get_password()};--host={obj.address};--check=1;--port={obj.port};--execute=0;--backup={backup};*/
inception_magic_start;
{sql_post}
inception_magic_commit;'''
conn = pymysql.connect(host='127.0.0.1', user='', passwd='',
db='', port=4000, charset="utf8mb4")
cur = conn.cursor()
cur.execute(sql1)
result = cur.fetchall()
cur.close()
conn.close()
tb.field_names = [i[0] for i in cur.description]
for row in result:
tb.add_row(row)
print(tb)
for i in result:
ret['data'].append({
"order_id": i[0],
"stage": i[1],
"error_level": i[2],
"stage_status": i[3],
"error_message": i[4],
"sql": i[5],
"affected_rows": i[6],
})
for j in ret['data']:
if j['error_level'] != 0:
ret['error'] = "检查未通过,禁止执行"
return HttpResponse(json.dumps(ret))
# 检查通过 执行
sql2 = f'''/*--user={obj.username};--password={obj.get_password()};--host={obj.address};--check=0;--port={obj.port};--execute=1;--backup={backup};*/
inception_magic_start;
{sql_post}
inception_magic_commit;'''
conn = pymysql.connect(host='127.0.0.1', user='', passwd='',
db='', port=4000, charset="utf8mb4")
cur = conn.cursor()
cur.execute(sql2)
result = cur.fetchall()
cur.close()
conn.close()
tb.field_names = [i[0] for i in cur.description]
for row in result:
tb.add_row(row)
print(tb)
ret = {"data": []}
for i in result:
ret['data'].append({
"order_id":i[0],
"stage": i[1],
"error_level": i[2],
"stage_status": i[3],
"error_message": i[4],
"sql": i[5],
"affected_rows": i[6],
})
else:
pass
print(ret)
return HttpResponse(json.dumps(ret))