Skip to content

feat: support native aggregate function mode#4782

Draft
andygrove wants to merge 1 commit into
apache:mainfrom
andygrove:feature/native-mode-agg
Draft

feat: support native aggregate function mode#4782
andygrove wants to merge 1 commit into
apache:mainfrom
andygrove:feature/native-mode-agg

Conversation

@andygrove

Copy link
Copy Markdown
Member

Which issue does this PR close?

Closes #3970.

Rationale for this change

The mode aggregate (the most frequent value in a group) is a mainstream statistical aggregate that previously fell back to Spark, preventing native execution of any query using it. Adding native support keeps these queries in Comet's pipeline.

What changes are included in this PR?

This PR was scaffolded with the implement-comet-expression project skill.

  • Native Rust implementation in native/spark-expr/src/agg_funcs/mode.rs: a Mode aggregate UDF with both a global Accumulator and a vectorized GroupsAccumulator. State is a frequency map keyed by ScalarValue, serialized as a single struct<values: array<T>, counts: array<bigint>> buffer column so partial/final buffer schemas stay aligned with Spark's single-attribute TypedImperativeAggregate buffer.
  • Protobuf Mode message and AggExpr oneof entry, plus the planner arm in planner.rs.
  • CometMode serde and registration in QueryPlanSerde.aggrSerdeMap.
  • A Mode branch in adjustOutputForNativeState (operators.scala) mapping the Spark binary buffer type to the native struct state type.
  • A modeHasUnsupportedOrdering shim in CometTypeShim (spark-3.x / spark-4.x) because Mode.reverseOpt only exists on Spark 4.0+.
  • Docs: mode marked supported in the expressions guide.

Scope and compatibility:

  • Only the plain mode(col) form is supported. The mode(col, deterministic) and mode() WITHIN GROUP (ORDER BY col) forms (Spark 4.0+, which set reverseOpt) fall back to Spark.
  • Registered as Incompatible (opt-in via spark.comet.expression.Mode.allowIncompatible=true): Spark breaks ties non-deterministically based on JVM hash-map iteration order, which a native hash map cannot reproduce bit-for-bit. Comet instead returns the smallest tied value deterministically.
  • NULLs are ignored, empty input returns NULL, and float keys are normalized (-0.0 to 0.0, canonical NaN) to match Spark's counting. Supported input types are numeric, boolean, decimal, date, timestamp, timestamp_ntz, and default-collation string; other types fall back.

How are these changes tested?

  • Rust unit tests in mode.rs covering most-frequent value, tie-break to smallest, NULL handling, empty input, float normalization, and partial/final merge equivalence for both the accumulator and the groups accumulator.
  • A mode.sql file test exercising global and grouped aggregation, NULLs, all-NULL groups, mixed aggregates, HAVING, and boolean/integer/double/decimal/string/date/timestamp inputs, plus an unsupported-type fallback assertion. Verified on Spark 3.5 and Spark 4.1.

@andygrove andygrove marked this pull request as draft July 1, 2026 15:44
Add native support for the Spark `mode` aggregate, the most frequent
value within a group.

Spark breaks ties on the default `mode(col)` form non-deterministically
(the chosen value depends on JVM hash-map iteration order), so the
function is registered as Incompatible and opt-in via allowIncompatible;
Comet resolves ties deterministically by returning the smallest tied
value. NULLs are ignored, empty input returns NULL, and float keys are
normalized to match Spark. The deterministic-flag and WITHIN GROUP forms
fall back to Spark.

Closes apache#3970
@andygrove andygrove force-pushed the feature/native-mode-agg branch from b6cf253 to fb2f52b Compare July 1, 2026 15:54
@andygrove andygrove changed the title feat: support native mode aggregate function feat: support native aggregate function mode Jul 1, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add native support for MODE aggregate function

1 participant