This commit is contained in:
charlesxie
2022-01-28 18:00:41 +08:00
parent 825f7f71b4
commit 5ef31b859b
14 changed files with 930 additions and 657 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -1,4 +1,4 @@
# Generated by Django 2.2.6 on 2022-01-28 07:05
# Generated by Django 2.2.6 on 2022-01-28 09:40
import datetime
from django.db import migrations, models
@@ -44,7 +44,7 @@ class Migration(migrations.Migration):
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('name', models.CharField(max_length=255, verbose_name='节点名称')),
('uuid', models.CharField(max_length=255, verbose_name='UUID')),
('uuid', models.CharField(max_length=255, unique=True, verbose_name='UUID')),
('description', models.CharField(blank=True, max_length=255, null=True, verbose_name='节点描述')),
('show', models.BooleanField(default=True, verbose_name='是否显示')),
('top', models.IntegerField(default=300)),
@@ -61,5 +61,44 @@ class Migration(migrations.Migration):
('outputs', django_mysql.models.JSONField(default=dict, verbose_name='输出参数')),
('process', models.ForeignKey(db_constraint=False, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='nodes', to='flow.Process')),
],
options={
'abstract': False,
},
),
migrations.CreateModel(
name='ProcessRun',
fields=[
('process_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='flow.Process')),
('state', models.CharField(max_length=32, verbose_name='工作流状态')),
('root_id', models.CharField(max_length=255, verbose_name='根节点uuid')),
('process', models.ForeignKey(db_constraint=False, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='run', to='flow.Process')),
],
bases=('flow.process',),
),
migrations.CreateModel(
name='NodeRun',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('name', models.CharField(max_length=255, verbose_name='节点名称')),
('uuid', models.CharField(max_length=255, unique=True, verbose_name='UUID')),
('description', models.CharField(blank=True, max_length=255, null=True, verbose_name='节点描述')),
('show', models.BooleanField(default=True, verbose_name='是否显示')),
('top', models.IntegerField(default=300)),
('left', models.IntegerField(default=300)),
('ico', models.CharField(blank=True, max_length=64, null=True, verbose_name='icon')),
('fail_retry_count', models.IntegerField(default=0, verbose_name='失败重试次数')),
('fail_offset', models.IntegerField(default=0, verbose_name='失败重试间隔')),
('fail_offset_unit', models.CharField(choices=[('seconds', ''), ('hours', ''), ('minutes', '')], max_length=32, verbose_name='重试间隔单位')),
('node_type', models.IntegerField(default=2)),
('component_code', models.CharField(max_length=255, verbose_name='插件名称')),
('is_skip_fail', models.BooleanField(default=False, verbose_name='忽略失败')),
('is_timeout_alarm', models.BooleanField(default=False, verbose_name='超时告警')),
('inputs', django_mysql.models.JSONField(default=dict, verbose_name='输入参数')),
('outputs', django_mysql.models.JSONField(default=dict, verbose_name='输出参数')),
('process_run', models.ForeignKey(db_constraint=False, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='nodes_run', to='flow.ProcessRun')),
],
options={
'abstract': False,
},
),
]

View File

@@ -1,18 +0,0 @@
# Generated by Django 2.2.6 on 2022-01-28 07:56
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('flow', '0001_initial'),
]
operations = [
migrations.AlterField(
model_name='node',
name='uuid',
field=models.CharField(max_length=255, unique=True, verbose_name='UUID'),
),
]

View File

@@ -27,9 +27,7 @@ class Process(models.Model):
update_by = models.CharField("修改人", max_length=64, null=True)
class Node(models.Model):
process = models.ForeignKey(Process, on_delete=models.SET_NULL, null=True, db_constraint=False,
related_name="nodes")
class BaseNode(models.Model):
name = models.CharField("节点名称", max_length=255, blank=False, null=False)
uuid = models.CharField("UUID", max_length=255, unique=True)
description = models.CharField("节点描述", max_length=255, blank=True, null=True)
@@ -50,3 +48,24 @@ class Node(models.Model):
inputs = JSONField("输入参数", default=dict)
outputs = JSONField("输出参数", default=dict)
class Meta:
abstract = True
class Node(BaseNode):
process = models.ForeignKey(Process, on_delete=models.SET_NULL, null=True, db_constraint=False,
related_name="nodes")
class ProcessRun(Process):
# new
process = models.ForeignKey(Process, on_delete=models.SET_NULL, null=True, db_constraint=False,
related_name="run")
state = models.CharField("工作流状态", max_length=32)
root_id = models.CharField("根节点uuid", max_length=255)
class NodeRun(BaseNode):
process_run = models.ForeignKey(ProcessRun, on_delete=models.SET_NULL, null=True, db_constraint=False,
related_name="nodes_run")

View File

