feature:网关节点弄出来了 哈哈哈哈哈哈

This commit is contained in:
charlesxie
2022-04-02 17:47:59 +08:00
parent 18eb77f850
commit bd2bfed32f
13 changed files with 109 additions and 44 deletions

View File

@@ -38,6 +38,14 @@ class Process(models.Model):
class BaseNode(models.Model):
START_NODE = 0
END_NODE = 1
JOB_NODE = 2
SUB_PROCESS_NODE = 3
CONDITION_NODE = 4
CONVERGE_NODE = 5
PARALLEL_NODE = 6
CONDITION_PARALLEL_NODE = 7
name = models.CharField("节点名称", max_length=255, blank=False, null=False)
uuid = models.CharField("UUID", max_length=255, unique=True)
description = models.CharField("节点描述", max_length=255, blank=True, null=True)
@@ -50,7 +58,7 @@ class BaseNode(models.Model):
fail_retry_count = models.IntegerField("失败重试次数", default=0)
fail_offset = models.IntegerField("失败重试间隔", default=0)
fail_offset_unit = models.CharField("重试间隔单位", choices=FAIL_OFFSET_UNIT_CHOICE, max_length=32)
# 0开始节点1结束节点2作业节点3其他作业流4分支5汇聚
# 0开始节点1结束节点2作业节点3其他作业流 4分支5汇聚.6:并行
node_type = models.IntegerField(default=2)
component_code = models.CharField("插件名称", max_length=255, blank=False, null=False)
is_skip_fail = models.BooleanField("忽略失败", default=False)

View File

