Skip to content

feat: add datafusion.execution.enable_file_stream_work_stealing config#23294

Merged
andygrove merged 4 commits into
apache:mainfrom
andygrove:repro-filestream-work-stealing-23293
Jul 2, 2026
Merged

feat: add datafusion.execution.enable_file_stream_work_stealing config#23294
andygrove merged 4 commits into
apache:mainfrom
andygrove:repro-filestream-work-stealing-23293

Conversation

@andygrove

@andygrove andygrove commented Jul 2, 2026

Copy link
Copy Markdown
Member

Which issue does this PR close?

Rationale for this change

FileStream sibling work-stealing (WorkSource::Shared, added in #21351 and extended by #23285) seeds one shared work queue from every file group and lets whichever output partition goes idle first steal the next unopened file (or byte-range morsel). This assumes all output partitions of a scan are polled concurrently in one process.

Executors that run each output partition as an isolated task in a separate process — Ballista and datafusion-distributed — never poll the sibling partitions. The single polled partition drains the whole shared queue and reads files belonging to other partitions, so every isolated task reads the entire input and the scan output is inflated by the partition count. This is a correctness bug for those executors, not just a performance one.

The existing escape hatches (preserve_order, partitioned_by_file_group) are plan-level flags on FileScanConfig, not something a distributed executor can set centrally through the session config, and a plain repartitioned scan does not set partitioned_by_file_group. There is no session-level off switch, unlike datafusion.optimizer.enable_dynamic_filter_pushdown, which exists precisely so consumers that cannot support runtime cross-partition state can disable it.

What changes are included in this PR?

  • Add datafusion.execution.enable_file_stream_work_stealing (default true). When false, FileScanConfig::create_sibling_state returns None, so each partition falls back to WorkSource::Local and reads only its own file group.
  • Thread &ConfigOptions into DataSource::create_sibling_state so the flag is read from the session config at execute time. As a session config value it round-trips through datafusion-proto with no proto schema change.
  • Regenerate configs.md and add the setting to information_schema.slt.
  • Turn the previously #[ignore]d reproduction test into a passing regression test that drives only partition 0 (as an isolated task does) and asserts both behaviors: with the default (stealing on) partition 0 also reads partition 1's file, and with the flag off it reads only its own.

Are these changes tested?

Yes. isolated_partition_respects_work_stealing_config in datafusion/datasource/src/file_stream/mod.rs covers both the default (shared-queue) behavior and the flag-off behavior. The existing sibling work-stealing tests continue to pass with the default. information_schema sqllogictests pass with the new setting listed.

Are there any user-facing changes?

A new session config, datafusion.execution.enable_file_stream_work_stealing (default true), so existing behavior is unchanged. DataSource::create_sibling_state gains a &ConfigOptions parameter (an API change for anyone implementing the DataSource trait directly).

Add an ignored regression test for apache#23293. FileStream sibling
work-stealing seeds one shared work queue from every file group and
relies on all output partitions being polled concurrently in one
process. An executor that runs each partition as an isolated task polls
only one partition, which then drains the whole queue and reads files
belonging to other partitions, inflating scan output by the partition
count.

The test builds and drives only partition 0 and asserts it reads solely
its own file. It fails on main by design, so it is #[ignore]d with its
assertion intact as a caught regression to triage.
@github-actions github-actions Bot added the datasource Changes to the datasource crate label Jul 2, 2026
@andygrove andygrove marked this pull request as draft July 2, 2026 14:14
@andygrove

Copy link
Copy Markdown
Member Author

I will update this PR to include the fix

FileStream sibling work-stealing (WorkSource::Shared) seeds one shared work
queue from every file group and lets whichever output partition goes idle
first steal the next file or byte-range morsel. This assumes all output
partitions of a scan are polled concurrently in one process. Executors that
run each output partition as an isolated task in a separate process (Ballista,
datafusion-distributed) never poll the sibling partitions, so the single polled
partition drains the whole queue and reads files belonging to other partitions,
inflating the scan output by the partition count. That is a correctness bug for
those executors, and the existing escape hatches (preserve_order,
partitioned_by_file_group) are plan-level flags, not a session config they can
set centrally.

Add datafusion.execution.enable_file_stream_work_stealing (default true),
checked in FileScanConfig::create_sibling_state: when false it returns None so
each partition falls back to reading only its own file group. This mirrors the
enable_dynamic_filter_pushdown precedent and round-trips through datafusion-proto
as a config value. Thread ConfigOptions into DataSource::create_sibling_state so
the flag is read from the session config at execute time.

Turn the #[ignore]'d reproduction test into a passing regression test that
drives only partition 0 and checks both behaviors: with the default (stealing
on) partition 0 also reads partition 1's file, and with the flag off it reads
only its own.

Closes apache#23293.
@github-actions github-actions Bot added documentation Improvements or additions to documentation sqllogictest SQL Logic Tests (.slt) common Related to common crate labels Jul 2, 2026
@andygrove andygrove changed the title test: reproduce isolated-per-task FileStream work-stealing scan bug (#23293) feat: add datafusion.execution.enable_file_stream_work_stealing config Jul 2, 2026
@andygrove andygrove marked this pull request as ready for review July 2, 2026 14:32
@andygrove andygrove added the api change Changes the API exposed to users of the crate label Jul 2, 2026
@alamb

alamb commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

We should probably also backport htis to 54.1.0

…param

Drop the redundant bool from the test's drive_partition0 helper (assert the
shared-queue state directly via create_sibling_state) and document the config
parameter on DataSource::create_sibling_state.

@alamb alamb left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this looks good to me - thanks @andygrove

i have some suggestions on documetation and tests, but that could be done as a follow on PR

Comment thread datafusion/common/src/config.rs Outdated
/// Should DataFusion keep the columns used for partition_by in the output RecordBatches
pub keep_partition_by_columns: bool, default = false

/// When `true` (the default), sibling partition streams of a single file

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this focues a lot on the internal details here. I would recommend describing this flag in terms of its user visible behavior. Maybe something like

        /// When `true` (the default), DataFusion's built in FileStream 
        /// tries to dynamically rebalance data between partitions during query
        /// execution. 
        ///
        /// Executors that depend on the plan time partition assignments, such 
        /// as Ballista should set this to `false` to avoid runtime reassignment. 
        pub enable_file_stream_work_stealing: bool, default = true

Comment thread datafusion/common/src/config.rs
Comment thread datafusion/datasource/src/file_stream/mod.rs
@github-actions

github-actions Bot commented Jul 2, 2026

Copy link
Copy Markdown

Thank you for opening this pull request!

Reviewer note: cargo-semver-checks reported the current version number is not SemVer-compatible with the changes in this pull request (compared against the base branch).

Details
     Cloning apache/main
    Building datafusion-common v54.0.0 (current)
       Built [  33.684s] (current)
     Parsing datafusion-common v54.0.0 (current)
      Parsed [   0.059s] (current)
    Building datafusion-common v54.0.0 (baseline)
       Built [  32.846s] (baseline)
     Parsing datafusion-common v54.0.0 (baseline)
      Parsed [   0.058s] (baseline)
    Checking datafusion-common v54.0.0 -> v54.0.0 (no change; assume patch)
     Checked [   0.624s] 223 checks: 222 pass, 1 fail, 0 warn, 30 skip

--- failure constructible_struct_adds_field: externally-constructible struct adds field ---

Description:
A pub struct constructible with a struct literal has a new pub field. Existing struct literals must be updated to include the new field.
        ref: https://doc.rust-lang.org/reference/expressions/struct-expr.html
       impl: https://github.com/obi1kenobi/cargo-semver-checks/tree/v0.48.0/src/lints/constructible_struct_adds_field.ron

Failed in:
  field ExecutionOptions.enable_file_stream_work_stealing in /home/runner/work/datafusion/datafusion/datafusion/common/src/config.rs:723

     Summary semver requires new major version: 1 major and 0 minor checks failed
    Finished [  68.399s] datafusion-common
    Building datafusion-datasource v54.0.0 (current)
       Built [  36.654s] (current)
     Parsing datafusion-datasource v54.0.0 (current)
      Parsed [   0.031s] (current)
    Building datafusion-datasource v54.0.0 (baseline)
       Built [  36.615s] (baseline)
     Parsing datafusion-datasource v54.0.0 (baseline)
      Parsed [   0.032s] (baseline)
    Checking datafusion-datasource v54.0.0 -> v54.0.0 (no change; assume patch)
     Checked [   0.275s] 223 checks: 222 pass, 1 fail, 0 warn, 30 skip

--- failure trait_method_parameter_count_changed: pub trait method parameter count changed ---

Description:
A trait method now takes a different number of parameters.
        ref: https://doc.rust-lang.org/cargo/reference/semver.html#trait-item-signature
       impl: https://github.com/obi1kenobi/cargo-semver-checks/tree/v0.48.0/src/lints/trait_method_parameter_count_changed.ron

Failed in:
  DataSource::create_sibling_state now takes 1 instead of 0 parameters, in file /home/runner/work/datafusion/datafusion/datafusion/datasource/src/source.rs:253

     Summary semver requires new major version: 1 major and 0 minor checks failed
    Finished [  74.851s] datafusion-datasource
    Building datafusion-sqllogictest v54.0.0 (current)
       Built [ 174.130s] (current)
     Parsing datafusion-sqllogictest v54.0.0 (current)
      Parsed [   0.022s] (current)
    Building datafusion-sqllogictest v54.0.0 (baseline)
       Built [ 174.522s] (baseline)
     Parsing datafusion-sqllogictest v54.0.0 (baseline)
      Parsed [   0.023s] (baseline)
    Checking datafusion-sqllogictest v54.0.0 -> v54.0.0 (no change; assume patch)
     Checked [   0.089s] 223 checks: 223 pass, 30 skip
     Summary no semver update required
    Finished [ 351.883s] datafusion-sqllogictest

@github-actions github-actions Bot added the auto detected api change Auto detected API change label Jul 2, 2026
@Dandandan

Copy link
Copy Markdown
Contributor

Hmm I thought there was a API-level setting as well that can be used 🤔

@Dandandan Dandandan left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Separately I wonder if Ballista couldn't just send a single partition instead of keeping all the partitions in the plan :)

@alamb

alamb commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

Hmm I thought there was a API-level setting as well that can be used 🤔

You might be able set this flag: FileScanConfig::partitioned_by_file_group which will disable the behavior too

@andygrove

Copy link
Copy Markdown
Member Author

Separately I wonder if Ballista couldn't just send a single partition instead of keeping all the partitions in the plan :)

There is a PR open for that work - apache/datafusion-ballista#1911

- Reword the config docstring in user-visible terms (dynamic runtime
  rebalancing) rather than internal work-queue mechanics.
- Note on FileScanConfig::file_groups that files may be reassigned across
  partitions at runtime when work stealing is enabled unless preserve_order or
  partitioned_by_file_group is set.
- Replace the bespoke isolated-partition test with the file's standard morsel
  snapshot harness: FileStreamMorselTest gains with_enable_file_stream_work_stealing,
  and the test reuses two_partition_morsel_test to show that disabling the flag
  keeps each partition's files local.

Regenerate configs.md and information_schema.slt for the reworded docstring.
@andygrove

Copy link
Copy Markdown
Member Author

Thanks for the reviews! Addressed the feedback in 608915e:

  • Config docstring — reworded in user-visible terms (dynamic runtime rebalancing of files across partitions), roughly following @alamb's suggested wording, instead of the internal work-queue mechanics.
  • FileScanConfig::file_groups docs — added a note that files may be reassigned to a different partition at runtime when work stealing is enabled, unless preserve_order or partitioned_by_file_group is set.
  • Test — replaced the bespoke isolated-partition test with the file's standard morsel snapshot harness: FileStreamMorselTest gains with_enable_file_stream_work_stealing(false), and the new test reuses two_partition_morsel_test() to snapshot that disabling the flag keeps each partition's files local (mirrors morsel_partitioned_by_file_group_keeps_files_local).

Regenerated configs.md and information_schema.slt for the reworded docstring.

On the semver-check failure: that's the usual result of adding a pub field to a config_namespace! struct — happens for any new config option. Happy to also open a 54.1.0 backport (#22547) once this lands.

@alamb alamb left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks great to me

@andygrove andygrove added this pull request to the merge queue Jul 2, 2026
Merged via the queue into apache:main with commit 457fc53 Jul 2, 2026
36 checks passed
@andygrove andygrove deleted the repro-filestream-work-stealing-23293 branch July 2, 2026 17:09
alamb pushed a commit that referenced this pull request Jul 3, 2026
… config (#23296)

## Which issue does this PR close?

- Backport of #23294 to `branch-54`.
- Closes #23293 for the 54 release line.

## Rationale for this change

`FileStream` sibling work-stealing (`WorkSource::Shared`) seeds one
shared work queue from every file group and lets whichever output
partition goes idle first steal the next unopened file (or byte-range
morsel). This assumes all output partitions of a scan are polled
concurrently in one process.

Executors that run each output partition as an isolated task in a
separate process — Ballista and datafusion-distributed — never poll the
sibling partitions. The single polled partition drains the whole shared
queue and reads files belonging to other partitions, so every isolated
task reads the entire input and the scan output is inflated by the
partition count. This is a correctness bug for those executors, not just
a performance one.

The existing escape hatches (`preserve_order`,
`partitioned_by_file_group`) are plan-level flags on `FileScanConfig`,
not something a distributed executor can set centrally through the
session config, and a plain repartitioned scan does not set
`partitioned_by_file_group`. There is no session-level off switch,
unlike `datafusion.optimizer.enable_dynamic_filter_pushdown`, which
exists precisely so consumers that cannot support runtime
cross-partition state can disable it.

## What changes are included in this PR?

This is a backport of #23294. The changes are identical in intent; two
conflicts were resolved for `branch-54`, which does not yet carry the
`output_partitioning` field on `FileScanConfig` or the
`enable_migration_aggregate` config that exist on `main`:

- Add `datafusion.execution.enable_file_stream_work_stealing` (default
`true`). When `false`, `FileScanConfig::create_sibling_state` returns
`None`, so each partition falls back to `WorkSource::Local` and reads
only its own file group.
- Thread `&ConfigOptions` into `DataSource::create_sibling_state` so the
flag is read from the session config at `execute` time. As a session
config value it round-trips through `datafusion-proto` with no proto
schema change.
- Regenerate `configs.md` and add the setting to
`information_schema.slt`.
- Add the regression test
`morsel_disabled_work_stealing_keeps_files_local` using the file's
standard morsel snapshot harness.

## Are these changes tested?

Yes. The `file_stream` test suite (including the new
`morsel_disabled_work_stealing_keeps_files_local`) passes, and `cargo
clippy` is clean. The existing sibling work-stealing tests continue to
pass with the default.

## Are there any user-facing changes?

A new session config,
`datafusion.execution.enable_file_stream_work_stealing` (default
`true`), so existing behavior is unchanged.
`DataSource::create_sibling_state` gains a `&ConfigOptions` parameter
(an API change for anyone implementing the `DataSource` trait directly).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

api change Changes the API exposed to users of the crate auto detected api change Auto detected API change common Related to common crate datasource Changes to the datasource crate documentation Improvements or additions to documentation sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

FileStream sibling work-stealing has no session-config off switch and breaks isolated-per-task executors

4 participants