[branch-54] Add datafusion.execution.enable_file_stream_work_stealing config#23296
Merged
Conversation
Add an ignored regression test for apache#23293. FileStream sibling work-stealing seeds one shared work queue from every file group and relies on all output partitions being polled concurrently in one process. An executor that runs each partition as an isolated task polls only one partition, which then drains the whole queue and reads files belonging to other partitions, inflating scan output by the partition count. The test builds and drives only partition 0 and asserts it reads solely its own file. It fails on main by design, so it is #[ignore]d with its assertion intact as a caught regression to triage. (cherry picked from commit 1b2760b)
FileStream sibling work-stealing (WorkSource::Shared) seeds one shared work queue from every file group and lets whichever output partition goes idle first steal the next file or byte-range morsel. This assumes all output partitions of a scan are polled concurrently in one process. Executors that run each output partition as an isolated task in a separate process (Ballista, datafusion-distributed) never poll the sibling partitions, so the single polled partition drains the whole queue and reads files belonging to other partitions, inflating the scan output by the partition count. That is a correctness bug for those executors, and the existing escape hatches (preserve_order, partitioned_by_file_group) are plan-level flags, not a session config they can set centrally. Add datafusion.execution.enable_file_stream_work_stealing (default true), checked in FileScanConfig::create_sibling_state: when false it returns None so each partition falls back to reading only its own file group. This mirrors the enable_dynamic_filter_pushdown precedent and round-trips through datafusion-proto as a config value. Thread ConfigOptions into DataSource::create_sibling_state so the flag is read from the session config at execute time. Turn the #[ignore]'d reproduction test into a passing regression test that drives only partition 0 and checks both behaviors: with the default (stealing on) partition 0 also reads partition 1's file, and with the flag off it reads only its own. Closes apache#23293. (cherry picked from commit 482b04b)
…param Drop the redundant bool from the test's drive_partition0 helper (assert the shared-queue state directly via create_sibling_state) and document the config parameter on DataSource::create_sibling_state. (cherry picked from commit b836a6d)
- Reword the config docstring in user-visible terms (dynamic runtime rebalancing) rather than internal work-queue mechanics. - Note on FileScanConfig::file_groups that files may be reassigned across partitions at runtime when work stealing is enabled unless preserve_order or partitioned_by_file_group is set. - Replace the bespoke isolated-partition test with the file's standard morsel snapshot harness: FileStreamMorselTest gains with_enable_file_stream_work_stealing, and the test reuses two_partition_morsel_test to show that disabling the flag keeps each partition's files local. Regenerate configs.md and information_schema.slt for the reworded docstring. (cherry picked from commit 608915e)
xudong963
approved these changes
Jul 3, 2026
Contributor
|
Thanks @andygrove and @xudong963 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
branch-54.Rationale for this change
FileStreamsibling work-stealing (WorkSource::Shared) seeds one shared work queue from every file group and lets whichever output partition goes idle first steal the next unopened file (or byte-range morsel). This assumes all output partitions of a scan are polled concurrently in one process.Executors that run each output partition as an isolated task in a separate process — Ballista and datafusion-distributed — never poll the sibling partitions. The single polled partition drains the whole shared queue and reads files belonging to other partitions, so every isolated task reads the entire input and the scan output is inflated by the partition count. This is a correctness bug for those executors, not just a performance one.
The existing escape hatches (
preserve_order,partitioned_by_file_group) are plan-level flags onFileScanConfig, not something a distributed executor can set centrally through the session config, and a plain repartitioned scan does not setpartitioned_by_file_group. There is no session-level off switch, unlikedatafusion.optimizer.enable_dynamic_filter_pushdown, which exists precisely so consumers that cannot support runtime cross-partition state can disable it.What changes are included in this PR?
This is a backport of #23294. The changes are identical in intent; two conflicts were resolved for
branch-54, which does not yet carry theoutput_partitioningfield onFileScanConfigor theenable_migration_aggregateconfig that exist onmain:datafusion.execution.enable_file_stream_work_stealing(defaulttrue). Whenfalse,FileScanConfig::create_sibling_statereturnsNone, so each partition falls back toWorkSource::Localand reads only its own file group.&ConfigOptionsintoDataSource::create_sibling_stateso the flag is read from the session config atexecutetime. As a session config value it round-trips throughdatafusion-protowith no proto schema change.configs.mdand add the setting toinformation_schema.slt.morsel_disabled_work_stealing_keeps_files_localusing the file's standard morsel snapshot harness.Are these changes tested?
Yes. The
file_streamtest suite (including the newmorsel_disabled_work_stealing_keeps_files_local) passes, andcargo clippyis clean. The existing sibling work-stealing tests continue to pass with the default.Are there any user-facing changes?
A new session config,
datafusion.execution.enable_file_stream_work_stealing(defaulttrue), so existing behavior is unchanged.DataSource::create_sibling_stategains a&ConfigOptionsparameter (an API change for anyone implementing theDataSourcetrait directly).