Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 13 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,14 @@ serde_json = { version = "1.0", features = ["preserve_order"] }
uuid = { version = "1.0", features = ["v4", "serde"] }

# duroxide integration
duroxide = "=0.1.29"
# Using git dependency on pinodeca/continue-parent-link branch which preserves the
# parent link when a sub-orchestration calls continue_as_new, unblocking the
# sub-orchestration approach for df.loop.
# Compatibility: duroxide-pg 0.1.34 has been verified to compile and run correctly
# against this branch (same public API as 0.1.29 — PR #31 is a runtime-only change).
# Once the branch is merged and a new duroxide release is published, revert both
# entries back to crates.io version pins as a compatible pair.
duroxide = { git = "https://github.com/microsoft/duroxide.git", branch = "pinodeca/continue-parent-link" }
duroxide-pg = "=0.1.34"
tokio = { version = "1", features = ["rt-multi-thread", "sync", "time"] }

Expand All @@ -61,6 +68,11 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] }
[dev-dependencies]
pgrx-tests = "=0.16.1"

# Override the crates.io duroxide with the git branch for both the direct dependency
# and duroxide-pg's transitive dependency on duroxide.
[patch.crates-io]
duroxide = { git = "https://github.com/microsoft/duroxide.git", branch = "pinodeca/continue-parent-link" }

[profile.dev]
panic = "unwind"

Expand Down
9 changes: 9 additions & 0 deletions USER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -1065,6 +1065,15 @@ SELECT df.start(
> )
> ```

### How Loops Execute

Each loop iteration advances via *continue-as-new*, which restarts the loop with fresh state while preserving durability. Where that restart happens depends on whether the loop is the **root** of the function:

- A **root loop** (the outermost node, e.g. `df.start(df.loop(...))` or the `@>` prefix) runs inline on the function's own orchestration. There is no surrounding work to preserve, so each iteration simply restarts the function.
- A **non-root loop** (a loop with prefix/suffix nodes, or one nested inside a `df.if()`, JOIN (`&`), or RACE (`|`) branch) runs as its own **child sub-orchestration**. Only the loop body restarts on each iteration — any work *before* the loop runs exactly once and is never re-executed, and a loop nested in a parallel branch gets its own durable instance.

This is transparent to your workflow; it only affects observability. The child sub-orchestration is an internal durable instance: it does **not** appear in `df.list_instances()` (which lists only the instances you started with `df.start()`). Instead, the loop node's status in `df.instance_nodes()` / `df.explain()` reflects the child's progress, so you observe the loop through its parent instance as usual.

### Stopping a Loop Externally

```sql
Expand Down
16 changes: 12 additions & 4 deletions docs/api-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -381,10 +381,18 @@ Return columns:
**`status_details` JSON contract.** Written by the worker through the
`update-node-status` activity and stored verbatim in `df.nodes.status_details`:

- `execution_id` — the node's full segmented execution path, e.g.
`a1b2c3d4::1::7f9a0012::1`. Parse it positionally: the second `::`-token is the
root loop generation (used to detect superseded loop iterations), and the
trailing segments encode `JOIN`/`RACE` sub-orchestration lineage.
- `execution_id` — the node's full execution stamp, `{instance_path}::{generation}`,
e.g. `a1b2c3d4::1::7f9a0012::2`. The **last** `::`-token is the generation
(continue-as-new count) of the orchestration that transitioned the node, and the
preceding `{instance_path}` is that orchestration's instance id. `instance_path`
encodes sub-orchestration lineage: it starts with the root function instance id and
appends a `::{parent_generation}::{branch_or_loop_node_id}` segment for each nested
`JOIN`/`RACE` branch and each non-root `df.loop()` (which runs as its own child
sub-orchestration). Instance ids and node ids are 8-char hex and never contain `::`,
so the path is unambiguous. Supersession is evaluated **per scope**: a node is
superseded when a newer generation exists for its own `instance_path`, or when any
ancestor scope in its path has advanced to a newer generation. For a plain root-level
loop this reduces to the second `::`-token being the loop generation.

`inferred_status` and `inferred_status_from_ancestor_id` are **computed at read
time** and are not stored in `df.nodes.status_details`.
Expand Down
11 changes: 7 additions & 4 deletions sql/pg_durable--0.2.3--0.2.4.sql
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,13 @@ CREATE INDEX idx_instances_created_at ON df.instances(created_at DESC, id);
-- Upgrade ordering (in-flight instances): the worker's orchestration history
-- changed shape in this release -- update_node_status activity inputs gained an
-- execution_id field, and JOIN/RACE branch sub-orchestrations now use
-- deterministic composed instance ids instead of auto-generated ones. duroxide
-- replays by exact equality on recorded inputs/ids, so instances in flight across
-- the upgrade cannot resume; drain or recreate them before upgrading (the same
-- constraint documented for issue #129).
-- deterministic composed instance ids instead of auto-generated ones. Non-root
-- df.loop() nodes also run as their own child sub-orchestration (with a
-- deterministic composed instance id) that stamps the loop node directly, instead
-- of the parent stamping it inline. This adds no new DDL -- status_details already
-- covers the loop node's stamps. duroxide replays by exact equality on recorded
-- inputs/ids, so instances in flight across the upgrade cannot resume; drain or
-- recreate them before upgrading (the same constraint documented for issue #129).
-- ============================================================================
ALTER TABLE df.nodes ADD COLUMN status_details JSONB;

Expand Down
73 changes: 60 additions & 13 deletions src/node_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,48 @@ pub struct Inferred {
pub from_ancestor_id: Option<String>,
}

/// Parse the loop generation (the second "::"-token) from a node's
/// status_details `execution_id` stamp. None when never stamped / unparseable.
fn gen_of(status_details: Option<&str>) -> Option<i64> {
/// Parse a node's `execution_id` stamp into `(instance_id, generation)`.
///
/// The stamp is `{orchestration_instance_id}::{execution_id}`. The trailing token is the
/// generation written by the innermost orchestration (a loop advances it via
/// `continue_as_new`); everything before it is that orchestration's instance id (its
/// "scope"). Instance ids and node ids are 8-char hex, so splitting on the last "::" is
/// unambiguous. None when never stamped / unparseable.
fn stamp_of(status_details: Option<&str>) -> Option<(String, i64)> {
let sd = status_details?;
let v: serde_json::Value = serde_json::from_str(sd).ok()?;
v.get("execution_id")?
.as_str()?
.split("::")
.nth(1)?
.parse::<i64>()
.ok()
let eid = v.get("execution_id")?.as_str()?;
let (instance_id, generation) = eid.rsplit_once("::")?;
Some((instance_id.to_string(), generation.parse::<i64>().ok()?))
}

/// Whether a node stamped `(instance_id, gen)` belongs to a superseded generation.
///
/// A node is superseded when a newer generation exists in its own scope, or when any
/// ancestor scope along its spawn lineage has advanced past the generation that spawned
/// this scope. A spawned scope's instance id is
/// `{parent_instance}::{parent_gen}::{child_root_node_id}` (see `subtree_instance_id` in
/// the orchestration), so the spawning generation is the second-to-last "::"-token and the
/// parent instance id is everything before it. The recursion makes a non-root loop body
/// (older inner generation) and a parallel branch spawned by an older loop generation both
/// read as superseded, while the root case (a single-token instance id) reduces to a plain
/// per-scope generation compare.
fn is_superseded(instance_id: &str, gen: i64, scope_max: &HashMap<String, i64>) -> bool {
if let Some(&m) = scope_max.get(instance_id) {
if gen < m {
return true;
}
}
let tokens: Vec<&str> = instance_id.split("::").collect();
if tokens.len() < 3 {
// Root scope: no parent scope can supersede this one.
return false;
}
let parent_instance = tokens[..tokens.len() - 2].join("::");
match tokens[tokens.len() - 2].parse::<i64>() {
Ok(parent_gen) => is_superseded(&parent_instance, parent_gen, scope_max),
Err(_) => false,
}
}

fn is_terminal(status: Option<&str>) -> bool {
Expand Down Expand Up @@ -128,6 +159,20 @@ pub fn infer_statuses<N: NodeFacts>(
let mut out: HashMap<String, Inferred> = HashMap::new();
let mut visited: HashSet<String> = HashSet::new();

// Per-scope current generation: the highest trailing generation stamped for each
// orchestration instance id (the stamp with its last "::"-token removed). A node is
// superseded when a newer generation exists for its own scope or for any ancestor
// scope along its spawn lineage (see `is_superseded`).
let mut scope_max: HashMap<String, i64> = HashMap::new();
for n in nodes.values() {
if let Some((instance_id, generation)) = stamp_of(n.status_details()) {
let slot = scope_max.entry(instance_id).or_insert(i64::MIN);
if generation > *slot {
*slot = generation;
}
}
}

// Top-down walk from the root, carrying the nearest terminal ancestor and
// highest-generation ancestor seen so far.
if let Some(root) = root_node {
Expand All @@ -141,11 +186,13 @@ pub fn infer_statuses<N: NodeFacts>(
None => continue, // dangling reference (should not happen)
};

let gen_n = gen_of(n.status_details());
let gen_n = stamp_of(n.status_details()).map(|(_, g)| g);
let terminal = is_terminal(n.status());
let superseded = match (gen_n, &max_gen_anc) {
(Some(g), Some((ag, _))) => *ag > g,
_ => false,
let superseded = match stamp_of(n.status_details()) {
Some((instance_id, generation)) => {
is_superseded(&instance_id, generation, &scope_max)
}
None => false,
};

let (inferred, from_anc): (String, Option<String>) = if terminal {
Expand Down
Loading
Loading