mirror of
https://github.com/hequan2017/seal.git
synced 2026-02-04 02:33:26 +08:00
207 lines
7.3 KiB
Python
207 lines
7.3 KiB
Python
import logging
|
|
from django.contrib.auth.mixins import LoginRequiredMixin, PermissionRequiredMixin
|
|
from django.views.generic import ListView, View, CreateView, UpdateView, DetailView
|
|
from django.shortcuts import render, HttpResponse
|
|
from sql.form import DatabaseForm
|
|
from sql.models import database
|
|
import json
|
|
from django.db.models import Q
|
|
import subprocess
|
|
from sql.handle import con_database
|
|
import pymysql
|
|
import prettytable as pt
|
|
tb = pt.PrettyTable()
|
|
logger = logging.getLogger('sql')
|
|
|
|
|
|
class SqlDdl(LoginRequiredMixin, CreateView):
|
|
model = database
|
|
form_class = DatabaseForm
|
|
template_name = 'sql/sql-ddl.html'
|
|
|
|
|
|
class SqlDdlQuery(LoginRequiredMixin, View):
|
|
model = database
|
|
|
|
def get(self, request, pk):
|
|
region = request.GET.get("region")
|
|
name = request.GET.get("name")
|
|
data_base = request.GET.get("data_base")
|
|
table = request.GET.get("table")
|
|
ret = {"data": []}
|
|
if pk == "name":
|
|
obj = database.objects.filter(region=region)
|
|
for i in obj:
|
|
ret['data'].append(i.name)
|
|
|
|
elif pk == "databases":
|
|
obj = database.objects.get(name=name)
|
|
try:
|
|
with con_database.SQLgo(
|
|
ip=obj.address,
|
|
user=obj.username,
|
|
password=obj.get_password(),
|
|
port=obj.port
|
|
) as f:
|
|
ret['data'] = f.baseItems(sql='show databases')
|
|
except Exception as e:
|
|
print(e)
|
|
elif pk == "tables":
|
|
obj = database.objects.get(name=name)
|
|
try:
|
|
with con_database.SQLgo(
|
|
ip=obj.address,
|
|
user=obj.username,
|
|
password=obj.get_password(),
|
|
port=obj.port,
|
|
db=data_base
|
|
) as f:
|
|
ret['data'] = f.baseItems(sql='show tables')
|
|
except Exception as e:
|
|
print(e)
|
|
elif pk == "structure":
|
|
obj = database.objects.get(name=name)
|
|
try:
|
|
with con_database.SQLgo(
|
|
ip=obj.address,
|
|
user=obj.username,
|
|
password=obj.get_password(),
|
|
port=obj.port,
|
|
db=data_base
|
|
) as f:
|
|
field = f.gen_alter(table_name=table) # 表结构详情
|
|
idx = f.index(table_name=table) # 索引
|
|
|
|
ret['data'] = {'idx': idx, 'field': field}
|
|
except Exception as e:
|
|
print(e)
|
|
else:
|
|
pass
|
|
print(ret)
|
|
return HttpResponse(json.dumps(ret))
|
|
|
|
def post(self, request, pk):
|
|
ret = {"data": []}
|
|
name = request.GET.get("name")
|
|
backup = request.GET.get("backup")
|
|
# 语法检查
|
|
if pk == "advice":
|
|
sql = request.POST.get("sql")
|
|
sql = sql.replace("\"", "'")
|
|
|
|
sql_cmd = f'echo "{sql}" | ./sql/bin/soar '
|
|
print(sql_cmd)
|
|
cmd = subprocess.Popen(sql_cmd, shell=True, stdout=subprocess.PIPE)
|
|
cmd = cmd.communicate()
|
|
cmd = cmd[0].decode().rstrip()
|
|
ret['data'] = cmd
|
|
# SQL检测
|
|
elif pk == "sql_test":
|
|
sql_post = request.POST.get("sql")
|
|
sql_post = sql_post.replace("\"", "'")
|
|
obj = database.objects.get(name=name)
|
|
sql = f'''/*--user={obj.username};--password={obj.get_password()};--host={obj.address};--check=1;--port={obj.port};--execute=0;--backup={backup};*/
|
|
inception_magic_start;
|
|
{sql_post}
|
|
inception_magic_commit;'''
|
|
|
|
conn = pymysql.connect(host='127.0.0.1', user='', passwd='',
|
|
db='', port=4000, charset="utf8mb4")
|
|
cur = conn.cursor()
|
|
cur.execute(sql)
|
|
result = cur.fetchall()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
tb.field_names = [i[0] for i in cur.description]
|
|
for row in result:
|
|
tb.add_row(row)
|
|
print(tb)
|
|
|
|
for i in result:
|
|
ret['data'].append({
|
|
"order_id":i[0],
|
|
"stage": i[1],
|
|
"error_level": i[2],
|
|
"stage_status": i[3],
|
|
"error_message": i[4],
|
|
"sql": i[5],
|
|
"affected_rows": i[6],
|
|
})
|
|
ret['error'] = 0
|
|
for j in ret['data']:
|
|
if j['error_level'] != 0:
|
|
ret['error'] = 1
|
|
|
|
# sql执行
|
|
elif pk == "sql_exe":
|
|
sql_post = request.POST.get("sql")
|
|
sql_post = sql_post.replace("\"", "'")
|
|
obj = database.objects.get(name=name)
|
|
# 先检查
|
|
sql1 = f'''/*--user={obj.username};--password={obj.get_password()};--host={obj.address};--check=1;--port={obj.port};--execute=0;--backup={backup};*/
|
|
inception_magic_start;
|
|
{sql_post}
|
|
inception_magic_commit;'''
|
|
conn = pymysql.connect(host='127.0.0.1', user='', passwd='',
|
|
db='', port=4000, charset="utf8mb4")
|
|
cur = conn.cursor()
|
|
cur.execute(sql1)
|
|
result = cur.fetchall()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
tb.field_names = [i[0] for i in cur.description]
|
|
for row in result:
|
|
tb.add_row(row)
|
|
print(tb)
|
|
|
|
for i in result:
|
|
ret['data'].append({
|
|
"order_id": i[0],
|
|
"stage": i[1],
|
|
"error_level": i[2],
|
|
"stage_status": i[3],
|
|
"error_message": i[4],
|
|
"sql": i[5],
|
|
"affected_rows": i[6],
|
|
})
|
|
|
|
for j in ret['data']:
|
|
if j['error_level'] != 0:
|
|
ret['error'] = "检查未通过,禁止执行"
|
|
return HttpResponse(json.dumps(ret))
|
|
# 检查通过 执行
|
|
sql2 = f'''/*--user={obj.username};--password={obj.get_password()};--host={obj.address};--check=0;--port={obj.port};--execute=1;--backup={backup};*/
|
|
inception_magic_start;
|
|
{sql_post}
|
|
inception_magic_commit;'''
|
|
|
|
conn = pymysql.connect(host='127.0.0.1', user='', passwd='',
|
|
db='', port=4000, charset="utf8mb4")
|
|
cur = conn.cursor()
|
|
cur.execute(sql2)
|
|
result = cur.fetchall()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
tb.field_names = [i[0] for i in cur.description]
|
|
for row in result:
|
|
tb.add_row(row)
|
|
print(tb)
|
|
ret = {"data": []}
|
|
for i in result:
|
|
ret['data'].append({
|
|
"order_id":i[0],
|
|
"stage": i[1],
|
|
"error_level": i[2],
|
|
"stage_status": i[3],
|
|
"error_message": i[4],
|
|
"sql": i[5],
|
|
"affected_rows": i[6],
|
|
})
|
|
else:
|
|
pass
|
|
print(ret)
|
|
return HttpResponse(json.dumps(ret))
|