feat: support native aggregate function mode#4782
Draft
andygrove wants to merge 1 commit into
Draft
Conversation
Add native support for the Spark `mode` aggregate, the most frequent value within a group. Spark breaks ties on the default `mode(col)` form non-deterministically (the chosen value depends on JVM hash-map iteration order), so the function is registered as Incompatible and opt-in via allowIncompatible; Comet resolves ties deterministically by returning the smallest tied value. NULLs are ignored, empty input returns NULL, and float keys are normalized to match Spark. The deterministic-flag and WITHIN GROUP forms fall back to Spark. Closes apache#3970
b6cf253 to
fb2f52b
Compare
mode
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Closes #3970.
Rationale for this change
The
modeaggregate (the most frequent value in a group) is a mainstream statistical aggregate that previously fell back to Spark, preventing native execution of any query using it. Adding native support keeps these queries in Comet's pipeline.What changes are included in this PR?
This PR was scaffolded with the
implement-comet-expressionproject skill.native/spark-expr/src/agg_funcs/mode.rs: aModeaggregate UDF with both a globalAccumulatorand a vectorizedGroupsAccumulator. State is a frequency map keyed byScalarValue, serialized as a singlestruct<values: array<T>, counts: array<bigint>>buffer column so partial/final buffer schemas stay aligned with Spark's single-attributeTypedImperativeAggregatebuffer.Modemessage andAggExproneof entry, plus the planner arm inplanner.rs.CometModeserde and registration inQueryPlanSerde.aggrSerdeMap.Modebranch inadjustOutputForNativeState(operators.scala) mapping the Spark binary buffer type to the native struct state type.modeHasUnsupportedOrderingshim inCometTypeShim(spark-3.x / spark-4.x) becauseMode.reverseOptonly exists on Spark 4.0+.modemarked supported in the expressions guide.Scope and compatibility:
mode(col)form is supported. Themode(col, deterministic)andmode() WITHIN GROUP (ORDER BY col)forms (Spark 4.0+, which setreverseOpt) fall back to Spark.Incompatible(opt-in viaspark.comet.expression.Mode.allowIncompatible=true): Spark breaks ties non-deterministically based on JVM hash-map iteration order, which a native hash map cannot reproduce bit-for-bit. Comet instead returns the smallest tied value deterministically.-0.0to0.0, canonicalNaN) to match Spark's counting. Supported input types are numeric, boolean, decimal, date, timestamp, timestamp_ntz, and default-collation string; other types fall back.How are these changes tested?
mode.rscovering most-frequent value, tie-break to smallest, NULL handling, empty input, float normalization, and partial/final merge equivalence for both the accumulator and the groups accumulator.mode.sqlfile test exercising global and grouped aggregation, NULLs, all-NULL groups, mixed aggregates, HAVING, and boolean/integer/double/decimal/string/date/timestamp inputs, plus an unsupported-type fallback assertion. Verified on Spark 3.5 and Spark 4.1.