Skip to content

[fix][broker] Don't let a closing topic-policies reader abort a concurrent cache-init reload#26132

Merged
lhotari merged 8 commits into
apache:masterfrom
lhotari:lh-fix-topic-policies-reader-close-race
Jul 2, 2026
Merged

[fix][broker] Don't let a closing topic-policies reader abort a concurrent cache-init reload#26132
lhotari merged 8 commits into
apache:masterfrom
lhotari:lh-fix-topic-policies-reader-close-race

Conversation

@lhotari

@lhotari lhotari commented Jul 1, 2026

Copy link
Copy Markdown
Member

Motivation

A namespace unload immediately followed by a load of a topic in the same namespace (for example
admin.namespaces().unload(ns) and then a lookup / getTopic) can fail the topic load in production
with a spurious error:

org.apache.pulsar.broker.service.BrokerServiceException:
Topic policies cache initialization for namespace <ns> was aborted because the cached state was cleared

SystemTopicBasedTopicPoliciesService.readMorePoliciesAsync tears down a namespace's cached policy
state by namespace key (cleanPoliciesCacheInitMap) when its __change_events reader observes an
AlreadyClosedException. That callback runs on the pulsar-client executor, not the per-namespace
ordered executor, so it races a reload:

  1. A namespace unload closes reader R1 asynchronously (via cleanPoliciesCacheInitMap
    Reader#closeAsync) and drops its init future.
  2. A reload of a topic in the same namespace (e.g. getTopic right after unload) installs a fresh
    reader R2 and a fresh, still-pending init future F2, and the topic load awaits F2.
  3. R1's asynchronous close finally completes; its pending readNextAsync fails with
    AlreadyClosedException, and readMorePoliciesAsync's AlreadyClosed branch runs the
    namespace-keyed cleanup, which removes R2 and completes F2 exceptionally with "…aborted because
    the cached state was cleared"
    . The awaiting topic load fails.

This race is not new — the reader-close branch has torn down state by namespace key since long before
this became visible — but it was previously benign. cleanPoliciesCacheInitMap simply removed the
init future from the map without completing it, so a clobbered reload still resolved via the reader's
own init chain or the getTopicPoliciesAsync retry path. #26025 changed that: to stop a stuck init
from pinning a namespace's topic loads until the broker restarts (#25294), the cleanup now completes a
dropped-but-pending init future exceptionally (failPendingPolicyCacheInit, the "…was aborted
because the cached state was cleared"
message). That fail-fast is correct and intentional, but it
turns the pre-existing clobber into a hard topic-load failure.

#26025 also introduced the identity-guarded cleanupFailedPolicyCacheInit and applied it to the
init-failure path — but left the reader-close path in readMorePoliciesAsync on the old
namespace-keyed cleanup. This PR completes that hardening.

The bug surfaces in CI as the flaky AdminApi2Test.testGetInternalStatsWithProperties (which unloads a
namespace and then reloads the topic), but the failure is a genuine production race, not a test artifact.

Modifications

Core fix:

  • Thread the initialization future through readMorePoliciesAsync(reader, initFuture).
  • Route its AlreadyClosed cleanup (and the service-closed cleanup) through the existing identity-guarded
    cleanupFailedPolicyCacheInit(namespace, initFuture, true) instead of the namespace-keyed
    cleanPoliciesCacheInitMap. It only tears down state that still belongs to this initialization, so a
    superseded reader's late close becomes a no-op, while a still-current reader tears down its own
    generation exactly as before. This is sound because a newer reader/init generation can only be
    installed after the previous one has been removed (readers and init futures are installed and removed
    as a pair), so guarding on the init future's identity correctly distinguishes the two.
  • Extend the cleanupFailedPolicyCacheInit javadoc to cover the reader-closed-after-init case.

Harden cleanupFailedPolicyCacheInit against the same class of generation clobbering:

  • Do not clear policiesCache/globalPoliciesCache here. A late cleanup for a superseded init could
    otherwise wipe policy entries that a fresh reader for a concurrent re-initialization has already
    repopulated, dropping topic policies until the next __change_events replay — a regression. The cached
    policies are now cleared only when the whole namespace is unloaded (all bundles gone), via
    cleanPoliciesCacheInitMap in removeOwnedNamespaceBundleAsync.
  • Remove the TopicPolicyMessageHandlerTracker by identity (remove(namespace, tracker)) so a stale
    cleanup can't close a tracker that a concurrent re-initialization installed for the namespace.

Small cleanups in the same file:

  • Remove the always-true closeReader parameter (and its dead false branch) from
    cleanPoliciesCacheInitMap; it is only ever called for the full namespace-unload teardown.
  • Remove the unused cleanCacheAndCloseReader method, simplify newReader with computeIfAbsent, and
    route writer-cache invalidation through the existing cleanWriterCache helper.

Verifying this change

This change added tests and can be verified as follows:

  • Added SystemTopicBasedTopicPoliciesServiceTest.testClosedSupersededReaderDoesNotAbortReloadedInit,
    a deterministic regression test: it parks the old reader's read loop on a controllable future, installs
    a newer reader + still-pending init future for the namespace, then completes the old reader's read with
    AlreadyClosedException and asserts the newer generation's init future is not aborted and its reader
    stays cached. It fails on the previous code (the reload's init future is aborted) and passes with the
    fix.
  • Updated testPrepareInitPoliciesCacheAsyncThrowExceptionAfterCreateReader: the reader-close path now
    routes through the identity-guarded cleanup, so its Mockito verify counts change
    (cleanPoliciesCacheInitMap 1→0, cleanupFailedPolicyCacheInit 1→2).
  • The full SystemTopicBasedTopicPoliciesServiceTest class and
    AdminApi2Test.testGetInternalStatsWithProperties pass; checkstyle and spotless are clean.

Does this pull request potentially affect one of the following parts:

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

…rrent cache-init reload

SystemTopicBasedTopicPoliciesService.readMorePoliciesAsync tore down the
namespace's cached policy state by namespace key (cleanPoliciesCacheInitMap)
when its __change_events reader observed an AlreadyClosedException. That callback
runs on the pulsar-client executor, not the per-namespace ordered executor, so
it races a reload: a namespace unload closes reader R1 asynchronously while a
reload (e.g. getTopic right after unload) installs a fresh reader R2 and init
future F2; R1's late close then clears R2/F2 and aborts F2 with "Topic policies
cache initialization ... was aborted because the cached state was cleared",
failing the reloading topic load. This surfaces as the flaky
AdminApi2Test.testGetInternalStatsWithProperties.

Route the reader-close (and the closed-service) cleanup through the existing
identity-guarded cleanupFailedPolicyCacheInit, threading the init future through
readMorePoliciesAsync. It only tears down state that still belongs to this
initialization, so a superseded reader's late close is a no-op while a
still-current reader tears down its own generation exactly as before.

Assisted-by: Claude Code (Opus 4.8)
lhotari added 7 commits July 2, 2026 00:49
It has no callers. Reader, init-future, tracker and policy-cache teardown is
handled by cleanPoliciesCacheInitMap and cleanupFailedPolicyCacheInit.
On a writer-creation failure, call cleanWriterCache(namespace) instead of
inlining writerCaches.synchronous().invalidate(...), so all writer-cache
invalidation goes through a single method.
…CacheInit

Capture the TopicPolicyMessageHandlerTracker before the identity guard and
remove it only when it is still the same instance
(topicPolicyMessageHandlerTrackers.remove(namespace, tracker)) instead of an
unconditional remove(namespace). This prevents a stale cleanup from closing a
tracker that a concurrent re-initialization has already installed for the
namespace.
readerCaches.compute(...) that returns the existing value unchanged when present
is exactly computeIfAbsent(); use it directly.
Explain that the method only proceeds while this initialization still owns the
namespace's init future, and that the reader is removed by identity so a reader
created by a later initialization is never closed by a stale cleanup.
Clearing policiesCache/globalPoliciesCache here can race with a new concurrent
initialization for the namespace: a late cleanup for a superseded init could
wipe entries that a fresh reader has already repopulated. Keep the cached
policies in place; they are cleared only when the whole namespace is unloaded
(all bundles gone) via cleanPoliciesCacheInitMap in
removeOwnedNamespaceBundleAsync.

Update the closeReader javadoc to match and reword the tracker-removal comment.
…nitMap

cleanPoliciesCacheInitMap is only ever called with closeReader=true in
production (the namespace-unload teardown and the closed-service check). Drop
the parameter and the dead closeReader=false branch, and update the callers and
tests accordingly.
@lhotari lhotari marked this pull request as ready for review July 1, 2026 23:11
@lhotari lhotari merged commit decc80f into apache:master Jul 2, 2026
45 checks passed
@lhotari lhotari added this to the 5.0.0-M2 milestone Jul 2, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants