diff --git a/applications/flow/__pycache__/models.cpython-36.pyc b/applications/flow/__pycache__/models.cpython-36.pyc index ae59942..778c277 100644 Binary files a/applications/flow/__pycache__/models.cpython-36.pyc and b/applications/flow/__pycache__/models.cpython-36.pyc differ diff --git a/applications/flow/__pycache__/serializers.cpython-36.pyc b/applications/flow/__pycache__/serializers.cpython-36.pyc index 2a1ef16..b37489c 100644 Binary files a/applications/flow/__pycache__/serializers.cpython-36.pyc and b/applications/flow/__pycache__/serializers.cpython-36.pyc differ diff --git a/applications/flow/__pycache__/views.cpython-36.pyc b/applications/flow/__pycache__/views.cpython-36.pyc index 0de4588..d80d953 100644 Binary files a/applications/flow/__pycache__/views.cpython-36.pyc and b/applications/flow/__pycache__/views.cpython-36.pyc differ diff --git a/applications/flow/models.py b/applications/flow/models.py index bec3fc9..3574541 100644 --- a/applications/flow/models.py +++ b/applications/flow/models.py @@ -38,6 +38,14 @@ class Process(models.Model): class BaseNode(models.Model): + START_NODE = 0 + END_NODE = 1 + JOB_NODE = 2 + SUB_PROCESS_NODE = 3 + CONDITION_NODE = 4 + CONVERGE_NODE = 5 + PARALLEL_NODE = 6 + CONDITION_PARALLEL_NODE = 7 name = models.CharField("节点名称", max_length=255, blank=False, null=False) uuid = models.CharField("UUID", max_length=255, unique=True) description = models.CharField("节点描述", max_length=255, blank=True, null=True) @@ -50,7 +58,7 @@ class BaseNode(models.Model): fail_retry_count = models.IntegerField("失败重试次数", default=0) fail_offset = models.IntegerField("失败重试间隔", default=0) fail_offset_unit = models.CharField("重试间隔单位", choices=FAIL_OFFSET_UNIT_CHOICE, max_length=32) - # 0:开始节点,1:结束节点,2:作业节点,3:其他作业流4:分支,5:汇聚 + # 0:开始节点,1:结束节点,2:作业节点,3:其他作业流 4:分支,5:汇聚.6:并行 node_type = models.IntegerField(default=2) component_code = models.CharField("插件名称", max_length=255, blank=False, null=False) is_skip_fail = models.BooleanField("忽略失败", default=False) diff --git a/applications/flow/serializers.py b/applications/flow/serializers.py index 5ce6cf2..78bcec6 100644 --- a/applications/flow/serializers.py +++ b/applications/flow/serializers.py @@ -38,8 +38,8 @@ class ProcessViewSetsSerializer(serializers.Serializer): bulk_nodes = [] for node in node_map.values(): node_data = node["node_data"] - if isinstance(node_data["inputs"], dict): - node_inputs = node_data["inputs"] + if isinstance(node_data.get("inputs", {}), dict): + node_inputs = node_data.get("inputs", {}) else: node_inputs = json.loads(node_data["inputs"]) bulk_nodes.append(Node(process=process, @@ -83,8 +83,8 @@ class ProcessViewSetsSerializer(serializers.Serializer): for node in node_map.values(): node_data = node["node_data"] node_obj = node_dict.get(node["uuid"], None) - if isinstance(node_data["inputs"], dict): - node_inputs = node_data["inputs"] + if isinstance(node_data.get("inputs", {}), dict): + node_inputs = node_data.get("inputs", {}) else: node_inputs = json.loads(node_data["inputs"]) if node_obj: diff --git a/applications/flow/views.py b/applications/flow/views.py index 1989d49..8a53d23 100644 --- a/applications/flow/views.py +++ b/applications/flow/views.py @@ -15,7 +15,7 @@ from applications.flow.models import Process, Node, ProcessRun, NodeRun, NodeTem from applications.flow.serializers import ProcessViewSetsSerializer, ListProcessViewSetsSerializer, \ RetrieveProcessViewSetsSerializer, ExecuteProcessSerializer, ListProcessRunViewSetsSerializer, \ RetrieveProcessRunViewSetsSerializer, NodeTemplateSerializer -from applications.utils.dag_helper import DAG, instance_dag +from applications.utils.dag_helper import DAG, instance_dag, PipelineBuilder from component.drf.viewsets import GenericViewSet @@ -40,41 +40,27 @@ class ProcessViewSets(mixins.ListModelMixin, def execute(self, request, *args, **kwargs): validated_data = self.is_validated_data(request.data) process_id = validated_data["process_id"] - process = Process.objects.filter(id=process_id).first() - node_map = Node.objects.filter(process_id=process_id).in_bulk(field_name="uuid") - dag_obj = DAG() - dag_obj.from_dict(process.dag) - topological_sort = dag_obj.topological_sort() + p_builder = PipelineBuilder(process_id) + pipeline = p_builder.build() - start = pipeline_tree = EmptyStartEvent() - # 运行实例的uuid - process_run_uuid = {topological_sort[0]: start.id} - for pipeline_id in topological_sort[1:]: - if node_map[pipeline_id].node_type == 0: - act = EmptyStartEvent() - elif node_map[pipeline_id].node_type == 1: - act = EmptyEndEvent() - else: - act = ServiceActivity(component_code="http_request") - act.component.inputs.inputs = Var(type=Var.PLAIN, value=node_map[pipeline_id].inputs) - process_run_uuid[pipeline_id] = act.id - pipeline_tree = getattr(pipeline_tree, "extend")(act) - - pipeline_data = Data() - pipeline = builder.build_tree(start, data=pipeline_data) + process = p_builder.process + node_map = p_builder.node_map + process_run_uuid = p_builder.instance + # 执行 runtime = BambooDjangoRuntime() api.run_pipeline(runtime=runtime, pipeline=pipeline) - + # 保存执行后的实例数据 process_run_data = process.clone_data process_run_data["dag"] = instance_dag(process_run_data["dag"], process_run_uuid) process_run = ProcessRun.objects.create(process_id=process.id, root_id=pipeline["id"], **process_run_data) node_run_bulk = [] for pipeline_id, node in node_map.items(): _node = {k: v for k, v in node.__dict__.items() if k in NodeRun.field_names()} - _node["uuid"] = process_run_uuid[pipeline_id] + _node["uuid"] = process_run_uuid[pipeline_id].id node_run_bulk.append(NodeRun(process_run=process_run, **_node)) NodeRun.objects.bulk_create(node_run_bulk, batch_size=500) Process.objects.filter(id=process_id).update(total_run_count=F("total_run_count") + 1) + return Response({}) @@ -119,14 +105,14 @@ def flow(request): start = EmptyStartEvent() act = ServiceActivity(component_code="http_request") - act2 = ServiceActivity(component_code="fac_cal_comp") + act2 = ServiceActivity(component_code="http_request") act2.component.inputs.n = Var(type=Var.PLAIN, value=50) - act3 = ServiceActivity(component_code="fac_cal_comp") + act3 = ServiceActivity(component_code="http_request") act3.component.inputs.n = Var(type=Var.PLAIN, value=5) - act4 = ServiceActivity(component_code="fast_execute_job") - act5 = ServiceActivity(component_code="fast_execute_job") + act4 = ServiceActivity(component_code="http_request") + act5 = ServiceActivity(component_code="http_request") eg = ExclusiveGateway( conditions={ 0: '${exe_res} >= 0', @@ -139,8 +125,7 @@ def flow(request): end = EmptyEndEvent() - start.extend(act).extend(eg).connect(act2, act3).to(eg).converge(pg).connect(act4, act5).to(pg).converge(cg).extend( - end) + start.extend(act).extend(eg).connect(act2, act3).to(act2).extend(act4).extend(act5).to(eg).converge(end) # 全局变量 pipeline_data = Data() pipeline_data.inputs['${exe_res}'] = NodeOutput(type=Var.PLAIN, source_act=act.id, source_key='exe_res') diff --git a/applications/utils/__pycache__/dag_helper.cpython-36.pyc b/applications/utils/__pycache__/dag_helper.cpython-36.pyc index d7669bf..c1cb944 100644 Binary files a/applications/utils/__pycache__/dag_helper.cpython-36.pyc and b/applications/utils/__pycache__/dag_helper.cpython-36.pyc differ diff --git a/applications/utils/dag_helper.py b/applications/utils/dag_helper.py index 357bcc8..a862588 100644 --- a/applications/utils/dag_helper.py +++ b/applications/utils/dag_helper.py @@ -1,6 +1,10 @@ from collections import OrderedDict, defaultdict from copy import copy, deepcopy +from applications.flow.models import Process, Node +from bamboo_engine.builder import EmptyStartEvent, EmptyEndEvent, ExclusiveGateway, ServiceActivity, Var, builder, Data, \ + ParallelGateway, ConvergeGateway, ConditionalParallelGateway + class DAG(object): """ Directed acyclic graph implementation. """ @@ -195,10 +199,73 @@ def instance_dag(dag_dict, process_run_uuid): new_dag_dict = defaultdict(list) for k, v_list in dag_dict.items(): for v in v_list: - new_dag_dict[process_run_uuid[k]].append(process_run_uuid[v]) + new_dag_dict[process_run_uuid[k].id].append(process_run_uuid[v].id) return dict(new_dag_dict) +class PipelineBuilder: + def __init__(self, process_id): + self.process_id = process_id + self.process = Process.objects.filter(id=process_id).first() + self.node_map = Node.objects.filter(process_id=process_id).in_bulk(field_name="uuid") + self.dag_obj = self.setup_dag() + self.instance = self.setup_instance() + + def setup_instance(self): + pipeline_instance = {} + for p_id, node in self.node_map.items(): + if node.node_type == Node.START_NODE: + pipeline_instance[p_id] = EmptyStartEvent() + elif node.node_type == Node.END_NODE: + pipeline_instance[p_id] = EmptyEndEvent() + elif node.node_type == Node.CONDITION_NODE: + pipeline_instance[p_id] = ExclusiveGateway( + conditions={ + 0: '1==0', + 1: '0==0' + }, + name='act_2 or act_3' + ) + elif node.node_type == Node.PARALLEL_NODE: + pipeline_instance[p_id] = ParallelGateway() + elif node.node_type == Node.CONVERGE_NODE: + pipeline_instance[p_id] = ConvergeGateway() + elif node.node_type == Node.CONDITION_PARALLEL_NODE: + pipeline_instance[p_id] = ConditionalParallelGateway( + conditions={ + 0: '1==0', + 1: '1==1', + 2: '2==2' + }, + name='[act_2] or [act_3 and act_4]' + ) + else: + act = ServiceActivity(component_code="http_request") + act.component.inputs.inputs = Var(type=Var.PLAIN, value=node.inputs) + pipeline_instance[p_id] = act + return pipeline_instance + + def setup_dag(self): + dag_obj = DAG() + dag_obj.from_dict(self.process.dag) + return dag_obj + + def get_inst(self, p_id): + return self.instance.get(p_id) + + def get_inst_list(self, p_ids): + return [self.instance.get(p_id) for p_id in p_ids] + + def build(self): + start = self.dag_obj.ind_nodes()[0] + for _in, out_list in self.dag_obj.graph.items(): + for _out in out_list: + self.get_inst(_in).extend(self.get_inst(_out)) + pipeline_data = Data() + pipeline = builder.build_tree(self.get_inst(start), data=pipeline_data) + return pipeline + + if __name__ == '__main__': dag = DAG() dag.add_node("a") diff --git a/dj_flow/__pycache__/urls.cpython-36.pyc b/dj_flow/__pycache__/urls.cpython-36.pyc index 30ae917..118159b 100644 Binary files a/dj_flow/__pycache__/urls.cpython-36.pyc and b/dj_flow/__pycache__/urls.cpython-36.pyc differ diff --git a/dj_flow/urls.py b/dj_flow/urls.py index 699f0cc..5256afc 100644 --- a/dj_flow/urls.py +++ b/dj_flow/urls.py @@ -16,6 +16,7 @@ Including another URLconf from django.contrib import admin from django.urls import path, include from applications.flow.urls import flow_router, node_router +from applications.flow.views import flow from dj_flow.views import index urlpatterns = [ @@ -23,5 +24,6 @@ urlpatterns = [ path('', index), path("process/", include(flow_router.urls)), path("node/", include(node_router.urls)), + path("tt/", flow), ] diff --git a/web/src/views/job_flow_mgmt/single_job_flow.vue b/web/src/views/job_flow_mgmt/single_job_flow.vue index 38b6539..e82f75f 100644 --- a/web/src/views/job_flow_mgmt/single_job_flow.vue +++ b/web/src/views/job_flow_mgmt/single_job_flow.vue @@ -297,9 +297,11 @@ }, // 处理渲染,true为详情方式渲染,false为编辑或新增方式渲染 handleRender(detail) { + console.log('handlerender') this.mainLoading = true const _this = this setTimeout(() => { + console.log('handlerender2222') const data = { edges: _this.jobFlowFrom.pipeline_tree.lines.map(line => { const item = { @@ -326,7 +328,7 @@ lineWidth: 1, r: 24 } - } else if (node.type === 4 || node.type === 5) { + } else if (node.type === '4' || node.type === 5) { style = { fill: '#fff', stroke: '#DCDEE5', @@ -354,7 +356,7 @@ x: node.left, y: node.top, nodeType: node.type, - type: (node.type === 0 || node.type === 1 || node.type === 4) ? 'circle-node' : 'rect-node', + type: (node.type === 0 || node.type === 1 || node.type === '4') ? 'circle-node' : 'rect-node', labelCfg: { style: { textAlign: (node.type === 0 || node.type === 1) ? 'center' : 'left' diff --git a/web/src/views/job_flow_mgmt/single_job_flow/preFlowCanvas.vue b/web/src/views/job_flow_mgmt/single_job_flow/preFlowCanvas.vue index baa7c2d..8de4ae4 100644 --- a/web/src/views/job_flow_mgmt/single_job_flow/preFlowCanvas.vue +++ b/web/src/views/job_flow_mgmt/single_job_flow/preFlowCanvas.vue @@ -99,6 +99,7 @@ }) }, handleRender(detail) { + console.log('pre render') this.mainLoading = true const _this = this setTimeout(() => { diff --git a/web/src/views/job_flow_mgmt/single_job_flow/taskMake.vue b/web/src/views/job_flow_mgmt/single_job_flow/taskMake.vue index 5f75e19..7029962 100644 --- a/web/src/views/job_flow_mgmt/single_job_flow/taskMake.vue +++ b/web/src/views/job_flow_mgmt/single_job_flow/taskMake.vue @@ -84,24 +84,24 @@ 'id': 46, 'creator': 'product', 'name': '并行网关', - 'type': 4, - 'nodeType': 4, + 'type': 6, + 'nodeType': 6, 'icon': 'e6d9' }, { 'id': 47, 'creator': 'product', 'name': '汇聚网关', - 'type': 4, - 'nodeType': 4, + 'type': 5, + 'nodeType': 5, 'icon': 'e6d9' }, { 'id': 48, 'creator': 'product', 'name': '条件并行网关', - 'type': 4, - 'nodeType': 4, + 'type': 7, + 'nodeType': 7, 'icon': 'e6d9' } ]