From a2156c7a34c00c55a9a53512ddd0c4ee406d262c Mon Sep 17 00:00:00 2001 From: Raphael Date: Thu, 4 Jun 2026 16:33:44 +0200 Subject: [PATCH 1/6] deps: updated tucana to 0.0.73 --- Cargo.lock | 14 +++++++------- Cargo.toml | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1fd6e24..93104bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1072,7 +1072,7 @@ dependencies = [ "taurus-provider", "tokio", "tonic", - "tucana 0.0.72", + "tucana 0.0.73", ] [[package]] @@ -1855,7 +1855,7 @@ dependencies = [ "tokio", "tonic", "tonic-health", - "tucana 0.0.72", + "tucana 0.0.73", ] [[package]] @@ -1868,7 +1868,7 @@ dependencies = [ "log", "rand 0.10.1", "serde_json", - "tucana 0.0.72", + "tucana 0.0.73", "ureq", "uuid", ] @@ -1889,7 +1889,7 @@ dependencies = [ "tokio", "tonic", "tonic-health", - "tucana 0.0.72", + "tucana 0.0.73", ] [[package]] @@ -1914,7 +1914,7 @@ dependencies = [ "serde", "serde_json", "taurus-core", - "tucana 0.0.72", + "tucana 0.0.73", ] [[package]] @@ -2262,9 +2262,9 @@ dependencies = [ [[package]] name = "tucana" -version = "0.0.72" +version = "0.0.73" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc5513f2fc08c899ee75f2f4717a7c4af53269a80e76b2f1a331b3455544b1ef" +checksum = "cb8c3882c50dad78a4ec128e9c0d80a681c0203e0ca55169361c49b31ad9161f" dependencies = [ "pbjson", "pbjson-build", diff --git a/Cargo.toml b/Cargo.toml index 4659d1e..9c3be3f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ edition = "2024" [workspace.dependencies] async-trait = "0.1.89" code0-flow = { version = "0.0.33" } -tucana = { version = "0.0.72" } +tucana = { version = "0.0.73" } tokio = { version = "1.44.1", features = ["rt-multi-thread", "signal"] } log = "0.4.27" futures-lite = "2.6.0" From ddbc240e1998edddf4382ab722c64ac50a18299d Mon Sep 17 00:00:00 2001 From: Raphael Date: Fri, 5 Jun 2026 14:24:16 +0200 Subject: [PATCH 2/6] docs: updated to latest taurus tasks --- README.md | 2 +- docs/dev.md | 7 +++++-- docs/errors.md | 2 +- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 0ef7fca..8ac7e92 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ # Taurus The heart of the execution block - the runtime itself -- Executes flows and handles test executions +- Executes flows and transmits execution results - Requests single node executions from Actions - Serves the standard CodeZero library diff --git a/docs/dev.md b/docs/dev.md index 9290963..47e9c4f 100644 --- a/docs/dev.md +++ b/docs/dev.md @@ -13,7 +13,7 @@ Taurus is the execution runtime in the CodeZero execution block. - Executes flow graphs via `taurus-core::runtime::engine::ExecutionEngine` - Emits lifecycle events to NATS (`runtime.emitter.`) - Delegates remote nodes to external services over NATS (`action..`) -- Reports runtime status and usage to Aquila in dynamic mode +- Reports runtime status, runtime usage, and execution results to Aquila in dynamic mode ## Workspace Layout @@ -50,7 +50,7 @@ graph TD Core --> Remote Remote -->|action..| NATS NATS --> Service - Taurus -->|runtime status + usage| Aquila + Taurus -->|runtime status + usage + execution result| Aquila ``` ### Execution details @@ -61,6 +61,7 @@ graph TD 4. Local nodes run handlers from the built-in function registry. 5. Non-local `definition_source` values are executed remotely via `RemoteRuntime`. 6. Lifecycle events are emitted as `starting`, `ongoing`, `finished`, or `failed`. +7. The completed `ExecutionResult` is transmitted through the Aquila execution gRPC API in dynamic mode. ## Runtime Modes @@ -73,6 +74,7 @@ Taurus mode is controlled by `MODE`. - Sends definitions to Aquila (retry loop until success) - Starts runtime status reporting (including heartbeat) - Sends runtime usage updates after each flow run +- Sends execution result updates after each flow run ### `static` @@ -82,6 +84,7 @@ Taurus mode is controlled by `MODE`. - No definition push - No runtime status updates - No runtime usage updates +- No execution result updates ## Environment Variables diff --git a/docs/errors.md b/docs/errors.md index 1b2a1ca..0d70622 100644 --- a/docs/errors.md +++ b/docs/errors.md @@ -39,7 +39,7 @@ This document is the canonical catalog for runtime error codes emitted by Taurus | `T-CORE-000304` | App Error Mapping | Serialization/deserialization failure mapped into runtime error format. | Encoding/decoding/parsing failure surfaced as `Error::Serialization`. | `types/errors/error.rs` | | `T-CORE-000399` | App Error Mapping | Internal application failure mapped into runtime error format. | Catch-all non-domain internal failure surfaced as `Error::Internal`. | `types/errors/error.rs` | | `T-CORE-999999` | Runtime Error Fallback | Default fallback runtime error code when no explicit mapping is provided. | `RuntimeError::default()` used as defensive fallback. | `types/errors/runtime_error.rs` | -| `T-TAURUS-000001` | Taurus App | Test execution request payload could not be decoded as an execution flow. | Malformed or schema-incompatible payload published to the test execution NATS subject. | `taurus/src/app/worker.rs` | +| `T-TAURUS-000001` | Taurus App | Execution request payload could not be decoded as an execution flow. | Malformed or schema-incompatible payload published to the execution NATS subject. | `taurus/src/app/worker.rs` | | `T-PROV-000001` | Provider Remote Runtime | Remote request to NATS did not yield a valid response message. | NATS request failed or timed out while waiting for remote runtime answer. | `taurus-provider/providers/remote/nats_remote_runtime.rs` | | `T-PROV-000002` | Provider Remote Runtime | Remote runtime response could not be decoded into expected protobuf structure. | Received payload is malformed, truncated, or schema-incompatible for `ExecutionResult`. | `taurus-provider/providers/remote/nats_remote_runtime.rs` | | `T-PROV-000003` | Provider Remote Runtime | Remote runtime response decoded, but contained no concrete result field. | `ExecutionResult` exists but `result` is `None` (protocol contract violation). | `taurus-provider/providers/remote/nats_remote_runtime.rs` | From c1b2ff1732405a027a50eb5da88f5fae4e86edf1 Mon Sep 17 00:00:00 2001 From: Raphael Date: Fri, 5 Jun 2026 14:24:39 +0200 Subject: [PATCH 3/6] feat: added queueing into nats as an manual option --- crates/taurus-manual/src/main.rs | 59 ++++++++++++++++++++++++++++++-- 1 file changed, 57 insertions(+), 2 deletions(-) diff --git a/crates/taurus-manual/src/main.rs b/crates/taurus-manual/src/main.rs index 795f7e7..4087aa2 100644 --- a/crates/taurus-manual/src/main.rs +++ b/crates/taurus-manual/src/main.rs @@ -1,13 +1,15 @@ use std::path::Path; -use clap::{Parser, arg, command}; +use clap::Parser; use log::error; use log::info; +use prost::Message; use serde::Deserialize; -use taurus_core::runtime::engine::ExecutionEngine; +use taurus_core::runtime::engine::{ExecutionEngine, ExecutionId}; use taurus_core::types::signal::Signal; use taurus_provider::providers::emitter::nats_emitter::NATSRespondEmitter; use taurus_provider::providers::remote::nats_remote_runtime::NATSRemoteRuntime; +use tucana::shared::ExecutionFlow; use tucana::shared::ValidationFlow; use tucana::shared::helper::value::from_json_value; use tucana::shared::helper::value::to_json_value; @@ -128,6 +130,10 @@ struct Args { /// Path value #[arg(short, long)] path: String, + + /// Queue the selected flow on a running Taurus instance instead of executing locally + #[arg(long, default_value_t = false)] + queue_execution: bool, } #[tokio::main] @@ -159,6 +165,12 @@ async fn main() { panic!("Failed to connect to NATS server: {}", err); } }; + + if args.queue_execution { + queue_execution(&client, &case, flow_input).await; + return; + } + let remote = NATSRemoteRuntime::new(client.clone()); let emitter = NATSRespondEmitter::new(client); let engine = ExecutionEngine::new(); @@ -194,3 +206,46 @@ async fn main() { } } } + +async fn queue_execution( + client: &async_nats::Client, + case: &Case, + input_value: Option, +) { + let execution_id = ExecutionId::new_v4(); + let execution_flow = ExecutionFlow { + flow_id: case.flow.flow_id, + project_id: case.flow.project_id, + starting_node_id: case.flow.starting_node_id, + node_functions: case.flow.node_functions.clone(), + input_value, + }; + let execution_topic = format!("execution.{}", execution_id); + + info!( + "Queueing execution of flow {} with execution id {}", + execution_flow.flow_id, execution_id + ); + + if let Err(err) = client + .publish( + execution_topic.clone(), + execution_flow.encode_to_vec().into(), + ) + .await + { + panic!( + "Failed to publish flow {} to execution topic '{}': {}", + case.flow.flow_id, execution_topic, err + ); + } + + if let Err(err) = client.flush().await { + panic!( + "Failed to flush execution request on '{}': {}", + execution_topic, err + ); + } + + println!("{}", execution_id); +} From a12c1685e987dfcf9ff328bcd658e448532ad8cb Mon Sep 17 00:00:00 2001 From: Raphael Date: Fri, 5 Jun 2026 14:25:10 +0200 Subject: [PATCH 4/6] feat: made execution save all results --- crates/taurus-core/src/runtime/engine.rs | 226 +++++++++++++++++- .../src/runtime/engine/executor.rs | 160 ++++++++++--- .../src/runtime/execution/value_store.rs | 33 ++- 3 files changed, 375 insertions(+), 44 deletions(-) diff --git a/crates/taurus-core/src/runtime/engine.rs b/crates/taurus-core/src/runtime/engine.rs index 9a388e8..529a75b 100644 --- a/crates/taurus-core/src/runtime/engine.rs +++ b/crates/taurus-core/src/runtime/engine.rs @@ -8,7 +8,7 @@ mod emitter; mod executor; mod model; -use tucana::shared::{ExecutionFlow, NodeFunction, Value}; +use tucana::shared::{ExecutionFlow, NodeExecutionResult, NodeFunction, Value}; use crate::handler::registry::FunctionStore; use crate::runtime::execution::value_store::ValueStore; @@ -29,6 +29,14 @@ pub struct ExecutionEngine { handlers: FunctionStore, } +/// Full result of one engine execution, including per-node results for reporting. +#[derive(Debug, Clone)] +pub struct EngineExecutionReport { + pub signal: Signal, + pub exit_reason: ExitReason, + pub node_execution_results: Vec, +} + impl Default for ExecutionEngine { fn default() -> Self { Self::new() @@ -51,13 +59,14 @@ impl ExecutionEngine { respond_emitter: Option<&dyn RespondEmitter>, with_trace: bool, ) -> (Signal, ExitReason) { - self.execute_flow_with_execution_id( + let report = self.execute_flow_with_execution_id_report( ExecutionId::new_v4(), flow, remote, respond_emitter, with_trace, - ) + ); + (report.signal, report.exit_reason) } /// Execute an `ExecutionFlow` with a caller-provided execution id. @@ -69,7 +78,43 @@ impl ExecutionEngine { respond_emitter: Option<&dyn RespondEmitter>, with_trace: bool, ) -> (Signal, ExitReason) { - self.execute_graph_with_execution_id( + let report = self.execute_flow_with_execution_id_report( + execution_id, + flow, + remote, + respond_emitter, + with_trace, + ); + (report.signal, report.exit_reason) + } + + /// Execute an `ExecutionFlow` and return the final signal plus per-node execution results. + pub fn execute_flow_report( + &self, + flow: ExecutionFlow, + remote: Option<&dyn RemoteRuntime>, + respond_emitter: Option<&dyn RespondEmitter>, + with_trace: bool, + ) -> EngineExecutionReport { + self.execute_flow_with_execution_id_report( + ExecutionId::new_v4(), + flow, + remote, + respond_emitter, + with_trace, + ) + } + + /// Execute an `ExecutionFlow` with a caller-provided execution id and return per-node results. + pub fn execute_flow_with_execution_id_report( + &self, + execution_id: ExecutionId, + flow: ExecutionFlow, + remote: Option<&dyn RemoteRuntime>, + respond_emitter: Option<&dyn RespondEmitter>, + with_trace: bool, + ) -> EngineExecutionReport { + self.execute_graph_with_execution_id_report( execution_id, flow.starting_node_id, flow.node_functions, @@ -90,7 +135,7 @@ impl ExecutionEngine { respond_emitter: Option<&dyn RespondEmitter>, with_trace: bool, ) -> (Signal, ExitReason) { - self.execute_graph_with_execution_id( + let report = self.execute_graph_with_execution_id_report( ExecutionId::new_v4(), start_node_id, node_functions, @@ -98,7 +143,8 @@ impl ExecutionEngine { remote, respond_emitter, with_trace, - ) + ); + (report.signal, report.exit_reason) } /// Execute a graph described by node list and start node with a caller-provided execution id. @@ -112,6 +158,50 @@ impl ExecutionEngine { respond_emitter: Option<&dyn RespondEmitter>, with_trace: bool, ) -> (Signal, ExitReason) { + let report = self.execute_graph_with_execution_id_report( + execution_id, + start_node_id, + node_functions, + flow_input, + remote, + respond_emitter, + with_trace, + ); + (report.signal, report.exit_reason) + } + + /// Execute a graph and return the final signal plus per-node execution results. + pub fn execute_graph_report( + &self, + start_node_id: i64, + node_functions: Vec, + flow_input: Option, + remote: Option<&dyn RemoteRuntime>, + respond_emitter: Option<&dyn RespondEmitter>, + with_trace: bool, + ) -> EngineExecutionReport { + self.execute_graph_with_execution_id_report( + ExecutionId::new_v4(), + start_node_id, + node_functions, + flow_input, + remote, + respond_emitter, + with_trace, + ) + } + + /// Execute a graph with a caller-provided execution id and return per-node results. + pub fn execute_graph_with_execution_id_report( + &self, + execution_id: ExecutionId, + start_node_id: i64, + node_functions: Vec, + flow_input: Option, + remote: Option<&dyn RemoteRuntime>, + respond_emitter: Option<&dyn RespondEmitter>, + with_trace: bool, + ) -> EngineExecutionReport { if let Some(emitter) = respond_emitter { emitter.emit(execution_id, EmitType::StartingExec, null_value()); } @@ -129,7 +219,11 @@ impl ExecutionEngine { emitter.emit(execution_id, EmitType::FailedExec, runtime_error.as_value()); } let signal = Signal::Failure(runtime_error); - return (signal, ExitReason::Failure); + return EngineExecutionReport { + signal, + exit_reason: ExitReason::Failure, + node_execution_results: Vec::new(), + }; } }; @@ -160,7 +254,11 @@ impl ExecutionEngine { } } let exit_reason = signal.exit_reason(); - (signal, exit_reason) + EngineExecutionReport { + signal, + exit_reason, + node_execution_results: value_store.node_execution_results(), + } } } @@ -171,8 +269,8 @@ mod tests { use std::cell::RefCell; use tucana::shared::{ InputType, ListValue, NodeParameter, NodeValue, ReferenceValue, Struct, SubFlow, - SubFlowSetting, Value, node_value, reference_value, sub_flow::ExecutionReference, - value::Kind, + SubFlowSetting, Value, node_execution_result, node_value, reference_value, + sub_flow::ExecutionReference, value::Kind, }; fn literal_param(database_id: i64, runtime_parameter_id: &str, value: Value) -> NodeParameter { @@ -719,6 +817,114 @@ mod tests { assert_eq!(expect_success(signal), int_value(42)); } + #[test] + fn execution_report_includes_literal_node_parameter_results() { + let engine = ExecutionEngine::new(); + let add_node = node( + 1, + "std::number::add", + vec![ + literal_param(100, "lhs", int_value(1)), + literal_param(101, "rhs", int_value(2)), + ], + None, + ); + + let report = engine.execute_graph_report(1, vec![add_node], None, None, None, false); + + assert_eq!(report.exit_reason, ExitReason::Success); + assert_eq!(report.node_execution_results.len(), 1); + + let node_result = &report.node_execution_results[0]; + assert_eq!(node_result.node_id, 1); + assert_eq!(node_result.parameter_results.len(), 2); + assert_eq!(node_result.parameter_results[0].value, Some(int_value(1))); + assert_eq!(node_result.parameter_results[1].value, Some(int_value(2))); + + match node_result.result.as_ref() { + Some(node_execution_result::Result::Success(value)) => { + assert_eq!(value, &int_value(3)); + } + other => panic!("expected node success result, got {:?}", other), + } + } + + #[test] + fn execution_report_includes_reference_node_parameter_results() { + let engine = ExecutionEngine::new(); + let value_node = node( + 1, + "std::control::value", + vec![literal_param(100, "value", int_value(7))], + Some(2), + ); + let add_node = node( + 2, + "std::number::add", + vec![ + node_result_ref_param(200, "lhs", 1), + literal_param(201, "rhs", int_value(5)), + ], + None, + ); + + let report = + engine.execute_graph_report(1, vec![value_node, add_node], None, None, None, false); + + assert_eq!(report.exit_reason, ExitReason::Success); + assert_eq!(report.node_execution_results.len(), 2); + + let node_result = &report.node_execution_results[1]; + assert_eq!(node_result.node_id, 2); + assert_eq!(node_result.parameter_results.len(), 2); + assert_eq!(node_result.parameter_results[0].value, Some(int_value(7))); + assert_eq!(node_result.parameter_results[1].value, Some(int_value(5))); + + match node_result.result.as_ref() { + Some(node_execution_result::Result::Success(value)) => { + assert_eq!(value, &int_value(12)); + } + other => panic!("expected node success result, got {:?}", other), + } + } + + #[test] + fn execution_report_includes_respond_node_parameter_results() { + let engine = ExecutionEngine::new(); + let respond_node = node( + 1, + "rest::control::respond", + vec![ + literal_param(100, "http_status_code", int_value(200)), + literal_param(101, "headers", empty_struct_value()), + literal_param(102, "payload", string_value("hello")), + ], + None, + ); + + let report = engine.execute_graph_report(1, vec![respond_node], None, None, None, false); + + assert_eq!(report.exit_reason, ExitReason::Success); + assert_eq!(report.node_execution_results.len(), 1); + + let node_result = &report.node_execution_results[0]; + assert_eq!(node_result.node_id, 1); + assert_eq!(node_result.parameter_results.len(), 3); + assert_eq!(node_result.parameter_results[0].value, Some(int_value(200))); + assert_eq!( + node_result.parameter_results[1].value, + Some(empty_struct_value()) + ); + assert_eq!( + node_result.parameter_results[2].value, + Some(string_value("hello")) + ); + assert!(matches!( + node_result.result, + Some(node_execution_result::Result::Success(_)) + )); + } + #[test] fn emitter_emits_start_and_finish_for_successful_execution() { let engine = ExecutionEngine::new(); diff --git a/crates/taurus-core/src/runtime/engine/executor.rs b/crates/taurus-core/src/runtime/engine/executor.rs index fc28e44..6e36680 100644 --- a/crates/taurus-core/src/runtime/engine/executor.rs +++ b/crates/taurus-core/src/runtime/engine/executor.rs @@ -9,8 +9,8 @@ use tucana::shared::node_execution_result::Result as TucanaNodeResult; use tucana::shared::reference_value::Target; use tucana::shared::value::Kind; use tucana::shared::{ - InputType, NodeExecutionResult as TucanaNodeExecutionResult, ReferenceValue, Struct, - SubFlowSetting, Value, + InputType, NodeExecutionResult as TucanaNodeExecutionResult, NodeParameterNodeExecutionResult, + ReferenceValue, Struct, SubFlowSetting, Value, }; use uuid::Uuid; @@ -65,6 +65,12 @@ struct ExecutionResult { struct NodeResult { signal: Signal, frame_id: Option, + parameter_results: Vec, +} + +struct ExecutedNode { + signal: Signal, + parameter_results: Vec, } struct EngineExecutor<'a> { @@ -104,11 +110,11 @@ impl<'a> EngineExecutor<'a> { match result.signal { // Only `Success` keeps walking through the current linear chain. - Signal::Success(_) => match next_idx { + Signal::Success(value) => match next_idx { Some(next) => current_idx = next, None => { return ExecutionResult { - signal: result.signal, + signal: Signal::Success(value), root_frame: call_root_frame, }; } @@ -119,7 +125,11 @@ impl<'a> EngineExecutor<'a> { emitter.emit(self.execution_id, EmitType::OngoingExec, value.clone()); } - value_store.insert_success(node_id, value.clone()); + value_store.insert_success_with_parameters( + node_id, + value.clone(), + result.parameter_results, + ); match next_idx { Some(next) => current_idx = next, None => { @@ -231,18 +241,31 @@ impl<'a> EngineExecutor<'a> { value_store.set_current_node_id(node.id); let frame_id = self.trace_enter(node, value_store); - let signal = match &node.execution_target { + let result = match &node.execution_target { NodeExecutionTarget::Local => { - let signal = self.execute_local_node(node, value_store, frame_id); - self.commit_result(node.id, signal, value_store) - } - NodeExecutionTarget::Remote { service } => { - self.execute_remote_node(node, service, value_store, frame_id) + let executed = self.execute_local_node(node, value_store, frame_id); + let parameter_results = executed.parameter_results; + let signal = self.commit_result( + node.id, + executed.signal, + parameter_results.clone(), + value_store, + ); + NodeResult { + signal, + frame_id, + parameter_results, + } } + NodeExecutionTarget::Remote { service } => NodeResult { + signal: self.execute_remote_node(node, service, value_store, frame_id), + frame_id, + parameter_results: Vec::new(), + }, }; - self.trace_exit(frame_id, &signal, value_store); + self.trace_exit(frame_id, &result.signal, value_store); - NodeResult { signal, frame_id } + result } fn execute_local_node( @@ -250,27 +273,40 @@ impl<'a> EngineExecutor<'a> { node: &CompiledNode, value_store: &mut ValueStore, frame_id: Option, - ) -> Signal { + ) -> ExecutedNode { let entry = match self.handlers.get(node.handler_id.as_str()) { Some(entry) => entry, None => { - return Signal::Failure(RuntimeError::new( - "T-CORE-000002", - "FunctionNotFound", - format!("Function {} not found", node.handler_id), - )); + return ExecutedNode { + signal: Signal::Failure(RuntimeError::new( + "T-CORE-000002", + "FunctionNotFound", + format!("Function {} not found", node.handler_id), + )), + parameter_results: Vec::new(), + }; } }; let mut args = match self.build_args(node, value_store, frame_id) { Ok(args) => args, - Err(err) => return Signal::Failure(err), + Err(err) => { + return ExecutedNode { + signal: Signal::Failure(err), + parameter_results: Vec::new(), + }; + } }; if let Some(signal) = self.force_eager_args(entry, &mut args, value_store, frame_id) { - return signal; + return ExecutedNode { + signal, + parameter_results: parameter_results_from_args(&args), + }; } + let parameter_results = parameter_results_from_args(&args); + // Handler-owned runtime calls (for lazy args / callbacks) re-enter the same executor. let mut run = |thunk: &Thunk, store: &mut ValueStore| { self.trace_mark_thunk_executed(frame_id, thunk); @@ -282,7 +318,10 @@ impl<'a> EngineExecutor<'a> { child_result.signal }; - (entry.handler)(&args, value_store, &mut run) + ExecutedNode { + signal: (entry.handler)(&args, value_store, &mut run), + parameter_results, + } } fn execute_remote_node( @@ -302,6 +341,7 @@ impl<'a> EngineExecutor<'a> { "RemoteRuntimeNotConfigured", "Remote runtime not configured", )), + Vec::new(), value_store, ); } @@ -309,25 +349,49 @@ impl<'a> EngineExecutor<'a> { let mut args = match self.build_args(node, value_store, frame_id) { Ok(args) => args, - Err(err) => return self.commit_result(node.id, Signal::Failure(err), value_store), + Err(err) => { + return self.commit_result(node.id, Signal::Failure(err), Vec::new(), value_store); + } }; let values = match self.resolve_remote_args(&mut args, value_store, frame_id) { Ok(values) => values, - Err(signal) => return self.commit_result(node.id, signal, value_store), + Err(signal) => { + return self.commit_result( + node.id, + signal, + parameter_results_from_args(&args), + value_store, + ); + } }; + let parameter_results = parameter_results_from_values(&values); let request = match self.build_remote_request(node, values) { Ok(request) => request, - Err(err) => return self.commit_result(node.id, Signal::Failure(err), value_store), + Err(err) => { + return self.commit_result( + node.id, + Signal::Failure(err), + parameter_results, + value_store, + ); + } }; match block_on(remote_runtime.execute_remote(RemoteExecution { target_service: service.to_string(), request, })) { - Ok(result) => self.commit_remote_result(node.id, result, value_store), - Err(err) => self.commit_result(node.id, Signal::Failure(err), value_store), + Ok(result) => { + self.commit_remote_result(node.id, result, parameter_results, value_store) + } + Err(err) => self.commit_result( + node.id, + Signal::Failure(err), + parameter_results, + value_store, + ), } } @@ -609,14 +673,24 @@ impl<'a> EngineExecutor<'a> { }) } - fn commit_result(&self, node_id: i64, signal: Signal, value_store: &mut ValueStore) -> Signal { + fn commit_result( + &self, + node_id: i64, + signal: Signal, + parameter_results: Vec, + value_store: &mut ValueStore, + ) -> Signal { match signal { Signal::Success(value) => { - value_store.insert_success(node_id, value.clone()); + value_store.insert_success_with_parameters( + node_id, + value.clone(), + parameter_results, + ); Signal::Success(value) } Signal::Failure(err) => { - value_store.insert_error(node_id, err.clone()); + value_store.insert_error_with_parameters(node_id, err.clone(), parameter_results); Signal::Failure(err) } // Control signals are transient and should not be cached as node outputs. @@ -627,9 +701,13 @@ impl<'a> EngineExecutor<'a> { fn commit_remote_result( &self, node_id: i64, - result: TucanaNodeExecutionResult, + mut result: TucanaNodeExecutionResult, + parameter_results: Vec, value_store: &mut ValueStore, ) -> Signal { + if result.parameter_results.is_empty() { + result.parameter_results = parameter_results; + } value_store.insert_node_result(node_id, result.clone()); match result.result { Some(TucanaNodeResult::Success(value)) => Signal::Success(value), @@ -724,6 +802,26 @@ impl<'a> EngineExecutor<'a> { } } +fn parameter_results_from_args(args: &[Argument]) -> Vec { + args.iter() + .map(|arg| NodeParameterNodeExecutionResult { + value: match arg { + Argument::Eval(value) => Some(value.clone()), + Argument::Thunk(_) => None, + }, + }) + .collect() +} + +fn parameter_results_from_values(values: &[Value]) -> Vec { + values + .iter() + .map(|value| NodeParameterNodeExecutionResult { + value: Some(value.clone()), + }) + .collect() +} + fn compiled_thunk_to_argument(thunk: &CompiledThunk) -> Thunk { match thunk { CompiledThunk::Node(node_id) => Thunk::Node(*node_id), diff --git a/crates/taurus-core/src/runtime/execution/value_store.rs b/crates/taurus-core/src/runtime/execution/value_store.rs index 1da8f68..348de26 100644 --- a/crates/taurus-core/src/runtime/execution/value_store.rs +++ b/crates/taurus-core/src/runtime/execution/value_store.rs @@ -3,7 +3,10 @@ use std::collections::HashMap; use tucana::shared::node_execution_result::Result as TucanaNodeResult; -use tucana::shared::{InputType, NodeExecutionResult, ReferenceValue, Value, value::Kind}; +use tucana::shared::{ + InputType, NodeExecutionResult, NodeParameterNodeExecutionResult, ReferenceValue, Value, + value::Kind, +}; use crate::runtime::execution::trace::{StoreInputSlotEntry, StoreResultEntry, StoreSnapshot}; use crate::time::now_unix_ms; @@ -143,6 +146,15 @@ impl ValueStore { } pub fn insert_success(&mut self, id: i64, value: Value) { + self.insert_success_with_parameters(id, value, Vec::new()); + } + + pub fn insert_success_with_parameters( + &mut self, + id: i64, + value: Value, + parameter_results: Vec, + ) { let ts = now_unix_ms(); self.insert_node_result( id, @@ -150,13 +162,22 @@ impl ValueStore { node_id: id, started_at: ts, finished_at: ts, - parameter_results: Vec::new(), + parameter_results, result: Some(TucanaNodeResult::Success(value)), }, ); } pub fn insert_error(&mut self, id: i64, runtime_error: RuntimeError) { + self.insert_error_with_parameters(id, runtime_error, Vec::new()); + } + + pub fn insert_error_with_parameters( + &mut self, + id: i64, + runtime_error: RuntimeError, + parameter_results: Vec, + ) { let ts = now_unix_ms(); self.insert_node_result( id, @@ -164,7 +185,7 @@ impl ValueStore { node_id: id, started_at: ts, finished_at: ts, - parameter_results: Vec::new(), + parameter_results, result: Some(TucanaNodeResult::Error(runtime_error.as_tucana_error())), }, ); @@ -175,6 +196,12 @@ impl ValueStore { self.results.insert(id, result); } + pub fn node_execution_results(&self) -> Vec { + let mut results: Vec<_> = self.results.values().cloned().collect(); + results.sort_by_key(|result| result.node_id); + results + } + pub fn push_runtime_trace_label(&mut self, label: String) { self.runtime_trace_labels.push(label); } From 428ba8f119b7eb6ebaee1a4fb09e6e76e07e2fb5 Mon Sep 17 00:00:00 2001 From: Raphael Date: Fri, 5 Jun 2026 14:25:30 +0200 Subject: [PATCH 5/6] feat: transmitting every execution instead of the test_execution path --- crates/taurus/src/app/mod.rs | 22 +- crates/taurus/src/app/worker.rs | 350 +++++------------- crates/taurus/src/client/mod.rs | 1 + crates/taurus/src/client/runtime_execution.rs | 53 +++ 4 files changed, 175 insertions(+), 251 deletions(-) create mode 100644 crates/taurus/src/client/runtime_execution.rs diff --git a/crates/taurus/src/app/mod.rs b/crates/taurus/src/app/mod.rs index fe3712e..4fabc48 100644 --- a/crates/taurus/src/app/mod.rs +++ b/crates/taurus/src/app/mod.rs @@ -13,6 +13,7 @@ use tokio::task::JoinHandle; use tokio::time::sleep; use tonic_health::pb::health_server::HealthServer; +use crate::client::runtime_execution::TaurusRuntimeExecutionService; use crate::client::runtime_status::TaurusRuntimeStatusService; use crate::client::runtime_usage::TaurusRuntimeUsageService; use crate::config::Config; @@ -26,8 +27,12 @@ pub async fn run() { let client = connect_nats(&config).await; let mut health_task = spawn_health_task(&config); - let (runtime_status_service, runtime_usage_service, mut runtime_status_heartbeat_task) = - setup_dynamic_services_if_needed(&config).await; + let ( + runtime_status_service, + runtime_execution_service, + runtime_usage_service, + mut runtime_status_heartbeat_task, + ) = setup_dynamic_services_if_needed(&config).await; let nats_remote = NATSRemoteRuntime::new(client.clone()); let runtime_emitter = NATSRespondEmitter::new(client.clone()); @@ -36,6 +41,7 @@ pub async fn run() { engine, nats_remote, runtime_emitter, + runtime_execution_service, runtime_usage_service, ); @@ -103,15 +109,24 @@ async fn setup_dynamic_services_if_needed( config: &Config, ) -> ( Option>, + Option, Option, Option>, ) { if config.mode != DYNAMIC { - return (None, None, None); + return (None, None, None, None); } push_definitions_until_success(config).await; + let runtime_execution_service = Some( + TaurusRuntimeExecutionService::from_url( + config.aquila_url.clone(), + config.aquila_token.clone(), + ) + .await, + ); + let runtime_usage_service = Some( TaurusRuntimeUsageService::from_url(config.aquila_url.clone(), config.aquila_token.clone()) .await, @@ -168,6 +183,7 @@ async fn setup_dynamic_services_if_needed( ( runtime_status_service, + runtime_execution_service, runtime_usage_service, runtime_status_heartbeat_task, ) diff --git a/crates/taurus/src/app/worker.rs b/crates/taurus/src/app/worker.rs index a953a86..fef777d 100644 --- a/crates/taurus/src/app/worker.rs +++ b/crates/taurus/src/app/worker.rs @@ -10,8 +10,9 @@ use taurus_provider::providers::emitter::nats_emitter::NATSRespondEmitter; use taurus_provider::providers::remote::nats_remote_runtime::NATSRemoteRuntime; use tokio::task::JoinHandle; use tucana::shared::execution_result; -use tucana::shared::{ExecutionFlow, ExecutionResult, RuntimeUsage, Value}; +use tucana::shared::{ExecutionFlow, ExecutionResult, NodeExecutionResult, RuntimeUsage, Value}; +use crate::client::runtime_execution::TaurusRuntimeExecutionService; use crate::client::runtime_usage::TaurusRuntimeUsageService; pub fn spawn_worker( @@ -19,6 +20,7 @@ pub fn spawn_worker( engine: ExecutionEngine, nats_remote: NATSRemoteRuntime, runtime_emitter: NATSRespondEmitter, + mut runtime_execution_service: Option, runtime_usage_service: Option, ) -> JoinHandle<()> { tokio::spawn(async move { @@ -36,24 +38,9 @@ pub fn spawn_worker( } }; - let mut test_execution_subscription = match client - .queue_subscribe(String::from("test_execution.*"), "taurus".into()) - .await - { - Ok(subscription) => { - log::info!("Subscribed to 'test_execution.*'"); - subscription - } - Err(err) => { - log::error!("Failed to subscribe to 'test_execution.*': {:?}", err); - return; - } - }; - let mut execution_closed = false; - let mut test_execution_closed = false; - while !(execution_closed && test_execution_closed) { + while !execution_closed { tokio::select! { message = execution_subscription.next(), if !execution_closed => { match message { @@ -63,6 +50,7 @@ pub fn spawn_worker( &engine, &nats_remote, &runtime_emitter, + runtime_execution_service.as_mut(), runtime_usage_service.as_ref(), ).await; } @@ -72,23 +60,6 @@ pub fn spawn_worker( } } } - message = test_execution_subscription.next(), if !test_execution_closed => { - match message { - Some(message) => { - process_test_execution_message( - &client, - message, - &engine, - &nats_remote, - runtime_usage_service.as_ref(), - ).await; - } - None => { - test_execution_closed = true; - log::warn!("Subscription 'test_execution.*' ended"); - } - } - } } } @@ -101,6 +72,7 @@ async fn process_execution_message( engine: &ExecutionEngine, nats_remote: &NATSRemoteRuntime, runtime_emitter: &NATSRespondEmitter, + mut runtime_execution_service: Option<&mut TaurusRuntimeExecutionService>, runtime_usage_service: Option<&TaurusRuntimeUsageService>, ) { let requested_execution_id = parse_execution_id_from_subject(&message.subject, "execution") @@ -122,6 +94,11 @@ async fn process_execution_message( err, &message.payload ); + if let Some(execution_service) = runtime_execution_service.as_mut() { + execution_service + .update_runtime_execution(build_decode_error_result(requested_execution_id)) + .await; + } return; } }; @@ -144,68 +121,27 @@ async fn process_execution_message( flow_id ); - if let Some(usage_service) = runtime_usage_service { - usage_service - .update_runtime_usage(run_result.runtime_usage) - .await; - } -} + let execution_result = build_execution_result( + run_result.execution_id, + run_result.flow_id, + run_result.started_at, + run_result.finished_at, + run_result.input.clone(), + run_result.node_execution_results, + run_result.signal.clone(), + ); -async fn process_test_execution_message( - client: &async_nats::Client, - message: async_nats::Message, - engine: &ExecutionEngine, - nats_remote: &NATSRemoteRuntime, - runtime_usage_service: Option<&TaurusRuntimeUsageService>, -) { - if message.reply.is_none() { - log::warn!( - "Received test execution request without reply subject on '{}'; ignoring request", - message.subject - ); - return; + if let Some(execution_service) = runtime_execution_service.as_mut() { + execution_service + .update_runtime_execution(execution_result) + .await; } - let requested_execution_id = - match parse_execution_id_from_subject(&message.subject, "test_execution") { - Some(res) => res, - None => { - log::error!("Failed to extract execution uuid from {}", &message.subject); - return; - } - }; - - let flow: ExecutionFlow = match ExecutionFlow::decode(&*message.payload) { - Ok(flow) => flow, - Err(err) => { - log::error!( - "Failed to deserialize test execution flow: {:?}, payload: {:?}", - err, - &message.payload - ); - let result = build_decode_error_result(requested_execution_id); - respond_to_test_execution_request(client, &message, result).await; - return; - } - }; - - let run_result = execute_flow(requested_execution_id, flow, engine, nats_remote, None); - if let Some(usage_service) = runtime_usage_service { usage_service - .update_runtime_usage(run_result.runtime_usage.clone()) + .update_runtime_usage(run_result.runtime_usage) .await; } - - let execution_result = build_execution_result( - run_result.execution_id, - run_result.flow_id, - run_result.started_at, - run_result.finished_at, - run_result.input, - run_result.signal, - ); - respond_to_test_execution_request(client, &message, execution_result).await; } #[derive(Clone)] @@ -216,6 +152,7 @@ struct FlowRunResult { finished_at: i64, input: Option, signal: Signal, + node_execution_results: Vec, runtime_usage: RuntimeUsage, } @@ -230,7 +167,7 @@ fn execute_flow( let start = Instant::now(); let flow_id = flow.flow_id; let input = flow.input_value.clone(); - let (signal, _reason) = engine.execute_flow_with_execution_id( + let report = engine.execute_flow_with_execution_id_report( execution_id, flow, Some(nats_remote), @@ -246,7 +183,8 @@ fn execute_flow( started_at, finished_at, input, - signal, + signal: report.signal, + node_execution_results: report.node_execution_results, runtime_usage: RuntimeUsage { flow_id, duration: duration_millis, @@ -274,6 +212,7 @@ fn build_execution_result( started_at: i64, finished_at: i64, input: Option, + node_execution_results: Vec, signal: Signal, ) -> ExecutionResult { let result = match signal { @@ -292,7 +231,7 @@ fn build_execution_result( started_at, finished_at, input, - node_execution_results: Vec::new(), + node_execution_results, result, } } @@ -302,7 +241,7 @@ fn build_decode_error_result(execution_id: ExecutionId) -> ExecutionResult { let runtime_error = RuntimeError::new( "T-TAURUS-000001", "ExecutionFlowDecodeError", - "Failed to decode test execution flow payload", + "Failed to decode execution flow payload", ); ExecutionResult { @@ -318,42 +257,11 @@ fn build_decode_error_result(execution_id: ExecutionId) -> ExecutionResult { } } -async fn respond_to_test_execution_request( - client: &async_nats::Client, - message: &async_nats::Message, - result: ExecutionResult, -) { - let Some(reply_subject) = message.reply.as_ref() else { - log::warn!( - "Received test execution request without reply subject on '{}'; cannot return ExecutionResult", - message.subject - ); - return; - }; - - if let Err(err) = client - .publish(reply_subject.clone(), result.encode_to_vec().into()) - .await - { - log::error!( - "Failed to publish test execution response on '{}': {:?}", - reply_subject, - err - ); - } -} - #[cfg(test)] mod tests { use super::*; - use std::sync::Mutex; - use std::time::Duration; - use prost::Message; use serde::Deserialize; - use taurus_core::runtime::engine::ExecutionEngine; - use taurus_provider::providers::emitter::nats_emitter::NATSRespondEmitter; - use taurus_provider::providers::remote::nats_remote_runtime::NATSRemoteRuntime; use tucana::shared::{ ValidationFlow, execution_result, helper::value::{from_json_value, to_json_value}, @@ -371,140 +279,86 @@ mod tests { flow: ValidationFlow, } - static NATS_TEST_LOCK: Mutex<()> = Mutex::new(()); - #[test] - #[ignore = "requires a running NATS server at NATS_URL or nats://127.0.0.1:4222"] - fn test_execution_request_returns_execution_result_over_nats() { - let _lock = NATS_TEST_LOCK - .lock() - .expect("NATS test lock should not be poisoned"); - runtime().block_on(async { - let client = connect_test_nats().await; - let worker = spawn_test_worker(client.clone()); - - let execution_id = ExecutionId::new_v4(); - let fixture = load_fixture("flows/01_return_object.json"); - let expected_result = fixture.inputs[0].expected_result.clone(); - let flow = execution_flow_from_fixture(fixture); - let response = request_execution_result(&client, execution_id, flow.encode_to_vec()) - .await - .expect("test execution request should receive an ExecutionResult response"); - - worker.abort(); - - assert_eq!(response.execution_identifier, execution_id.to_string()); - assert_eq!(response.flow_id, flow.flow_id); - assert!(response.started_at > 0); - assert!(response.finished_at >= response.started_at); - assert_eq!(response.input, None); - assert!(response.node_execution_results.is_empty()); - - match response.result { - Some(execution_result::Result::Success(value)) => { - assert_eq!(to_json_value(value), expected_result); - } - other => panic!("expected successful test execution result, got {:?}", other), + fn build_execution_result_preserves_success_payload() { + let execution_id = ExecutionId::new_v4(); + let fixture = load_fixture("flows/01_return_object.json"); + let expected_result = fixture.inputs[0].expected_result.clone(); + let flow = execution_flow_from_fixture(fixture); + let success = from_json_value(expected_result.clone()); + + let response = build_execution_result( + execution_id, + flow.flow_id, + 1, + 2, + flow.input_value, + Vec::new(), + Signal::Success(success), + ); + + assert_eq!(response.execution_identifier, execution_id.to_string()); + assert_eq!(response.flow_id, flow.flow_id); + assert_eq!(response.started_at, 1); + assert_eq!(response.finished_at, 2); + assert!(response.node_execution_results.is_empty()); + + match response.result { + Some(execution_result::Result::Success(value)) => { + assert_eq!(to_json_value(value), expected_result); } - }); + other => panic!("expected successful execution result, got {:?}", other), + } } #[test] - #[ignore = "requires a running NATS server at NATS_URL or nats://127.0.0.1:4222"] - fn test_execution_request_returns_decode_error_over_nats() { - let _lock = NATS_TEST_LOCK - .lock() - .expect("NATS test lock should not be poisoned"); - runtime().block_on(async { - let client = connect_test_nats().await; - let worker = spawn_test_worker(client.clone()); - - let execution_id = ExecutionId::new_v4(); - let response = - request_execution_result(&client, execution_id, b"not protobuf".to_vec()) - .await - .expect("malformed test execution request should receive an error response"); - - worker.abort(); - - assert_eq!(response.execution_identifier, execution_id.to_string()); - assert_eq!(response.flow_id, 0); - assert!(response.started_at > 0); - assert!(response.finished_at >= response.started_at); - assert_eq!(response.input, None); - assert!(response.node_execution_results.is_empty()); - - match response.result { - Some(execution_result::Result::Error(error)) => { - assert_eq!(error.code, "T-TAURUS-000001"); - assert_eq!(error.category, "ExecutionFlowDecodeError"); - assert_eq!( - error.message, - "Failed to decode test execution flow payload" - ); - } - other => panic!("expected decode error result, got {:?}", other), + fn build_decode_error_result_uses_execution_payload_message() { + let execution_id = ExecutionId::new_v4(); + let response = build_decode_error_result(execution_id); + + assert_eq!(response.execution_identifier, execution_id.to_string()); + assert_eq!(response.flow_id, 0); + assert!(response.started_at > 0); + assert!(response.finished_at >= response.started_at); + assert_eq!(response.input, None); + assert!(response.node_execution_results.is_empty()); + + match response.result { + Some(execution_result::Result::Error(error)) => { + assert_eq!(error.code, "T-TAURUS-000001"); + assert_eq!(error.category, "ExecutionFlowDecodeError"); + assert_eq!(error.message, "Failed to decode execution flow payload"); } - }); - } - - fn runtime() -> tokio::runtime::Runtime { - tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - .expect("test runtime should build") - } - - async fn connect_test_nats() -> async_nats::Client { - let nats_url = - std::env::var("NATS_URL").unwrap_or_else(|_| "nats://127.0.0.1:4222".to_string()); - async_nats::connect(&nats_url) - .await - .unwrap_or_else(|err| panic!("failed to connect to NATS at {nats_url}: {err}")) + other => panic!("expected decode error result, got {:?}", other), + } } - fn spawn_test_worker(client: async_nats::Client) -> tokio::task::JoinHandle<()> { - let engine = ExecutionEngine::new(); - let nats_remote = NATSRemoteRuntime::new(client.clone()); - let runtime_emitter = NATSRespondEmitter::new(client.clone()); - spawn_worker(client, engine, nats_remote, runtime_emitter, None) - } + #[test] + fn build_execution_result_preserves_node_execution_results() { + let execution_id = ExecutionId::new_v4(); + let node_result = NodeExecutionResult { + node_id: 42, + started_at: 1, + finished_at: 2, + parameter_results: vec![tucana::shared::NodeParameterNodeExecutionResult { + value: Some(from_json_value(serde_json::json!("parameter-value"))), + }], + result: Some(tucana::shared::node_execution_result::Result::Success( + from_json_value(serde_json::json!("node-output")), + )), + }; - async fn request_execution_result( - client: &async_nats::Client, - execution_id: ExecutionId, - payload: Vec, - ) -> Result { - let subject = format!("test_execution.{execution_id}"); - - for attempt in 1..=10 { - match tokio::time::timeout( - Duration::from_secs(2), - client.request(subject.clone(), payload.clone().into()), - ) - .await - { - Ok(Ok(message)) => { - return ExecutionResult::decode(&*message.payload) - .map_err(|err| format!("failed to decode ExecutionResult: {err}")); - } - Ok(Err(err)) if attempt < 10 => { - log::debug!( - "test execution request attempt {} failed before subscription was ready: {:?}", - attempt, - err - ); - tokio::time::sleep(Duration::from_millis(100)).await; - } - Ok(Err(err)) => return Err(format!("NATS request failed: {err}")), - Err(_) if attempt < 10 => { - tokio::time::sleep(Duration::from_millis(100)).await; - } - Err(_) => return Err("timed out waiting for test execution response".to_string()), - } - } + let response = build_execution_result( + execution_id, + 10, + 1, + 2, + None, + vec![node_result.clone()], + Signal::Success(from_json_value(serde_json::json!("flow-output"))), + ); - Err("test execution request did not complete".to_string()) + assert_eq!(response.node_execution_results, vec![node_result]); } fn load_fixture(path: &str) -> FlowFixture { diff --git a/crates/taurus/src/client/mod.rs b/crates/taurus/src/client/mod.rs index 940ca04..8a5cec6 100644 --- a/crates/taurus/src/client/mod.rs +++ b/crates/taurus/src/client/mod.rs @@ -1,2 +1,3 @@ +pub mod runtime_execution; pub mod runtime_status; pub mod runtime_usage; diff --git a/crates/taurus/src/client/runtime_execution.rs b/crates/taurus/src/client/runtime_execution.rs new file mode 100644 index 0000000..0e91136 --- /dev/null +++ b/crates/taurus/src/client/runtime_execution.rs @@ -0,0 +1,53 @@ +use code0_flow::flow_service::{ + auth::get_authorization_metadata, retry::create_channel_with_retry, +}; +use tonic::{Extensions, Request, transport::Channel}; +use tucana::{ + aquila::{ExecutionRequest, execution_service_client::ExecutionServiceClient}, + shared::ExecutionResult, +}; + +pub struct TaurusRuntimeExecutionService { + client: ExecutionServiceClient, + aquila_token: String, +} + +impl TaurusRuntimeExecutionService { + pub async fn from_url(aquila_url: String, aquila_token: String) -> Self { + let channel = create_channel_with_retry("Aquila", aquila_url).await; + let client = ExecutionServiceClient::new(channel); + + TaurusRuntimeExecutionService { + client, + aquila_token, + } + } + + pub async fn update_runtime_execution(&mut self, runtime_execution: ExecutionResult) { + log::info!("Updating the current Runtime Execution!"); + log::debug!( + "ExecutionResult payload sent to execution service: {:?}", + runtime_execution + ); + + let request = Request::from_parts( + get_authorization_metadata(&self.aquila_token), + Extensions::new(), + ExecutionRequest { + execution_result: Some(runtime_execution), + }, + ); + + match self.client.update(request).await { + Ok(response) => { + log::info!( + "Transmitted Execution Result (success: {})", + response.into_inner().success + ); + } + Err(err) => { + log::error!("Failed to update RuntimeExecution: {:?}", err); + } + } + } +} From a733528959c8d01a23185d04dafd2468b18e5e05 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20G=C3=B6tz?= <52959657+raphael-goetz@users.noreply.github.com> Date: Sat, 6 Jun 2026 17:03:11 +0200 Subject: [PATCH 6/6] Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Signed-off-by: Raphael Götz <52959657+raphael-goetz@users.noreply.github.com> --- crates/taurus/src/client/runtime_execution.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/crates/taurus/src/client/runtime_execution.rs b/crates/taurus/src/client/runtime_execution.rs index 0e91136..a21b7dd 100644 --- a/crates/taurus/src/client/runtime_execution.rs +++ b/crates/taurus/src/client/runtime_execution.rs @@ -24,10 +24,11 @@ impl TaurusRuntimeExecutionService { } pub async fn update_runtime_execution(&mut self, runtime_execution: ExecutionResult) { - log::info!("Updating the current Runtime Execution!"); - log::debug!( - "ExecutionResult payload sent to execution service: {:?}", - runtime_execution + log::info!( + "Transmitting execution result to Aquila (execution_id={}, flow_id={}, node_results={})", + runtime_execution.execution_identifier.as_str(), + runtime_execution.flow_id, + runtime_execution.node_execution_results.len() ); let request = Request::from_parts(