GraphMindStudio工作流框架
本文展示了一个预制工作流的使用示例,通过C#类WorkFlows实现了工作流的构建和管理。核心功能包括: 提供BuildBaseVersionDictionary方法构建基础版本字典,支持链式信息和覆盖控制; 包含两种字典构建方式:JToken原始格式和字符串格式; 预定义了两个工作流模板:图像生成和代码生成,采用JSON格式描述工作流节点; 每个工作流节点包含符号标识、标签和功能管道配置; 支持
系统整体架构
这是一个工作流编排和执行引擎,实现了以下核心功能:
- 工作流定义与存储系统
JSON配置化工作流:使用JSON字符串定义复杂的工作流节点和连接关系
多版本管理:支持基础版本、连接版本、规则版本等多种工作流变体
预制模板库:内置图像生成、代码生成、数据分析等常用工作流模板
动态工作流组合:支持运行时动态合并多个工作流
- 节点化执行引擎
通用节点模型:统一的数据结构和处理流程
配置驱动执行:通过配置对象控制节点行为,无需硬编码逻辑
异步流水线处理:支持多节点并行和顺序执行
- 动态工作流连接系统
运行时工作流拼接:能够在执行过程中动态连接不同工作流
键值映射机制:解决不同工作流间参数传递问题
工作流链管理:支持复杂的多级工作流连接关系
- 智能节点编排框架
自动节点发现:基于符号、标签等元数据的动态节点查询
依赖关系解析:自动处理节点间的输入输出依赖
执行图构建:动态构建和维护执行流程图
核心功能模块详解
模块1:工作流定义与解析
csharp
// 功能:将JSON字符串转换为可执行的工作流节点
// 特点:
// 1. JSON Schema定义节点结构
// 2. 支持函数管道配置
// 3. 内置连接器配置
模块2:节点生命周期管理
csharp
// 四个核心阶段:
// 1. GetNode_ - 输入数据收集和准备
// 2. ExcuteNode_ - 核心业务逻辑执行
// 3. AddNode_ - 执行结果处理和节点注册
// 4. NextNode_ - 后续节点连接和调度
模块3:数据流转系统
csharp
// 数据流向:
// 全局字典(dic) → 节点输入(InputValues) → 函数执行 →
// 节点输出(OutputValues) → 后续节点输入 → 全局字典
模块4:工作流连接器
csharp
// 连接功能:
// 1. 工作流间的参数映射
// 2. 批量配置处理
// 3. 键值转换规则
// 4. 连接路径配置
主要业务场景
场景1:AI服务集成流水线
图像生成:参数合并 → Web API调用 → 图片下载
代码生成:多阶段代码处理 → 动态执行 → 编译检查
数据分析:数据处理 → AI分析 → 数据库存储
场景2:复杂业务流程编排
csharp
// 示例:数据分析+存储流程
- 数据预处理节点 (数据清洗和转换)
- AI分析节点 (调用外部AI服务)
- 数据库操作节点 (存储结果)
- 连接多个子工作流 (动态扩展)
场景3:动态代码执行环境
代码生成 → 动态编译 → 执行验证 → 结果处理
支持多轮代码迭代和优化
技术特性
- 声明式配置
所有工作流和节点都通过配置定义
无需编写大量过程代码
- 热插拔架构
工作流可以动态添加、移除、替换
节点功能可以随时扩展
- 可观测性
内置调试信息收集
执行流程可视化
错误追踪和日志记录
- 数据驱动执行
执行流程由输入数据驱动
支持条件分支和动态路由
- 容错与恢复
异常处理和重试机制
部分失败时的状态管理
应用价值
对企业:
降低开发成本:通过配置而非编码实现复杂流程
提高可维护性:清晰的工作流定义,易于修改和扩展
促进知识复用:预制工作流模板可跨项目重用
增强灵活性:业务流程可快速调整响应变化
对开发者:
抽象复杂性:隐藏底层实现细节,专注业务逻辑
标准化接口:统一的节点接口,降低集成难度
调试友好:详细的执行日志和状态跟踪
实际应用示例
示例1:智能客服系统
text
用户问题 → 意图识别(工作流A) → 答案生成(工作流B) →
答案验证(工作流C) → 返回结果
示例2:数据ETL流程
text
数据抽取 → 数据清洗 → 数据转换 →
AI分析增强 → 结果存储 → 报告生成
示例3:多模态内容生成
text
文本输入 → 文本分析 → 图像生成 →
内容组合 → 质量检查 → 发布
技术优势
解耦性:节点间松耦合,易于测试和替换
可扩展性:新增节点类型无需修改核心框架
可配置性:业务流程完全通过配置控制
可视化潜力:JSON配置天然支持可视化编辑
领域特定语言:为工作流编排设计的DSL
潜在改进方向
可视化编辑器:基于JSON配置的图形化工作流设计器
版本控制:工作流配置的版本管理和回滚
性能优化:节点执行并发和缓存机制
监控告警:执行性能监控和异常告警
权限控制:工作流执行的权限和配额管理
总结
这个系统实现了一个企业级的工作流编排平台,它:
✅ 将复杂业务流程转化为可配置的节点网络
✅ 支持动态的工作流组合和调整
✅ 提供了强大的数据流转和转换能力
✅ 内置了完善的调试和监控机制
✅ 适用于需要灵活、可配置业务流程的各种场景
特别适合需要频繁调整业务流程、集成多种外部服务、需要可视化流程管理的企业应用场景。
系统概述
本预制工作流系统是一个基于节点的工作流执行引擎,支持动态流程组合、节点连接和异步执行。系统通过JSON配置定义工作流,通过通用节点框架执行具体操作。
核心组件
- 工作流定义类 (WorkFlows)
1.1 基础版本 (BaseVersion)
包含预定义的工作流模板,支持多种数据格式:
csharp
// JToken格式的工作流字典
public static Dictionary<string, JToken> baseVersion_JToken
// 字符串格式的工作流字典
public static Dictionary<string, string> baseVersion_String
// 连接版本的工作流
public static Dictionary<string, string> connectVersion
// 连接规则版本
public static Dictionary<string, string> connectRuleVersion
1.2 预定义工作流示例
图像生成工作流
json
[
{
“Symbol”: “MerageNode”,
“Label”: “TEST_1_Label”,
“funcPipelineConfig”: {
“FuncName”: “MergePrefix”,
“ParameterKeys”: “MERGE1_, Names_”,
“ResultKeys”: “MERGE1_Result”
}
},
// … 更多节点
]
代码生成工作流
支持多阶段代码处理
包含动态执行和编译功能
支持代码分段处理和合并
数据分析工作流
数据预处理
Web服务调用
数据库存储测试连接
- 通用节点框架 (UniversalNode)
2.1 核心数据结构
csharp
public class UniversalNode
{
public List Label { get; set; } // 节点标签
public string Symbol { get; set; } // 节点标识符
public List GetConfig { get; set; } // 输入配置
public ExecuteConfig ExeConfig { get; set; } // 执行配置
public AddNodeConfig AddConfig { get; set; } // 节点添加配置
public NextNodeConfig NextConfig { get; set; } // 后续节点配置
}
2.2 配置类说明
GetInputStrConfig - 输入字符串获取配置
csharp
public class GetInputStrConfig
{
public string Getstring { get; set; } // 获取字符串的路径
public List Label { get; set; } // 标签过滤
public string Symbol { get; set; } // 符号过滤
public List QuerySteps { get; set; } // 查询步骤
}
ExecuteConfig - 执行配置
csharp
public class ExecuteConfig
{
public List ParameterKeys { get; set; } // 参数键列表
public List ResultKeys { get; set; } // 结果键列表
public FuncPipelineParameters FuncPipeline { get; set; } // 函数管道
}
AddNodeConfig - 节点添加配置
csharp
public class AddNodeConfig
{
public string AddKey { get; set; } // 添加键
public List Label { get; set; } // 节点标签
public string Symbol { get; set; } // 节点符号
public bool AutoCreateRoot { get; set; } // 自动创建根节点
public bool AddAICallCellInvoke { get; set; } // 添加AI调用单元格
public List BackEdge { get; set; } // 后向边
public List ProcessedBackEdge { get; set; } // 处理后向边
// … 其他属性
}
NextNodeConfig - 后续节点配置
csharp
public class NextNodeConfig
{
public bool HandleNextNodeRun { get; set; } // 是否处理下一节点
public string NextNodeKey { get; set; } // 下一节点键
public string NodeKey { get; set; } // 当前节点键
public bool UpdateChain { get; set; } // 更新链标志
public List ConnectChainInfoConfig { get; set; } // 连接链配置
public List MapKeyConfig { get; set; } // 映射键配置
}
3. 节点处理系统 (NodeProcess)
3.1 核心处理流程
csharp
public static async Task ProcessInputAndAddAICallCell(UniversalNode uNode)
{
// 1. 节点获取 (GetNode_)
// 2. 函数运行 (ExcuteNode_)
// 3. 节点添加 (AddNode_)
// 4. 后续节点处理 (NextNode_)
}
3.1.1 节点获取 (GetNode_)
处理输入配置
收集输入数据
执行动态节点查询
3.1.2 函数运行 (ExcuteNode_)
初始化函数映射
设置函数参数
执行管道运行
处理输出结果
3.1.3 节点添加 (AddNode_)
处理输入键值
处理输出数据
初始化节点配置
创建处理节点
3.1.4 后续节点处理 (NextNode_)
更新工作流配置
检查节点连接
处理工作流执行
- 使用方法
4.1 定义工作流
csharp
// 使用预定义工作流
var workflowDict = WorkFlows.BaseVersion.BuildBaseVersionDictionary();
// 添加自定义工作流链
var customChain = new List<(string, JToken)>
{
(“自定义工作流”, JToken.Parse(WorkFlows.BaseVersion.代码生成))
};
var fullDict = WorkFlows.BaseVersion.BuildBaseVersionDictionary(
chainInfos: customChain,
allowOverwrite: true
);
4.2 创建和执行节点
csharp
var universalNode = new UniversalNode
{
Symbol = “DataProcessNode”,
Label = new List { “数据处理” },
ExeConfig = new ExecuteConfig
{
ParameterKeys = new List { “InputData”, “ProcessingRule” },
ResultKeys = new List { “ProcessedResult” },
FuncPipeline = new FuncPipelineParameters
{
Calls = new List
{
new FuncCall { FuncName = “DataProcessingFunc” }
}
}
},
AddConfig = new AddNodeConfig
{
AddKey = “ProcessResult”,
AutoCreateRoot = true,
AddAICallCellInvoke = true
}
};
// 执行节点
await NodeProcess.ProcessInputAndAddAICallCell(universalNode);
4.3 工作流连接
json
{
“Count”: 1,
“Workflows”: [
{“WorkflowKey”: “数据分析”},
{“WorkflowKey”: “数据库储存测试”}
],
“PathMappings”: {
“ChainedNodeConfig.BaseNode”: “BaseNode”,
“AI.WebCallKey”: “MerageNode_c0”
},
“KeyMapping”: {
“DataInitial_Result.1”: “Value1.1”
}
}
5. 扩展功能
5.1 动态节点查询
csharp
var queryResult = DynamicNodeQuery(
root: GetAICallFrame(),
symbol: “目标符号”,
querySteps: new List { “步骤1”, “步骤2” },
label: new List { “标签过滤” }
);
5.2 工作流更新
csharp
// 更新现有工作流链
var updatedNode = UpdateWorkflowIfNeeded(originalNode);
5.3 批量配置处理
支持通过批处理配置一次性设置多个参数映射。
- 调试和日志
系统内置调试功能:
csharp
// 创建调试模板
var processDebug = DynamicDebugHelper.CreateProcessTemplate(“节点处理流程”);
// 调试输出
DebugPrint(uNode.Symbol, false, “uNode.Symbol”);
DebugPrint(GetNode_debug, false, “GetNode_debug”);
7. 最佳实践
工作流设计
保持节点功能单一
合理使用标签和符号
设计清晰的输入输出接口
错误处理
验证输入参数非空
检查字典键是否存在
使用try-catch包装异步操作
性能优化
合理使用缓存
避免不必要的节点创建
优化查询步骤
可维护性
使用有意义的标签和符号
保持配置结构清晰
添加必要的注释和文档
- 注意事项
确保工作流JSON格式正确
注意节点间的依赖关系
合理设置超时和重试机制
监控内存使用,避免泄漏
public class WorkFlows
{
public static class BaseVersion
{
public static Dictionary<string, JToken> BuildBaseVersionDictionary(
List<(string, JToken)> chainInfos = null,
bool allowOverwrite = false)
{
return CreateWorkflowDictionary<JToken>(
additionalItems: chainInfos,
baseDictionary: baseVersion_JToken,
//baseDictionary: baseVersion,
overwriteDuplicates: allowOverwrite
);
}
private static Dictionary<string, T> BuildDictionary<T>(Func<Func<JToken>, T> converter)
{
return new Dictionary<string, T>
{
["数据分析"] = converter(() => 数据分析),
["数据库储存测试"] = converter(() => 数据库储存测试)
};
}
public static Dictionary<string, JToken> baseVersion_JToken =>
BuildDictionary<JToken>(method => method());
public static Dictionary<string, string> baseVersion_String =>
BuildDictionary<string>(method => method().ToString());
public static Dictionary<string, string> connectVersion = new()
{
{ "自定义连接工作流_", 自定义连接工作流_ }
};
public static Dictionary<string, string> connectRuleVersion = new()
{
{ "自定义连接工作流键位映射_", 自定义连接工作流键位映射_ }
};
public static string 图像生成 = @"[
{
""Symbol"": ""MerageNode"",
""Label"": ""\""TEST_1_Label\"""",
""funcPipelineConfig"": {
""FuncName"": ""MergePrefix"",
""ParameterKeys"": ""\""MERGE1_\"", \""Names_\"""",
""ResultKeys"": ""\""MERGE1_Result\""""
}
},
{
""Symbol"": ""ImageInitial"",
""Label"": ""\""TEST_2_Label\"""",
""funcPipelineConfig"": {
""FuncName"": ""WebInvoke"",
""ParameterKeys"": ""\""WebSet_\"", \""MERGE1_Result\"", \""ExtractMode_Code\"""",
""ResultKeys"": ""\""ImageInitial_Result\""""
}
},
{
""Symbol"": ""DonloadNode"",
""Label"": ""\""TEST_3_Label\"""",
""funcPipelineConfig"": {
""FuncName"": ""DonloadPhoto"",
""ParameterKeys"": ""\""ImageInitial_Result\"", \""Directory_\"", \""Names_\"""",
""ResultKeys"": ""\""TEST_1_Result\""""
}
}
]";
public static string 代码生成 = @"[
{
""Symbol"": ""MerageNode0"",
""Label"": ""\""TEST_4_Label\"""",
""funcPipelineConfig"": {
""FuncName"": ""MergePrefix"",
""ParameterKeys"": ""\""MERGE0_\"", \""Names_\"""",
""ResultKeys"": ""\""MERGE0_Result0_\""""
}
},
{
""Symbol"": ""CodeInitial0"",
""Label"": ""\""TEST_1_Label\"""",
""funcPipelineConfig"": {
""FuncName"": ""WebInvoke"",
""ParameterKeys"": ""\""WebSet_\"", \""MERGE0_Result0_\"", \""ExtractMode_Code\"""",
""ResultKeys"": ""\""CodeInitial_Result0\""""
}
},
{
""Symbol"": ""MerageNode2"",
""Label"": ""\""TEST_4_Label\"""",
""funcPipelineConfig"": {
""FuncName"": ""MergePrefix"",
""ParameterKeys"": ""\""MERGE2_\"", \""CodeInitial_Result0\"""",
""ResultKeys"": ""\""MERGE2_Result\""""
}
},
{
""Symbol"": ""CodeInitial3"",
""Label"": ""\""TEST_3_Label\"""",
""funcPipelineConfig"": {
""FuncName"": ""WebInvoke"",
""ParameterKeys"": ""\""WebSet_\"", \""MERGE2_Result\"", \""ExtractMode_Code\"""",
""ResultKeys"": ""\""CodeInitial3_Result\""""
}
},
{
""Symbol"": ""MerageNode"",
""Label"": ""\""TEST_2_Label\"""",
""funcPipelineConfig"": {
""FuncName"": ""MergePrefix"",
""ParameterKeys"": ""\""CodeProcess\"",\""CodeInitial_Result0\"", \""CodeInitial3_Result\"""",
""ResultKeys"": ""\""MERGE1_Result\""""
}
},
{
""Symbol"": ""CodeInitial2"",
""Label"": ""\""TEST_3_Label\"""",
""funcPipelineConfig"": {
""FuncName"": ""WebInvoke"",
""ParameterKeys"": ""\""WebSet_\"", \""MERGE1_Result\"", \""ExtractMode_Code\"""",
""ResultKeys"": ""\""CodeInitial2_Result\""""
}
},
{
""Symbol"": ""DynamicExcute"",
""Label"": ""\""TEST_5_Label\"""",
""funcPipelineConfig"": {
""FuncName"": ""DynamicExcute"",
""ParameterKeys"": ""\""CodeInitial2_Result\"""",
""ResultKeys"": ""\""DynamicExcute_Result\""""
}
},
{
""Symbol"": ""CmopileFunction"",
""Label"": ""\""TEST_6_Label\"""",
""funcPipelineConfig"": {
""FuncName"": ""CmopileFunction"",
""ParameterKeys"": ""\""DynamicExcute_Result\"""",
""ResultKeys"": ""\""CmopileFunction_Result\""""
}
},
//{
// ""Symbol"": ""MerageNode4"",
// ""Label"": ""\""TEST_4_Label\"""",
// ""funcPipelineConfig"": {
// ""FuncName"": ""MergePrefix"",
// ""ParameterKeys"": ""\""MERGE4_\"", \""MERGE3_Result\"""",
// ""ResultKeys"": ""\""MERGE4_Result\""""
// }
//},
//{
// ""Symbol"": ""CodeInitial4"",
// ""Label"": ""\""TEST_3_Label\"""",
// ""funcPipelineConfig"": {
// ""FuncName"": ""WebInvoke"",
// ""ParameterKeys"": ""\""WebSet_\"", \""MERGE4_Result\"", \""ExtractMode_Code\"""",
// ""ResultKeys"": ""\""CodeInitial4_Result\""""
// }
//},
//{
// ""Symbol"": ""PipelineRun"",
// ""Label"": ""\""TEST_3_Label\"""",
// ""funcPipelineConfig"": {
// ""FuncName"": ""PipelineRun_"",
// ""ParameterKeys"": ""\""CodeInitial3_Result\"""",
// ""ResultKeys"": ""\""PipelineRun_Result\""""
// }
//}
]";
public static string 数据分析 = @"
[
{
""Symbol"": ""MerageNode"",
""Label"": ""\""TEST_1_Label\"""",
""funcPipelineConfig"": {
""FuncName"": ""MergePrefix"",
""ParameterKeys"": ""\""DataProcess\"", \""Inputs_\"""",
""ResultKeys"": ""\""MERGE1_Result\""""
}
},
{
""Symbol"": ""DataInitial"",
""Label"": ""\""TEST_2_Label\"""",
""funcPipelineConfig"": {
""FuncName"": ""WebInvoke"",
""ParameterKeys"": ""\""WebSet_\"", \""MERGE1_Result\"", \""ExtractMode_Code\"""",
""ResultKeys"": ""\""DataInitial_Result\""""
},
""nextNodeConfig"":{
""MapKeyConfig"": ""\""自定义连接工作流键位映射_\"""",
""ConnectChainInfoConfig"": ""\""数据库储存测试\"""",
""UpdateChain"": ""true"",
}
},
]
";
public static string 数据库储存测试 = @"
[
{
""Symbol"": ""ToListDictionary"",
""Label"": ""\""TEST_1_Label\"""",
""funcPipelineConfig"": {
""FuncName"": ""ToListDictionary"",
""ParameterKeys"": ""\""Key1\"", \""Value1\"",\""Key2\"", \""Value2\"""",
""ResultKeys"": ""\""ToListDictionary_Result\""""
}
},
{
""Symbol"": ""DataValueUpdate"",
""Label"": ""\""TEST_1_Label\"""",
""funcPipelineConfig"": {
""FuncName"": ""DataValueUpdate"",
""ParameterKeys"": ""\""TableName\"", \""MainKeyName\"", \""ToListDictionary_Result\"""",
""ResultKeys"": ""\""DataValueUpdate_Result\""""
}
},
{
""Symbol"": ""DataValueRead"",
""Label"": ""\""TEST_1_Label\"""",
""funcPipelineConfig"": {
""FuncName"": ""DataValueRead"",
""ParameterKeys"": ""\""TableName\"", \""MainKeyName\"",\""Value1\"""",
""ResultKeys"": ""\""DataValueRead_Result\""""
}
},
]
";
public static string 编译测试 = @"[
{
""Symbol"": ""CodeInitial"",
""Label"": ""\""TEST_1_Label\"""",
""funcPipelineConfig"": {
""FuncName"": ""WebInvoke"",
""ParameterKeys"": ""\""WebSet_\"", \""Names_\"", \""ExtractMode_Code\"", \""TRUE\"""",
""ResultKeys"": ""\""CodeInitial_Result\""""
}
},
{
""Symbol"": ""DynamicExcute"",
""Label"": ""\""TEST_5_Label\"""",
""funcPipelineConfig"": {
""FuncName"": ""DynamicExcute"",
""ParameterKeys"": ""\""CodeInitial_Result\"""",
""ResultKeys"": ""\""DynamicExcute_Result\""""
}
},
{
""Symbol"": ""CompileCheck"",
""Label"": ""\""TEST_5_Label\"""",
""funcPipelineConfig"": {
""FuncName"": ""CompileCheck"",
""ParameterKeys"": ""\""DynamicExcute_Result\"""",
""ResultKeys"": ""\""CompileCheck_Result\""""
}
},
{
""Symbol"": ""CmopileFunction"",
""Label"": ""\""TEST_6_Label\"""",
""funcPipelineConfig"": {
""FuncName"": ""CmopileFunction"",
""ParameterKeys"": ""\""DynamicExcute_Result\"""",
""ResultKeys"": ""\""CmopileFunction_Result\""""
}
}
]";
}
public static class ConnectVersion
{
public static string 自定义连接工作流_ = @"{
""Count"": 1,
""Workflows"": [
{""WorkflowKey"": ""数据分析""},
{""WorkflowKey"": ""数据库储存测试""}
],
""PathMappings"": {
""ChainedNodeConfig.BaseNode"": ""BaseNode"",
""AI.WebCallKey"": ""MerageNode_c0""
},
""BatchConfig"": {
""defaultCount"": 1,
""pathConfigurations"": [
{
""targetPrefix"": ""Inputs_"",
""value"": ""AI.AIInput.RequestCode"",
""valueSource"": ""path"",
},
{""targetPrefix"": ""DataProcess"", ""value"": ""DataProcess"",},
{""targetPrefix"": ""WebSet_"", ""value"": ""DeepSeek_R1MConfig"",},
{""targetPrefix"": ""ExtractMode_Code"", ""value"": ""code"",},
{""targetPrefix"": ""code"", ""value"": ""ExtractMode_Code"",},
{""targetPrefix"": ""DeepSeek_R1MConfig"",""value"": ""WebSet_"",},
{""targetPrefix"": ""Key1"", ""value"": ""TESTKey"",},
{""targetPrefix"": ""Value1"", ""value"": ""TESTValue1_"",},
{""targetPrefix"": ""Key2"", ""value"": ""TESTKey2"",},
{""targetPrefix"": ""Value2"", ""value"": ""TESTValue2_"",},
{""targetPrefix"": ""TableName"", ""value"": ""TESTTableName1"",},
{""targetPrefix"": ""MainKeyName"", ""value"": ""TESTKey"",}
]
},
""KeyMapping"": {
""DataInitial_Result._1_"": ""Value1._1_""
}
}";
}
public static class ConnectRule
{
public static string 自定义连接工作流键位映射_ = @"
{
""DataInitial_Result._1_"": ""Value1._1_""
}
";
}
}
UniversalNode运行节点以及运行框架。
#region _通用
public class UniversalNode
{
public List<string> Label { get; set; }
public string Symbol { get; set; }
public List<GetInputStrConfig> GetConfig { get; set; } = null;
public ExecuteConfig ExeConfig { get; set; } = new();
public AddNodeConfig AddConfig { get; set; } = new();
public NextNodeConfig NextConfig { get; set; } = new();
}
public class GetInputStrConfig
{
public string Getstring { get; set; }
public List<string> Label { get; set; }
public string Symbol { get; set; }
public List<string> QuerySteps { get; set; }
}
public class ExecuteConfig
{
public List<string> ParameterKeys { get; set; }
public List<string> ResultKeys { get; set; }
public FuncPipelineParameters FuncPipeline { get; set; } = new();
}
public class AddNodeConfig
{
public string AddKey { get; set; } = null;
public List<string> Label { get; set; } = null;
public string Symbol { get; set; } = null;
public bool AutoCreateRoot { get; set; } = true;
public bool AddAICallCellInvoke { get; set; } = true;
public List<string> BackEdge { get; set; } = new();
public List<string> ProcessedBackEdge { get; set; } = new();
public IEnumerable<string> InputDatas { get; set; }
public IEnumerable<string> OutputResult { get; set; }
public List<string> SearchSet { get; set; }
public string AddParameters { get; set; }
public List<string> Record { get; set; } = new();
}
public class NextNodeConfig
{
public bool HandleNextNodeRun { get; set; } = false;
public string NextNodeKey { get; set; }
public string NodeKey { get; set; }
public bool UpdateChain { get; set; } = false;
public List<string> ConnectChainInfoConfig { get; set; }
public List<string> MapKeyConfig { get; set; }
}
public static class NodeProcess
{
/// <summary>
/// 处理输入数据并添加AI调用单元格节点到图中
/// 该函数负责执行完整的节点处理流程,包括:收集输入数据、执行处理函数、配置节点参数、添加节点到图结构
/// </summary>
/// <param name="uNode">通用节点对象,包含节点的配置信息、处理函数和连接参数</param>
/// <returns>处理结果的字符串表示,通常是输出值中的第一个有效结果</returns>
public static async Task ProcessInputAndAddAICallCell(UniversalNode uNode)
{
//DebugPrint("ProcessInputAndAddAICallCellRun");
//DebugPrint(GetAICallFrame(), true, "GetAICallFrame()");
if (uNode == null)
throw new ArgumentNullException(nameof(uNode));
var debug = DynamicDebugHelper.InvokeDebugHandler($"{uNode.Symbol}debug");
var GetNode_debug = DynamicDebugHelper.InvokeDebugHandler($"{uNode.Symbol}GetNode_debug");
var ExcuteNode_debug = DynamicDebugHelper.InvokeDebugHandler($"{uNode.Symbol}ExcuteNode_debug");
var AddNode_debug = DynamicDebugHelper.InvokeDebugHandler($"{uNode.Symbol}AddNode_debug");
var NextNode_debug = DynamicDebugHelper.InvokeDebugHandler($"{uNode.Symbol}AddNode_debug");
Dictionary<string, object> InputValues = new();
Dictionary<string, object> OutputValues = new();
// 创建流程调试模板
var processDebug = DynamicDebugHelper.CreateProcessTemplate("节点处理流程");
string result = null;
AICallFrame.CallNode nextNode = new();
try
{
debug = await FunctionExecutionFramework.CreateFlow("添加单元格节点流程", debug)
.Add("节点获取", async () =>
{
(GetNode_debug, InputValues) = await GetNode_(uNode, GetNode_debug, InputValues);
return null;
})
.Add("函数运行", async () =>
{
(ExcuteNode_debug, OutputValues) = await ExcuteNode_(uNode, ExcuteNode_debug, OutputValues);
return null;
})
.Add("节点添加", async () =>
{
(AddNode_debug, nextNode) = await AddNode_(uNode, AddNode_debug, InputValues, OutputValues);
return null;
})
.Add("节点添加", async () =>
{
NextNode_debug = await NextNode_(uNode, AddNode_debug);
return null;
})
.ExecuteAsync();
// 使用原有的结果提取逻辑
result = OutputValues.Values
.Select(v => v?.ToString())
.FirstOrDefault(str => !string.IsNullOrEmpty(str)) ?? string.Empty;
// 更新全局字典 - 使用原有的GetPathSetValue方法
if (!string.IsNullOrEmpty(result))
{
GetPathSetValue(dic, "AI.AIInput.RequestCode", result);
}
}
finally
{
nextNode.Record.Add(debug);
nextNode.Record.Add(GetNode_debug);
nextNode.Record.Add(ExcuteNode_debug);
nextNode.Record.Add(AddNode_debug);
nextNode.Record.Add(NextNode_debug);
//bool debugbool = true;
bool debugbool = nextNode.AreAllListCollectionsEmpty();
if (debugbool)
{
// 使用原有的DebugPrint方法
DebugPrint(uNode.Symbol, false, "uNode.Symbol");
DebugPrint(GetNode_debug, false, "GetNode_debug");
DebugPrint(ExcuteNode_debug, false, "ExcuteNode_debug");
DebugPrint(AddNode_debug, false, "AddNode_debug");
DebugPrint(NextNode_debug, false, "NextNode_debug");
DebugPrint(debug, false, "debug");
DynamicDebugHelper.PrintDebugInfo(debug, "ProcessInputAndAddAICallCell");
}
if (nextNode == null)
throw new InvalidOperationException("执行框架未添加成功");
// 使用原有的GetPathSetValue方法
GetPathSetValue(dic, "AI.AICallFrame", nextNode);
}
}
#region 通用处理包装器
/// <summary>
/// 通用的异步处理包装方法,处理返回(string, T)的方法
/// </summary>
private static async Task<(string DebugInfo, T Result)> ProcessNodeStepAsync<T>(
Func<Task<(string, T)>> operation, string operationName, string debug, T defaultValue = default)
{
try
{
var (debugInfo, result) = await operation();
return (debugInfo, result);
}
catch (Exception ex)
{
string errorDebug = DynamicDebugHelper.InvokeDebugHandler("LogException", debug, operationName, ex);
return (errorDebug, defaultValue);
}
}
/// <summary>
/// 通用的异步处理包装方法,处理返回单个值的方法
/// </summary>
private static async Task<string> ProcessNodeStepSingleAsync(
Func<Task<string>> operation, string operationName, string debug, string defaultValue = "")
{
try
{
return await operation();
}
catch (Exception ex)
{
return DynamicDebugHelper.InvokeDebugHandler("LogException", debug, operationName, ex);
}
}
#endregion
#region 节点处理方法
private static async Task<(string debug, Dictionary<string, object> InputValues)> GetNode_(
UniversalNode uNode, string debug, Dictionary<string, object> InputValues)
{
return await ProcessNodeStepAsync(
() => ProcessGetConfig(uNode, debug, InputValues),
"节点配置获取",
debug,
InputValues);
}
private static async Task<(string debug, Dictionary<string, object> OutputValues)> ExcuteNode_(
UniversalNode uNode, string debug, Dictionary<string, object> OutputValues)
{
return await ProcessNodeStepAsync(
() => ExecuteNodeCore(uNode, debug, OutputValues),
"节点函数执行",
debug,
OutputValues);
}
private static async Task<(string debug, AICallFrame.CallNode)> AddNode_(
UniversalNode uNode, string debug, Dictionary<string, object> InputValues, Dictionary<string, object> OutputValues)
{
return await ProcessNodeStepAsync(
() => AddNodeProcess.AddCellNode(uNode, debug, InputValues, OutputValues),
"节点添加",
debug,
new AICallFrame.CallNode());
}
private static async Task<string> NextNode_(UniversalNode uNode, string debug)
{
return await ProcessNodeStepSingleAsync(
() => NextNodeProcess.NextCellNode(uNode, debug),
"后续节点处理",
debug,
debug);
}
#endregion
#region 核心处理逻辑
private static async Task<(string debug, Dictionary<string, object> InputValues)> ProcessGetConfig(
UniversalNode uNode, string debug, Dictionary<string, object> InputValues)
{
if (uNode.GetConfig?.Count > 0)
{
for (int i = 0; i < uNode.GetConfig.Count; i++)
{
var config = uNode.GetConfig[i] ?? throw new ArgumentNullException(
$"uNode.GetConfig[{i}]", $"GetConfig 元素 {i} 为 null");
var result = await GetInputStrProcess.GetInputStr(InputValues, config, debug);
InputValues = result.InputValues ?? throw new ArgumentNullException("InputValues是空的");
debug = result.debug;
}
}
return (debug, InputValues);
}
private static async Task<(string debug, Dictionary<string, object> OutputValues)> ExecuteNodeCore(
UniversalNode uNode, string debug, Dictionary<string, object> OutputValues)
{
var (funcOutput, newDebug) = await ExecuteProcess.ExecuteProcessFuncAsync(uNode, debug);
return (newDebug, funcOutput);
}
#endregion
}
public static class AddNodeProcess
{
public static async Task<(string, AICallFrame.CallNode)> AddCellNode(
UniversalNode uNode,
string debug,
Dictionary<string, object> InputValues,
Dictionary<string, object> OutputValues)
{
AICallFrame.CallNode nextNode = new();
debug = await FunctionExecutionFramework.CreateFlow("Universal节点处理流程", debug)
.Add("处理输入数据", () => ProcessTargetKey(uNode, InputValues))
.Add("处理输出数据", () => ProcessOutputValues(uNode, OutputValues))
.Add("初始化节点配置", () => SetNodeConfigDefaults(uNode))
.Add("创建处理节点", () =>
{
var result = HandleAICall(uNode.AddConfig);
nextNode = result.Item2;
return result.Item1;
})
.ExecuteAsync();
return (debug, nextNode);
}
private static (object, AICallFrame.CallNode) HandleAICall(AddNodeConfig config)
{
if (!config.AddAICallCellInvoke)
return ("跳过处理节点创建", null);
// 验证输入配置
if (config.BackEdge?.Count == 0)
throw new InvalidOperationException("缺少输入数据连接");
// 构建新节点配置
var newNodeConfig = new AddNodeConfig
{
InputDatas = new List<string>() { config.BackEdge[0] },
OutputResult = config.ProcessedBackEdge,
Label = config.Label,
Symbol = config.Symbol,
SearchSet = config.SearchSet ?? FindNodeByInputsSet,
AddParameters = config.AddParameters
};
// 获取或创建执行框架根节点
var root = GetAICallFrame();
if (root == null && config.AutoCreateRoot)
{
root = new AICallFrame.CallNode(config);
GetPathSetValue(dic, "AI.AICallFrame", root);
}
// 添加新节点到框架
var node = AddNode(root, newNodeConfig);
return ("处理节点创建成功", node);
}
private static object ProcessTargetKey(UniversalNode uNode, Dictionary<string, object> inputValues)
{
var targetKey = uNode.AddConfig?.AddKey;
if (string.IsNullOrEmpty(targetKey))
return "未处理BackEdge";
if (!inputValues.TryGetValue(targetKey, out var targetValue))
throw new InvalidOperationException($"InputValues中不存在targetKey: {targetKey} InputValues.keys=[{string.Join("\n", inputValues.Keys)}]");
// 尝试添加到参数
if (AddToArgsStrict(uNode.AddConfig.BackEdge, targetValue, out string diastring))
{
return $"已添加的输入键值: {targetKey}";
}
else
{
// 构建详细的错误日志
var log = $@"输入键值添加失败: {diastring}
目标键: {targetKey}
目标值类型: {targetValue?.GetType().Name ?? "null"}
目标值内容: {PrintCollection(targetValue, "InputValues[targetKey]", true)}
所有输入值: {PrintCollection(inputValues, "InputValues", true)}";
throw new InvalidOperationException(log);
}
}
private static object ProcessOutputValues(UniversalNode uNode, Dictionary<string, object> OutputValues)
{
if (uNode.AddConfig == null) return "节点配置为空,跳过输出处理";
var processedValues = OutputValues.Values
.Select(value => value?.ToString())
.Where(valueStr => !string.IsNullOrEmpty(valueStr))
.ToList();
if (processedValues.Count == 0 || processedValues == null)
throw new InvalidOperationException("输出值为空");
foreach (var valueStr in processedValues)
{
if (string.IsNullOrEmpty(valueStr))
throw new InvalidOperationException($"OutputValues中valueStr为空");
uNode.AddConfig.ProcessedBackEdge.Add(valueStr);
}
return $"处理输出数据: {processedValues.Count} 项";
}
private static object SetNodeConfigDefaults(UniversalNode uNode)
{
if (uNode.AddConfig == null) return "节点配置为空,跳过初始化";
uNode.AddConfig.Label ??= uNode.Label;
uNode.AddConfig.Symbol ??= uNode.Symbol;
return "节点配置初始化完成";
}
}
public static class NextNodeProcess
{
public static async Task<string> NextCellNode(
UniversalNode uNode,
string debug)
{
// 创建本地字典(根据你的实际需求)
var dic = new Dictionary<string, object>();
debug = await FunctionExecutionFramework.CreateFlow("Universal节点处理流程", debug)
.Add("更新工作流配置", () =>
{
string result = null;
(uNode, result) = UpdateWorkflowIfNeeded(uNode);
return result;
})
.Add("检查节点连接配置", () => ValidateNextNodeConfig(uNode.NextConfig))
.Add("处理节点连接", async () => await ProcessWorkflowAndExecuteNode(uNode.NextConfig))
.ExecuteAsync();
return debug;
}
private static string ValidateNextNodeConfig(NextNodeConfig nextNodeConfig)
{
if (nextNodeConfig == null)
return "跳过下一节点处理(配置为空)";
if (nextNodeConfig.HandleNextNodeRun != true)
return "跳过下一节点处理(未启用执行)";
if (string.IsNullOrEmpty(nextNodeConfig.NextNodeKey))
throw new ArgumentException("NextNodeKey不能为空");
return "节点连接配置验证通过";
}
private static (UniversalNode, string) UpdateWorkflowIfNeeded(UniversalNode uNode)
{
var nextNodeConfig = uNode.NextConfig;
DebugPrint(nextNodeConfig, true, "nextNodeConfig");
// 前置条件检查
if (nextNodeConfig?.UpdateChain != true)
return (uNode, "跳过工作流更新");
if (nextNodeConfig.NodeKey == null)
return (uNode, "NodeKey为Null");
var config = GetConfig(nextNodeConfig.NodeKey);
var chainInfo = config.GetLatestChainInfo();
if (chainInfo == null)
throw new InvalidOperationException($"未找到节点{nextNodeConfig.NodeKey}对应的工作流链信息");
var chainId = ChainConnectWebCall.ChainInfoManager.GetChainId(chainInfo);
var dictionary = CreateWorkflowDictionary<string>(baseDictionary: connectRuleVersion);
var workflowDictionary = CreateWorkflowDictionary<string>(baseDictionary: baseVersion_String);
if (dictionary == null || workflowDictionary == null)
throw new InvalidOperationException("工作流字典创建失败");
if (!dictionary.TryGetValue(nextNodeConfig.MapKeyConfig[0], out var mapKeyConfigValue))
throw new KeyNotFoundException($"字典中找不到MapKey: {nextNodeConfig.MapKeyConfig[0]}");
if (!workflowDictionary.TryGetValue(nextNodeConfig.ConnectChainInfoConfig[0], out var chainConfigValue))
throw new KeyNotFoundException($"工作流字典中找不到ChainKey: {nextNodeConfig.ConnectChainInfoConfig[0]}");
Dictionary<string, string> keyMapping = Deserialize<Dictionary<string, string>>(mapKeyConfigValue);
var newChainInfo = CreateChainInfo(chainConfigValue, 1, chainConfigValue);
DebugPrint(chainConfigValue, false, "ChainConfig");
DebugPrint(keyMapping, false, "KeyMapping");
List<ChainInfo> workflows = new()
{
chainInfo,
newChainInfo
};
var ConnectChainInfo = ConnectAndRunWorkflows(workflows, keyMapping);
SetWorkflow(chainId, ConnectChainInfo);
uNode = config.Refresh(uNode, nextNodeConfig.NodeKey);
DebugPrint(uNode.NextConfig, true, "Resfreshed");
DebugPrint(ConnectChainInfo, true, "ConnectChainInfo");
return (uNode, $"工作流更新成功: {nextNodeConfig.NextNodeKey}");
}
private static async Task<string> ProcessWorkflowAndExecuteNode(
NextNodeConfig nextNodeConfig)
{
var config = ChainConnectWebCall.WebCallManager.GetConfig(nextNodeConfig.NextNodeKey);
// 检查工作流是否存在
var chainInfo = config.GetLatestChainInfo();
if (chainInfo == null)
throw new InvalidOperationException($"未找到节点{nextNodeConfig.NextNodeKey}对应的工作流");
if (chainInfo == null)
throw new ArgumentNullException(nameof(chainInfo));
if (config == null)
throw new ArgumentNullException(nameof(config));
try
{
await ChainConnectWebCall.WorkflowProcessor.ExecuteWebCallAndUpdate(config);
}
catch (Exception ex)
{
throw new InvalidOperationException($"处理节点{config.WebCallKey}失败: {ex.Message}", ex);
}
return $"处理下一节点成功: {nextNodeConfig.NextNodeKey}";
}
}
public static class GetInputStrProcess
{
public static async Task<(Dictionary<string, object> InputValues, string debug)> GetInputStr(
Dictionary<string, object> InputValues,
GetInputStrConfig config,
string debug)
{
if (config == null)
return (InputValues, debug);
var resultDebug = await FunctionExecutionFramework.CreateFlow("获取输入字符串", debug)
.AddWhen(!string.IsNullOrEmpty(config.Symbol) && config.QuerySteps?.Count >= 1,
"动态节点查询",
() => ExecuteDynamicNodeQuery(config, InputValues))
.AddWhen(!string.IsNullOrEmpty(config.Getstring),
"输入集合添加",
() => ExecutePathGetString(InputValues, config))
.ExecuteAsync();
return (InputValues, resultDebug);
}
/// <summary>
/// 执行动态节点查询操作
/// </summary>
private static object ExecuteDynamicNodeQuery(GetInputStrConfig config, Dictionary<string, object> inputValuesRef)
{
List<string> searched = new();
var root = GetAICallFrame();
if (root == null)
throw new InvalidOperationException("执行框架未初始化");
var queryResult = DynamicNodeQuery<string>(root, symbol: config.Symbol, querySteps: config.QuerySteps, label: config.Label);
bool success = AddToArgsStrict(searched, queryResult, out string diastring);
// 检查查询结果是否为空
if (!success || searched.Count == 0)
{
var errorMessage = BuildQueryErrorMessage(
root, success, searched.Count, diastring, config, queryResult
);
throw new InvalidOperationException(errorMessage);
}
// 更新输入值引用
if (!inputValuesRef.ContainsKey(config.Symbol))
{
inputValuesRef.Add(config.Symbol, searched);
}
//DebugPrint(inputValuesRef, false, "inputValuesRef");
return new
{
查询符号 = config.Symbol,
查询步骤 = config.QuerySteps,
结果 = searched,
结果数量 = searched.Count
};
}
/// <summary>
/// 构建动态节点查询的错误信息
/// </summary>
private static string BuildQueryErrorMessage(AICallFrame.CallNode root, bool success, int resultCount, string diagnostic,
GetInputStrConfig config, object queryResult)
{
var errorType = !success ? "查询执行失败" :
resultCount == 0 ? "查询结果为空" : "未知错误";
return $@"
动态节点查询失败:
错误类型: {errorType}
查询符号: {config.Symbol}
查询步骤: {PrintCollection(config.QuerySteps, "config.QuerySteps")}
查询标签: {PrintCollection(config.Label, "config.Label")}
诊断信息: {diagnostic ?? "无"}
结果数量: {resultCount}
节点信息: {PrintCollection(root)}
";
}
/// <summary>
/// 执行路径获取字符串操作
/// </summary>
private static object ExecutePathGetString(Dictionary<string, object> inputValuesRef, GetInputStrConfig config)
{
var pathValue = GetPathGetEntireString(dic, config.Getstring);
if (!inputValuesRef.ContainsKey(config.Getstring))
{
if (AddToArgs(inputValuesRef, config.Getstring, pathValue))
{
//DebugPrint(pathValue,false, "pathValue");
//DebugPrint(config.Getstring, false, "config.Getstring");
return new { 添加输入 = config.Getstring };
}
else
{
var log = $"输入集合添加失败:value:{config.Getstring}key:{pathValue}";
DebugPrint(log);
throw new InvalidOperationException(log);
}
}
return null;
}
}
public static class ExecuteProcess
{
public static async Task<(Dictionary<string, object> OutputValues, string debug)> ExecuteProcessFuncAsync(
UniversalNode uNode,
string debug)
{
Dictionary<string, object> OutputValues = new Dictionary<string, object>();
object output = null;
var resultDebug = await FunctionExecutionFramework.CreateFlow("执行处理函数", debug)
.Add("初始化函数映射",
() =>
{
uNode.ExeConfig.FuncPipeline.FunctionMap = getfunc();
return new { 状态 = "函数映射已初始化" };
})
.AddWhen(uNode.ExeConfig.FuncPipeline.Calls != null, "函数参数赋值", () => { return FuncParameterSet(uNode); })
.Add("执行管道运行", async () =>
{
output = await FuncExecute(uNode, output);
return "运行完成";
})
.Add("处理输出结果",
() =>
{
OutputValues = ProcessOutputToDictionary(output);
if (OutputValues == null)
throw new InvalidOperationException($"OutputValues结果为空");
return new { 输出条目数 = OutputValues.Count, 输出结果 = PrintCollection(OutputValues.Values) };
})
.ExecuteAsync();
return (OutputValues, resultDebug);
}
static object FuncParameterSet(UniversalNode uNode)
{
// 参数验证
if (uNode?.ExeConfig.ParameterKeys == null || uNode.ExeConfig.ParameterKeys.Count == 0)
throw new InvalidOperationException("ParameterKeys 为空或未初始化");
if (dic == null)
throw new InvalidOperationException("数据字典 dic 为空");
// 初始化参数列表
var parameterlist = new List<object>();
// 构建参数列表
foreach (var parameterKey in uNode.ExeConfig.ParameterKeys)
{
var parameterValue = GetPathGetValue<object>(dic, parameterKey);
if (parameterValue == null)
throw new InvalidOperationException($"字典中无法获取参数值: {parameterKey}");
parameterlist.Add(parameterValue);
}
// 验证函数管道
if (uNode.ExeConfig.FuncPipeline?.Calls?.Count == 0)
throw new InvalidOperationException("FuncPipeline.Calls 为空");
var firstCall = uNode.ExeConfig.FuncPipeline.Calls[0];
if (firstCall == null)
throw new InvalidOperationException("FuncPipeline.Calls 第一个元素为null");
// 设置参数
firstCall.Parameters = parameterlist;
return string.Join(",", uNode.ExeConfig.ParameterKeys);
}
static async Task<object> FuncExecute(UniversalNode uNode, object output)
{
output = await PipelineRunAsync(uNode.ExeConfig.FuncPipeline);
List<object> outputlist = (List<object>)output;
if (uNode.ExeConfig.ResultKeys.Count == 0 || uNode.ExeConfig.ResultKeys == null)
throw new InvalidOperationException($"uNode.ResultKeys为空");
for (int i = 0; i < uNode.ExeConfig.ResultKeys.Count; i++)
{
if (string.IsNullOrEmpty(uNode.ExeConfig.ResultKeys[i]))
throw new InvalidOperationException($"uNode.ResultKeys[i]为空");
else
GetPathSetValue(dic, uNode.ExeConfig.ResultKeys[i], outputlist[i]);
}
if (outputlist == null)
throw new InvalidOperationException($"PipelineRun运行结果为空");
//GetPathSetValue<object>(dic);
return output;
}
/// <summary>
/// 处理输出并转换为字典格式
/// </summary>
private static Dictionary<string, object> ProcessOutputToDictionary(object output)
{
var outputValues = new Dictionary<string, object>();
if (output is IEnumerable enumerable && output is not string)
{
int index = 1;
foreach (var item in enumerable)
{
outputValues.Add(index.ToString(), item?.ToString());
index++;
}
}
else
{
outputValues.Add("1", output?.ToString());
}
return outputValues;
}
}
#endregion
鲲鹏昇腾开发者社区是面向全社会开放的“联接全球计算开发者,聚合华为+生态”的社区,内容涵盖鲲鹏、昇腾资源,帮助开发者快速获取所需的知识、经验、软件、工具、算力,支撑开发者易学、好用、成功,成为核心开发者。
更多推荐



所有评论(0)