perf: split row groups by file range (morsel splitting)#23285
perf: split row groups by file range (morsel splitting)#23285Dandandan wants to merge 3 commits into
Conversation
…ans (morsel splitting) When a parquet scan is restricted to a byte range that only partially overlaps a row group, read the proportional slice of the row group's rows (via a RowSelection) instead of assigning the whole row group to the one range containing its first data page. Since FileGroupPartitioner already tiles files into byte ranges, this lets all partitions decode disjoint slices of the same row group in parallel, parallelizing scans of files with fewer row groups than partitions (e.g. a single large row group). Row boundaries are computed with identical integer arithmetic on both sides of each range boundary, so every row is read exactly once. The offset index is now loaded whenever the access plan contains row selections so each partition fetches and decodes only the pages covering its slice. Controlled by `datafusion.execution.parquet.split_row_groups_by_range` (default: true). TPC-DS SF=1 with one row group per file: 2.2x faster overall (16.8s -> 7.5s), 82/99 queries faster, up to 5.3x. TPC-H SF=1 (multi-row-group files): 1.16x faster overall, no significant regressions. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing split-row-groups-by-range (6d1a25a) to 3bb9314 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing split-row-groups-by-range (6d1a25a) to 3bb9314 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing split-row-groups-by-range (6d1a25a) to 3bb9314 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
Thank you for opening this pull request! Reviewer note: cargo-semver-checks reported the current version number is not SemVer-compatible with the changes in this pull request (compared against the base branch). Details |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing split-row-groups-by-range (6d1a25a) to 3bb9314 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing split-row-groups-by-range (6d1a25a) to 3bb9314 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing split-row-groups-by-range (6d1a25a) to 3bb9314 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
… morsels When a pop would leave the shared work queue empty, split the final byte range into small morsels (halving down to a floor) and push the excess back, so sibling streams that finish their pieces early steal a share of the last piece instead of idling behind one straggler. Work items are returned exactly as the planner sized them while the queue is deep. The morsel floor is ~1MiB of data the scan actually reads: the file-range floor is scaled by the fraction of file columns referenced by the projection and filter, so narrow projections produce proportionally larger byte ranges and the fixed per-piece open cost stays amortized. TPC-DS SF=1 (single-row-group files): 4.4% faster overall (7.53s -> 7.21s), 28/99 queries faster, up to 1.14x, no meaningful regressions. TPC-H SF=1: 6.1% faster overall (1084ms -> 1022ms), 9/22 faster, 0 slower. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing split-row-groups-by-range (95185fa) to 3bb9314 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing split-row-groups-by-range (95185fa) to 3bb9314 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing split-row-groups-by-range (95185fa) to 3bb9314 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
…lits Pieces of a byte-range-split file now share a per-file SharedFileState (attached when the shared work queue is built): the first piece to finish opening publishes its parsed metadata, specialized predicate/projection, and the statistics/bloom/page-index pruned access plan; pieces that start later skip all of that work and its I/O, going straight from file-level pruning to building their decoder. Sharing is optimistic (no waiting): pieces that start before anything is published open the file themselves, exactly as before. To make the published plan reusable, the byte-range restriction moves from row-group pruning time to stream-build time for shared files. That also happens after the page index is loaded, so split boundaries now snap to the page boundaries of each row group's largest column chunk: adjacent pieces no longer both decode the page straddling their boundary. Tail morsel splitting is unchanged: unconditional ~1MiB-projected morsels were re-benchmarked with the shared open state and are still slower (TPC-H SF=1 -11%: residual per-piece decoder/filter setup outweighs the balance gain), so morsels remain a tail-only mechanism. TPC-DS SF=1 single-row-group: 42/99 queries 5-13% faster, total neutral (dominated by one high-variance join query). TPC-H SF=1: 1.3% faster. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing split-row-groups-by-range (46e73b2) to 3bb9314 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing split-row-groups-by-range (46e73b2) to 3bb9314 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing split-row-groups-by-range (46e73b2) to 3bb9314 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
andygrove
left a comment
There was a problem hiding this comment.
This is a nice speedup for files with a single large row group, and gating it behind split_row_groups_by_range is a clean way to make it opt-out.
One question about the sibling work-stealing path this builds on (SharedWorkSource / WorkSource::Shared). It matters for executors that run each output partition as an isolated task in a separate process, like Ballista and datafusion-distributed. Those consumers cannot poll all sibling partitions together, so a shared queue gets fully drained by whichever single task runs, and every task then scans the whole input. Today the only way to opt out of sibling sharing looks like the preserve_order and partitioned_by_file_group plan flags in create_sibling_state. Would you be open to also gating dynamic sibling work-stealing behind a session config, the way enable_dynamic_filter_pushdown gates that feature? That would give distributed executors a clean off switch.
It is worth noting that split_row_groups_by_range=false does not cover this on its own, since the shared queue still hands out whole row groups dynamically. So this is really a question about the underlying WorkSource::Shared mechanism rather than this PR specifically, which just extends it.
Happy to open a separate issue for the broader dynamic-scheduling question so this PR stays focused on the perf win. Does that split sound right to you?
This review was prepared with the help of an LLM.
Thanks for the review. Yeah would be good to gate it. I am also still iterating on this PR (e.g. removing the overhead). See also my other comment in your PR about ballista. |
apache#23294) ## Which issue does this PR close? - Closes apache#23293. ## Rationale for this change `FileStream` sibling work-stealing (`WorkSource::Shared`, added in apache#21351 and extended by apache#23285) seeds one shared work queue from every file group and lets whichever output partition goes idle first steal the next unopened file (or byte-range morsel). This assumes all output partitions of a scan are polled concurrently in one process. Executors that run each output partition as an isolated task in a separate process — Ballista and datafusion-distributed — never poll the sibling partitions. The single polled partition drains the whole shared queue and reads files belonging to other partitions, so every isolated task reads the entire input and the scan output is inflated by the partition count. This is a correctness bug for those executors, not just a performance one. The existing escape hatches (`preserve_order`, `partitioned_by_file_group`) are plan-level flags on `FileScanConfig`, not something a distributed executor can set centrally through the session config, and a plain repartitioned scan does not set `partitioned_by_file_group`. There is no session-level off switch, unlike `datafusion.optimizer.enable_dynamic_filter_pushdown`, which exists precisely so consumers that cannot support runtime cross-partition state can disable it. ## What changes are included in this PR? - Add `datafusion.execution.enable_file_stream_work_stealing` (default `true`). When `false`, `FileScanConfig::create_sibling_state` returns `None`, so each partition falls back to `WorkSource::Local` and reads only its own file group. - Thread `&ConfigOptions` into `DataSource::create_sibling_state` so the flag is read from the session config at `execute` time. As a session config value it round-trips through `datafusion-proto` with no proto schema change. - Regenerate `configs.md` and add the setting to `information_schema.slt`. - Turn the previously `#[ignore]`d reproduction test into a passing regression test that drives only partition 0 (as an isolated task does) and asserts both behaviors: with the default (stealing on) partition 0 also reads partition 1's file, and with the flag off it reads only its own. ## Are these changes tested? Yes. `isolated_partition_respects_work_stealing_config` in `datafusion/datasource/src/file_stream/mod.rs` covers both the default (shared-queue) behavior and the flag-off behavior. The existing sibling work-stealing tests continue to pass with the default. `information_schema` sqllogictests pass with the new setting listed. ## Are there any user-facing changes? A new session config, `datafusion.execution.enable_file_stream_work_stealing` (default `true`), so existing behavior is unchanged. `DataSource::create_sibling_state` gains a `&ConfigOptions` parameter (an API change for anyone implementing the `DataSource` trait directly).
Which issue does this PR close?
Rationale for this change
Speed up parquet scanning on few row groups.
Are these changes tested?
Yes
Are there any user-facing changes?
New config option
datafusion.execution.parquet.split_row_groups_by_range(defaulttrue). With the default, partitions of a ranged parquet scan that previously returned no rows (their range contained no row group start) now return the rows proportional to their byte range; total scan output is unchanged.🤖 Generated with Claude Code