@@ -1,7 +1,7 @@
from django.db import transaction
from rest_framework import serializers
from applications.flow.models import Process, Node
from applications.flow.models import Process, Node, ProcessRun
class ProcessViewSetsSerializer(serializers.Serializer):
@@ -53,6 +53,12 @@ class ListProcessViewSetsSerializer(serializers.ModelSerializer):
fields = "__all__"
class ListProcessRunViewSetsSerializer(serializers.ModelSerializer):
class Meta:
model = ProcessRun
fields = "__all__"
class RetrieveProcessViewSetsSerializer(serializers.ModelSerializer):
pipeline_tree = serializers.SerializerMethodField()
@@ -96,5 +102,48 @@ class RetrieveProcessViewSetsSerializer(serializers.ModelSerializer):
fields = ("id", "name", "description", "category", "run_type", "pipeline_tree")
class RetrieveProcessRunViewSetsSerializer(serializers.ModelSerializer):
pipeline_tree = serializers.SerializerMethodField()
# category = serializers.SerializerMethodField()
#
# def get_category(self, obj):
# return obj.category.all()
def get_pipeline_tree(self, obj):
lines = []
nodes = []
for _from, to_list in obj.dag.items():
for _to in to_list:
lines.append({
"from": _from,
"to": _to
})
node_list = Node.objects.filter(process_id=obj.id).values()
for node in node_list:
nodes.append({"show": node["show"],
"top": node["top"],
"left": node["left"],
"ico": node["ico"],
"type": node["node_type"],
"name": node["name"],
"node_data": {
"inputs": node["inputs"],
"run_mark": 0,
"node_name": node["name"],
"description": node["description"],
"fail_retry_count": node["fail_retry_count"],
"fail_offset": node["fail_offset"],
"fail_offset_unit": node["fail_offset_unit"],
"is_skip_fail": node["is_skip_fail"],
"is_timeout_alarm": node["is_timeout_alarm"]},
"uuid": node["uuid"]})
return {"lines": lines, "nodes": nodes}
class Meta:
model = ProcessRun
fields = ("id", "name", "description", "category", "run_type", "pipeline_tree")
class ExecuteProcessSerializer(serializers.Serializer):
process_id = serializers.IntegerField(required=True)

View File

@@ -4,3 +4,4 @@ from . import views
flow_router = DefaultRouter()
flow_router.register(r"", viewset=views.ProcessViewSets, base_name="flow")
flow_router.register(r"run", viewset=views.ProcessRunViewSets, base_name="run")

View File

@@ -6,9 +6,10 @@ from rest_framework import mixins
from rest_framework.decorators import action
from rest_framework.response import Response
from applications.flow.models import Process, Node
from applications.flow.models import Process, Node, ProcessRun
from applications.flow.serializers import ProcessViewSetsSerializer, ListProcessViewSetsSerializer, \
RetrieveProcessViewSetsSerializer, ExecuteProcessSerializer
RetrieveProcessViewSetsSerializer, ExecuteProcessSerializer, ListProcessRunViewSetsSerializer, \
RetrieveProcessRunViewSetsSerializer
from applications.utils.dag_helper import DAG
from component.drf.viewsets import GenericViewSet
@@ -17,7 +18,6 @@ class ProcessViewSets(mixins.ListModelMixin,
mixins.CreateModelMixin,
mixins.RetrieveModelMixin,
GenericViewSet):
serializer_class = ProcessViewSetsSerializer
queryset = Process.objects.order_by("-update_time")
def get_serializer_class(self):
@@ -51,12 +51,25 @@ class ProcessViewSets(mixins.ListModelMixin,
pipeline_data = Data()
pipeline = builder.build_tree(start, data=pipeline_data)
print(pipeline)
runtime = BambooDjangoRuntime()
api.run_pipeline(runtime=runtime, pipeline=pipeline)
return Response({})
class ProcessRunViewSets(mixins.ListModelMixin,
mixins.RetrieveModelMixin,
GenericViewSet):
queryset = ProcessRun.objects.order_by("-update_time")
def get_serializer_class(self):
if self.action == "list":
return ListProcessRunViewSetsSerializer
elif self.action == "retrieve":
return RetrieveProcessRunViewSetsSerializer
elif self.action == "execute":
return ExecuteProcessSerializer
# Create your views here.
def flow(request):
# 使用 builder 构造出流程描述结构

View File

@@ -4,7 +4,7 @@ from pipeline.core.flow.activity import Service, StaticIntervalGenerator
from pipeline.component_framework.component import Component
import json
import eventlet
import time
requests = eventlet.import_patched('requests')
@@ -13,6 +13,7 @@ class HttpRequestService(Service):
def execute(self, data, parent_data):
print("执行了")
time.sleep(5)
return True

View File

@@ -3,22 +3,22 @@ import {GET, POST, PUT, DELETE, reUrl} from '../../axiosconfig/axiosconfig'
export default {
// 作业台
list: function(params) {
return GET(reUrl + '/process_run/', params)
return GET(reUrl + '/process/run/', params)
},
create: function(params) {
return POST(reUrl + '/process_run/', params)
return POST(reUrl + '/process/run/', params)
},
retrieve: function(id, params) {
return GET(reUrl + '/process_run/' + JSON.stringify(id) + '/', params)
return GET(reUrl + '/process/run/' + JSON.stringify(id) + '/', params)
},
update: function(id, params) {
return PUT(reUrl + '/process_run/' + JSON.stringify(id) + '/', params)
return PUT(reUrl + '/process/run/' + JSON.stringify(id) + '/', params)
},
delete: function(id) {
return DELETE(reUrl + '/process_run/' + JSON.stringify(id) + '/')
return DELETE(reUrl + '/process/run/' + JSON.stringify(id) + '/')
},
control: function(params) {
return POST(reUrl + '/process_run/control/', params)
return POST(reUrl + '/process/run/control/', params)
},
process_snapshot: function(params) {
return GET(reUrl + '/process_snapshot/', params)