feat: native collect_list / array_agg aggregate#4720
Conversation
Wires Spark's CollectList aggregate to datafusion-spark's SparkCollectList. array_agg, registered as a SQL alias of CollectList in FunctionRegistry, is also covered. Closes apache#2524.
bf6b0c1 to
c9c19e3
Compare
…gates
A distinct aggregate combined with collect_list/collect_set produces a
multi-stage plan (Partial -> PartialMerge -> Final). CollectList/CollectSet
declare a BinaryType buffer in Spark but produce a native ArrayType state, so
Comet cannot read a Spark-produced Binary buffer, nor round-trip its own
ArrayType buffer across the intermediate PartialMerge stages. Both led to
native crashes ("could not cast Binary to List" / "cast List to Binary").
Force these multi-stage aggregates to fall back to Spark consistently:
- tag the feeding Partial when a PartialMerge stage of CollectList/CollectSet
is present (CometExecRule.tagUnsafePartialAggregates), and
- fall back a PartialMerge stage whose buffer was produced by a Spark partial
(CometBaseAggregate.doConvert).
Two-stage collect_list/collect_set continue to run natively.
Patch the upstream SPARK-22223 plan-shape test to disable Comet, since native
collect_list removes the ObjectHashAggregateExec it asserts on.
Enabling fully-native multi-stage execution is tracked in apache#4724.
mbutrovich
left a comment
There was a problem hiding this comment.
First pass. Mostly want to discuss Spark 4.2 or if we should punt.
| inputs: Seq[Attribute], | ||
| binding: Boolean, | ||
| conf: SQLConf): Option[ExprOuterClass.AggExpr] = { | ||
| val child = expr.children.head |
There was a problem hiding this comment.
This does not check CollectList.ignoreNulls, and the upstream SparkCollectList hardcodes ignore_nulls = true (datafusion/spark/src/function/aggregate/collect.rs). On Spark 3.4 through 4.1 CollectList has no ignoreNulls field so this is correct. On Spark 4.2 it gains ignoreNulls: Boolean = true and RESPECT NULLS becomes reachable from SQL. FunctionResolution.applyIgnoreNulls resolves it:
case collectList: CollectList => collectList.copy(ignoreNulls = ignoreNulls)So collect_list(x) RESPECT NULLS sets ignoreNulls = false, which keeps null elements. Comet drops them and returns a different result from Spark with no fallback. 4.2 releases soon, so we might want to tackle this now. The field is absent on 3.4-4.1, so reading it needs a version shim (CometExprShim) rather than a direct expr.ignoreNulls reference. CometCollectSet has the same gap on 4.2, so consider covering both in the shim.
CometFirst (line 249) and CometLast (line 285) already thread ignoreNulls through the proto. First/Last carry the field on every supported Spark version, so no shim is needed there, which is the one extra wrinkle for CollectList/CollectSet. Same wiring shape applies once the field is shimmed.
| * are therefore only safe to run natively when every stage runs in Comet and there are at most | ||
| * two stages (Partial + Final). | ||
| */ | ||
| def hasIncompatibleBufferAgg(aggExprs: Seq[AggregateExpression]): Boolean = { |
There was a problem hiding this comment.
Nice to factor this out and share it between CometExecRule and operators.scala instead of duplicating. Minor naming thought: "incompatible buffer" is a bit generic. The method really means "produces a native ArrayType state where Spark declares BinaryType, so it cannot round-trip a Spark-produced intermediate buffer." Something like hasNativeArrayBufferAgg would carry more of that at the call sites. The doc comment is clear either way, so take it or leave it.
| -- ============================================================ | ||
|
|
||
| query | ||
| SELECT grp, sort_array(collect_list(DISTINCT i)) FROM cl_src_int GROUP BY grp ORDER BY grp |
There was a problem hiding this comment.
For the record, no change needed. A lone collect_list(DISTINCT i) does not create a PartialMerge stage on collect_list itself. Spark does the dedup with group-by-only stages and the collect still runs plain Partial then Final, so this query stays fully native and does not touch the hasIncompatibleBufferAgg fallback. The fallback path needs a distinct aggregate combined with the collect, which the new CometAggregateSuite test covers over both LocalTableScan and Parquet sources. Coverage across the two files is good.
# Conflicts: # dev/diffs/3.4.3.diff # docs/source/contributor-guide/expression-audits/agg_funcs.md # native/proto/src/proto/expr.proto # spark/src/main/scala/org/apache/comet/serde/aggregates.scala # spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
Spark 4.2 adds an ignoreNulls field to CollectList and CollectSet, and collect_list(x) RESPECT NULLS sets it to false, keeping null elements. The native path delegates to SparkCollectList/SparkCollectSet, which always drop nulls, so it would silently return a different result from Spark. Add a per-version CometCollectShim that reads ignoreNulls (always true on Spark 3.4 through 4.1, where the field is absent) and fall back to Spark in getSupportLevel when it is false. Also rename QueryPlanSerde.hasIncompatibleBufferAgg to hasNativeArrayBufferAgg to describe what it detects: an aggregate whose native ArrayType state cannot round-trip Spark's declared BinaryType buffer.
Which issue does this PR close?
Closes #2524.
Rationale for this change
collect_listand its aliasarray_aggare common aggregate functions that previously fell back to Spark, breaking native execution for many real workloads (notably plans that group rows into arrays before further processing). Adding native support keeps these queries on the Comet path.What changes are included in this PR?
This change was scaffolded with the
implement-comet-expressionskill.CollectListmessage toexpr.protoand a newcollectList = 18arm in theAggExproneof.CometCollectListserde inaggregates.scalaand registerclassOf[CollectList] -> CometCollectListinQueryPlanSerde.datafusion_spark::function::aggregate::collect::SparkCollectList, the upstream Spark-compatible accumulator (Spark 3.4 through 4.1 useignore_nulls = truesemantics that match it). No Comet-local Rust function is added.adjustOutputForNativeStatefix inoperators.scalato coverCollectList(same pattern already used forCollectSet: native producesArrayType(elementType)while Spark declares the buffer asBinaryType).collect_listandarray_aggas supported in the user-guide expression page.docs/source/contributor-guide/expression-audits/agg_funcs.mdcovering Spark 3.4.3, 3.5.8, 4.0.1, 4.1.1.How are these changes tested?
spark/src/test/resources/sql-tests/expressions/aggregate/collect_list.sqlexercises ~30 queries across types (boolean, byte/short/int/bigint, float/double including NaN/Inf/-0, string, binary, decimal up to (38,0), date, timestamp, struct, nested array), GROUP BY, NULLs, all-NULL groups, empty tables, single-row, mixed aggregates, multiplecollect_listcolumns, DISTINCT, HAVING, thearray_aggalias, INT/BIGINT boundary values, and an SPARK-17641 null-filter regression. The fixture runs underConfigMatrix: parquet.enable.dictionary=false,true, so each query executes twice.CometSqlFileTestSuitetests pass (./mvnw test -Dsuites="org.apache.comet.CometSqlFileTestSuite expressions/aggregate" -Dtest=none -Pspark-3.5), confirming no regression tocollect_setor other aggregates.cargo clippy --all-targets --workspace -- -D warningsis clean.