Skip to content

feat: native collect_list / array_agg aggregate#4720

Open
andygrove wants to merge 4 commits into
apache:mainfrom
andygrove:feat-collect-list
Open

feat: native collect_list / array_agg aggregate#4720
andygrove wants to merge 4 commits into
apache:mainfrom
andygrove:feat-collect-list

Conversation

@andygrove

Copy link
Copy Markdown
Member

Which issue does this PR close?

Closes #2524.

Rationale for this change

collect_list and its alias array_agg are common aggregate functions that previously fell back to Spark, breaking native execution for many real workloads (notably plans that group rows into arrays before further processing). Adding native support keeps these queries on the Comet path.

What changes are included in this PR?

This change was scaffolded with the implement-comet-expression skill.

  • Add CollectList message to expr.proto and a new collectList = 18 arm in the AggExpr oneof.
  • Add CometCollectList serde in aggregates.scala and register classOf[CollectList] -> CometCollectList in QueryPlanSerde.
  • Wire the native side to datafusion_spark::function::aggregate::collect::SparkCollectList, the upstream Spark-compatible accumulator (Spark 3.4 through 4.1 use ignore_nulls = true semantics that match it). No Comet-local Rust function is added.
  • Extend the Partial-mode adjustOutputForNativeState fix in operators.scala to cover CollectList (same pattern already used for CollectSet: native produces ArrayType(elementType) while Spark declares the buffer as BinaryType).
  • Mark collect_list and array_agg as supported in the user-guide expression page.
  • Add an audit entry under docs/source/contributor-guide/expression-audits/agg_funcs.md covering Spark 3.4.3, 3.5.8, 4.0.1, 4.1.1.

How are these changes tested?

  • New SQL fixture spark/src/test/resources/sql-tests/expressions/aggregate/collect_list.sql exercises ~30 queries across types (boolean, byte/short/int/bigint, float/double including NaN/Inf/-0, string, binary, decimal up to (38,0), date, timestamp, struct, nested array), GROUP BY, NULLs, all-NULL groups, empty tables, single-row, mixed aggregates, multiple collect_list columns, DISTINCT, HAVING, the array_agg alias, INT/BIGINT boundary values, and an SPARK-17641 null-filter regression. The fixture runs under ConfigMatrix: parquet.enable.dictionary=false,true, so each query executes twice.
  • All 21 aggregate CometSqlFileTestSuite tests pass (./mvnw test -Dsuites="org.apache.comet.CometSqlFileTestSuite expressions/aggregate" -Dtest=none -Pspark-3.5), confirming no regression to collect_set or other aggregates.
  • cargo clippy --all-targets --workspace -- -D warnings is clean.

Wires Spark's CollectList aggregate to datafusion-spark's
SparkCollectList. array_agg, registered as a SQL alias of
CollectList in FunctionRegistry, is also covered.

Closes apache#2524.
@andygrove andygrove force-pushed the feat-collect-list branch from bf6b0c1 to c9c19e3 Compare June 24, 2026 16:31
@andygrove andygrove marked this pull request as ready for review June 24, 2026 17:01
@andygrove andygrove marked this pull request as draft June 24, 2026 19:26
…gates

A distinct aggregate combined with collect_list/collect_set produces a
multi-stage plan (Partial -> PartialMerge -> Final). CollectList/CollectSet
declare a BinaryType buffer in Spark but produce a native ArrayType state, so
Comet cannot read a Spark-produced Binary buffer, nor round-trip its own
ArrayType buffer across the intermediate PartialMerge stages. Both led to
native crashes ("could not cast Binary to List" / "cast List to Binary").

Force these multi-stage aggregates to fall back to Spark consistently:
- tag the feeding Partial when a PartialMerge stage of CollectList/CollectSet
  is present (CometExecRule.tagUnsafePartialAggregates), and
- fall back a PartialMerge stage whose buffer was produced by a Spark partial
  (CometBaseAggregate.doConvert).

Two-stage collect_list/collect_set continue to run natively.

