Skip to content

feat: Add Native Support for In-Memory Cache#4591

Open
pchintar wants to merge 4 commits into
apache:mainfrom
pchintar:comet-native-in-memory-cache
Open

feat: Add Native Support for In-Memory Cache#4591
pchintar wants to merge 4 commits into
apache:mainfrom
pchintar:comet-native-in-memory-cache

Conversation

@pchintar

@pchintar pchintar commented Jun 4, 2026

Copy link
Copy Markdown

Which issue does this PR close?

Closes #2391 .

Rationale for this change

Comet currently has limited support for Spark's in-memory cache.

When a table is cached and later read, the cached data cannot be consumed directly by Comet operators. Instead, the execution plan falls back to Spark's cache scan path and introduces an additional CometSparkColumnarToColumnar conversion before execution can continue in Comet.

This extra conversion adds overhead to cached table scans and prevents cached data from remaining on a native Comet execution path.

This PR adds native support for in-memory cached tables so that cached data written in a Comet-compatible format can be read directly by Comet operators.

What changes are included in this PR?

This PR introduces a native cache path for in-memory cached tables behind a new configuration:

spark.comet.exec.inMemoryCache.enabled

When enabled:

  • Cached data is stored using a Comet-specific cache serializer.
  • Cached data is represented as CometCachedBatch.
  • Cached tables are scanned using CometInMemoryTableScanExec.
  • Cached data can be consumed directly by Comet operators without introducing a CometSparkColumnarToColumnar conversion.

When disabled:

  • Spark's existing cache serializer continues to be used.
  • Existing cache scan behavior is preserved.

How are these changes tested?

Added CometInMemoryCacheSuite covering:

  • Comet-native cache scan over CometCachedBatch
  • Fallback behavior when native cache support is disabled
  • Multi-partition cached tables
  • Empty cached tables
  • Projection-only cache reads
  • Shuffle execution after cached table scans
  • Stats-based batch pruning
  • Empty projection scans (SELECT count(*))
  • Floating-point pruning with NaN values
  • Fallback read path for Spark DefaultCachedBatch

Added CometInMemoryCacheBenchmark to compare the native cache path against the existing fallback path for:

  • Repeated cached table scans
  • Selective-filter scans exercising cache pruning

Benchmark results (Release build, Apple M3 Ultra, JDK 17, Spark 3.5 profile, 5M-row cached table):

Repeated full scan (SELECT sum(id), sum(k), sum(v))

Case Best (ms) Avg (ms) Relative
Comet cache disabled 180 201 1.0×
Comet cache enabled 121 128 1.5×

Selective filter (WHERE id >= 4500000 AND id < 4750000)

Case Best (ms) Avg (ms) Relative
Comet cache disabled 46 53 1.0×
Comet cache enabled 42 48 1.1×

Verified with:

./mvnw -pl spark -DskipTests test-compile

./mvnw test -pl spark \
  -DwildcardSuites=org.apache.comet.exec.CometInMemoryCacheSuite

SPARK_GENERATE_BENCHMARK_FILES=1 \
make benchmark-org.apache.spark.sql.benchmark.CometInMemoryCacheBenchmark

@pchintar pchintar changed the title Add native support for in-memory cache feat: Add native support for in-memory cache Jun 4, 2026
@pchintar pchintar changed the title feat: Add native support for in-memory cache feat: Add Native Support for In-Memory Cache Jun 4, 2026
@pchintar

pchintar commented Jun 4, 2026

Copy link
Copy Markdown
Author

cc @andygrove @mbutrovich

@andygrove

Copy link
Copy Markdown
Member

Thanks @pchintar. I will compare this to #4569 today

@mbutrovich mbutrovich self-requested a review June 4, 2026 14:51
@pchintar

Copy link
Copy Markdown
Author

Thanks @pchintar. I will compare this to #4569 today

Hi @andygrove Could you kindly, if possible, provide any update on this? thnx.

@andygrove

Copy link
Copy Markdown
Member

Thanks @pchintar. I will compare this to #4569 today

Hi @andygrove Could you kindly, if possible, provide any update on this? thnx.

Hi @pchintar. I haven't had time to review yet, but I will. I am working on some more urgent items for the 0.17.0 release currently.

Unfortunately we have limited review bandwidth.

@andygrove andygrove added this to the 1.0.0 milestone Jun 11, 2026
@andygrove

Copy link
Copy Markdown
Member

Comparison of #4569 and #4591

These two PRs both close #2391 and take the same fundamental approach, so cross-linking a comparison here for visibility.

Shared goal and mechanics

Both solve the same problem: Comet does not treat InMemoryTableScanExec as native, so it inserts a CometSparkToColumnarExec and pays a JVM-to-Arrow conversion on every cached read. Both fix this by storing the cache as compressed Arrow IPC once at build time, so repeated scans feed native execution directly.

Both share the same core building blocks:

  • A custom CachedBatchSerializer that encodes batches with Comet's existing serializeBatches / decodeBatches (compressed Arrow IPC).
  • A CometCachedBatch payload holding the IPC bytes.
  • Installation of the serializer via CometDriverPlugin at startup.
  • A new config, off by default.
  • Decode back to CometVector-backed ColumnarBatch with column pruning, plus an InternalRow fallback for non-Comet consumers.
  • Roughly the same size and the same test layout (a serializer suite plus a plugin/exec suite).

Key differences

Dimension #4569 #4591
Serializer base class SimpleMetricsCachedBatchSerializer plain CachedBatchSerializer
Batch-level pruning Yes: stores a Spark-format per-column stats row (min/max/null/count), so buildFilter prunes batches No: stats = InternalRow.empty, buildFilter is a pass-through no-op
How the scan stays native No new operator. Reuses CometSparkToColumnarExec with a passthrough fast-path: if columns are already CometVector, forward without re-copy (adds numPassthroughBatches metric) New operator CometInMemoryTableScanExec (a CometExec / LeafExecNode) plus a CometOperatorSerde, wired into CometExecRule's nativeExecs map
Unsupported types Delegates to DefaultCachedBatchSerializer (nested/complex) as an explicit drop-in Delegates to DefaultCachedBatchSerializer by inspecting batch class in the convert methods
Serializer install policy Sets spark.sql.cache.serializer only when enabled, and never overrides a user-provided non-default serializer Always registers the serializer; the serializer decides at runtime whether to use Comet format or delegate
Config spark.comet.cache.serializer.enabled spark.comet.exec.inMemoryCache.enabled (EXEC category)
Plan-rule changes Minimal (works through the existing SparkToColumnar path) Adds an explicit InMemoryTableScanExec case with detailed fallback-reason messages (disabled / wrong-batch-class / empty-buffer)

Architectural distinction

The main difference is the integration strategy:

  • feat: add Comet CachedBatchSerializer for native in-memory cache #4569 is serializer-centric. It does the minimum on the plan-rule side and leans on the existing CometSparkToColumnarExec, teaching it to skip the copy when batches are already Arrow. It also invests in stats so filter pushdown prunes cached batches, and is careful to respect a user-set serializer.
  • feat: Add Native Support for In-Memory Cache #4591 is operator-centric. It introduces a dedicated CometInMemoryTableScanExec node and serde, giving cached scans a first-class place in the Comet operator framework with explicit fallback reasons. It currently skips column statistics, so cached filters do not get batch pruning.

Suggested path forward

A strong combined outcome would pair #4591's dedicated scan operator and explicit fallback reasons with #4569's stats-based buildFilter, passthrough fast-path, and respect-user-serializer install logic.

@pchintar

pchintar commented Jun 21, 2026

Copy link
Copy Markdown
Author

thank you @andygrove for your very detailed review and comparison of the PRs.

So, what should I take up as my next course of action exactly now? Should I now include Batch-level pruning, stats-based buildFilter, and respect-user-serializer install logic in my current PR as you've indicated above? or do you have something else in mind?

@andygrove

Copy link
Copy Markdown
Member

thank you @andygrove for your very detailed review and comparison of the PRs.

So, what should I take up as my next course of action exactly now? Should I now include Batch-level pruning, stats-based buildFilter, and respect-user-serializer install logic in my current PR as you've indicated above? or do you have something else in mind?

I'm happy to keep reviewing if you want to keep iterating on this. My review was just using AI, so we need to decide what makes sense for the first PR. I would say the pruning/filter work is quite important. I'd also like to see microbenchmarks so that we can measure the Comet benefit.

@andygrove andygrove removed this from the 1.0.0 milestone Jun 25, 2026
@pchintar

pchintar commented Jun 26, 2026

Copy link
Copy Markdown
Author

Thanks @andygrove for your suggestions.

So I've updated the PR with all the changes we discussed except the microbenchmarks:

  • The cache serializer now computes per-batch column statistics (lower bound, upper bound, null count, and row count) in the format expected by Spark's SimpleMetricsCachedBatchSerializer, so Spark's existing buildFilter can prune cached batches before they are decoded.
  • Added regression test that verifies whether the statistics are generated correctly, that buildFilter prunes matching batches as expected, and that filtered queries continue to use the native Comet in-memory cache scan path.
  • Updated the plugin so it only installs the Comet cache serializer when the in-memory cache feature is enabled, while leaving any user-configured cache serializer unchanged.

I've also rerun the full test suite for these changes, and everything is passing.

But w.r.t the microbenchmarks, I haven't put together Spark microbenchmarks before, so I wanted to ask a couple of questions before I start:

  • Would you prefer extending an existing benchmark in the repo, or adding a new benchmark specifically for the in-memory cache path?
  • Which workload would be the most useful to measure here? For example, repeated filtered scans over a cached table, or something else?
  • Are there any particular metrics you'd like to compare (execution time, throughput, cache scan time, etc.)?

I can work on those next once I know what would be most useful for evaluating this PR. Thnx again!

@andygrove

Copy link
Copy Markdown
Member

Thanks @pchintar. I will review more today.

@pchintar

Copy link
Copy Markdown
Author

Thanks @pchintar. I will review more today.

Sure, thnx for your time @andygrove

@andygrove andygrove left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the iteration @pchintar. The stats wiring looks correct (the five per-column fields match what SimpleMetricsCachedBatchSerializer expects) and the respect-user-serializer logic reads well. I left a few inline comments. The most important is the cross-version compile break in the new suite, followed by two spots where we run a Spark job where we probably shouldn't (per-scan and at planning time). On the microbenchmark you asked about: a new CometInMemoryCacheBenchmark under spark/src/test/scala/org/apache/spark/sql/benchmark/, modeled on the existing ones, measuring wall-clock over repeated scans of a cached table feeding a native operator (Comet cache enabled vs disabled), with a second selective-filter variant to exercise the new pruning.

}

private def cachedBatchTypes(table: String): Array[String] = {
val ds = spark.table(table).asInstanceOf[org.apache.spark.sql.classic.Dataset[_]]

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This suite lives under src/test/scala (built for all Spark versions) but uses org.apache.spark.sql.classic.Dataset, which only exists in Spark 4.x. In this repo that package is only referenced from spark/src/test/spark-4.x/. The default build is now Spark 4.1 so it compiles locally, but CI also builds 3.4 and 3.5 and this suite would fail to compile there. Could the cache lookup avoid the classic.Dataset cast, or move the version-specific helper into a spark-4.x shim? Same cast appears again around line 284.

}.toArray
}

val batchTypes = input.map(_.getClass.getName).distinct().collect()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

collect() is an action, so this kicks off a full distinct-and-collect job over the entire cached RDD on every scan, just to detect the batch type, before the real scan RDD is even returned. For a large cache that is an extra pass (plus a shuffle for distinct) per read, which undercuts the caching benefit. Since the batch type is homogeneous per relation, could this be handled lazily inside mapPartitions (pattern-match per batch, or inspect the first batch per partition) so no driver-side job is needed? The mixed-type guard is reasonable, but it should not cost a job. Same pattern in convertCachedBatchToInternalRow around line 315.


case scan: InMemoryTableScanExec =>
val cachedBuffers = scan.relation.cacheBuilder.cachedColumnBuffers
val firstBatchOpt = cachedBuffers.take(1).headOption

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cachedColumnBuffers builds the cache RDD and take(1) runs a job to fetch the first batch, during plan transformation. This rule can fire more than once under AQE re-planning, so we would run a job at plan time on each fire just to read the class of the first cached batch. Is there a way to detect the Comet cache format without materializing a batch, for example checking relation.cacheBuilder.serializer.isInstanceOf[ArrowCachedBatchSerializer] together with the enabled flag?

it.flatMap {
case cb: CometCachedBatch =>
Utils.decodeBatches(cb.bytes, "CometCache").map { batch =>
if (selectedIndices.length == batch.numCols()) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This treats "selected count equals cached column count" as an identity projection and returns the decoded batch unchanged. For a full-width but reordered projection (cache is [key, value], scan output is [value, key]), selectedIndices would be [1, 0], length 2 equals numCols 2, and we would return the batch in the wrong column order. Spark's DefaultCachedBatchSerializer always projects by computed indices rather than taking a length shortcut. Could this guard be selectedIndices.sameElements(batch.indices) instead? If InMemoryTableScanExec is guaranteed to always emit in relation order with a separate Project on top this is moot, but the length-only check is fragile. Same shortcut in convertCachedBatchToInternalRow around line 330.

} else {
val cols =
selectedIndices.map(i => batch.column(i).asInstanceOf[ColumnVector])
new ColumnarBatch(cols, batch.numRows())

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When projecting a strict subset, the new ColumnarBatch holds only the selected CometVectors, so closing it releases only those columns. Do the dropped columns get closed anywhere? If decodeBatches hands ownership of the off-heap Arrow buffers to the ColumnarBatch, this projection would drop that ownership for the unselected columns and leak their buffers until GC.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the dropped-column ownership question, I checked the decode path. Utils.decodeBatches returns an ArrowReaderIterator, and that iterator keeps the full decoded ColumnarBatch as currentBatch. Before loading the next batch, or when the iterator is closed, it calls currentBatch.close(). Since NativeUtil.rootAsBatch wraps all vectors from the Arrow VectorSchemaRoot, the dropped columns are still closed through the original full decoded batch owned by the iterator. Because of that, I did not add manual close calls in the projection code as I felt that doing so could close vectors while the Arrow reader/root still owns them.

@pchintar pchintar force-pushed the comet-native-in-memory-cache branch from 010fa63 to 87badf0 Compare July 1, 2026 06:56
@pchintar

pchintar commented Jul 1, 2026

Copy link
Copy Markdown
Author

Hi @andygrove , thanks again for the detailed review! I've addressed your review comments in the latest update:

  • Reworked the planner logic to determine whether the native cache path should be used from the configured cache serializer together with the feature flag, eliminating the planning-time Spark job previously used to inspect cached batches.

  • Removed the unnecessary driver-side collect().distinct() from the serializer read path

  • Replaced the length-based identity projection shortcut with an explicit identity-index check so only true identity projections bypass projection, while reordered projections continue to project columns correctly.

  • Updated the test suite to remove the Spark 4.x-specific classic.Dataset dependency from the shared test code, keeping the suite cross-version compatible.

  • For the ownership concern, I took another look at the current implementation but didn't make any further changes. The current implementation already decodes only the requested columns through convertCachedBatchToColumnarBatch, and I couldn't identify a way to simplify that path further without changing its behavior.

I also added the new CometInMemoryCacheBenchmark under spark/src/test/scala/org/apache/spark/sql/benchmark/, following the existing benchmark style. It compares repeated scans of a cached table with the native cache path enabled and disabled, together with a selective-filter variant to exercise the pruning path. On my local machine, I observed modest improvements for both workloads. When you have a chance, could you please take a look at the benchmark? I tried to follow the requested benchmark structure but if you'd prefer any further changes to the workload/setup, I would be happy to update it further.

@andygrove andygrove left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the iteration @pchintar. The fixes from the last round all read well: the classic.Dataset cross-version break is gone, the per-scan and plan-time jobs are removed, and the identity-projection guard now handles reordered projections correctly. On the dropped-column ownership question, your explanation checks out. ArrowReaderIterator retains the full decoded batch as currentBatch and closes it, so a strict-subset projection releases the unselected columns through the original batch. A few new things I'd like to look at.

Read path throws on a non-Comet cached batch

This is the one I'd most like to resolve. The write path is defensive but the read path is not.

convertInternalRowToCachedBatch delegates to fallback (the DefaultCachedBatchSerializer) when the feature is disabled, so a relation can end up holding DefaultCachedBatch payloads. But both read methods only handle CometCachedBatch and throw for anything else:

case other =>
  throw new IllegalStateException(s"Expected CometCachedBatch, got ${other.getClass.getName}")

spark.comet.exec.inMemoryCache.enabled is a normal SQLConf, so it can be toggled at runtime, while the serializer is installed once at startup and is fixed per relation. That opens this sequence: the session starts with the feature on (serializer installed), the user disables it at runtime, then caches a table. The write goes through the fallback and stores DefaultCachedBatch. Any later read of that relation calls convertCachedBatchToColumnarBatch, hits the case other => branch, and throws. That relation becomes permanently unreadable.

Could the read methods delegate non-CometCachedBatch inputs to fallback.convertCachedBatchToColumnarBatch / convertCachedBatchToInternalRow instead of throwing, so they stay symmetric with the write path?

Empty scan output (SELECT count(*))

Both convert and createExec fall back to the full cache schema when op.output is empty:

val actualOutput = if (op.output.nonEmpty) op.output else op.relation.output

So for an empty-projection scan, the native scan is built with the full schema and doExecuteColumnar emits full-width batches, while CometInMemoryTableScanExec.output stays empty. I couldn't convince myself the mismatch between the declared output and the emitted batch width is safe for the parent operator's serialization. The existing empty-cache test covers an empty relation, not an empty projection. Could you add a SELECT count(*) FROM cached_table case to confirm this path?

Minor: sizeInBytes stat is hard-coded to 0

In computeStats the fifth per-column stat is always 0L. This doesn't affect pruning correctness, but it understates the relation's per-column size in the UI and in InMemoryRelation size accounting. Worth a comment that this is intentionally skipped, or filling it in.

Minor: float/double bounds and NaN

compare uses java.lang.Float.compare / java.lang.Double.compare, which order NaN and -0.0 differently from Spark's primitive comparisons. Since pruning must never produce a false positive, a Float/Double column with NaN values under a selective filter would be reassuring. The current pruning test only exercises a Long key.

Benchmark

Nice to see CometInMemoryCacheBenchmark with both variants and the verifyPlan guard. Could you paste the actual benchmark numbers into the PR description? "Modest improvements" is hard to evaluate without the figures.

@andygrove

Copy link
Copy Markdown
Member

I ran CometInMemoryCacheBenchmark from this PR locally to see the numbers. Release build, Apple M3 Ultra, JDK 17, default Spark profile (3.5), 5M-row cached table.

Repeated full scan (SELECT sum(id), sum(k), sum(v))

Case Best (ms) Avg (ms) Rate (M/s) Relative
Comet cache disabled 180 201 27.7 1.0X
Comet cache enabled 121 128 41.3 1.5X

Selective filter (WHERE id >= 4500000 AND id < 4750000)

Case Best (ms) Avg (ms) Rate (M/s) Relative
Comet cache disabled 46 53 108.2 1.0X
Comet cache enabled 42 48 117.9 1.1X

The full repeated scan is about 1.5x faster, which is the case this targets directly since it drops the CometSparkColumnarToColumnar conversion on every read.

The selective filter is only about 1.1x. That is the workload I'd expect to gain the most from the new stats-based pruning, so the small gap is a bit surprising. My guess is the filtered query spends most of its time in the aggregate rather than the scan, so the pruning win gets diluted. It might be worth a variant that isolates the scan (wider projection, more selective predicate, or larger row count) to show the pruning benefit more clearly.

Could you add these numbers (or your own run) to the PR description?

@andygrove

Copy link
Copy Markdown
Member

@pchintar could you add the new suite to the pr_build*.yml workflows?

Suite not found in workflow .github/workflows/pr_build_linux.yml: org.apache.comet.exec.CometInMemoryCacheSuite

@andygrove

Copy link
Copy Markdown
Member

I reviewed the code from a performance angle and there are a few opportunities, but nothing that should block landing the functional version. I'd suggest we merge this first and do the optimizations as follow-ups. I filed #4781 to track them:

  • Read path: hoist UnsafeProjection.create out of the per-batch loop, look at reducing the per-scan deep copy driven by arrow_ffi_safe = false, and consider an optional uncompressed cache format.
  • Write path: specialize computeStats per column to stop boxing every value, and copy string bounds only on update rather than copying every value.

The trivial one is the UnsafeProjection hoist. The rest can come later.

@pchintar pchintar force-pushed the comet-native-in-memory-cache branch from 87badf0 to b411b7a Compare July 1, 2026 17:08
@pchintar

pchintar commented Jul 1, 2026

Copy link
Copy Markdown
Author

Hi @andygrove I've addressed all your additional read-path and pruning suggestions except the ones in #4781 as that would be a follow-up PR as you indicated above:

  • For cached batches written in Spark's default cache format, I added explicit handling in ArrowCachedBatchSerializer instead of letting the read path throw. The serializer now detects DefaultCachedBatch in both columnar and row read paths and decodes those batches locally with Spark's cache column format using the selected-column indexes and ColumnAccessor.decompress. This is only for the fallback cached-batch format and the normal CometCachedBatch path remains unchanged.

  • I also added a regression test covering this fallback path by caching data as DefaultCachedBatch and verifying that both the columnar and row read paths can read it successfully without errors.

  • Added a count(*) cached-table regression test for empty projection scans.

  • Added a NaN floating-point pruning regression test.

  • Added a comment explaining why the per-column size stat is currently left as 0.

I also updated the PR description by including the benchmark results there.

@pchintar

pchintar commented Jul 1, 2026

Copy link
Copy Markdown
Author

@pchintar could you add the new suite to the pr_build*.yml workflows?

Suite not found in workflow .github/workflows/pr_build_linux.yml: org.apache.comet.exec.CometInMemoryCacheSuite

@andygrove This is done too, I've added it inside both the .github/workflows/pr_build_linux.yml and .github/workflows/pr_build_macos.yml files under the exec group, right near CometExecSuite. So, could you kindly pls enable the CI checks now, thnx.

@pchintar pchintar force-pushed the comet-native-in-memory-cache branch from b411b7a to 86a97cc Compare July 1, 2026 18:42
@andygrove

Copy link
Copy Markdown
Member

@pchintar could you take a look at the compilation errors?

Error: ] /__w/datafusion-comet/datafusion-comet/spark/src/main/scala/org/apache/spark/sql/comet/CometInMemoryTableScanExec.scala:111: value setArrowFfiSafe is not a member of org.apache.comet.serde.OperatorOuterClass.Scan.Builder

@andygrove

andygrove commented Jul 1, 2026

Copy link
Copy Markdown
Member

@pchintar could you take a look at the compilation errors?

Error: ] /__w/datafusion-comet/datafusion-comet/spark/src/main/scala/org/apache/spark/sql/comet/CometInMemoryTableScanExec.scala:111: value setArrowFfiSafe is not a member of org.apache.comet.serde.OperatorOuterClass.Scan.Builder

The arrow_ffi_safe field was removed from Scan in #4572, so setArrowFfiSafe no longer exists on the builder. This PR will need to work around that to ensure that a deep copy is still made.

@pchintar pchintar force-pushed the comet-native-in-memory-cache branch from fe63af5 to 744e6c3 Compare July 2, 2026 11:54
@pchintar

pchintar commented Jul 2, 2026

Copy link
Copy Markdown
Author

Thanks @andygrove and @mbutrovich I tracked down the compilation issues after merging with the latest main and updated my branch accordingly.

My branch is now up to date with the current main, and I adapted the implementation to the Arrow C Stream changes from #4572 , Specifically:

  • Removed the obsolete setArrowFfiSafe usage since arrow_ffi_safe was removed with the Arrow C Stream migration.
  • Updated ArrowCachedBatchSerializer to use the current APIs (toStructType instead of DataTypeUtils.fromAttributes, and CometArrowAllocator for rowToArrowBatchIter).
  • Updated the Arrow stream schema reconciliation logic to match the current implementation while preserving the existing behavior.
  • Fixed the Scala syntactic/lint issue in CometExecRule.

So could you now please re-run the CI checks?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Explore options for accelerating InMemoryTableScanExec

3 participants