Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ edition = "2024"
[workspace.dependencies]
async-trait = "0.1.89"
code0-flow = { version = "0.0.33" }
tucana = { version = "0.0.72" }
tucana = { version = "0.0.73" }
tokio = { version = "1.44.1", features = ["rt-multi-thread", "signal"] }
log = "0.4.27"
futures-lite = "2.6.0"
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Taurus
The heart of the execution block - the runtime itself

- Executes flows and handles test executions
- Executes flows and transmits execution results
- Requests single node executions from Actions
- Serves the standard CodeZero library

Expand Down
226 changes: 216 additions & 10 deletions crates/taurus-core/src/runtime/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mod emitter;
mod executor;
mod model;

use tucana::shared::{ExecutionFlow, NodeFunction, Value};
use tucana::shared::{ExecutionFlow, NodeExecutionResult, NodeFunction, Value};

use crate::handler::registry::FunctionStore;
use crate::runtime::execution::value_store::ValueStore;
Expand All @@ -29,6 +29,14 @@ pub struct ExecutionEngine {
handlers: FunctionStore,
}

/// Full result of one engine execution, including per-node results for reporting.
#[derive(Debug, Clone)]
pub struct EngineExecutionReport {
pub signal: Signal,
pub exit_reason: ExitReason,
pub node_execution_results: Vec<NodeExecutionResult>,
}

