[fix][broker] Don't let a closing topic-policies reader abort a concurrent cache-init reload#26132
Merged
lhotari merged 8 commits intoJul 2, 2026
Conversation
…rrent cache-init reload SystemTopicBasedTopicPoliciesService.readMorePoliciesAsync tore down the namespace's cached policy state by namespace key (cleanPoliciesCacheInitMap) when its __change_events reader observed an AlreadyClosedException. That callback runs on the pulsar-client executor, not the per-namespace ordered executor, so it races a reload: a namespace unload closes reader R1 asynchronously while a reload (e.g. getTopic right after unload) installs a fresh reader R2 and init future F2; R1's late close then clears R2/F2 and aborts F2 with "Topic policies cache initialization ... was aborted because the cached state was cleared", failing the reloading topic load. This surfaces as the flaky AdminApi2Test.testGetInternalStatsWithProperties. Route the reader-close (and the closed-service) cleanup through the existing identity-guarded cleanupFailedPolicyCacheInit, threading the init future through readMorePoliciesAsync. It only tears down state that still belongs to this initialization, so a superseded reader's late close is a no-op while a still-current reader tears down its own generation exactly as before. Assisted-by: Claude Code (Opus 4.8)
2 tasks
merlimat
approved these changes
Jul 1, 2026
It has no callers. Reader, init-future, tracker and policy-cache teardown is handled by cleanPoliciesCacheInitMap and cleanupFailedPolicyCacheInit.
On a writer-creation failure, call cleanWriterCache(namespace) instead of inlining writerCaches.synchronous().invalidate(...), so all writer-cache invalidation goes through a single method.
…CacheInit Capture the TopicPolicyMessageHandlerTracker before the identity guard and remove it only when it is still the same instance (topicPolicyMessageHandlerTrackers.remove(namespace, tracker)) instead of an unconditional remove(namespace). This prevents a stale cleanup from closing a tracker that a concurrent re-initialization has already installed for the namespace.
readerCaches.compute(...) that returns the existing value unchanged when present is exactly computeIfAbsent(); use it directly.
Explain that the method only proceeds while this initialization still owns the namespace's init future, and that the reader is removed by identity so a reader created by a later initialization is never closed by a stale cleanup.
Clearing policiesCache/globalPoliciesCache here can race with a new concurrent initialization for the namespace: a late cleanup for a superseded init could wipe entries that a fresh reader has already repopulated. Keep the cached policies in place; they are cleared only when the whole namespace is unloaded (all bundles gone) via cleanPoliciesCacheInitMap in removeOwnedNamespaceBundleAsync. Update the closeReader javadoc to match and reword the tracker-removal comment.
…nitMap cleanPoliciesCacheInitMap is only ever called with closeReader=true in production (the namespace-unload teardown and the closed-service check). Drop the parameter and the dead closeReader=false branch, and update the callers and tests accordingly.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation
A namespace unload immediately followed by a load of a topic in the same namespace (for example
admin.namespaces().unload(ns)and then a lookup /getTopic) can fail the topic load in productionwith a spurious error:
SystemTopicBasedTopicPoliciesService.readMorePoliciesAsynctears down a namespace's cached policystate by namespace key (
cleanPoliciesCacheInitMap) when its__change_eventsreader observes anAlreadyClosedException. That callback runs on the pulsar-client executor, not the per-namespaceordered executor, so it races a reload:
R1asynchronously (viacleanPoliciesCacheInitMap→Reader#closeAsync) and drops its init future.getTopicright afterunload) installs a freshreader
R2and a fresh, still-pending init futureF2, and the topic load awaitsF2.R1's asynchronous close finally completes; its pendingreadNextAsyncfails withAlreadyClosedException, andreadMorePoliciesAsync's AlreadyClosed branch runs thenamespace-keyed cleanup, which removes
R2and completesF2exceptionally with "…aborted becausethe cached state was cleared". The awaiting topic load fails.
This race is not new — the reader-close branch has torn down state by namespace key since long before
this became visible — but it was previously benign.
cleanPoliciesCacheInitMapsimply removed theinit future from the map without completing it, so a clobbered reload still resolved via the reader's
own init chain or the
getTopicPoliciesAsyncretry path. #26025 changed that: to stop a stuck initfrom pinning a namespace's topic loads until the broker restarts (#25294), the cleanup now completes a
dropped-but-pending init future exceptionally (
failPendingPolicyCacheInit, the "…was abortedbecause the cached state was cleared" message). That fail-fast is correct and intentional, but it
turns the pre-existing clobber into a hard topic-load failure.
#26025 also introduced the identity-guarded
cleanupFailedPolicyCacheInitand applied it to theinit-failure path — but left the reader-close path in
readMorePoliciesAsyncon the oldnamespace-keyed cleanup. This PR completes that hardening.
The bug surfaces in CI as the flaky
AdminApi2Test.testGetInternalStatsWithProperties(which unloads anamespace and then reloads the topic), but the failure is a genuine production race, not a test artifact.
Modifications
Core fix:
readMorePoliciesAsync(reader, initFuture).cleanupFailedPolicyCacheInit(namespace, initFuture, true)instead of the namespace-keyedcleanPoliciesCacheInitMap. It only tears down state that still belongs to this initialization, so asuperseded reader's late close becomes a no-op, while a still-current reader tears down its own
generation exactly as before. This is sound because a newer reader/init generation can only be
installed after the previous one has been removed (readers and init futures are installed and removed
as a pair), so guarding on the init future's identity correctly distinguishes the two.
cleanupFailedPolicyCacheInitjavadoc to cover the reader-closed-after-init case.Harden
cleanupFailedPolicyCacheInitagainst the same class of generation clobbering:policiesCache/globalPoliciesCachehere. A late cleanup for a superseded init couldotherwise wipe policy entries that a fresh reader for a concurrent re-initialization has already
repopulated, dropping topic policies until the next
__change_eventsreplay — a regression. The cachedpolicies are now cleared only when the whole namespace is unloaded (all bundles gone), via
cleanPoliciesCacheInitMapinremoveOwnedNamespaceBundleAsync.TopicPolicyMessageHandlerTrackerby identity (remove(namespace, tracker)) so a stalecleanup can't close a tracker that a concurrent re-initialization installed for the namespace.
Small cleanups in the same file:
closeReaderparameter (and its deadfalsebranch) fromcleanPoliciesCacheInitMap; it is only ever called for the full namespace-unload teardown.cleanCacheAndCloseReadermethod, simplifynewReaderwithcomputeIfAbsent, androute writer-cache invalidation through the existing
cleanWriterCachehelper.Verifying this change
This change added tests and can be verified as follows:
SystemTopicBasedTopicPoliciesServiceTest.testClosedSupersededReaderDoesNotAbortReloadedInit,a deterministic regression test: it parks the old reader's read loop on a controllable future, installs
a newer reader + still-pending init future for the namespace, then completes the old reader's read with
AlreadyClosedExceptionand asserts the newer generation's init future is not aborted and its readerstays cached. It fails on the previous code (the reload's init future is aborted) and passes with the
fix.
testPrepareInitPoliciesCacheAsyncThrowExceptionAfterCreateReader: the reader-close path nowroutes through the identity-guarded cleanup, so its Mockito verify counts change
(
cleanPoliciesCacheInitMap1→0,cleanupFailedPolicyCacheInit1→2).SystemTopicBasedTopicPoliciesServiceTestclass andAdminApi2Test.testGetInternalStatsWithPropertiespass; checkstyle and spotless are clean.Does this pull request potentially affect one of the following parts: