Skip to content

feat(dataframe): expose the executed physical plan with per-operator metrics #96

@LantaoJin

Description

@LantaoJin

Is your feature request related to a problem or challenge?

Today there are two ways to inspect the physical plan of a DataFrame, and neither is suitable for programmatic consumption:

  1. df.explain(false, false) / df.explain(true, false) -- return a DataFrame of text rows describing the lazy logical plan (and optimised + physical plans when verbose=true). Pre-execution. No metrics.
  2. df.explain(true, true) -- runs the plan, then returns a DataFrame of text rows that includes per-operator metrics rendered as a string. Post-execution. Metrics are present but only as text -- output_rows=12345, elapsed_compute=4.2ms etc.

Both surface the plan as text rows. To answer "did this query produce more output rows than the last run?" or "which operator spilled?" today, callers run df.explain(true, true).collect() and parse the strings. Brittle to upstream wording, ergonomically painful, and the metric values lose their type.

DataFusion's Rust API exposes the underlying structure already: Arc<dyn ExecutionPlan> is a tree whose nodes have typed metrics() -> Option<MetricsSet> accessors. The values are typed Count, Time, Gauge, Timestamp. None of that survives the trip into the existing EXPLAIN text. The gap is purely on the Java surface -- pre-execution and post-execution.

Describe the solution you'd like

A new DataFrame.executedPlan() returning a small immutable POJO tree, modelled on Spark's df.queryExecution.executedPlan:

record ExecutedPlan(
    String name,                       // "HashAggregateExec" / "DataSourceExec" / etc.
    String displayDetails,             // single-line rendering, e.g. "filter=x > 1"
    List<ExecutedPlan> children,
    OperatorMetrics metrics) { }

record OperatorMetrics(
    OptionalLong outputRows,           // OutputRows summed across partitions; absent if the operator doesn't track this
    OptionalLong elapsedComputeNanos,  // ElapsedCompute summed across partitions
    OptionalLong outputBytes,
    OptionalLong outputBatches,
    OptionalLong spillCount,
    OptionalLong spilledBytes,
    OptionalLong spilledRows,
    OptionalLong currentMemoryUsage,   // peak / latest Gauge value
    Map<String, Long> customCounters) {  // any MetricValue::Count(name) the operator emits
}

executedPlan() is lazy -- the call itself does not execute the query. It plans the DataFrame (forcing optimisation if not yet done) and returns a snapshot of the physical plan tree. Calling it before collect() / executeStream() returns the structure with zero-valued metrics. Calling it after returns the same structure with populated metrics. This matches Spark's shape: df.queryExecution.executedPlan is always available, and each node's metrics map fills in as the plan runs.

To make "same plan, before-and-after" work end-to-end, the native side stashes the planned Arc<dyn ExecutionPlan> on the DataFrame handle. collect() and executeStream() use that stashed plan if present (instead of creating a new one each call). After execution, a second executedPlan() call returns the same tree with metrics populated -- by reference to the same plan -- not a freshly-replanned tree.

try (DataFrame df = ctx.sql("SELECT count(*) FROM events WHERE ts > '2026-01-01'")) {
    ExecutedPlan before = df.executedPlan();              // structure, zero metrics
    System.out.println(before.name());                    // "AggregateExec"
    System.out.println(before.children().get(0).name());  // "DataSourceExec"

    try (BufferAllocator alloc = new RootAllocator();
         ArrowReader r = df.collect(alloc)) {
        while (r.loadNextBatch()) { /* ... */ }
    }

    ExecutedPlan after = df.executedPlan();               // same tree, populated metrics
    long rows = after.children().get(0).metrics().outputRows().orElse(-1L);
}

Describe alternatives you've considered

Parse the text from df.explain(true, true). Cheapest implementation — no native API changes — but brittle to upstream wording. The whole motivation here is to avoid string-scraping.

Faithful 1:1 mirror of MetricValue variants. A Java sealed type with one variant per upstream variant. More expressive, but pins the Java API to upstream's variant set; every DataFusion bump risks an API break. Going with the fixed set + customCounters map keeps the Java contract stable; the named getters cover every well-known variant, the map covers everything else.

Bundle into the existing explain text output. Add structured metrics columns to the EXPLAIN-output DataFrame. Doesn't help the parsing problem; would need a separate type to carry typed values anyway.

Eager getter that runs the query if not already run. df.executedPlan() would internally trigger materialisation when called before collect(). Surprising — a getter doing real work — and conflicts with the documented "non-consuming" pattern of the other introspection methods (schema, explain, cache, describe).

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request
    No fields configured for Feature.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions