v0.4.2 优化k8s webssh 连接

This commit is contained in:
何全
2019-06-18 10:29:09 +08:00
parent fa84943f18
commit e7be025c86
5 changed files with 89 additions and 110 deletions

View File

@@ -18,7 +18,7 @@
> 作者会在周末进行开发、更新。
> 支持 k8s webssh 管理pod
> 支持 k8s webssh 管理 pod
## 介绍
@@ -35,7 +35,7 @@
* 一期: 基础模板 (已完成)
* 二期: k8s管理平台 (开发中)
* node/service/pod 列表 (已完成)
* pod webssh (已完成)
* pod webssh (已完成 通过调用 k8s api 进行执行命令)
* 三期: mysql sql语句审核(开发中)
* 引擎 goInception
@@ -147,14 +147,6 @@ nohup python36 manage.py runserver 0.0.0.0:8001 >> /tmp/http.log 2>&1 &
## K8S
Token = "eyJhbGciOiJSUzI1NiIsImtpZCI6IiJ9.eyJpc3MiOiJrdWJlcm5ldGVzL3NlcnZpY2VhY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc3BhY2UiOiJrdWJlLXN5c3RlbSIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VjcmV0Lm5hbWUiOiJkYXNoYm9hcmQtYWRtaW4tdG9rZW4tZGhobWMiLCJrdWJlcm5ldGVzLmlvL3NlcnZpY2VhY2NvdW50L3NlcnZpY2UtYWNjb3VudC5uYW1lIjoiZGFzaGJvYXJkLWFkbWluIiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9zZXJ2aWNlLWFjY291bnQudWlkIjoiOThkMDcwZWItODc1Yy0xMWU5LWE1MzgtMDAwYzI5N2I0ZmU3Iiwic3ViIjoic3lzdGVtOnNlcnZpY2VhY2NvdW50Omt1YmUtc3lzdGVtOmRhc2hib2FyZC1hZG1pbiJ9.XDFpez2E84R_zlopt_uEHPvVGUtSavypyix6UcYJO3J4imHdJy7MEkfV-wltBA1H8x0TT2AW64rLlXaRJ8OkFWJ0myedfKdjnf7i0oLQ8j-7lw6rT3A0e2pKmpnOaBQfgzRm83-t2I5MMp3Iu9VNUiAbqQpjql4AKwRuJEEGCs99tKStUxzIsJKusmUHh9KAK4BAxySn9h16T2URZ7czLP4mty2crYWNV4KwSwFPthGhFPsl8mnet_hiV5k4me5a8frmXytOy64MmGW8w3TBgiM-7hBYSxt84QGGnyi84LU0EFgtLwBWEOTZeUKKQ6IkoAprMmNcSxX8WUJFlx_uJg"
APISERVER = 'https://192.168.100.111:6443'
## k8s webssh 有权限执行 kubectl exec -it 的主机
webssh_ip = "192.168.100.111"
webssh_port = "22"
webssh_username = "root"
webssh_password = "1qaz.2wsx"
webssh_name = "root@k8s-master" # 终端显示的名字 是为了判断终端退出用
```

View File

@@ -1,99 +1,61 @@
from asgiref.sync import async_to_sync
from channels.generic.websocket import WebsocketConsumer
import paramiko
import threading
import time
from seal import settings
from channels.layers import get_channel_layer
from k8s.k8sApi.core import K8sApi
channel_layer = get_channel_layer()
class MyThread(threading.Thread):
def __init__(self, chan):
class K8SStreamThread(threading.Thread):
def __init__(self, ws, container_stream):
threading.Thread.__init__(self)
self.chan = chan
self.number = 0
self.ws = ws
self.stream = container_stream
def run(self):
while not self.chan.chan.exit_status_ready():
# while not self.ws.exit_status_ready():
while self.stream.is_open():
time.sleep(0.1)
if not self.stream.is_open():
self.ws.close()
try:
data = self.chan.chan.recv(1024)
str_data = data.decode(encoding='utf-8')
if getattr(settings, 'webssh_name') in data.decode(encoding='utf-8'):
self.number += 1
if "kubectl exec -it" in str_data:
# 不返回内容
pass
else:
if "rpc error" in str_data:
async_to_sync(self.chan.channel_layer.group_send)(
self.chan.scope['user'].username,
{
"type": "user.message",
"text": "连接错误,已断开连接! 此 pod 不支持sh 或者其他未知错误!\r"
},
)
self.chan.sshclient.close()
elif self.number > 1:
async_to_sync(self.chan.channel_layer.group_send)(
self.chan.scope['user'].username,
{
"type": "user.message",
"text": "程序退出,已断开连接!\r"
},
)
self.chan.sshclient.close()
else:
async_to_sync(self.chan.channel_layer.group_send)(
self.chan.scope['user'].username,
{
"type": "user.message",
"text": bytes.decode(data)
},
)
except Exception as ex:
pass
self.chan.sshclient.close()
return False
if self.stream.peek_stdout():
stdout = self.stream.read_stdout()
self.ws.send(stdout)
if self.stream.peek_stderr():
stderr = self.stream.read_stderr()
self.ws.send(stderr)
except Exception as err:
self.ws.close()
class EchoConsumer(WebsocketConsumer):
def connect(self):
# 创建channels group 命名为:用户名 (最好不要中文名字)并使用channel_layer写入到redis
# 创建channels group 命名为:用户名 (最好不要中文名字,这里会用名字 建立一个通道,通过这个通道进行通信)并使用channel_layer写入到redis
async_to_sync(self.channel_layer.group_add)(self.scope['user'].username, self.channel_name)
self.sshclient = paramiko.SSHClient()
self.sshclient.load_system_host_keys()
self.sshclient.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.sshclient.connect(getattr(settings, 'webssh_ip'), getattr(settings, 'webssh_port'),
getattr(settings, 'webssh_username'), getattr(settings, 'webssh_password'))
self.chan = self.sshclient.invoke_shell(term='xterm')
self.chan.settimeout(0)
t1 = MyThread(self)
t1.setDaemon(True)
t1.start()
# 可以在这里根据 用户 要访问的pod 进行 权限控制
path = self.scope['path'].split('/')
cmd = f"kubectl exec -it {path[2]} -n {path[3]} sh \r"
self.chan.send(cmd)
try:
k = K8sApi()
self.container_stream = k.terminal_start(namespace=path[3], pod_name=path[2], container="")
kub_stream = K8SStreamThread(self, self.container_stream)
kub_stream.start()
except Exception as err:
return
self.accept()
def receive(self, text_data):
try:
self.chan.send(text_data)
self.container_stream.write_stdin(text_data)
except Exception as ex:
pass
# print(str(ex))
self.container_stream.write_stdin('exit\r')
def user_message(self, event):
self.send(text_data=event["text"])

View File

@@ -40,30 +40,25 @@ class K8sApi(object):
ret_pod = client_v1.read_namespaced_pod(name, namespace)
return ret_pod
def get_namespace_list(self):
def terminal_start(self, namespace, pod_name, container):
command = [
"/bin/sh",
"-c",
'TERM=xterm-256color; export TERM; [ -x /bin/bash ] '
'&& ([ -x /usr/bin/script ] '
'&& /usr/bin/script -q -c "/bin/bash" /dev/null || exec /bin/bash) '
'|| exec /bin/sh']
client_v1 = self.get_client()
ret_namespace = client_v1.list_namespace()
return ret_namespace
container_stream = stream(
client_v1.connect_get_namespaced_pod_exec,
name=pod_name,
namespace=namespace,
container=container,
command=command,
stderr=True, stdin=True,
stdout=True, tty=True,
_preload_content=False
)
return container_stream
# def test_pod_connect(self, podname, namespace, command, container=None):
# client_v1 = self.get_client()
# if stream(client_v1.connect_get_namespaced_pod_exec, podname, namespace, command=command,
# container=container,
# stderr=True, stdin=False,
# stdout=True, tty=False):
# return True
# else:
# return False
#
# def get_pod_exec(self, podname, namespace, command, container=None):
# client_v1 = self.get_client()
# if container:
# rest = stream(client_v1.connect_get_namespaced_pod_exec, podname, namespace, command=command,
# container=container,
# stderr=True, stdin=False,
# stdout=True, tty=False)
# else:
# rest = stream(client_v1.connect_get_namespaced_pod_exec, podname, namespace, command=command,
# stderr=True, stdin=False,
# stdout=True, tty=False)
# return rest

View File

@@ -255,14 +255,6 @@ MIDDLEWARE_CLASSES = ('system.views.DisableCSRFCheck',)
Token = "eyJhbGciOiJSUzI1NiIsImtpZCI6IiJ9.eyJpc3MiOiJrdWJlcm5ldGVzL3NlcnZpY2VhY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc3BhY2UiOiJrdWJlLXN5c3RlbSIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VjcmV0Lm5hbWUiOiJkYXNoYm9hcmQtYWRtaW4tdG9rZW4tZGhobWMiLCJrdWJlcm5ldGVzLmlvL3NlcnZpY2VhY2NvdW50L3NlcnZpY2UtYWNjb3VudC5uYW1lIjoiZGFzaGJvYXJkLWFkbWluIiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9zZXJ2aWNlLWFjY291bnQudWlkIjoiOThkMDcwZWItODc1Yy0xMWU5LWE1MzgtMDAwYzI5N2I0ZmU3Iiwic3ViIjoic3lzdGVtOnNlcnZpY2VhY2NvdW50Omt1YmUtc3lzdGVtOmRhc2hib2FyZC1hZG1pbiJ9.XDFpez2E84R_zlopt_uEHPvVGUtSavypyix6UcYJO3J4imHdJy7MEkfV-wltBA1H8x0TT2AW64rLlXaRJ8OkFWJ0myedfKdjnf7i0oLQ8j-7lw6rT3A0e2pKmpnOaBQfgzRm83-t2I5MMp3Iu9VNUiAbqQpjql4AKwRuJEEGCs99tKStUxzIsJKusmUHh9KAK4BAxySn9h16T2URZ7czLP4mty2crYWNV4KwSwFPthGhFPsl8mnet_hiV5k4me5a8frmXytOy64MmGW8w3TBgiM-7hBYSxt84QGGnyi84LU0EFgtLwBWEOTZeUKKQ6IkoAprMmNcSxX8WUJFlx_uJg"
APISERVER = 'https://192.168.100.111:6443'
## k8s webssh 有权限执行 kubectl exec -it 的主机
webssh_ip = "192.168.100.111"
webssh_port = "22"
webssh_username = "root"
webssh_password = "1qaz.2wsx"
webssh_name = "root@k8s-master" # 终端显示的名字 是为了判断终端退出用
# django-channels配置
CHANNEL_LAYERS = {
"default": {

View File

@@ -1,3 +1,41 @@
from django.test import TestCase
# Create your tests here.
from datetime import timedelta, datetime
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
default_args = {
'owner': 'jifeng.si',
'depends_on_past': False,
# 'depends_on_past': True,
#'start_date': airflow.utils.dates.days_ago(2),
'start_date': datetime(2018, 5, 2),
'email': ['1219957063@qq.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'example_hello_world_dag',
default_args=default_args,
description='my first DAG',
schedule_interval='*/25 * * * *',
start_date=datetime(2018, 5, 28)
)
dummy_operator = DummyOperator(task_id='dummy_task', dag=dag)
hello_operator = BashOperator(
task_id='sleep_task',
depends_on_past=False,
bash_command='echo `date` >> /home/py/test.txt',
dag=dag
)
dummy_operator >> hello_operator