Patch the upstream SPARK-22223 plan-shape test to disable Comet, since native
collect_list removes the ObjectHashAggregateExec it asserts on.

Enabling fully-native multi-stage execution is tracked in apache#4724.
@andygrove andygrove marked this pull request as ready for review June 25, 2026 13:54
@mbutrovich mbutrovich self-requested a review July 1, 2026 14:36

@mbutrovich mbutrovich left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First pass. Mostly want to discuss Spark 4.2 or if we should punt.

inputs: Seq[Attribute],
binding: Boolean,
conf: SQLConf): Option[ExprOuterClass.AggExpr] = {
val child = expr.children.head

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not check CollectList.ignoreNulls, and the upstream SparkCollectList hardcodes ignore_nulls = true (datafusion/spark/src/function/aggregate/collect.rs). On Spark 3.4 through 4.1 CollectList has no ignoreNulls field so this is correct. On Spark 4.2 it gains ignoreNulls: Boolean = true and RESPECT NULLS becomes reachable from SQL. FunctionResolution.applyIgnoreNulls resolves it:

case collectList: CollectList => collectList.copy(ignoreNulls = ignoreNulls)

So collect_list(x) RESPECT NULLS sets ignoreNulls = false, which keeps null elements. Comet drops them and returns a different result from Spark with no fallback. 4.2 releases soon, so we might want to tackle this now. The field is absent on 3.4-4.1, so reading it needs a version shim (CometExprShim) rather than a direct expr.ignoreNulls reference. CometCollectSet has the same gap on 4.2, so consider covering both in the shim.

CometFirst (line 249) and CometLast (line 285) already thread ignoreNulls through the proto. First/Last carry the field on every supported Spark version, so no shim is needed there, which is the one extra wrinkle for CollectList/CollectSet. Same wiring shape applies once the field is shimmed.

* are therefore only safe to run natively when every stage runs in Comet and there are at most
* two stages (Partial + Final).
*/
def hasIncompatibleBufferAgg(aggExprs: Seq[AggregateExpression]): Boolean = {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice to factor this out and share it between CometExecRule and operators.scala instead of duplicating. Minor naming thought: "incompatible buffer" is a bit generic. The method really means "produces a native ArrayType state where Spark declares BinaryType, so it cannot round-trip a Spark-produced intermediate buffer." Something like hasNativeArrayBufferAgg would carry more of that at the call sites. The doc comment is clear either way, so take it or leave it.

-- ============================================================

query
SELECT grp, sort_array(collect_list(DISTINCT i)) FROM cl_src_int GROUP BY grp ORDER BY grp

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the record, no change needed. A lone collect_list(DISTINCT i) does not create a PartialMerge stage on collect_list itself. Spark does the dedup with group-by-only stages and the collect still runs plain Partial then Final, so this query stays fully native and does not touch the hasIncompatibleBufferAgg fallback. The fallback path needs a distinct aggregate combined with the collect, which the new CometAggregateSuite test covers over both LocalTableScan and Parquet sources. Coverage across the two files is good.

andygrove added 2 commits July 1, 2026 16:52
# Conflicts:
#	dev/diffs/3.4.3.diff
#	docs/source/contributor-guide/expression-audits/agg_funcs.md
#	native/proto/src/proto/expr.proto
#	spark/src/main/scala/org/apache/comet/serde/aggregates.scala
#	spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
Spark 4.2 adds an ignoreNulls field to CollectList and CollectSet, and
collect_list(x) RESPECT NULLS sets it to false, keeping null elements. The
native path delegates to SparkCollectList/SparkCollectSet, which always drop
nulls, so it would silently return a different result from Spark. Add a
per-version CometCollectShim that reads ignoreNulls (always true on Spark 3.4
through 4.1, where the field is absent) and fall back to Spark in
getSupportLevel when it is false.

Also rename QueryPlanSerde.hasIncompatibleBufferAgg to hasNativeArrayBufferAgg
to describe what it detects: an aggregate whose native ArrayType state cannot
round-trip Spark's declared BinaryType buffer.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Integrate collect_list/array_agg to Comet

2 participants