Skip to content

feat!(datafusion): enable parallel file scanning with eager task bucketing#2298

Open
toutane wants to merge 32 commits into
apache:mainfrom
toutane:draft/partitioned-file-scanning-contribution
Open

feat!(datafusion): enable parallel file scanning with eager task bucketing#2298
toutane wants to merge 32 commits into
apache:mainfrom
toutane:draft/partitioned-file-scanning-contribution

Conversation

@toutane

@toutane toutane commented Mar 31, 2026

Copy link
Copy Markdown
Contributor

This PR has been split up, first part is #2671

Which issue does this PR close?

Motivation

Today the DataFusion scan node (IcebergTableScan) streams every file through a single partition and reports UnknownPartitioning. Two costs follow:

  • No parallelism. File reads aren't spread across DataFusion partitions, so we don't use the parallel execution it's built for.

  • Redundant shuffles. Because the node hides its partitioning, the optimizer can't tell the data is already physically partitioned by Iceberg, and inserts a RepartitionExec for joins/aggregations on partition columns -- re-shuffling already co-located rows.
    This PR is a scoped fix at the file grain (one FileScanTask = smallest unit). It doesn't preempt the sub-file / row-group direction tracked in EPIC: Support parallel scan in iceberg-datafusion #1604; it's the intermediate step toward it.

What changes are included in this PR?

Breaking change to IcebergTableProvider and IcebergTableScan. The scan node now has two modes:

  • Eager (new default). IcebergTableProvider::scan plans the tasks at planning time and buckets them into min(target_partitions, n_files) groups; each bucket is one output partition the scan streams in parallel. Tasks are computed once and shared across all partitions; the plan is reproducible; execute(partition) is pure I/O with no catalog round-trips. IcebergTableProvider::scan now does network I/O (catalog + metadata reads), which is unusual for a planning-phase method. An alternative design - planning lazily at execute time - would keep scan() cheap but requires one plan_files() call per partition (redundant). A future extension could expose this as an option for use cases where snapshot staleness matters more than plan reproducibility.

  • Lazy (legacy). Tasks are planned at execution time by IcebergStaticTableProvider, as before -- the fallback. Static snapshots do not benefit from eager planning because the task list is fixed by construction.

A single scan node carries both modes (as opposed to introducing new types as originally proposed), so there's one provider and one scan node to maintain as suggested by @timsaucer.

Output partitioning. When the table is identity-partitioned on the scanned columns, the node declares Partitioning::Hash(...) instead of UnknownPartitioning, bucketing tasks so the result matches DataFusion's repartition hashing (create_hashes + REPARTITION_RANDOM_STATE); the optimizer can then drop the redundant RepartitionExec. It falls back to UnknownPartitioning whenever that match isn't guaranteed: spec evolution, unsupported dtypes, null partition values, or non-identity transforms (bucket/truncate/temporal).

Key changes

  • Eager planning moves into IcebergTableProvider::scan. In eager mode plan_files() runs at planning time and the tasks are bucketed (bucketing::bucket_tasks) into min(target_partitions, n_files) groups stored on the scan node. In lazy mode (IcebergStaticTableProvider), plan_files() stays in execute().

  • Eager execute(partition) builds the Arrow reader directly. It streams its pre-assigned bucket through an ArrowReaderBuilder, reusing the scan's reader settings (concurrency limit, row-group filtering, row selection, batch size) -- no per-partition plan_files() or catalog round-trip. Buckets are held as Arc<[Arc<[FileScanTask]>]>, so execute hands out a pointer instead of cloning the task list.

  • New bucketing module (proposed by @timsaucer). Assigns tasks to buckets and decides the declared Partitioning: for a single-spec identity-partitioned table it hashes tasks on their partition values (create_hashes + REPARTITION_RANDOM_STATE) and declares Partitioning::Hash, otherwise UnknownPartitioning.

  • New IcebergTableScanBuilder. Replaces the new / new_with_tasks / new_inner constructors (and their too_many_arguments), validating buckets.len() == partition_count and returning Result so an inconsistent node can't be built.

  • with_new_children now errors. IcebergTableScan is a leaf node; non-empty children return DataFusionError::Internal instead of being silently dropped (matching IcebergCommitExec).

  • The builder's build is now pub. Kept public so downstream integrations with their own catalog / schema provider can construct an IcebergTableScan directly, as requested in feat(datafusion): export table provider constructor #2123.

