feat: Add Native Support for In-Memory Cache#4591
Conversation
|
Hi @andygrove Could you kindly, if possible, provide any update on this? thnx. |
Hi @pchintar. I haven't had time to review yet, but I will. I am working on some more urgent items for the 0.17.0 release currently. Unfortunately we have limited review bandwidth. |
Comparison of #4569 and #4591These two PRs both close #2391 and take the same fundamental approach, so cross-linking a comparison here for visibility. Shared goal and mechanicsBoth solve the same problem: Comet does not treat Both share the same core building blocks:
Key differences
Architectural distinctionThe main difference is the integration strategy:
Suggested path forwardA strong combined outcome would pair #4591's dedicated scan operator and explicit fallback reasons with #4569's stats-based |
|
thank you @andygrove for your very detailed review and comparison of the PRs. So, what should I take up as my next course of action exactly now? Should I now include Batch-level pruning, stats-based |
I'm happy to keep reviewing if you want to keep iterating on this. My review was just using AI, so we need to decide what makes sense for the first PR. I would say the pruning/filter work is quite important. I'd also like to see microbenchmarks so that we can measure the Comet benefit. |
|
Thanks @andygrove for your suggestions. So I've updated the PR with all the changes we discussed except the microbenchmarks:
I've also rerun the full test suite for these changes, and everything is passing. But w.r.t the microbenchmarks, I haven't put together Spark microbenchmarks before, so I wanted to ask a couple of questions before I start:
I can work on those next once I know what would be most useful for evaluating this PR. Thnx again! |
|
Thanks @pchintar. I will review more today. |
Sure, thnx for your time @andygrove |
andygrove
left a comment
There was a problem hiding this comment.
Thanks for the iteration @pchintar. The stats wiring looks correct (the five per-column fields match what SimpleMetricsCachedBatchSerializer expects) and the respect-user-serializer logic reads well. I left a few inline comments. The most important is the cross-version compile break in the new suite, followed by two spots where we run a Spark job where we probably shouldn't (per-scan and at planning time). On the microbenchmark you asked about: a new CometInMemoryCacheBenchmark under spark/src/test/scala/org/apache/spark/sql/benchmark/, modeled on the existing ones, measuring wall-clock over repeated scans of a cached table feeding a native operator (Comet cache enabled vs disabled), with a second selective-filter variant to exercise the new pruning.
| } | ||
|
|
||
| private def cachedBatchTypes(table: String): Array[String] = { | ||
| val ds = spark.table(table).asInstanceOf[org.apache.spark.sql.classic.Dataset[_]] |
There was a problem hiding this comment.
This suite lives under src/test/scala (built for all Spark versions) but uses org.apache.spark.sql.classic.Dataset, which only exists in Spark 4.x. In this repo that package is only referenced from spark/src/test/spark-4.x/. The default build is now Spark 4.1 so it compiles locally, but CI also builds 3.4 and 3.5 and this suite would fail to compile there. Could the cache lookup avoid the classic.Dataset cast, or move the version-specific helper into a spark-4.x shim? Same cast appears again around line 284.
| }.toArray | ||
| } | ||
|
|
||
| val batchTypes = input.map(_.getClass.getName).distinct().collect() |
There was a problem hiding this comment.
collect() is an action, so this kicks off a full distinct-and-collect job over the entire cached RDD on every scan, just to detect the batch type, before the real scan RDD is even returned. For a large cache that is an extra pass (plus a shuffle for distinct) per read, which undercuts the caching benefit. Since the batch type is homogeneous per relation, could this be handled lazily inside mapPartitions (pattern-match per batch, or inspect the first batch per partition) so no driver-side job is needed? The mixed-type guard is reasonable, but it should not cost a job. Same pattern in convertCachedBatchToInternalRow around line 315.
|
|
||
| case scan: InMemoryTableScanExec => | ||
| val cachedBuffers = scan.relation.cacheBuilder.cachedColumnBuffers | ||
| val firstBatchOpt = cachedBuffers.take(1).headOption |
There was a problem hiding this comment.
cachedColumnBuffers builds the cache RDD and take(1) runs a job to fetch the first batch, during plan transformation. This rule can fire more than once under AQE re-planning, so we would run a job at plan time on each fire just to read the class of the first cached batch. Is there a way to detect the Comet cache format without materializing a batch, for example checking relation.cacheBuilder.serializer.isInstanceOf[ArrowCachedBatchSerializer] together with the enabled flag?
| it.flatMap { | ||
| case cb: CometCachedBatch => | ||
| Utils.decodeBatches(cb.bytes, "CometCache").map { batch => | ||
| if (selectedIndices.length == batch.numCols()) { |
There was a problem hiding this comment.
This treats "selected count equals cached column count" as an identity projection and returns the decoded batch unchanged. For a full-width but reordered projection (cache is [key, value], scan output is [value, key]), selectedIndices would be [1, 0], length 2 equals numCols 2, and we would return the batch in the wrong column order. Spark's DefaultCachedBatchSerializer always projects by computed indices rather than taking a length shortcut. Could this guard be selectedIndices.sameElements(batch.indices) instead? If InMemoryTableScanExec is guaranteed to always emit in relation order with a separate Project on top this is moot, but the length-only check is fragile. Same shortcut in convertCachedBatchToInternalRow around line 330.
| } else { | ||
| val cols = | ||
| selectedIndices.map(i => batch.column(i).asInstanceOf[ColumnVector]) | ||
| new ColumnarBatch(cols, batch.numRows()) |
There was a problem hiding this comment.
When projecting a strict subset, the new ColumnarBatch holds only the selected CometVectors, so closing it releases only those columns. Do the dropped columns get closed anywhere? If decodeBatches hands ownership of the off-heap Arrow buffers to the ColumnarBatch, this projection would drop that ownership for the unselected columns and leak their buffers until GC.
There was a problem hiding this comment.
For the dropped-column ownership question, I checked the decode path. Utils.decodeBatches returns an ArrowReaderIterator, and that iterator keeps the full decoded ColumnarBatch as currentBatch. Before loading the next batch, or when the iterator is closed, it calls currentBatch.close(). Since NativeUtil.rootAsBatch wraps all vectors from the Arrow VectorSchemaRoot, the dropped columns are still closed through the original full decoded batch owned by the iterator. Because of that, I did not add manual close calls in the projection code as I felt that doing so could close vectors while the Arrow reader/root still owns them.
010fa63 to
87badf0
Compare
|
Hi @andygrove , thanks again for the detailed review! I've addressed your review comments in the latest update:
I also added the new |
andygrove
left a comment
There was a problem hiding this comment.
Thanks for the iteration @pchintar. The fixes from the last round all read well: the classic.Dataset cross-version break is gone, the per-scan and plan-time jobs are removed, and the identity-projection guard now handles reordered projections correctly. On the dropped-column ownership question, your explanation checks out. ArrowReaderIterator retains the full decoded batch as currentBatch and closes it, so a strict-subset projection releases the unselected columns through the original batch. A few new things I'd like to look at.
Read path throws on a non-Comet cached batch
This is the one I'd most like to resolve. The write path is defensive but the read path is not.
convertInternalRowToCachedBatch delegates to fallback (the DefaultCachedBatchSerializer) when the feature is disabled, so a relation can end up holding DefaultCachedBatch payloads. But both read methods only handle CometCachedBatch and throw for anything else:
case other =>
throw new IllegalStateException(s"Expected CometCachedBatch, got ${other.getClass.getName}")spark.comet.exec.inMemoryCache.enabled is a normal SQLConf, so it can be toggled at runtime, while the serializer is installed once at startup and is fixed per relation. That opens this sequence: the session starts with the feature on (serializer installed), the user disables it at runtime, then caches a table. The write goes through the fallback and stores DefaultCachedBatch. Any later read of that relation calls convertCachedBatchToColumnarBatch, hits the case other => branch, and throws. That relation becomes permanently unreadable.
Could the read methods delegate non-CometCachedBatch inputs to fallback.convertCachedBatchToColumnarBatch / convertCachedBatchToInternalRow instead of throwing, so they stay symmetric with the write path?
Empty scan output (SELECT count(*))
Both convert and createExec fall back to the full cache schema when op.output is empty:
val actualOutput = if (op.output.nonEmpty) op.output else op.relation.outputSo for an empty-projection scan, the native scan is built with the full schema and doExecuteColumnar emits full-width batches, while CometInMemoryTableScanExec.output stays empty. I couldn't convince myself the mismatch between the declared output and the emitted batch width is safe for the parent operator's serialization. The existing empty-cache test covers an empty relation, not an empty projection. Could you add a SELECT count(*) FROM cached_table case to confirm this path?
Minor: sizeInBytes stat is hard-coded to 0
In computeStats the fifth per-column stat is always 0L. This doesn't affect pruning correctness, but it understates the relation's per-column size in the UI and in InMemoryRelation size accounting. Worth a comment that this is intentionally skipped, or filling it in.
Minor: float/double bounds and NaN
compare uses java.lang.Float.compare / java.lang.Double.compare, which order NaN and -0.0 differently from Spark's primitive comparisons. Since pruning must never produce a false positive, a Float/Double column with NaN values under a selective filter would be reassuring. The current pruning test only exercises a Long key.
Benchmark
Nice to see CometInMemoryCacheBenchmark with both variants and the verifyPlan guard. Could you paste the actual benchmark numbers into the PR description? "Modest improvements" is hard to evaluate without the figures.
|
I ran Repeated full scan (
Selective filter (
The full repeated scan is about 1.5x faster, which is the case this targets directly since it drops the The selective filter is only about 1.1x. That is the workload I'd expect to gain the most from the new stats-based pruning, so the small gap is a bit surprising. My guess is the filtered query spends most of its time in the aggregate rather than the scan, so the pruning win gets diluted. It might be worth a variant that isolates the scan (wider projection, more selective predicate, or larger row count) to show the pruning benefit more clearly. Could you add these numbers (or your own run) to the PR description? |
|
@pchintar could you add the new suite to the |
|
I reviewed the code from a performance angle and there are a few opportunities, but nothing that should block landing the functional version. I'd suggest we merge this first and do the optimizations as follow-ups. I filed #4781 to track them:
The trivial one is the |
87badf0 to
b411b7a
Compare
|
Hi @andygrove I've addressed all your additional read-path and pruning suggestions except the ones in #4781 as that would be a follow-up PR as you indicated above:
I also updated the PR description by including the benchmark results there. |
@andygrove This is done too, I've added it inside both the |
b411b7a to
86a97cc
Compare
|
@pchintar could you take a look at the compilation errors? |
The |
fe63af5 to
744e6c3
Compare
|
Thanks @andygrove and @mbutrovich I tracked down the compilation issues after merging with the latest My branch is now up to date with the current
So could you now please re-run the CI checks? |
Which issue does this PR close?
Closes #2391 .
Rationale for this change
Comet currently has limited support for Spark's in-memory cache.
When a table is cached and later read, the cached data cannot be consumed directly by Comet operators. Instead, the execution plan falls back to Spark's cache scan path and introduces an additional
CometSparkColumnarToColumnarconversion before execution can continue in Comet.This extra conversion adds overhead to cached table scans and prevents cached data from remaining on a native Comet execution path.
This PR adds native support for in-memory cached tables so that cached data written in a Comet-compatible format can be read directly by Comet operators.
What changes are included in this PR?
This PR introduces a native cache path for in-memory cached tables behind a new configuration:
spark.comet.exec.inMemoryCache.enabledWhen enabled:
CometCachedBatch.CometInMemoryTableScanExec.CometSparkColumnarToColumnarconversion.When disabled:
How are these changes tested?
Added
CometInMemoryCacheSuitecovering:CometCachedBatchSELECT count(*))DefaultCachedBatchAdded
CometInMemoryCacheBenchmarkto compare the native cache path against the existing fallback path for:Benchmark results (Release build, Apple M3 Ultra, JDK 17, Spark 3.5 profile, 5M-row cached table):
Repeated full scan (
SELECT sum(id), sum(k), sum(v))Selective filter (
WHERE id >= 4500000 AND id < 4750000)Verified with:
./mvnw -pl spark -DskipTests test-compile ./mvnw test -pl spark \ -DwildcardSuites=org.apache.comet.exec.CometInMemoryCacheSuite SPARK_GENERATE_BENCHMARK_FILES=1 \ make benchmark-org.apache.spark.sql.benchmark.CometInMemoryCacheBenchmark