@@ -38,8 +38,8 @@ class ProcessViewSetsSerializer(serializers.Serializer):
bulk_nodes = []
for node in node_map.values():
node_data = node["node_data"]
if isinstance(node_data["inputs"], dict):
node_inputs = node_data["inputs"]
if isinstance(node_data.get("inputs", {}), dict):
node_inputs = node_data.get("inputs", {})
else:
node_inputs = json.loads(node_data["inputs"])
bulk_nodes.append(Node(process=process,
@@ -83,8 +83,8 @@ class ProcessViewSetsSerializer(serializers.Serializer):
for node in node_map.values():
node_data = node["node_data"]
node_obj = node_dict.get(node["uuid"], None)
if isinstance(node_data["inputs"], dict):
node_inputs = node_data["inputs"]
if isinstance(node_data.get("inputs", {}), dict):
node_inputs = node_data.get("inputs", {})
else:
node_inputs = json.loads(node_data["inputs"])
if node_obj:

View File

@@ -15,7 +15,7 @@ from applications.flow.models import Process, Node, ProcessRun, NodeRun, NodeTem
from applications.flow.serializers import ProcessViewSetsSerializer, ListProcessViewSetsSerializer, \
RetrieveProcessViewSetsSerializer, ExecuteProcessSerializer, ListProcessRunViewSetsSerializer, \
RetrieveProcessRunViewSetsSerializer, NodeTemplateSerializer
from applications.utils.dag_helper import DAG, instance_dag
from applications.utils.dag_helper import DAG, instance_dag, PipelineBuilder
from component.drf.viewsets import GenericViewSet
@@ -40,41 +40,27 @@ class ProcessViewSets(mixins.ListModelMixin,
def execute(self, request, *args, **kwargs):
validated_data = self.is_validated_data(request.data)
process_id = validated_data["process_id"]
process = Process.objects.filter(id=process_id).first()
node_map = Node.objects.filter(process_id=process_id).in_bulk(field_name="uuid")
dag_obj = DAG()
dag_obj.from_dict(process.dag)
topological_sort = dag_obj.topological_sort()
p_builder = PipelineBuilder(process_id)
pipeline = p_builder.build()
start = pipeline_tree = EmptyStartEvent()
# 运行实例的uuid
process_run_uuid = {topological_sort[0]: start.id}
for pipeline_id in topological_sort[1:]:
if node_map[pipeline_id].node_type == 0:
act = EmptyStartEvent()
elif node_map[pipeline_id].node_type == 1:
act = EmptyEndEvent()
else:
act = ServiceActivity(component_code="http_request")
act.component.inputs.inputs = Var(type=Var.PLAIN, value=node_map[pipeline_id].inputs)
process_run_uuid[pipeline_id] = act.id
pipeline_tree = getattr(pipeline_tree, "extend")(act)
pipeline_data = Data()
pipeline = builder.build_tree(start, data=pipeline_data)
process = p_builder.process
node_map = p_builder.node_map
process_run_uuid = p_builder.instance
# 执行
runtime = BambooDjangoRuntime()
api.run_pipeline(runtime=runtime, pipeline=pipeline)
# 保存执行后的实例数据
process_run_data = process.clone_data
process_run_data["dag"] = instance_dag(process_run_data["dag"], process_run_uuid)
process_run = ProcessRun.objects.create(process_id=process.id, root_id=pipeline["id"], **process_run_data)
node_run_bulk = []
for pipeline_id, node in node_map.items():
_node = {k: v for k, v in node.__dict__.items() if k in NodeRun.field_names()}
_node["uuid"] = process_run_uuid[pipeline_id]
_node["uuid"] = process_run_uuid[pipeline_id].id
node_run_bulk.append(NodeRun(process_run=process_run, **_node))
NodeRun.objects.bulk_create(node_run_bulk, batch_size=500)
Process.objects.filter(id=process_id).update(total_run_count=F("total_run_count") + 1)
return Response({})
@@ -119,14 +105,14 @@ def flow(request):
start = EmptyStartEvent()
act = ServiceActivity(component_code="http_request")
act2 = ServiceActivity(component_code="fac_cal_comp")
act2 = ServiceActivity(component_code="http_request")
act2.component.inputs.n = Var(type=Var.PLAIN, value=50)
act3 = ServiceActivity(component_code="fac_cal_comp")
act3 = ServiceActivity(component_code="http_request")
act3.component.inputs.n = Var(type=Var.PLAIN, value=5)
act4 = ServiceActivity(component_code="fast_execute_job")
act5 = ServiceActivity(component_code="fast_execute_job")
act4 = ServiceActivity(component_code="http_request")
act5 = ServiceActivity(component_code="http_request")
eg = ExclusiveGateway(
conditions={
0: '${exe_res} >= 0',
@@ -139,8 +125,7 @@ def flow(request):
end = EmptyEndEvent()
start.extend(act).extend(eg).connect(act2, act3).to(eg).converge(pg).connect(act4, act5).to(pg).converge(cg).extend(
end)
start.extend(act).extend(eg).connect(act2, act3).to(act2).extend(act4).extend(act5).to(eg).converge(end)
# 全局变量
pipeline_data = Data()
pipeline_data.inputs['${exe_res}'] = NodeOutput(type=Var.PLAIN, source_act=act.id, source_key='exe_res')

View File

@@ -1,6 +1,10 @@
from collections import OrderedDict, defaultdict
from copy import copy, deepcopy
from applications.flow.models import Process, Node
from bamboo_engine.builder import EmptyStartEvent, EmptyEndEvent, ExclusiveGateway, ServiceActivity, Var, builder, Data, \
ParallelGateway, ConvergeGateway, ConditionalParallelGateway
class DAG(object):
""" Directed acyclic graph implementation. """
@@ -195,10 +199,73 @@ def instance_dag(dag_dict, process_run_uuid):
new_dag_dict = defaultdict(list)
for k, v_list in dag_dict.items():
for v in v_list:
new_dag_dict[process_run_uuid[k]].append(process_run_uuid[v])
new_dag_dict[process_run_uuid[k].id].append(process_run_uuid[v].id)
return dict(new_dag_dict)
class PipelineBuilder:
def __init__(self, process_id):
self.process_id = process_id
self.process = Process.objects.filter(id=process_id).first()
self.node_map = Node.objects.filter(process_id=process_id).in_bulk(field_name="uuid")
self.dag_obj = self.setup_dag()
self.instance = self.setup_instance()
def setup_instance(self):
pipeline_instance = {}
for p_id, node in self.node_map.items():
if node.node_type == Node.START_NODE:
pipeline_instance[p_id] = EmptyStartEvent()
elif node.node_type == Node.END_NODE:
pipeline_instance[p_id] = EmptyEndEvent()
elif node.node_type == Node.CONDITION_NODE:
pipeline_instance[p_id] = ExclusiveGateway(
conditions={
0: '1==0',
1: '0==0'
},
name='act_2 or act_3'
)
elif node.node_type == Node.PARALLEL_NODE:
pipeline_instance[p_id] = ParallelGateway()
elif node.node_type == Node.CONVERGE_NODE:
pipeline_instance[p_id] = ConvergeGateway()
elif node.node_type == Node.CONDITION_PARALLEL_NODE:
pipeline_instance[p_id] = ConditionalParallelGateway(
conditions={
0: '1==0',
1: '1==1',
2: '2==2'
},
name='[act_2] or [act_3 and act_4]'
)
else:
act = ServiceActivity(component_code="http_request")
act.component.inputs.inputs = Var(type=Var.PLAIN, value=node.inputs)
pipeline_instance[p_id] = act
return pipeline_instance
def setup_dag(self):
dag_obj = DAG()
dag_obj.from_dict(self.process.dag)
return dag_obj
def get_inst(self, p_id):
return self.instance.get(p_id)
def get_inst_list(self, p_ids):
return [self.instance.get(p_id) for p_id in p_ids]
def build(self):
start = self.dag_obj.ind_nodes()[0]
for _in, out_list in self.dag_obj.graph.items():
for _out in out_list:
self.get_inst(_in).extend(self.get_inst(_out))
pipeline_data = Data()
pipeline = builder.build_tree(self.get_inst(start), data=pipeline_data)
return pipeline
if __name__ == '__main__':
dag = DAG()
dag.add_node("a")

View File

@@ -16,6 +16,7 @@ Including another URLconf
from django.contrib import admin
from django.urls import path, include
from applications.flow.urls import flow_router, node_router
from applications.flow.views import flow
from dj_flow.views import index
urlpatterns = [
@@ -23,5 +24,6 @@ urlpatterns = [
path('', index),
path("process/", include(flow_router.urls)),
path("node/", include(node_router.urls)),
path("tt/", flow),
]

View File

@@ -297,9 +297,11 @@
},
// 处理渲染true为详情方式渲染false为编辑或新增方式渲染
handleRender(detail) {
console.log('handlerender')
this.mainLoading = true
const _this = this
setTimeout(() => {
console.log('handlerender2222')
const data = {
edges: _this.jobFlowFrom.pipeline_tree.lines.map(line => {
const item = {
@@ -326,7 +328,7 @@
lineWidth: 1,
r: 24
}
} else if (node.type === 4 || node.type === 5) {
} else if (node.type === '4' || node.type === 5) {
style = {
fill: '#fff',
stroke: '#DCDEE5',
@@ -354,7 +356,7 @@
x: node.left,
y: node.top,
nodeType: node.type,
type: (node.type === 0 || node.type === 1 || node.type === 4) ? 'circle-node' : 'rect-node',
type: (node.type === 0 || node.type === 1 || node.type === '4') ? 'circle-node' : 'rect-node',
labelCfg: {
style: {
textAlign: (node.type === 0 || node.type === 1) ? 'center' : 'left'

View File

@@ -99,6 +99,7 @@
})
},
handleRender(detail) {
console.log('pre render')
this.mainLoading = true
const _this = this
setTimeout(() => {

View File

@@ -84,24 +84,24 @@
'id': 46,
'creator': 'product',
'name': '并行网关',
'type': 4,
'nodeType': 4,
'type': 6,
'nodeType': 6,
'icon': 'e6d9'
},
{
'id': 47,
'creator': 'product',
'name': '汇聚网关',
'type': 4,
'nodeType': 4,
'type': 5,
'nodeType': 5,
'icon': 'e6d9'
},
{
'id': 48,
'creator': 'product',
'name': '条件并行网关',
'type': 4,
'nodeType': 4,
'type': 7,
'nodeType': 7,
'icon': 'e6d9'
}
]