Known limitations

  • Limited type support for Partitioning::Hash. Each identity column is hashed via a single-element Arrow array, so only handled dtypes can declare Hash. The set has been broadened (timestamps now work); a few types such as Utf8View still fall back to UnknownPartitioning.

  • Spec evolution disables Partitioning::Hash. With more than one historical spec, the module returns UnknownPartitioning rather than risk a partition-tuple mismatch -- stricter than iceberg-java (see follow-ups). Covered by test_spec_evolution_falls_back_to_unknown_partitioning.

  • Bucketing is count-based, not size-aware. Distribution is hash % n_partitions, ignoring file_size_in_bytes; one large file among small ones can serialize the query on a single bucket. iceberg-java bin-packs by size (Plan file scan task according scan file size. #128).

  • Planning runs in scan() and may re-run. scan() does catalog + manifest I/O; if DataFusion calls it multiple times during optimization, planning repeats -- no task caching yet, whereas iceberg-java caches behind tasks() / taskGroups().

Follow-up work

  • Redundant FilterExec (IcebergTableProvider::supports_filters_pushdown marks every filter as Inexact, causing a redundant FilterExec above IcebergTableScan #2363) -- @timsaucer reports that supports_filters_pushdown returns Inexact for all filters, causing DataFusion to insert a FilterExec above IcebergTableScan even though the Arrow reader already applies the predicate via ArrowPredicateFn. Returning Exact for losslessly-converted filters would eliminate this redundant re-evaluation (only identity transforms are safe to mark Exact). He proposed a solution in earlier commits, but those have been reverted as out of scope for this PR.

  • Size-based bin-packing (Plan file scan task according scan file size. #128) -- distribute tasks by file_size_in_bytes (e.g. first-fit-decreasing) rather than by count; the size is already on FileScanTask.

  • Cache planned tasks -- key the bucketed tasks by (snapshot, projection, filter) so repeated scan() calls during optimization don't re-read manifests, mirroring iceberg-java's lazy caching.

  • Grouping key across specs (datafusion: relax identity grouping-key gate across partition-spec evolution #2658) -- instead of disabling Partitioning::Hash when multiple specs exist, intersect the identity fields common to all specs and declare Hash on those columns (iceberg-java's groupingKeyType / commonActiveFieldIds). The per-column intersection was tried and reverted as out of scope here.

  • Widen dtype coverage -- reuse create_primitive_array_single_element (arrow/value.rs, pub(crate)) for the bucketing arrays; it already maps ~20 literal types and handles nulls, so more partition types could declare Hash.

Are these changes tested?

Yes. Unit tests in table/mod.rs covering the new bucketed scan path. Additional tests are added for IcebergTableProvider to cover limit pushdown, insert behavior, and schema consistency, ensuring the refactor introduces no regressions on existing functionality. EXPLAIN SQL logic tests snapshots are updated to reflect the new buckets:[N] file_count:[M] display format and the correct input_partitions counts.

We plan to test these changes in our infrastructure by shadowing real-world queries.

@timsaucer timsaucer left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm no expert on Iceberg but I've worked a lot on DataFusion, particularly table providers. I wrote a blog on the datafusion site recently, but since you first put this PR up. In case it's in any way useful: https://datafusion.apache.org/blog/2026/03/31/writing-table-providers/

Overall I think the approach here is definitely reasonable. My comments are mostly around opportunities to squeeze out a little more performance based on having done something similar at my work.

self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
Ok(self)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this doesn't support children, I'd recommend an error if _children is not empty. Not a blocker for merge.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're right thanks! Pushed a fix that returns a DataFusionError::Internal, matching the pattern used in IcebergCommitExec::with_new_children.

Side note: IcebergTableScan::with_new_children has the same issue. This could be the subject of another PR.

Comment thread crates/integrations/datafusion/src/table/mod.rs Outdated
&self,
filters: &[&Expr],
) -> DFResult<Vec<TableProviderFilterPushDown>> {
Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do better than this? If we have partitioned scan and the filter is on the partitions I would expect to be able to get an exact pushdown. That would entirely remove a filter operation for cases where it matches, and I think that's a big win and common use case I've seen in other work.

@toutane toutane Apr 20, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're right there's something to do here, I agree.

I'd prefer to tackle this in a follow-up PR: doing it correctly requires a per-filter conversion API (currently convert_filters_to_predicate collapses everything into a single combined predicate and silently drops non-convertible filters) and, in a partition-spec-aware check, only Identity-transformed partition columns can be safely marked Exact; bucket, truncate, year/month/etc. are lossy and must stay Inexact to avoid incorrect results.

Happy to open a tracking issue. However, if you think it's simple enough, I can go ahead and make the changes directly in the PR.

.map_err(to_datafusion_error)?
.try_collect::<Vec<_>>()
.await
.map_err(to_datafusion_error)?;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the number of output partitions will be the number of files, right? I'm wondering if there's an opportunity to do better than that. We're specifying that the output partitioning in the exec is unknown, but don't we have information about the partitioning we could utilize?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By better I mean could we be more performant if we were to go ahead and get the target partitions from the session and output in those number of partitions already with hashing?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for raising this, please push back if any of the below is off.

For context, the long-term direction for this is tracked in the EPIC #1604 (row-group-based parallel scan with a GroupPruner that can split/merge FileScanTask below the file grain). What I was hoping to land with this PR is a more immediate, scoped optimization that stays within the current file-grain contract, so we don't preempt the design choices in #1604. The file-grouping step you're pointing at is essentially what #2220 describes as the intermediate improvement on the path toward #1604.

If you think it's appropriate, I'd be happy to pick up a short-term follow-up along these lines:

  1. Switch IcebergPartitionedScan from tasks: Vec<FileScanTask> to file_groups: Vec<Vec<FileScanTask>>, to follow the convention used by DataFusion's own FileScanConfig, each group = one DataFusion partition that streams its files sequentially through ArrowReaderBuilder::read.
  2. In IcebergPartitionedTableProvider::scan, read state.config().target_partitions() and group tasks into min(n_files, target_partitions) buckets.
  3. When n_files < target_partitions, parallelism is still capped at n_files. I think that's inherent to the file grain, but let me know if I'm missing something.

I'm happy to open the follow-up issue/PR myself, or defer to you if you'd rather frame it, whatever works best.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose I'd need to understand those conversations. I think I mentioned in one of the other comments on this PR, but I found the whole discussion difficult to track. Maybe I can find some time this weekend to look through that sized based partitioning they mention.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrote this PR targeting your branch. Let me know what you think!

toutane#1

The one issue I have is that I do not personally have access to any iceberg catalogs that I could use for benchmarking. My ability to test it is very limited right now.

@toutane toutane Apr 27, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey Tim, thanks a lot for the proposal. It is really clean and smart.

I created an issue for the redundant FilterExec you were mentioning (#2363), so nice that you're addressed it here.

For the benchmark, we can do it in our infra by shadowing real traffic (our ultimate goal would be to distribute execution on multiple workers, based on the output partitioning). I will not be a standard benchmark but at least it will show if things are improving on real world queries.

What do you finally thing of merging this new provider/scan with the current one so that we only maintain one path as you suggested? If I understand correctly the current path is reachable by setting target_partitions to 1.

Last thing, I'll try to support partitioning based on Iceberg bucket transform, the tricky thing being that DataFusion and Iceberg aren't using the same hash function making the bucket hash incompatible with RepartitionExec.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I strongly believe you should be updating the existing table provider instead of creating a new one. I think it's just more work in the long run to keep to near identical bits of code.

I don't think you'll be able to use iceberg bucket transforms for the datafusion hashing output.

@mbutrovich

Copy link
Copy Markdown
Collaborator

Thanks for the PR, @toutane! One thing I noticed: IcebergPartitionedScan::execute() creates a bare ArrowReaderBuilder::new(file_io).build() with no configuration. The existing path through IcebergTableScan wires through row group filtering, row selection, concurrency limits, and batch size. Might be worth plumbing those through here too so users don't silently lose those optimizations when switching to the partitioned scan.

@toutane toutane marked this pull request as draft April 21, 2026 09:35
@toutane toutane force-pushed the draft/partitioned-file-scanning-contribution branch from 0a7af45 to fde61f6 Compare April 21, 2026 09:56
@timsaucer

Copy link
Copy Markdown
Member

More broadly, is adding in a second path really the best answer? It seems like now you're going to increase your maintenance load. Is there any reason not to have a single path and the fallback be that it's a partitioned scan of N=1?

I am going to spend a little more time trying to understand the issues. It's difficult because some of them are marked as unplanned or stale and some of the links do not have good descriptions. I suppose I'll need to look at the java source to get a better idea of what the long term goal is.

@toutane

toutane commented Apr 22, 2026

Copy link
Copy Markdown
Contributor Author

Hey Tim, I think you're absolutely right about consolidating everything into a single TableProvider long term.

The only reason I kept separate paths was to avoid introducing breaking changes. I am going to explore a design where partitioned file scan becomes the default behavior, with the current provider's logic as a fallback as you suggested.

On a related note, it could be worth thinking about the next step: exposing Partitioning::Hash as output-partitioned when the Iceberg data uses bucket partitioning. Do you think that fits naturally in the same path, or would a separate provider be a better fit?

@timsaucer

Copy link
Copy Markdown
Member

Hey Tim, I think you're absolutely right about consolidating everything into a single TableProvider long term.

The only reason I kept separate paths was to avoid introducing breaking changes. I am going to explore a design where partitioned file scan becomes the default behavior, with the current provider's logic as a fallback as you suggested.

On a related note, it could be worth thinking about the next step: exposing Partitioning::Hash as output-partitioned when the Iceberg data uses bucket partitioning. Do you think that fits naturally in the same path, or would a separate provider be a better fit?

I understand a desire to not introduce breaking changes. Is the concern that the API is changing or do you have implementation concerns? If it's just the API change, then I think a good upgrade documentation is often sufficient, especially since it looks like the change would be fairly straightforward for a downstream consumer. Please correct me if that's not correct.

If it's concern about the implementation, then I think the real solution is to make sure there's robust testing both in the repo and against some real life workloads to verify performance at different scales and partitioning structures.

With respect to the question about output partitioning, I think any time you can do that you should. Any time we can give more information about these kinds of things we're going to see performance gains, and sometimes significant gains.

@toutane toutane force-pushed the draft/partitioned-file-scanning-contribution branch from 0a2ba62 to 70bc487 Compare April 29, 2026 14:48
@toutane toutane changed the title feat(datafusion): enable parallel file-level scanning via one partition per file feat!(datafusion): enable parallel file scanning with eager task bucketing Apr 29, 2026
@toutane toutane force-pushed the draft/partitioned-file-scanning-contribution branch from 1f87e13 to 7ff1f6d Compare April 30, 2026 09:12
@toutane toutane marked this pull request as ready for review April 30, 2026 09:44
@toutane toutane requested a review from timsaucer April 30, 2026 09:44
@toutane toutane closed this May 7, 2026
@toutane toutane deleted the draft/partitioned-file-scanning-contribution branch May 7, 2026 09:21
@toutane toutane restored the draft/partitioned-file-scanning-contribution branch May 7, 2026 09:23
@toutane toutane reopened this May 7, 2026
toutane added a commit to DataDog/iceberg-rust that referenced this pull request May 11, 2026
…ning with eager task bucketing (apache#2298) (#18)

* feat(datafusion): add IcebergPartitionedTableProvider and IcebergPartitionedScan for parallel file scanning

(cherry picked from commit b9819b4)

* docs(datafusion): update comment in IcebergPartitionedScan

(cherry picked from commit c362b0f)

* Update crates/integrations/datafusion/src/table/mod.rs

Co-authored-by: Tim Saucer <timsaucer@gmail.com>
(cherry picked from commit ec1bd37)

* fix(datafusion): reject non-empty children in IcebergPartitionedScan::with_new_children

(cherry picked from commit 5e53cb8)

* fix(datafusion): use ArrowReaderBuilder existing configuration path

(cherry picked from commit cc6a833)

* format

(cherry picked from commit 75e521d)

* feat(datafusion): bucket FileScanTasks across target_partitions with identity-hash partitioning

Replace the one-task-per-partition layout in IcebergPartitionedScan with
N buckets sized from the session's target_partitions. When the table's
default spec exposes identity-transform columns and every task carries
the corresponding partition values, tasks are bucketed by hashing those
values via DataFusion's REPARTITION_RANDOM_STATE so the resulting
partitioning matches what RepartitionExec would produce. The scan then
declares Partitioning::Hash(exprs, N), letting downstream joins and
aggregates skip an extra repartition.

Hash declaration is conservative and only stands when:
  - the table has a single partition spec (no spec evolution)
  - every identity source column is present in the output projection
  - every column type is supported by literal_to_array
  - every task supplied a full identity key
Any miss collapses to UnknownPartitioning(N) while bucketing falls
back to a hash of data_file_path so partitions still distribute.

IcebergPartitionedScan now stores Vec<Vec<FileScanTask>> and execute(i)
streams every task in buckets[i] through to_arrow_with_tasks. Bucket
count is capped at min(target_partitions, num_files), and an empty
table still yields zero partitions to avoid out-of-bounds execute calls.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
(cherry picked from commit 9edd54a)

* feat(datafusion): mark identity-partition filters as Exact pushdown

`IcebergPartitionedTableProvider::supports_filters_pushdown` previously
returned `Inexact` for every filter, forcing DataFusion to re-evaluate
even filters that Iceberg's manifest-level pruning has fully resolved.
Per-filter the provider now returns `Exact` when both:
  - the iceberg conversion can represent the filter, so manifest pruning
    will remove every row that fails it, and
  - every leaf is a comparison or null check against an identity-
    partition column with a literal RHS.

Identity-partitioned column names are cached at `try_new` from the
table's default spec; tables with spec evolution (>1 historical specs)
fall back to an empty set so all filters stay `Inexact`. Supported
shapes: =, !=, <, <=, >, >=, IS NULL, IS NOT NULL, IN/NOT IN, plus
AND/OR/NOT compositions of the above. Every other shape is `Inexact`.

`convert_filter_to_predicate` is promoted to `pub(crate)` so the
provider can probe convertibility per filter without rebuilding the
whole AND-collapsed predicate.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
(cherry picked from commit add5e35)

* feat(datafusion): allow Exact pushdown across spec evolution via per-column intersection

Previously identity_partition_col_names returned an empty set whenever
the table had more than one historical partition spec, forcing every
filter back to Inexact under spec evolution. This was overly
conservative: Iceberg evaluates partition predicates against each
manifest's own spec, so a column that is identity-partitioned in every
spec is fully prunable across the entire table regardless of which spec
a given file was written under.

Replace the multi-spec gate with an intersection across every spec's
identity-source set. A column survives only if every spec includes it
with Transform::Identity; columns that appear with non-identity
transforms in some spec, or are missing from a spec entirely, are
dropped. The result remains an honest set of columns for which Exact
pushdown is provably safe across all surviving files.

Hash bucketing (compute_identity_cols) keeps its single-spec gate
because slot-order alignment with the table's default spec depends on
each task carrying its own spec id, which the native plan flow does
not yet do.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
(cherry picked from commit e8771e4)

* Revert "feat(datafusion): allow Exact pushdown across spec evolution via per-column intersection"

This reverts commit b2613e3.

(cherry picked from commit 826f054)
(cherry picked from commit aba4523)

* Revert "feat(datafusion): mark identity-partition filters as Exact pushdown"

This reverts commit 6d0ed4c.

(cherry picked from commit 4381f00)
(cherry picked from commit 598c5de)

* refactor(datafusion): merge IcebergPartitionedTableProvider into IcebergTableProvider

IcebergPartitionedTableProvider and IcebergPartitionedScan were introduced
to enable parallel file scanning by bucketing FileScanTasks across DataFusion
partitions. However, maintaining two TableProvider implementations is
redundant: the new provider is strictly more capable, and its degenerate case
(target_partitions=1) reproduces the old single-partition behavior exactly.

This commit folds the partitioned provider into IcebergTableProvider and
the partitioned scan into IcebergTableScan, eliminating the parallel types.

Changes:
- IcebergTableProvider::scan() now eagerly calls plan_files() and distributes
FileScanTasks into buckets using the same identity-hash strategy
(REPARTITION_RANDOM_STATE + create_hashes) that was in
IcebergPartitionedTableProvider, enabling Partitioning::Hash declarations
that align with DataFusion's RepartitionExec.
- IcebergTableScan gains a new_with_tasks() constructor that accepts
pre-planned buckets and a caller-supplied Partitioning. execute(i) streams
the tasks in buckets[i] via TableScan::to_arrow_with_tasks, rebuilding
the TableScan per-partition to avoid serializing PlanContext Arc-shared
caches across workers.
- The original new() constructor and the to_arrow() lazy path are kept
unchanged for IcebergStaticTableProvider, which does not pre-plan tasks.
- Limit slicing (try_filter_map truncation) from the old IcebergTableScan
is preserved in both execution paths.
- Bucketing helpers (IdentityCol, compute_identity_cols, bucket_tasks,
identity_hash, fallback_hash, literal_to_array, is_supported_dtype) are
moved verbatim into a new private table/bucketing.rs module.
- Unit tests from partitioned.rs are migrated to table/mod.rs and updated
to use IcebergTableProvider and IcebergTableScan.
- integration_datafusion_test.rs: fix test_provider_plan_stream_schema to
call execute(0) instead of execute(1). The old call worked only because
the previous IcebergTableScan silently ignored the partition index.

(cherry picked from commit d2e5e04)
(cherry picked from commit 23f3d8f)

* refactor(datafusion): polish scan API and add bucketing tests

Review pass over the partitioned-scan branch ahead of upstream
contribution.

- Rename `TableScan::to_arrow_with_tasks` to `to_arrow_from_tasks` —
  `from` better signals that the tasks are the input source rather
  than a builder-style modifier.
- Restructure the doc with a `# Correctness` section that calls out
  the projection/filter contract while clarifying that reader-side
  configuration (concurrency, batch size, row-group filtering, row
  selection) is taken from `self`.
- Make `IcebergTableScan::new` and `new_with_tasks` `pub` (were
  `pub(crate)`) so external users can construct the node directly,
  matching the public visibility of the struct itself.
- Drop the `convert_filters_to_predicate` re-export from
  `physical_plan/mod.rs`: it was unused outside the module.

- Extract a private `new_inner` constructor on `IcebergTableScan` so
  `new` and `new_with_tasks` share a single source of truth for the
  `PlanProperties` / projection / predicate setup.
- Split `IcebergTableScan::execute` into a linear pipeline backed by
  three helpers: `build_table_scan` (synchronous scan-builder
  plumbing), `build_record_batch_stream` (async stream construction
  for the lazy/eager modes), and `apply_limit`.
- Trim the `IcebergTableScan` struct doc and field comments to match
  the rest of the file's style; drop the verbose `to_arrow_with_tasks`
  rationale (the `# Correctness` doc carries the load-bearing info).
- Tighten `DisplayAs::fmt_as`: remove the file-path enumeration (file
  count alone is enough for `EXPLAIN`) and factor the common prefix.
- Trim several narrating comments in `table/mod.rs` and the module
  doc that duplicated information already evident from the code.

- Add `test_identity_partitioned_declares_hash`: verifies the happy
  path where an identity-partitioned table with the partition column
  in the projection produces `Partitioning::Hash` referencing that
  column. This was the main missing coverage for the bucketing logic.
- Add `test_projection_without_partition_col_falls_back_to_unknown`:
  verifies the `compute_identity_cols → None` branch when the
  projection omits the partition source column.
- Add helpers (`make_partitioned_catalog_and_table_for_bucketing`,
  `append_partitioned_fake_data_files`) to build identity-partitioned
  fixtures without writing real Parquet files.

(cherry picked from commit b1f2d66)
(cherry picked from commit 616dcdc)

* test(sqllogictest): update EXPLAIN snapshots for eager bucketing output

IcebergTableProvider::scan now plans files eagerly and buckets them
across DataFusion partitions before returning the ExecutionPlan.
As a result, IcebergTableScan's DisplayAs output always includes
`buckets:[N] file_count:[M]` - even for unpartitioned tables where
N = 1.

Update the four .slt files whose EXPLAIN snapshots were missing this
suffix, and fix the like_predicate_pushdown snapshots that also had
a stale input_partitions count on RepartitionExec (the table now has
multiple files across multiple buckets).

(cherry picked from commit 6ae4a71)
(cherry picked from commit 581dde7)

* fix(datafusion): resolve conflicts

(cherry picked from commit 7ad9dcc)

* fix(datafusion): format

(cherry picked from commit 7ff1f6d)

---------

Co-authored-by: Tim Saucer <timsaucer@gmail.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

@timsaucer timsaucer left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my perspective on the datafusion side this looks great! I appreciate the responsiveness to the earlier feedback

@mbutrovich mbutrovich left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for tackling this @toutane! First round of feedback.

/// time and are not re-applied here. Reader-side configuration
/// (concurrency, batch size, row-group filtering, row selection) is
/// taken from `self` and may differ from the planning scan.
pub fn to_arrow_from_tasks(&self, tasks: FileScanTaskStream) -> Result<ArrowRecordBatchStream> {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now public on the core TableScan, and the only thing keeping it correct is the doc-comment contract that the caller's tasks were planned with the same projection/filter as self. Since the predicate is baked into the tasks and isn't re-derived here, passing tasks from a differently-filtered scan would silently return wrong rows with no error. Could we keep this pub(crate) (or behind the integration) until there's a concrete external need? If it does need to be public, is there a way to make the type system carry the invariant (e.g., consuming a wrapper that proves the tasks came from this scan) rather than relying on the doc?

Separately: as written, the body just copies five reader-config fields off self (file_io, concurrency_limit_data_files, row_group_filtering_enabled, row_selection_enabled, batch_size) onto an ArrowReaderBuilder and calls read(tasks); nothing in it needs the Table. And ArrowReaderBuilder is already public and Table-free, so the "execute pre-planned tasks" capability already exists for external callers: datafusion-comet's IcebergScanExec, which holds only a metadata_location and JVM-planned tasks (no Table), builds its reader that way today. So this method doesn't unlock anything new for downstream engines; its only value is reusing a TableScan's already-configured reader settings. Given that, does it earn pub on the core TableScan, or is pub(crate) enough? Callers without a TableScan are better served by ArrowReaderBuilder directly.

@toutane toutane Jun 15, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think your comment makes it clear that we don't need this function at all. As you pointed out, we can read the tasks directly from ArrowReaderBuilder without going through a TableScan. That means the only reason for IcebergTableScan to carry a Table (other than to obtain a FileIO instance) would be the "planning at execution" path.

You mentioned that the datafusion-comet implementation has already chosen to drop the Table from the scanning node, since the tasks are pre-planned. With that in mind, I'm wondering whether the same approach might be desirable for iceberg-datafusion, which would let us remove its "planning at execution" path. What do you think?

/// `lit`. The Arrow type must match what DataFusion will see for this column
/// at scan time, otherwise `create_hashes` would dispatch on a different type
/// and produce a hash that disagrees with DataFusion's row-wise hashing.
fn literal_to_array(lit: &Literal, dt: &DataType) -> Option<ArrayRef> {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this duplicates create_primitive_array_single_element in crates/iceberg/src/arrow/value.rs:627 (currently pub(crate)), which already maps a Literal to a single-element array for ~20 types including Timestamp, Decimal128, and Binary, and handles nulls. Could we promote that to pub (or re-export it) and call it here? That would drop ~60 lines and, combined with is_supported_dtype, widen the set of partition types that can declare Hash instead of falling back. The deliberate type gate can stay where it is.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in b6369e2. Rather than call create_primitive_array_single_element per task (which would keep the per-file allocations flagged in your other comment), I extracted the shared literal to Arrow dispatch into a new pub PrimitiveLiteralArrayBuilder (arrow/value.rs), re-exported as iceberg::arrow::PrimitiveLiteralArrayBuilder:

  • create_primitive_array_single_element now delegates to it , so the type mapping lives in one place.
  • The duplicated literal_to_array in bucketing.rs is deleted; the hashing path builds through the same builder.
  • is_supported_dtype is widened so identity partition columns now declare Hash instead of falling back. New co-location test bucket_tasks_hashes_decimal_and_timestamp_identity_columns exercises the widened set.
  • The deliberate type gate stays in compute_identity_cols, exactly as you suggested.

Net lines went up rather than down (the shared builder + wider type coverage + tests), but the duplication itself is gone and the dispatch is now a single source of truth shared by both the single-element and batched-hashing paths.

/// [`REPARTITION_RANDOM_STATE`] so the bucket assignment matches DataFusion's
/// hash-repartition convention. Returns `None` if the task lacks partition
/// data or any required slot is null/unsupported.
fn identity_hash(task: &FileScanTask, cols: &[IdentityCol]) -> Option<u64> {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Hash declaration is only sound if the array we build here hashes identically to the column DataFusion sees at scan time (same create_hashes + REPARTITION_RANDOM_STATE). That holds today because the reader emits Utf8/Int32/Date32 and we rebuild those exact types, but it's a quiet trap if it ever drifts. For example DataFusion hashes Utf8 via <&str>::hash_one but Utf8View via the view word (hash_array vs hash_generic_byte_view_array in datafusion/common/src/hash_utils.rs), so the same string hashes differently. If the reader (or a downstream coercion) ever produces Utf8View without repartitioning this side, equal keys would split across partitions and a join/aggregate that trusts the declared partitioning would return wrong results silently.

Two requests: (1) a comment here making the "array dtype must equal the reader's output dtype" invariant explicit, and (2) a test that actually exercises co-location: run create_hashes(REPARTITION_RANDOM_STATE) over a real array of the partition values and assert each row lands in the same bucket this code assigned. Right now the tests assert the shape of the partitioning but never that the bucketing agrees with RepartitionExec.

Separately, the hashing is structured the slow way: for every task it allocates a Vec<ArrayRef>, builds one single-element array per identity column, allocates a one-element hashes buffer, and makes its own create_hashes call. That is O(tasks * cols) tiny allocations plus one hash-kernel invocation per file, all on the planning path that gates every query. This should be a single pass: build one array of length n_tasks per identity column (one element per task), call create_hashes once to fill an n_tasks buffer, then take hash % n_partitions per task. Same result, but allocations drop from per-task to per-column and the kernel runs once instead of N times. Tables with tens of thousands of files are routine, so I would rather fix this now than ship a per-file hot loop in planning.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two requests: (1) a comment here making the "array dtype must equal the reader's output dtype" invariant explicit, and (2) a test that actually exercises co-location: run create_hashes(REPARTITION_RANDOM_STATE) over a real array of the partition values and assert each row lands in the same bucket this code assigned.

Both done in ace9b7e:

  • (1) The invariant is now stated where we declare Partitioning::Hash (table/mod.rs): the declaration is only sound if the arrays we build from partition literals hash identically to what the reader emits at scan time. Since DataFusion's hash dispatch is dtype-specific, any drift in the reader output dtype (e.g. Utf8 to Utf8View) must either materialize that exact dtype on the bucketing path or fall back to UnknownPartitioning. (Wording later refined in 971aa0f.)

  • (2) test_identity_partitioned_hash_buckets_match_datafusion_repartition runs create_hashes + REPARTITION_RANDOM_STATE over a real array of the partition values and asserts every row lands in the bucket bucket_tasks assigned -- i.e. agreement with RepartitionExec, not just the partitioning shape.

If the reader (or a downstream coercion) ever produces Utf8View without repartitioning this side, equal keys would split across partitions and a join/aggregate that trusts the declared partitioning would return wrong results silently.

This exact trap is locked by test_unsupported_output_partition_dtype_falls_back_to_unknown_partitioning
(2802af4): it wires an output schema with Utf8View as a deliberately unsupported dtype and asserts the scan declares UnknownPartitioning instead of Hash.

Separately, the hashing is structured the slow way: for every task it allocates a Vec<ArrayRef>, builds one single-element array per identity column, allocates a one-element hashes buffer, and makes its own create_hashes call. That is O(tasks * cols) tiny allocations plus one hash-kernel invocation per file [...]. This should be a single pass: build one array of length n_tasks per identity column [...] call create_hashes once [...] then take hash % n_partitions per task.

Done in b6369e2. identity_hash (per-task) is replaced by identity_hashes_for_tasks: one PrimitiveLiteralArrayBuilder per identity column with capacity = n_tasks, a single pass appending each task's partition literal, then one create_hashes call filling an n_tasks buffer; the bucket is hash % n_partitions per task. Allocations drop from per-task to per-column and the hash kernel runs once instead of N times. The per-file fallback (fallback_hash) is unchanged and still only applies to tasks missing a full identity key.

/// missing partition data fall back to hashing `data_file_path`, which still
/// distributes evenly but breaks the `Hash` contract — the second tuple
/// element flags whether every task supplied a full identity key.
pub(super) fn bucket_tasks(

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tasks are distributed purely by hash % n_partitions, with no awareness of file_size_in_bytes. A table with one large file plus many small ones would put very uneven work in one bucket and serialize the query on it. iceberg-java does size-based bin-packing here (TableScanUtil.planTaskGroups + BinPacking, weighted by file/delete size with a target split size), which is also what #128 is asking for. I'm fine with count-based as a first cut, but could we at least note the limitation in the module doc and file/track the size-based follow-up? The size field is already on FileScanTask, so first-fit-decreasing on bytes would be a fairly contained extension.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, documented the limitation in the bucketing module doc, and tracked the size-based bin-packing follow-up in #128.

output_schema: &ArrowSchema,
) -> Option<Vec<IdentityCol>> {
let metadata = table.metadata();
if metadata.partition_specs_iter().len() > 1 {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning None whenever there's more than one historical spec is safe but stricter than iceberg-java, which intersects the identity fields present across all specs (Partitioning.groupingKeyType / commonActiveFieldIds) and still reports a grouping key on the common columns. I see the history tried the per-column intersection and reverted it as out of scope, which is totally reasonable for this PR. Could we add a one-line comment pointing at that behavior so the conservatism reads as intentional, and link a follow-up issue?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. iceberg-java intersects the identity fields common to all specs and still emits a grouping key on them; we bail out instead because the bucketing path aligns slots to the default spec and FileScanTask doesn't yet carry its own spec id (related to e0d6add). Added a comment at the guard and filed #2658 to track it.

/// Partition `i` streams `buckets[i]`. The caller is responsible for
/// ensuring `partitioning` matches the bucketing. Used by
/// [`IcebergTableProvider`][crate::table::IcebergTableProvider].
#[allow(clippy::too_many_arguments)]

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new_with_tasks takes raw buckets and a Partitioning with nothing enforcing buckets.len() == partitioning.partition_count(), and both this and new need #[allow(clippy::too_many_arguments)]. Could we collapse these into a small params struct (or builder) that validates the bucket/partitioning consistency and returns Result? That also removes the new/new_with_tasks/new_inner parameter repetition.

Related: new and new_with_tasks are now pub. Is that needed for the two in-crate providers, or could they stay pub(crate)? Public constructors that can build an inconsistent node are easy to misuse.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, this could let a user build an inconsistent node.

I added a pub struct IcebergTableScanBuilder that throws an error if buckets.len() != partition_count. This removes the duplication between new, new_with_tasks, and new_inner.

I'm interested in keeping the ability to build an IcebergTableScan from outside, which is why new and new_with_tasks were previously pub. We currently rely on a catalog and schema provider other than the ones in iceberg-datafusion. This is also something other users have asked for, as mentioned in this proposal: #2123. For that reason, I've kept build as pub.

predicates: Option<Predicate>,
/// Optional limit on the number of rows to return
/// Pre-planned file scan tasks per partition (eager mode), or `None` (lazy mode).
buckets: Option<Vec<Vec<FileScanTask>>>,

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Option plus the buckets() accessor returning &[] via unwrap_or(&[]) could just be a Vec that's empty in lazy mode, which would drop a layer. That said, the Option does signal "lazy vs eager" intent, so I could go either way; your call.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is indeed misleading.

I think keeping Option is better, as it lets us distinguish the case where we've already planned but got 0 tasks from the case where we haven't planned yet. I changed the accessor accordingly, so it now returns an Option.

I also added a comment above the struct field to make explicit that None means lazy mode and Some means eager mode.

) -> DFResult<SendableRecordBatchStream> {
let fut = get_batch_stream(
let bucket = match &self.buckets {
Some(buckets) => Some(buckets.get(partition).cloned().ok_or_else(|| {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

buckets.get(partition).cloned() deep-copies a whole Vec<FileScanTask> on every execute(partition), and each FileScanTask carries a predicate tree plus schema, so this is a real per-partition copy of all the planned task state, not a cheap clone. Storing the buckets as Arc<[Vec<FileScanTask>]> (or wrapping each bucket in an Arc) lets execute hand out a pointer and drop the copy entirely. Since execute runs per partition on the query hot path, I would make this an Arc rather than clone.

@toutane toutane Jun 16, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done: buckets is now Option<Arc<[Arc<[FileScanTask]>]>>

/// Builds the `RecordBatch` stream for a single partition. When `bucket` is
/// `Some`, streams the pre-planned tasks via `to_arrow_from_tasks`; when
/// `None`, plans and reads the full scan via `to_arrow`.
async fn build_record_batch_stream(

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both arms repeat .map_err(to_datafusion_error)? ... .map_err(to_datafusion_error). Could we produce the inner stream in the match and apply the error mapping once afterward to cut the duplication? (No behavior change intended.)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, the stream is produced by the match and the error mapping is applied in only one location.

}

/// Identity-partitioned table whose source column is in the projection
/// must produce `Partitioning::Hash` referencing that column.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice fixtures (MemoryCatalog + synthesized data files, no Parquet needed). A few branches I don't see covered that seem worth locking down since they fail silently if they regress:

  • co-location correctness vs RepartitionExec (see the bucketing.rs:135 comment), the most important one;
  • spec evolution -> UnknownPartitioning;
  • an unsupported partition dtype (e.g., timestamp) -> UnknownPartitioning;
  • a null partition value -> fallback hashing / Unknown.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the missing provider-level coverage in crates/integrations/datafusion/src/table/mod.rs:

  • Co-location vs DataFusion repartition hash was already covered by test_identity_partitioned_hash_buckets_match_datafusion_repartition
  • Added test_spec_evolution_falls_back_to_unknown_partitioning`
  • Added test_unsupported_output_partition_dtype_falls_back_to_unknown_partitioning (timestamp is currently supported by this bucketing path, so I used Utf8View as the unsupported dtype.)
  • Added test_null_partition_value_falls_back_to_unknown_partitioning

Also kept the existing lower-level null fallback behavior covered in bucketing.rs

timsaucer and others added 20 commits June 16, 2026 11:56
…column intersection

Previously identity_partition_col_names returned an empty set whenever
the table had more than one historical partition spec, forcing every
filter back to Inexact under spec evolution. This was overly
conservative: Iceberg evaluates partition predicates against each
manifest's own spec, so a column that is identity-partitioned in every
spec is fully prunable across the entire table regardless of which spec
a given file was written under.

Replace the multi-spec gate with an intersection across every spec's
identity-source set. A column survives only if every spec includes it
with Transform::Identity; columns that appear with non-identity
transforms in some spec, or are missing from a spec entirely, are
dropped. The result remains an honest set of columns for which Exact
pushdown is provably safe across all surviving files.

Hash bucketing (compute_identity_cols) keeps its single-spec gate
because slot-order alignment with the table's default spec depends on
each task carrying its own spec id, which the native plan flow does
not yet do.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…via per-column intersection"

This reverts commit b2613e3.

(cherry picked from commit 826f054)
…shdown"

This reverts commit 6d0ed4c.

(cherry picked from commit 4381f00)
…ergTableProvider

IcebergPartitionedTableProvider and IcebergPartitionedScan were introduced
to enable parallel file scanning by bucketing FileScanTasks across DataFusion
partitions. However, maintaining two TableProvider implementations is
redundant: the new provider is strictly more capable, and its degenerate case
(target_partitions=1) reproduces the old single-partition behavior exactly.

This commit folds the partitioned provider into IcebergTableProvider and
the partitioned scan into IcebergTableScan, eliminating the parallel types.

Changes:
- IcebergTableProvider::scan() now eagerly calls plan_files() and distributes
FileScanTasks into buckets using the same identity-hash strategy
(REPARTITION_RANDOM_STATE + create_hashes) that was in
IcebergPartitionedTableProvider, enabling Partitioning::Hash declarations
that align with DataFusion's RepartitionExec.
- IcebergTableScan gains a new_with_tasks() constructor that accepts
pre-planned buckets and a caller-supplied Partitioning. execute(i) streams
the tasks in buckets[i] via TableScan::to_arrow_with_tasks, rebuilding
the TableScan per-partition to avoid serializing PlanContext Arc-shared
caches across workers.
- The original new() constructor and the to_arrow() lazy path are kept
unchanged for IcebergStaticTableProvider, which does not pre-plan tasks.
- Limit slicing (try_filter_map truncation) from the old IcebergTableScan
is preserved in both execution paths.
- Bucketing helpers (IdentityCol, compute_identity_cols, bucket_tasks,
identity_hash, fallback_hash, literal_to_array, is_supported_dtype) are
moved verbatim into a new private table/bucketing.rs module.
- Unit tests from partitioned.rs are migrated to table/mod.rs and updated
to use IcebergTableProvider and IcebergTableScan.
- integration_datafusion_test.rs: fix test_provider_plan_stream_schema to
call execute(0) instead of execute(1). The old call worked only because
the previous IcebergTableScan silently ignored the partition index.

(cherry picked from commit d2e5e04)
Review pass over the partitioned-scan branch ahead of upstream
contribution.

- Rename `TableScan::to_arrow_with_tasks` to `to_arrow_from_tasks` —
  `from` better signals that the tasks are the input source rather
  than a builder-style modifier.
- Restructure the doc with a `# Correctness` section that calls out
  the projection/filter contract while clarifying that reader-side
  configuration (concurrency, batch size, row-group filtering, row
  selection) is taken from `self`.
- Make `IcebergTableScan::new` and `new_with_tasks` `pub` (were
  `pub(crate)`) so external users can construct the node directly,
  matching the public visibility of the struct itself.
- Drop the `convert_filters_to_predicate` re-export from
  `physical_plan/mod.rs`: it was unused outside the module.

- Extract a private `new_inner` constructor on `IcebergTableScan` so
  `new` and `new_with_tasks` share a single source of truth for the
  `PlanProperties` / projection / predicate setup.
- Split `IcebergTableScan::execute` into a linear pipeline backed by
  three helpers: `build_table_scan` (synchronous scan-builder
  plumbing), `build_record_batch_stream` (async stream construction
  for the lazy/eager modes), and `apply_limit`.
- Trim the `IcebergTableScan` struct doc and field comments to match
  the rest of the file's style; drop the verbose `to_arrow_with_tasks`
  rationale (the `# Correctness` doc carries the load-bearing info).
- Tighten `DisplayAs::fmt_as`: remove the file-path enumeration (file
  count alone is enough for `EXPLAIN`) and factor the common prefix.
- Trim several narrating comments in `table/mod.rs` and the module
  doc that duplicated information already evident from the code.

- Add `test_identity_partitioned_declares_hash`: verifies the happy
  path where an identity-partitioned table with the partition column
  in the projection produces `Partitioning::Hash` referencing that
  column. This was the main missing coverage for the bucketing logic.
- Add `test_projection_without_partition_col_falls_back_to_unknown`:
  verifies the `compute_identity_cols → None` branch when the
  projection omits the partition source column.
- Add helpers (`make_partitioned_catalog_and_table_for_bucketing`,
  `append_partitioned_fake_data_files`) to build identity-partitioned
  fixtures without writing real Parquet files.

(cherry picked from commit b1f2d66)
IcebergTableProvider::scan now plans files eagerly and buckets them
across DataFusion partitions before returning the ExecutionPlan.
As a result, IcebergTableScan's DisplayAs output always includes
`buckets:[N] file_count:[M]` - even for unpartitioned tables where
N = 1.

Update the four .slt files whose EXPLAIN snapshots were missing this
suffix, and fix the like_predicate_pushdown snapshots that also had
a stale input_partitions count on RepartitionExec (the table now has
multiple files across multiple buckets).

(cherry picked from commit 6ae4a71)
@toutane toutane force-pushed the draft/partitioned-file-scanning-contribution branch from 803912c to 277ee9c Compare June 16, 2026 10:45
toutane added 3 commits June 16, 2026 12:58
Document that task bucketing distributes by file count, not
file_size_in_bytes, and track the size-based bin-packing follow-up
in apache#128.
Note that compute_identity_cols intentionally bails out on any partition
spec evolution rather than intersecting identity fields across specs like
iceberg-java does. Link the follow-up issue tracking that relaxation.

Closes review comment on apache#2298. Tracked in apache#2658.
@toutane

toutane commented Jun 16, 2026

Copy link
Copy Markdown
Contributor Author

Thanks for tackling this @toutane! First round of feedback.

@mbutrovich thank you very much for this initial review and for your clear and insightful comments, which are essential for determining the direction of the implementation.

I’ve tried to address each of your comments. Unfortunately, the PR has doubled in size due to the addition of tests, the creation of a builder for the scan node, the refactoring of the bucketing.rs module, etc. Is this a blocker? If that's the case, one option might be to split the PR into two parts: the first part on task pre-planning and the second on leveraging the physical partitioning of the Iceberg table (e.g setting DataFusion's output-partitioning to Hash). What do you think?

Also, regarding your comment about adding a cache for tasks to avoid re-reading the metadata files if scan() is called multiple times, I completely agree that this optimization is important, but I think the PR is currently large enough that it justifies adding it at a later stage. I can open a tracking issue if you’re okay with that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Enable parallel file-level scanning for IcebergTableScan Datafusion Integration

3 participants