diff --git a/README.md b/README.md index b07bf1d..a52c80d 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ > 作者会在周末进行开发、更新。 -> 支持 k8s webssh 管理pod +> 支持 k8s webssh 管理 pod! ## 介绍 @@ -35,7 +35,7 @@ * 一期: 基础模板 (已完成) * 二期: k8s管理平台 (开发中) * node/service/pod 列表 (已完成) - * pod webssh (已完成) + * pod webssh (已完成, 通过调用 k8s api 进行执行命令) * 三期: mysql sql语句审核(开发中) * 引擎 goInception @@ -147,14 +147,6 @@ nohup python36 manage.py runserver 0.0.0.0:8001 >> /tmp/http.log 2>&1 & ## K8S Token = "eyJhbGciOiJSUzI1NiIsImtpZCI6IiJ9.eyJpc3MiOiJrdWJlcm5ldGVzL3NlcnZpY2VhY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc3BhY2UiOiJrdWJlLXN5c3RlbSIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VjcmV0Lm5hbWUiOiJkYXNoYm9hcmQtYWRtaW4tdG9rZW4tZGhobWMiLCJrdWJlcm5ldGVzLmlvL3NlcnZpY2VhY2NvdW50L3NlcnZpY2UtYWNjb3VudC5uYW1lIjoiZGFzaGJvYXJkLWFkbWluIiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9zZXJ2aWNlLWFjY291bnQudWlkIjoiOThkMDcwZWItODc1Yy0xMWU5LWE1MzgtMDAwYzI5N2I0ZmU3Iiwic3ViIjoic3lzdGVtOnNlcnZpY2VhY2NvdW50Omt1YmUtc3lzdGVtOmRhc2hib2FyZC1hZG1pbiJ9.XDFpez2E84R_zlopt_uEHPvVGUtSavypyix6UcYJO3J4imHdJy7MEkfV-wltBA1H8x0TT2AW64rLlXaRJ8OkFWJ0myedfKdjnf7i0oLQ8j-7lw6rT3A0e2pKmpnOaBQfgzRm83-t2I5MMp3Iu9VNUiAbqQpjql4AKwRuJEEGCs99tKStUxzIsJKusmUHh9KAK4BAxySn9h16T2URZ7czLP4mty2crYWNV4KwSwFPthGhFPsl8mnet_hiV5k4me5a8frmXytOy64MmGW8w3TBgiM-7hBYSxt84QGGnyi84LU0EFgtLwBWEOTZeUKKQ6IkoAprMmNcSxX8WUJFlx_uJg" APISERVER = 'https://192.168.100.111:6443' - -## k8s webssh 有权限执行 kubectl exec -it 的主机 - -webssh_ip = "192.168.100.111" -webssh_port = "22" -webssh_username = "root" -webssh_password = "1qaz.2wsx" -webssh_name = "root@k8s-master" # 终端显示的名字 是为了判断终端退出用 ``` diff --git a/k8s/consumers.py b/k8s/consumers.py index 249efd6..a1c1712 100644 --- a/k8s/consumers.py +++ b/k8s/consumers.py @@ -1,99 +1,61 @@ from asgiref.sync import async_to_sync from channels.generic.websocket import WebsocketConsumer -import paramiko import threading import time -from seal import settings from channels.layers import get_channel_layer +from k8s.k8sApi.core import K8sApi channel_layer = get_channel_layer() -class MyThread(threading.Thread): - def __init__(self, chan): +class K8SStreamThread(threading.Thread): + + def __init__(self, ws, container_stream): threading.Thread.__init__(self) - self.chan = chan - self.number = 0 + self.ws = ws + self.stream = container_stream def run(self): - - while not self.chan.chan.exit_status_ready(): + # while not self.ws.exit_status_ready(): + while self.stream.is_open(): time.sleep(0.1) + + if not self.stream.is_open(): + self.ws.close() try: - data = self.chan.chan.recv(1024) - str_data = data.decode(encoding='utf-8') - if getattr(settings, 'webssh_name') in data.decode(encoding='utf-8'): - self.number += 1 - - if "kubectl exec -it" in str_data: - # 不返回内容 - pass - else: - if "rpc error" in str_data: - async_to_sync(self.chan.channel_layer.group_send)( - self.chan.scope['user'].username, - { - "type": "user.message", - "text": "连接错误,已断开连接! 此 pod 不支持sh 或者其他未知错误!\r" - }, - ) - self.chan.sshclient.close() - elif self.number > 1: - async_to_sync(self.chan.channel_layer.group_send)( - self.chan.scope['user'].username, - { - "type": "user.message", - "text": "程序退出,已断开连接!\r" - }, - ) - self.chan.sshclient.close() - else: - async_to_sync(self.chan.channel_layer.group_send)( - self.chan.scope['user'].username, - { - "type": "user.message", - "text": bytes.decode(data) - }, - ) - - except Exception as ex: - pass - self.chan.sshclient.close() - return False + if self.stream.peek_stdout(): + stdout = self.stream.read_stdout() + self.ws.send(stdout) + if self.stream.peek_stderr(): + stderr = self.stream.read_stderr() + self.ws.send(stderr) + except Exception as err: + self.ws.close() class EchoConsumer(WebsocketConsumer): def connect(self): - # 创建channels group, 命名为:用户名 (最好不要中文名字),并使用channel_layer写入到redis - + # 创建channels group, 命名为:用户名 (最好不要中文名字,这里会用名字 建立一个通道,通过这个通道进行通信),并使用channel_layer写入到redis async_to_sync(self.channel_layer.group_add)(self.scope['user'].username, self.channel_name) - - self.sshclient = paramiko.SSHClient() - self.sshclient.load_system_host_keys() - self.sshclient.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - self.sshclient.connect(getattr(settings, 'webssh_ip'), getattr(settings, 'webssh_port'), - getattr(settings, 'webssh_username'), getattr(settings, 'webssh_password')) - self.chan = self.sshclient.invoke_shell(term='xterm') - self.chan.settimeout(0) - t1 = MyThread(self) - t1.setDaemon(True) - t1.start() # 可以在这里根据 用户 要访问的pod 进行 权限控制 path = self.scope['path'].split('/') - cmd = f"kubectl exec -it {path[2]} -n {path[3]} sh \r" - self.chan.send(cmd) - + try: + k = K8sApi() + self.container_stream = k.terminal_start(namespace=path[3], pod_name=path[2], container="") + kub_stream = K8SStreamThread(self, self.container_stream) + kub_stream.start() + except Exception as err: + return self.accept() def receive(self, text_data): try: - self.chan.send(text_data) + self.container_stream.write_stdin(text_data) except Exception as ex: - pass - # print(str(ex)) + self.container_stream.write_stdin('exit\r') def user_message(self, event): self.send(text_data=event["text"]) diff --git a/k8s/k8sApi/core.py b/k8s/k8sApi/core.py index 1fd55ed..d0c0fbb 100644 --- a/k8s/k8sApi/core.py +++ b/k8s/k8sApi/core.py @@ -40,30 +40,25 @@ class K8sApi(object): ret_pod = client_v1.read_namespaced_pod(name, namespace) return ret_pod - def get_namespace_list(self): + def terminal_start(self, namespace, pod_name, container): + command = [ + "/bin/sh", + "-c", + 'TERM=xterm-256color; export TERM; [ -x /bin/bash ] ' + '&& ([ -x /usr/bin/script ] ' + '&& /usr/bin/script -q -c "/bin/bash" /dev/null || exec /bin/bash) ' + '|| exec /bin/sh'] client_v1 = self.get_client() - ret_namespace = client_v1.list_namespace() - return ret_namespace + container_stream = stream( + client_v1.connect_get_namespaced_pod_exec, + name=pod_name, + namespace=namespace, + container=container, + command=command, + stderr=True, stdin=True, + stdout=True, tty=True, + _preload_content=False + ) + + return container_stream - # def test_pod_connect(self, podname, namespace, command, container=None): - # client_v1 = self.get_client() - # if stream(client_v1.connect_get_namespaced_pod_exec, podname, namespace, command=command, - # container=container, - # stderr=True, stdin=False, - # stdout=True, tty=False): - # return True - # else: - # return False - # - # def get_pod_exec(self, podname, namespace, command, container=None): - # client_v1 = self.get_client() - # if container: - # rest = stream(client_v1.connect_get_namespaced_pod_exec, podname, namespace, command=command, - # container=container, - # stderr=True, stdin=False, - # stdout=True, tty=False) - # else: - # rest = stream(client_v1.connect_get_namespaced_pod_exec, podname, namespace, command=command, - # stderr=True, stdin=False, - # stdout=True, tty=False) - # return rest diff --git a/seal/settings.py b/seal/settings.py index 564cfcb..215234b 100644 --- a/seal/settings.py +++ b/seal/settings.py @@ -255,14 +255,6 @@ MIDDLEWARE_CLASSES = ('system.views.DisableCSRFCheck',) Token = "eyJhbGciOiJSUzI1NiIsImtpZCI6IiJ9.eyJpc3MiOiJrdWJlcm5ldGVzL3NlcnZpY2VhY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc3BhY2UiOiJrdWJlLXN5c3RlbSIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VjcmV0Lm5hbWUiOiJkYXNoYm9hcmQtYWRtaW4tdG9rZW4tZGhobWMiLCJrdWJlcm5ldGVzLmlvL3NlcnZpY2VhY2NvdW50L3NlcnZpY2UtYWNjb3VudC5uYW1lIjoiZGFzaGJvYXJkLWFkbWluIiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9zZXJ2aWNlLWFjY291bnQudWlkIjoiOThkMDcwZWItODc1Yy0xMWU5LWE1MzgtMDAwYzI5N2I0ZmU3Iiwic3ViIjoic3lzdGVtOnNlcnZpY2VhY2NvdW50Omt1YmUtc3lzdGVtOmRhc2hib2FyZC1hZG1pbiJ9.XDFpez2E84R_zlopt_uEHPvVGUtSavypyix6UcYJO3J4imHdJy7MEkfV-wltBA1H8x0TT2AW64rLlXaRJ8OkFWJ0myedfKdjnf7i0oLQ8j-7lw6rT3A0e2pKmpnOaBQfgzRm83-t2I5MMp3Iu9VNUiAbqQpjql4AKwRuJEEGCs99tKStUxzIsJKusmUHh9KAK4BAxySn9h16T2URZ7czLP4mty2crYWNV4KwSwFPthGhFPsl8mnet_hiV5k4me5a8frmXytOy64MmGW8w3TBgiM-7hBYSxt84QGGnyi84LU0EFgtLwBWEOTZeUKKQ6IkoAprMmNcSxX8WUJFlx_uJg" APISERVER = 'https://192.168.100.111:6443' -## k8s webssh 有权限执行 kubectl exec -it 的主机 - -webssh_ip = "192.168.100.111" -webssh_port = "22" -webssh_username = "root" -webssh_password = "1qaz.2wsx" -webssh_name = "root@k8s-master" # 终端显示的名字 是为了判断终端退出用 - # django-channels配置 CHANNEL_LAYERS = { "default": { diff --git a/system/tests.py b/system/tests.py index 7ce503c..4ae0c8f 100644 --- a/system/tests.py +++ b/system/tests.py @@ -1,3 +1,41 @@ from django.test import TestCase # Create your tests here. +from datetime import timedelta, datetime + +import airflow +from airflow import DAG +from airflow.operators.bash_operator import BashOperator +from airflow.operators.dummy_operator import DummyOperator + +default_args = { + 'owner': 'jifeng.si', + 'depends_on_past': False, + # 'depends_on_past': True, + #'start_date': airflow.utils.dates.days_ago(2), + 'start_date': datetime(2018, 5, 2), + 'email': ['1219957063@qq.com'], + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': timedelta(minutes=5), +} + +dag = DAG( + 'example_hello_world_dag', + default_args=default_args, + description='my first DAG', + schedule_interval='*/25 * * * *', + start_date=datetime(2018, 5, 28) +) + +dummy_operator = DummyOperator(task_id='dummy_task', dag=dag) + +hello_operator = BashOperator( + task_id='sleep_task', + depends_on_past=False, + bash_command='echo `date` >> /home/py/test.txt', + dag=dag +) + +dummy_operator >> hello_operator \ No newline at end of file