impl Default for ExecutionEngine {
fn default() -> Self {
Self::new()
Expand All @@ -51,13 +59,14 @@ impl ExecutionEngine {
respond_emitter: Option<&dyn RespondEmitter>,
with_trace: bool,
) -> (Signal, ExitReason) {
self.execute_flow_with_execution_id(
let report = self.execute_flow_with_execution_id_report(
ExecutionId::new_v4(),
flow,
remote,
respond_emitter,
Comment on lines +62 to 66
with_trace,
)
);
(report.signal, report.exit_reason)
}

/// Execute an `ExecutionFlow` with a caller-provided execution id.
Expand All @@ -69,7 +78,43 @@ impl ExecutionEngine {
respond_emitter: Option<&dyn RespondEmitter>,
with_trace: bool,
) -> (Signal, ExitReason) {
self.execute_graph_with_execution_id(
let report = self.execute_flow_with_execution_id_report(
execution_id,
flow,
remote,
respond_emitter,
with_trace,
);
(report.signal, report.exit_reason)
}

/// Execute an `ExecutionFlow` and return the final signal plus per-node execution results.
pub fn execute_flow_report(
&self,
flow: ExecutionFlow,
remote: Option<&dyn RemoteRuntime>,
respond_emitter: Option<&dyn RespondEmitter>,
with_trace: bool,
) -> EngineExecutionReport {
self.execute_flow_with_execution_id_report(
ExecutionId::new_v4(),
flow,
remote,
respond_emitter,
with_trace,
)
}

/// Execute an `ExecutionFlow` with a caller-provided execution id and return per-node results.
pub fn execute_flow_with_execution_id_report(
&self,
execution_id: ExecutionId,
flow: ExecutionFlow,
remote: Option<&dyn RemoteRuntime>,
respond_emitter: Option<&dyn RespondEmitter>,
with_trace: bool,
) -> EngineExecutionReport {
self.execute_graph_with_execution_id_report(
execution_id,
flow.starting_node_id,
flow.node_functions,
Expand All @@ -90,15 +135,16 @@ impl ExecutionEngine {
respond_emitter: Option<&dyn RespondEmitter>,
with_trace: bool,
) -> (Signal, ExitReason) {
self.execute_graph_with_execution_id(
let report = self.execute_graph_with_execution_id_report(
ExecutionId::new_v4(),
start_node_id,
node_functions,
flow_input,
remote,
respond_emitter,
with_trace,
)
);
(report.signal, report.exit_reason)
}

/// Execute a graph described by node list and start node with a caller-provided execution id.
Expand All @@ -112,6 +158,50 @@ impl ExecutionEngine {
respond_emitter: Option<&dyn RespondEmitter>,
with_trace: bool,
) -> (Signal, ExitReason) {
let report = self.execute_graph_with_execution_id_report(
execution_id,
start_node_id,
node_functions,
flow_input,
remote,
respond_emitter,
with_trace,
);
(report.signal, report.exit_reason)
}

/// Execute a graph and return the final signal plus per-node execution results.
pub fn execute_graph_report(
&self,
start_node_id: i64,
node_functions: Vec<NodeFunction>,
flow_input: Option<Value>,
remote: Option<&dyn RemoteRuntime>,
respond_emitter: Option<&dyn RespondEmitter>,
with_trace: bool,
) -> EngineExecutionReport {
self.execute_graph_with_execution_id_report(
ExecutionId::new_v4(),
start_node_id,
node_functions,
flow_input,
remote,
respond_emitter,
with_trace,
)
}

/// Execute a graph with a caller-provided execution id and return per-node results.
pub fn execute_graph_with_execution_id_report(
&self,
execution_id: ExecutionId,
start_node_id: i64,
node_functions: Vec<NodeFunction>,
flow_input: Option<Value>,
remote: Option<&dyn RemoteRuntime>,
respond_emitter: Option<&dyn RespondEmitter>,
with_trace: bool,
) -> EngineExecutionReport {
if let Some(emitter) = respond_emitter {
emitter.emit(execution_id, EmitType::StartingExec, null_value());
}
Expand All @@ -129,7 +219,11 @@ impl ExecutionEngine {
emitter.emit(execution_id, EmitType::FailedExec, runtime_error.as_value());
}
let signal = Signal::Failure(runtime_error);
return (signal, ExitReason::Failure);
return EngineExecutionReport {
signal,
exit_reason: ExitReason::Failure,
node_execution_results: Vec::new(),
};
}
};

Expand Down Expand Up @@ -160,7 +254,11 @@ impl ExecutionEngine {
}
}
let exit_reason = signal.exit_reason();
(signal, exit_reason)
EngineExecutionReport {
signal,
exit_reason,
node_execution_results: value_store.node_execution_results(),
}
}
}

Expand All @@ -171,8 +269,8 @@ mod tests {
use std::cell::RefCell;
use tucana::shared::{
InputType, ListValue, NodeParameter, NodeValue, ReferenceValue, Struct, SubFlow,
SubFlowSetting, Value, node_value, reference_value, sub_flow::ExecutionReference,
value::Kind,
SubFlowSetting, Value, node_execution_result, node_value, reference_value,
sub_flow::ExecutionReference, value::Kind,
};

fn literal_param(database_id: i64, runtime_parameter_id: &str, value: Value) -> NodeParameter {
Expand Down Expand Up @@ -719,6 +817,114 @@ mod tests {
assert_eq!(expect_success(signal), int_value(42));
}

#[test]
fn execution_report_includes_literal_node_parameter_results() {
let engine = ExecutionEngine::new();
let add_node = node(
1,
"std::number::add",
vec![
literal_param(100, "lhs", int_value(1)),
literal_param(101, "rhs", int_value(2)),
],
None,
);

let report = engine.execute_graph_report(1, vec![add_node], None, None, None, false);

assert_eq!(report.exit_reason, ExitReason::Success);
assert_eq!(report.node_execution_results.len(), 1);

let node_result = &report.node_execution_results[0];
assert_eq!(node_result.node_id, 1);
assert_eq!(node_result.parameter_results.len(), 2);
assert_eq!(node_result.parameter_results[0].value, Some(int_value(1)));
assert_eq!(node_result.parameter_results[1].value, Some(int_value(2)));

match node_result.result.as_ref() {
Some(node_execution_result::Result::Success(value)) => {
assert_eq!(value, &int_value(3));
}
other => panic!("expected node success result, got {:?}", other),
}
}

#[test]
fn execution_report_includes_reference_node_parameter_results() {
let engine = ExecutionEngine::new();
let value_node = node(
1,
"std::control::value",
vec![literal_param(100, "value", int_value(7))],
Some(2),
);
let add_node = node(
2,
"std::number::add",
vec![
node_result_ref_param(200, "lhs", 1),
literal_param(201, "rhs", int_value(5)),
],
None,
);

let report =
engine.execute_graph_report(1, vec![value_node, add_node], None, None, None, false);

assert_eq!(report.exit_reason, ExitReason::Success);
assert_eq!(report.node_execution_results.len(), 2);

let node_result = &report.node_execution_results[1];
assert_eq!(node_result.node_id, 2);
assert_eq!(node_result.parameter_results.len(), 2);
assert_eq!(node_result.parameter_results[0].value, Some(int_value(7)));
assert_eq!(node_result.parameter_results[1].value, Some(int_value(5)));

match node_result.result.as_ref() {
Some(node_execution_result::Result::Success(value)) => {
assert_eq!(value, &int_value(12));
}
other => panic!("expected node success result, got {:?}", other),
}
}

#[test]
fn execution_report_includes_respond_node_parameter_results() {
let engine = ExecutionEngine::new();
let respond_node = node(
1,
"rest::control::respond",
vec![
literal_param(100, "http_status_code", int_value(200)),
literal_param(101, "headers", empty_struct_value()),
literal_param(102, "payload", string_value("hello")),
],
None,
);

let report = engine.execute_graph_report(1, vec![respond_node], None, None, None, false);

assert_eq!(report.exit_reason, ExitReason::Success);
assert_eq!(report.node_execution_results.len(), 1);

let node_result = &report.node_execution_results[0];
assert_eq!(node_result.node_id, 1);
assert_eq!(node_result.parameter_results.len(), 3);
assert_eq!(node_result.parameter_results[0].value, Some(int_value(200)));
assert_eq!(
node_result.parameter_results[1].value,
Some(empty_struct_value())
);
assert_eq!(
node_result.parameter_results[2].value,
Some(string_value("hello"))
);
assert!(matches!(
node_result.result,
Some(node_execution_result::Result::Success(_))
));
}

#[test]
fn emitter_emits_start_and_finish_for_successful_execution() {
let engine = ExecutionEngine::new();
Expand Down
Loading