GraphMindStudio节点管理算法
// 单个工作流的基本配置,包含工作流名称、节点JSON和链计数。/// 描述节点在链中的位置信息,包括链索引、节点索引、键后缀等。/// 提供工作流的创建、配置、连接、执行和分析的完整解决方案。/// 描述两个工作流之间的连接关系,包含源/目标节点和键映射。/// 用于创建链式节点的配置,包含基础配置、配置动作和链计数。/// 完整的工作流链信息,包含起始节点、所有节点、键关系图等。/// 包含
///
/// 模块化工作流处理方法类
/// 提供工作流的创建、配置、连接、执行和分析的完整解决方案
/// 主要功能包括:
/// 1. 工作流配置管理(多种配置格式支持)
/// 2. 工作流连接与数据流管理
/// 3. 键映射和路径映射配置
/// 4. 批量处理和链式执行
/// 5. 数据流分析和关系图构建
///
#region 配置参数类
///
/// 模块参数配置类
/// 包含工作流处理所需的所有配置数据结构
/// 支持JSON配置、对象配置和字符串配置等多种配置格式
///
///
/// 工作流参数配置类
/// 主要工作流配置容器,支持从单个JSON配置构建完整工作流
/// 包含工作流列表、路径映射、批量配置、键映射等配置项
///
///
/// 工作流项配置类
/// 单个工作流的基本配置,包含工作流名称、节点JSON和链计数
///
///
/// 通用工作流配置类
/// 结构化的配置类,使用字典和对象存储配置,便于程序处理
///
///
/// 字符串工作流配置类
/// 所有配置项都以JSON字符串形式存储,便于序列化和存储
///
///
/// JSON工作流类
/// 包装JToken节点和链计数,便于JSON处理
///
///
/// 路径配置类
/// 定义目标路径前缀、值来源、后缀格式等批量处理配置
///
///
/// 批量配置类
/// 包含默认计数和路径配置列表,用于批量数据处理
///
#endregion
#region 工作流连接模块
///
/// 工作流连接配置类
/// 包含要连接的工作流列表和键映射配置
///
///
/// 键映射配置类
/// 存储自定义的键映射关系
///
///
/// 工作流连接结果类
/// 包含连接后的工作流、连接列表、键映射报告和状态信息
///
///
/// 工作流连接类
/// 描述两个工作流之间的连接关系,包含源/目标节点和键映射
///
///
/// 键映射类
/// 单个键映射关系,包含源键和目标键
///
///
/// 键映射报告类
/// 包含映射统计、所有映射、未映射键和摘要信息
///
///
/// 键类型枚举
/// 区分参数键和结果键
///
///
/// 键关系类
/// 描述键的源节点、目标节点和类型,标识是否为外部输入
///
///
/// 键关系图类
/// 存储完整的键关系图,包含节点输入输出键的映射关系
/// 提供按节点查询键关系的方法
///
///
/// 节点链设置类
/// 描述节点在链中的位置信息,包括链索引、节点索引、键后缀等
///
///
/// 链式节点配置类
/// 用于创建链式节点的配置,包含基础配置、配置动作和链计数
///
///
/// 数据流路径类
/// 描述数据在节点间的流动路径,包含节点列表、深度和完成状态
///
///
/// 数据流分析结果类
/// 包含起始节点、键关系列表、数据流路径和分析报告
///
///
/// 链信息类
/// 完整的工作流链信息,包含起始节点、所有节点、键关系图等
/// 提供节点查询、前后继节点查找、数据流分析等功能
///
#endregion
#region 核心工具模块
///
/// 简化节点处理类
/// 提供节点创建、链式节点生成、键关系计算等核心功能
/// 主要功能:
/// 1. 解析JSON中的键列表
/// 2. 创建链式节点配置
/// 3. 计算键关系图
/// 4. 应用链配置到节点
///
///
/// 工作流连接器类
/// 提供工作流连接、键映射、节点更新等连接功能
/// 包含三个子类:键映射服务、工作流连接工具、工作流连接协调器
///
///
/// 键映射服务类
/// 根据配置映射源键和目标键
///
///
/// 工作流连接工具类
/// 实现工作流连接的核心逻辑,包括验证、缓存构建、连接建立等
///
///
/// 工作流连接协调器类
/// 高层协调器,从配置创建工作流并执行连接
///
///
/// 数据流键提取器类
/// 从数据流分析结果中提取输入输出键
/// 提供按节点分组和整体提取功能
///
///
/// 动态批量配置器类
/// 根据批量配置动态处理字典数据
/// 支持固定值、路径值、路径键等多种值来源
///
///
/// 字符串工作流连接器类
/// 支持多种配置格式的工作流配置和执行
/// 自动转换配置格式并执行工作流连接
///
#endregion
public class ChainConnectWebCall
{
// 常量定义
private const string DEFAULT_CHAIN_ID = “default_chain”;
private static readonly TimeSpan EXECUTION_STATUS_CLEANUP_INTERVAL = TimeSpan.FromMinutes(5);
private static readonly TimeSpan OBSERVER_CLEANUP_INTERVAL = TimeSpan.FromMinutes(10);
private static readonly TimeSpan CHAIN_INFO_CACHE_TIMEOUT = TimeSpan.FromSeconds(30);
// 基础数据模型
public class NodeExecutionStatus
{
public bool Executed { get; set; }
public bool Succeeded => Executed && Error == null;
public DateTime ExecutionTime { get; set; }
public object Result { get; set; }
public Exception Error { get; set; }
public TimeSpan Age => DateTime.UtcNow - ExecutionTime;
}
public class ExecutionResult
{
public bool Success { get; set; }
public string ConfigKey { get; set; }
public string ChainId { get; set; }
public NodeExecutionStatus ExecutionStatus { get; set; }
public DateTime Timestamp { get; set; }
public string Error { get; set; }
public long? ExecutionDurationMs { get; set; }
}
public class BatchExecutionResult
{
public List<ExecutionResult> AllResults { get; set; } = new();
public List<string> SuccessfulConfigs { get; set; } = new();
public List<string> FailedConfigs { get; set; } = new();
public int TotalCount => AllResults.Count;
public int SuccessCount => SuccessfulConfigs.Count;
public int FailureCount => FailedConfigs.Count;
public double SuccessRate => TotalCount > 0 ? (double)SuccessCount / TotalCount : 0;
public bool AllSucceeded => FailureCount == 0;
}
public class WorkflowExecutionException : Exception
{
public string ConfigKey { get; }
public string ChainId { get; }
public bool ShouldHaveUpdatedNode { get; }
public WorkflowExecutionException(string message, Exception innerException = null)
: base(message, innerException) { }
public WorkflowExecutionException(string configKey, string chainId, string message, Exception innerException = null)
: base(message, innerException)
{
ConfigKey = configKey;
ChainId = chainId;
}
}
// ==================== 通用清理管理器 ====================
private static class CleanupManager
{
private static readonly ConcurrentDictionary<string, (DateTime LastAccess, object Data)> _timedData = new();
private static readonly ConcurrentDictionary<string, DateTime> _lastCleanupTimes = new();
public static void RegisterTimedData<T>(string category, string key, T data)
{
_timedData[$"{category}:{key}"] = (DateTime.UtcNow, data);
}
public static T GetTimedData<T>(string category, string key)
{
if (_timedData.TryGetValue($"{category}:{key}", out var entry) &&
entry.Data is T typedData)
{
return typedData;
}
return default;
}
public static void PerformCleanup(string category, TimeSpan cleanupInterval)
{
var lastCleanupKey = $"LastCleanup:{category}";
var lastCleanup = _lastCleanupTimes.GetOrAdd(lastCleanupKey, DateTime.MinValue);
if ((DateTime.UtcNow - lastCleanup) <= cleanupInterval)
return;
var keysToRemove = _timedData
.Where(kvp => kvp.Key.StartsWith($"{category}:") &&
(DateTime.UtcNow - kvp.Value.LastAccess) > cleanupInterval)
.Select(kvp => kvp.Key)
.ToList();
foreach (var key in keysToRemove)
{
_timedData.TryRemove(key, out _);
}
_lastCleanupTimes[lastCleanupKey] = DateTime.UtcNow;
}
public static void UpdateAccessTime(string category, string key)
{
if (_timedData.TryGetValue($"{category}:{key}", out var entry))
{
_timedData[$"{category}:{key}"] = (DateTime.UtcNow, entry.Data);
}
}
}
// ==================== 节点执行状态管理器 ====================
public static class NodeExecutionStatusManager
{
private static readonly ConcurrentDictionary<string, NodeExecutionStatus> _executionStatus = new();
private static readonly object _cleanupLock = new();
public static void SetStatus(string nodeCallKey, bool executed, object result = null, Exception error = null)
{
if (string.IsNullOrEmpty(nodeCallKey)) return;
_executionStatus[nodeCallKey] = new NodeExecutionStatus
{
Executed = executed,
ExecutionTime = DateTime.UtcNow,
Result = result,
Error = error
};
CleanupManager.RegisterTimedData("ExecutionStatus", nodeCallKey, _executionStatus[nodeCallKey]);
PerformCleanup();
}
public static void MarkAsExecuted(string nodeCallKey, object result = null)
=> SetStatus(nodeCallKey, true, result);
public static void MarkAsFailed(string nodeCallKey, Exception error)
=> SetStatus(nodeCallKey, true, error: error);
public static NodeExecutionStatus GetStatus(string nodeCallKey)
{
if (string.IsNullOrEmpty(nodeCallKey)) return null;
PerformCleanup();
return _executionStatus.TryGetValue(nodeCallKey, out var status) ?
status :
CleanupManager.GetTimedData<NodeExecutionStatus>("ExecutionStatus", nodeCallKey);
}
public static bool IsExecuted(string nodeCallKey)
=> GetStatus(nodeCallKey)?.Executed ?? false;
public static bool IsSucceeded(string nodeCallKey)
=> GetStatus(nodeCallKey)?.Succeeded ?? false;
public static void ClearStatus(string nodeCallKey)
{
if (!string.IsNullOrEmpty(nodeCallKey))
{
_executionStatus.TryRemove(nodeCallKey, out _);
}
}
public static void ClearAllStatus()
{
_executionStatus.Clear();
}
private static void PerformCleanup()
{
CleanupManager.PerformCleanup("ExecutionStatus", EXECUTION_STATUS_CLEANUP_INTERVAL);
}
}
// ==================== 统一观察者系统 ====================
public static class UnifiedObserver<T>
{
private static readonly ConcurrentDictionary<string, HashSet<Action<T>>> _callbacks = new();
private static readonly ConcurrentDictionary<string, DateTime> _lastAccessTime = new();
public static void Register(string key, Action<T> callback)
{
if (key == null || callback == null) return;
var callbacks = _callbacks.GetOrAdd(key, k => new HashSet<Action<T>>());
lock (callbacks)
{
callbacks.Add(callback);
}
UpdateAccessTime(key);
}
public static void Unregister(string key, Action<T> callback)
{
if (key == null || callback == null) return;
if (_callbacks.TryGetValue(key, out var callbacks))
{
lock (callbacks)
{
callbacks.Remove(callback);
if (callbacks.Count == 0)
{
_callbacks.TryRemove(key, out _);
_lastAccessTime.TryRemove(key, out _);
}
}
}
}
public static void Notify(string key, T parameter)
{
if (!_callbacks.TryGetValue(key, out var callbacks) || callbacks == null)
return;
List<Action<T>> callbacksToExecute;
lock (callbacks)
{
callbacksToExecute = callbacks.ToList();
}
foreach (var callback in callbacksToExecute)
{
try
{
callback?.Invoke(parameter);
}
catch (Exception ex)
{
DebugPrint($"Callback invocation failed for key '{key}': {ex.Message}");
}
}
UpdateAccessTime(key);
}
public static void Clear(string key)
{
if (!string.IsNullOrEmpty(key))
{
_callbacks.TryRemove(key, out _);
_lastAccessTime.TryRemove(key, out _);
}
}
public static void ClearAll()
{
_callbacks.Clear();
_lastAccessTime.Clear();
}
private static void UpdateAccessTime(string key)
{
_lastAccessTime[key] = DateTime.UtcNow;
CleanupManager.PerformCleanup("Observer", OBSERVER_CLEANUP_INTERVAL);
}
}
// ==================== 链信息管理器 ====================
public static class ChainInfoManager
{
private static readonly ConcurrentDictionary<string, HashSet<string>> _chainNodeGroups = new();
private static readonly ConcurrentDictionary<string, string> _configToChainMap = new();
private static readonly ConcurrentDictionary<string, DateTime> _chainUpdateTimes = new();
public static string GenerateChainId(ChainInfo chainInfo)
{
if (chainInfo == null) return DEFAULT_CHAIN_ID;
var startKey = chainInfo.GetStartNodeCallkey() ?? "unknown_start";
var endKey = chainInfo.GetEndNodeCallkey() ?? "unknown_end";
var hash = BitConverter.ToString(
System.Security.Cryptography.MD5.Create()
.ComputeHash(System.Text.Encoding.UTF8.GetBytes($"{startKey}_{endKey}")))
.Replace("-", "");
return $"{hash}_{DateTime.UtcNow:yyyyMMddHHmmss}";
}
public static string GetChainId(ChainInfo chainInfo)
{
if (chainInfo == null) return DEFAULT_CHAIN_ID;
return GenerateChainId(chainInfo);
}
public static void RegisterChainMapping(ChainInfo chainInfo)
{
if (chainInfo == null) return;
var chainId = GetChainId(chainInfo);
var configKeys = ExtractConfigKeys(chainInfo);
if (configKeys.Count == 0) return;
UpdateChainMapping(chainId, configKeys);
_chainUpdateTimes[chainId] = DateTime.UtcNow;
}
private static List<string> ExtractConfigKeys(ChainInfo chainInfo)
{
return chainInfo.GetAllNodeCallkeys()?
.Where(k => !string.IsNullOrEmpty(k))
.Distinct()
.ToList() ?? new List<string>();
}
private static void UpdateChainMapping(string chainId, List<string> configKeys)
{
var nodeSet = new HashSet<string>(configKeys);
if (_chainNodeGroups.TryRemove(chainId, out var oldNodes) && oldNodes != null)
{
foreach (var oldNode in oldNodes.Where(oldNode => !nodeSet.Contains(oldNode)))
{
_configToChainMap.TryRemove(oldNode, out _);
}
}
_chainNodeGroups[chainId] = nodeSet;
foreach (var configKey in configKeys)
{
_configToChainMap[configKey] = chainId;
}
}
public static string GetChainIdForConfig(string configKey)
{
return !string.IsNullOrEmpty(configKey) &&
_configToChainMap.TryGetValue(configKey, out var chainId) ? chainId : null;
}
public static void RemoveChainMapping(string chainId)
{
if (string.IsNullOrEmpty(chainId)) return;
if (_chainNodeGroups.TryRemove(chainId, out var nodes) && nodes != null)
{
foreach (var node in nodes)
{
_configToChainMap.TryRemove(node, out _);
}
}
_chainUpdateTimes.TryRemove(chainId, out _);
}
public static List<string> GetConfigKeysInChain(string chainId)
{
return !string.IsNullOrEmpty(chainId) &&
_chainNodeGroups.TryGetValue(chainId, out var keys) ?
keys.ToList() : new List<string>();
}
public static bool IsConfigInChain(string chainId, string configKey)
{
return !string.IsNullOrEmpty(chainId) &&
!string.IsNullOrEmpty(configKey) &&
_chainNodeGroups.TryGetValue(chainId, out var keys) &&
keys.Contains(configKey);
}
public static bool IsConfigInAnyChain(string configKey)
{
return !string.IsNullOrEmpty(configKey) && _configToChainMap.ContainsKey(configKey);
}
public static DateTime GetChainUpdateTime(string chainId)
{
return _chainUpdateTimes.TryGetValue(chainId, out var time) ? time : DateTime.MinValue;
}
}
// ==================== WebCall配置类 ====================
public class WebCallOrReturnConfig : IDisposable
{
public string WebCallKey { get; }
public Func<Task> NodeFunc { get; private set; }
public string DirectString { get; private set; }
public string SwitchReturn { get; private set; }
public bool Show { get; private set; } = true;
public string ChainId { get; private set; }
public DateTime LastRefreshTime { get; private set; }
private readonly object _syncLock = new();
private ChainInfo _cachedChainInfo;
private DateTime _chainInfoCacheTime = DateTime.MinValue;
private bool _isDisposed;
private readonly Action<ChainInfo> _chainUpdateCallback;
public WebCallOrReturnConfig(string webCallKey, string chainId = null)
{
WebCallKey = webCallKey ?? throw new ArgumentNullException(nameof(webCallKey));
ChainId = chainId ?? ChainInfoManager.GetChainIdForConfig(webCallKey);
_chainUpdateCallback = OnChainInfoUpdated;
RegisterChainUpdateListener();
Refresh();
}
private void RegisterChainUpdateListener()
{
if (!string.IsNullOrEmpty(ChainId))
{
UnifiedObserver<ChainInfo>.Register(ChainId, _chainUpdateCallback);
}
}
private void UnregisterChainUpdateListener()
{
if (!string.IsNullOrEmpty(ChainId))
{
UnifiedObserver<ChainInfo>.Unregister(ChainId, _chainUpdateCallback);
}
}
private void OnChainInfoUpdated(ChainInfo chainInfo)
{
if (_isDisposed || chainInfo == null) return;
lock (_syncLock)
{
var updatedChainId = ChainInfoManager.GetChainId(chainInfo);
if (updatedChainId == ChainId)
{
if (ChainInfoManager.IsConfigInChain(ChainId, WebCallKey))
{
UpdateFromChainInfo(chainInfo);
}
else
{
TryFindAndUpdateCorrectChain();
}
}
else if (!ChainInfoManager.IsConfigInAnyChain(WebCallKey) &&
ChainInfoManager.IsConfigInChain(updatedChainId, WebCallKey))
{
UpdateChainIdInternal(updatedChainId, chainInfo);
}
}
}
private void TryFindAndUpdateCorrectChain()
{
var correctChainId = ChainInfoManager.GetChainIdForConfig(WebCallKey) ??
WebCallManager.FindChainIdForConfig(WebCallKey);
if (!string.IsNullOrEmpty(correctChainId) && correctChainId != ChainId)
{
UpdateChainIdInternal(correctChainId);
}
else if (string.IsNullOrEmpty(correctChainId))
{
_cachedChainInfo = null;
_chainInfoCacheTime = DateTime.MinValue;
}
}
private void UpdateFromChainInfo(ChainInfo chainInfo)
{
if (chainInfo?.AllNodes == null) return;
var nodeConfig = FindNodeConfig(chainInfo.AllNodes, WebCallKey);
if (nodeConfig == null) return;
UpdateConfigProperties(nodeConfig);
UpdateNodeFunction(nodeConfig);
_cachedChainInfo = chainInfo;
_chainInfoCacheTime = DateTime.UtcNow;
LastRefreshTime = DateTime.UtcNow;
CleanupManager.RegisterTimedData("ChainInfo", ChainId, chainInfo);
}
private NodeConfig FindNodeConfig(IEnumerable<object> nodes, string callKey)
{
return nodes.OfType<NodeConfig>()
.FirstOrDefault(node => node.basicConfig?.Callkey == callKey);
}
private void UpdateConfigProperties(NodeConfig nodeConfig)
{
if (nodeConfig.basicConfig == null) return;
DirectString = nodeConfig.basicConfig.DirectString;
SwitchReturn = nodeConfig.basicConfig.SwitchReturn;
Show = StringToBool(nodeConfig.basicConfig.Show);
}
private void UpdateNodeFunction(NodeConfig nodeConfig)
{
try
{
var jsonConfig = NodeConfig.CreateNode(nodeConfig);
if (!string.IsNullOrEmpty(jsonConfig))
{
var node = JsonConvert.DeserializeObject<UniversalNode>(jsonConfig);
NodeFunc = node != null
? (async () => await NodeProcess.ProcessInputAndAddAICallCell(node))
: null;
}
else
{
NodeFunc = null;
}
}
catch (Exception ex)
{
DebugPrint($"Failed to create node function for {WebCallKey}: {ex.Message}");
NodeFunc = null;
}
}
public ChainInfo GetLatestChainInfo()
{
if (_isDisposed) return null;
lock (_syncLock)
{
if (_cachedChainInfo != null &&
(DateTime.UtcNow - _chainInfoCacheTime) < CHAIN_INFO_CACHE_TIMEOUT)
{
return _cachedChainInfo;
}
return FindAndCacheChainInfo();
}
}
private ChainInfo FindAndCacheChainInfo()
{
ChainInfo chainInfo = null;
if (!string.IsNullOrEmpty(ChainId))
{
chainInfo = WebCallManager.GetWorkflow(ChainId) ??
CleanupManager.GetTimedData<ChainInfo>("ChainInfo", ChainId);
}
if (chainInfo == null)
{
chainInfo = WebCallManager.FindChainInfoForConfig(WebCallKey);
}
_cachedChainInfo = chainInfo;
_chainInfoCacheTime = DateTime.UtcNow;
return chainInfo;
}
public void Refresh()
{
if (_isDisposed) return;
lock (_syncLock)
{
var chainInfo = GetLatestChainInfo();
if (chainInfo != null)
{
UpdateFromChainInfo(chainInfo);
}
else
{
_cachedChainInfo = null;
_chainInfoCacheTime = DateTime.MinValue;
}
}
}
public void UpdateChainId(string newChainId)
=> UpdateChainIdInternal(newChainId);
private void UpdateChainIdInternal(string newChainId, ChainInfo chainInfo = null)
{
if (_isDisposed || string.IsNullOrEmpty(newChainId) || newChainId == ChainId)
return;
lock (_syncLock)
{
UnregisterChainUpdateListener();
ChainId = newChainId;
RegisterChainUpdateListener();
var targetChainInfo = chainInfo ?? WebCallManager.GetWorkflow(ChainId);
if (targetChainInfo != null)
{
UpdateFromChainInfo(targetChainInfo);
}
_cachedChainInfo = null;
}
}
public bool IsValid() => !string.IsNullOrEmpty(WebCallKey) && !_isDisposed;
public void Dispose()
{
if (_isDisposed) return;
lock (_syncLock)
{
UnregisterChainUpdateListener();
NodeFunc = null;
_cachedChainInfo = null;
_isDisposed = true;
}
}
}
// ==================== WebCall管理器 ====================
public static class WebCallManager
{
public static readonly ConcurrentDictionary<string, WebCallOrReturnConfig> _configs = new();
private static readonly ConcurrentDictionary<string, ChainInfo> _activeWorkflows = new();
private static readonly ConcurrentDictionary<string, DateTime> _workflowUpdateTimes = new();
public static WebCallOrReturnConfig GetConfig(string key)
{
if (string.IsNullOrEmpty(key))
throw new ArgumentNullException(nameof(key));
if (!_configs.TryGetValue(key, out var config) || config == null)
throw new KeyNotFoundException($"WebCall config not found: {key}");
return config;
}
public static bool TryGetConfig(string key, out WebCallOrReturnConfig config)
{
config = null;
return !string.IsNullOrEmpty(key) && _configs.TryGetValue(key, out config) && config != null;
}
public static WebCallOrReturnConfig CreateOrUpdateConfig(string callKey, ChainInfo chainInfo = null)
{
if (string.IsNullOrEmpty(callKey)) return null;
if (TryGetConfig(callKey, out var existingConfig))
{
existingConfig.Refresh();
return existingConfig;
}
string chainId = chainInfo != null ?
ChainInfoManager.GetChainId(chainInfo) :
ChainInfoManager.GetChainIdForConfig(callKey);
var config = new WebCallOrReturnConfig(callKey, chainId);
_configs[callKey] = config;
return config;
}
public static ChainInfo FindChainInfoForConfig(string configKey)
{
if (string.IsNullOrEmpty(configKey)) return null;
if (ChainInfoManager.GetChainIdForConfig(configKey) is string chainId &&
_activeWorkflows.TryGetValue(chainId, out var workflow))
{
return workflow;
}
return _activeWorkflows.Values
.FirstOrDefault(w => w?.AllNodes?
.Any(n =>
n is NodeConfig nodeConfig &&
nodeConfig.basicConfig?.Callkey == configKey) ?? false);
}
public static string FindChainIdForConfig(string configKey)
{
var chainInfo = FindChainInfoForConfig(configKey);
return chainInfo != null ? ChainInfoManager.GetChainId(chainInfo) : null;
}
public static ChainInfo GetWorkflow(string chainId)
{
return !string.IsNullOrEmpty(chainId) &&
_activeWorkflows.TryGetValue(chainId, out var workflow) ?
workflow : null;
}
public static void SetWorkflow(string chainId, ChainInfo workflow)
{
if (string.IsNullOrEmpty(chainId) || workflow == null) return;
_activeWorkflows[chainId] = workflow;
_workflowUpdateTimes[chainId] = DateTime.UtcNow;
ChainInfoManager.RegisterChainMapping(workflow);
UnifiedObserver<ChainInfo>.Notify(chainId, workflow);
CleanupManager.RegisterTimedData("Workflow", chainId, workflow);
}
public static List<WebCallOrReturnConfig> GetConfigsInWorkflow(string chainId)
{
var result = new List<WebCallOrReturnConfig>();
if (string.IsNullOrEmpty(chainId)) return result;
var configKeys = ChainInfoManager.GetConfigKeysInChain(chainId);
foreach (var key in configKeys)
{
if (TryGetConfig(key, out var config))
result.Add(config);
}
return result;
}
public static void RemoveWorkflow(string chainId)
{
if (string.IsNullOrEmpty(chainId)) return;
if (_activeWorkflows.TryRemove(chainId, out _))
{
ChainInfoManager.RemoveChainMapping(chainId);
_workflowUpdateTimes.TryRemove(chainId, out _);
UnifiedObserver<ChainInfo>.Clear(chainId);
}
}
public static void ClearAll()
{
foreach (var config in _configs.Values)
{
config?.Dispose();
}
_configs.Clear();
_activeWorkflows.Clear();
_workflowUpdateTimes.Clear();
NodeExecutionStatusManager.ClearAllStatus();
UnifiedObserver<ChainInfo>.ClearAll();
}
}
// ==================== 配置创建器 ====================
public static class AIConfigCreator
{
public static void CreateConfigsForWorkflow(ChainInfo chainInfo)
{
if (chainInfo == null || chainInfo.AllNodes == null) return;
var chainId = ChainInfoManager.GetChainId(chainInfo);
WebCallManager.SetWorkflow(chainId, chainInfo);
foreach (var node in chainInfo.AllNodes.OfType<NodeConfig>())
{
string callKey = node.basicConfig?.Callkey;
if (string.IsNullOrEmpty(callKey)) continue;
WebCallManager.CreateOrUpdateConfig(callKey, chainInfo);
}
}
}
// ==================== 共享上下文 ====================
public class SharedChainInfoContext : IDisposable
{
private string _chainId;
private readonly Action<ChainInfo> _updateCallback;
private bool _isDisposed;
public event Action<ChainInfo> OnChainInfoUpdated;
public SharedChainInfoContext(ChainInfo initialChainInfo)
{
if (initialChainInfo == null)
throw new ArgumentNullException(nameof(initialChainInfo));
_chainId = ChainInfoManager.GetChainId(initialChainInfo);
WebCallManager.SetWorkflow(_chainId, initialChainInfo);
_updateCallback = updatedChainInfo =>
{
OnChainInfoUpdated?.Invoke(updatedChainInfo);
};
UnifiedObserver<ChainInfo>.Register(_chainId, _updateCallback);
}
public ChainInfo CurrentChainInfo
{
get => WebCallManager.GetWorkflow(_chainId);
set
{
if (_isDisposed || value == null) return;
var newChainId = ChainInfoManager.GetChainId(value);
if (newChainId != _chainId)
{
UnifiedObserver<ChainInfo>.Unregister(_chainId, _updateCallback);
_chainId = newChainId;
UnifiedObserver<ChainInfo>.Register(_chainId, _updateCallback);
}
WebCallManager.SetWorkflow(_chainId, value);
}
}
public void Dispose()
{
if (_isDisposed) return;
UnifiedObserver<ChainInfo>.Unregister(_chainId, _updateCallback);
_isDisposed = true;
}
}
// ==================== 工作流处理器 ====================
public static class WorkflowProcessor
{
public static async Task<ExecutionResult> ExecuteWebCallAndUpdate(WebCallOrReturnConfig config)
{
if (config == null)
throw new ArgumentNullException(nameof(config));
var startTime = DateTime.UtcNow;
try
{
if (config.NodeFunc != null)
{
await config.NodeFunc();
NodeExecutionStatusManager.MarkAsExecuted(config.WebCallKey);
}
else
{
throw new InvalidOperationException($"No NodeFunc for config: {config.WebCallKey}");
}
return new ExecutionResult
{
Success = true,
ConfigKey = config.WebCallKey,
ChainId = config.ChainId,
ExecutionStatus = NodeExecutionStatusManager.GetStatus(config.WebCallKey),
Timestamp = DateTime.UtcNow,
ExecutionDurationMs = (long)(DateTime.UtcNow - startTime).TotalMilliseconds
};
}
catch (Exception ex)
{
NodeExecutionStatusManager.MarkAsFailed(config.WebCallKey, ex);
throw new WorkflowExecutionException(
config.WebCallKey,
config.ChainId,
$"Failed to execute web call '{config.WebCallKey}'",
ex);
}
}
public static async Task<BatchExecutionResult> ExecuteBatchWebCalls(List<WebCallOrReturnConfig> configs)
{
var result = new BatchExecutionResult();
var tasks = new List<Task<ExecutionResult>>();
foreach (var config in configs.Where(c => c != null))
{
tasks.Add(Task.Run(async () =>
{
try
{
var executionResult = await ExecuteWebCallAndUpdate(config);
result.SuccessfulConfigs.Add(config.WebCallKey);
return executionResult;
}
catch (Exception ex)
{
result.FailedConfigs.Add(config.WebCallKey);
return new ExecutionResult
{
Success = false,
ConfigKey = config.WebCallKey,
ChainId = config.ChainId,
Error = ex.Message,
Timestamp = DateTime.UtcNow
};
}
}));
}
var results = await Task.WhenAll(tasks);
result.AllResults.AddRange(results);
return result;
}
}
}
鲲鹏昇腾开发者社区是面向全社会开放的“联接全球计算开发者,聚合华为+生态”的社区,内容涵盖鲲鹏、昇腾资源,帮助开发者快速获取所需的知识、经验、软件、工具、算力,支撑开发者易学、好用、成功,成为核心开发者。
更多推荐



所有评论(0)