[fix][broker] Don't let a stuck or aborted topic policies cache init make a namespace's topics unloadable#26025
Conversation
c94b39b to
93fa3ba
Compare
…make a namespace's topics unloadable Topic loading waits for the namespace's topic policies cache to be initialized by reading the __change_events system topic to the end (SystemTopicBasedTopicPoliciesService#initPolicesCache), which completes a shared per-namespace future in policyCacheInitMap that every topic load awaits. That future could be left pending forever, leaving every topic in the namespace stuck and unloadable until the broker was restarted (issue apache#25294), in two ways: 1. The read loop had no timeout: a system-topic reader that reconnected but stopped making progress pinned the future indefinitely. 2. Several cleanup paths removed the future from policyCacheInitMap without ever completing it (most importantly the namespace-bundle unload path, removeOwnedNamespaceBundleAsync), relying on the reader being closed and the init chain failing to complete it indirectly. Modifications: - Add topicPoliciesCacheInitTimeoutSeconds (default 60s, dynamic). prepareInitPoliciesCacheAsync now schedules a timeout that fails the init future, and via an identity-guarded cleanup (cleanupAfterPolicyCacheInitTimeout) clears the cached state and closes the stuck reader only when the timed-out future is still the current one, so a concurrent retry/unload is never clobbered. A new metric pulsar.broker.topic.policies.cache.init.timeout.count counts these events. - cleanPoliciesCacheInitMap and close() now complete any pending init future they drop (exceptionally), so awaiting topic loads fail fast and retry instead of hanging. Completion happens outside the ConcurrentHashMap compute() remapping to avoid a recursive update / deadlock (apache#24977). Assisted-by: Claude Code (Opus 4.8)
93fa3ba to
33f4ddd
Compare
void-ptr974
left a comment
There was a problem hiding this comment.
I found a possible race here. For example, init A times out and cleanupAfterPolicyCacheInitTimeout removes A by identity. Then a later topic load starts init B for the same namespace and installs a new future/reader. If reader A later fails or is closed, this old callback calls cleanPoliciesCacheInitMap(namespace, ...) by namespace, so it may remove/close init B's newer future/reader.
Maybe we can add an identity check before cleanup here to avoid cleaning up state from a newer init attempt.
…er failure can't clobber a retry Addresses review feedback on apache#26025. The timeout cleanup was identity-guarded, but the reader-chain exceptionally cleanup in prepareInitPoliciesCacheAsync still removed/closed state by namespace key. If init A timed out (its future already dropped by the identity-guarded timeout cleanup and its reader closed) and a retry installed a fresh future/reader B for the same namespace, A's late reader-failure callback called cleanPoliciesCacheInitMap(namespace, ...) by key and dropped B's future and closed B's reader. When B had already finished initializing and was tail-following __change_events, that left the namespace with no reader and no init entry, so topic policy updates stopped until init was re-triggered. Generalize the timeout's identity-guarded cleanup into cleanupFailedPolicyCacheInit(namespace, initFuture, closeReader) and route both the timeout path and both reader-chain exceptionally branches through it, so a callback that no longer owns the namespace is a no-op. closeReader=false preserves the previous transient-read-error semantics (drop the init future only, keep the reader cached for the retry to reuse). cleanPoliciesCacheInitMap (by namespace) is still used by the unload and closed paths. The dropped future is completed outside any ConcurrentHashMap remapping function, preserving the apache#24977 deadlock guard. Add testCleanupFailedPolicyCacheInitIsIdentityGuarded covering both the stale-future no-op and the owner-future teardown. Assisted-by: Claude Code (Opus 4.8)
|
thanks @void-ptr974, pushed another commit to fix that |
… the identity-guarded cleanup The two existing exception-path tests in SystemTopicBasedTopicPoliciesServiceTest counted cleanPoliciesCacheInitMap invocations. Routing the reader-chain exceptionally cleanup through the identity-guarded cleanupFailedPolicyCacheInit moved those calls off cleanPoliciesCacheInitMap, so the counts changed even though the cleanup still runs exactly once per failure trigger: - testPrepareInitPoliciesCacheAsyncThrowExceptionAfterCreateReader: cleanPoliciesCacheInitMap runs once (readMorePoliciesAsync on the closed reader) and cleanupFailedPolicyCacheInit once (the second prepareInitPoliciesCacheAsync failing in initPolicesCache) — was 2x cleanPoliciesCacheInitMap. - testPrepareInitPoliciesCacheAsyncThrowExceptionInCreateReader: reader creation fails, so cleanup runs once via cleanupFailedPolicyCacheInit and never via cleanPoliciesCacheInitMap — was 1x cleanPoliciesCacheInitMap. Assert the moved counts so the tests still pin "cleanup runs exactly once per trigger, no recursion". Assisted-by: Claude Code (Opus 4.8)
void-ptr974
left a comment
There was a problem hiding this comment.
LGTM. I rechecked the cleanup path and the related tests pass locally. Thanks for the update.
|
Follow up PR #26132 to address a remaining gap that came to the surface after this PR. |
Fixes #25294
Motivation
When a topic is loaded, the broker first waits for the namespace's topic policies cache to be
initialized. Initialization (
SystemTopicBasedTopicPoliciesService#initPolicesCache) reads thenamespace's
__change_eventssystem topic to the end and completes a shared, per-namespacefuture (
policyCacheInitMap) that every topic load in the namespace awaits.That shared future could be left pending forever, leaving every topic in the namespace stuck and
unloadable until the broker was restarted (issue #25294), in two distinct ways:
__change_eventsreader reconnects but then stops makingprogress — e.g. after
__change_eventsis unloaded/moved and the reconnected reader gets stuck(see the compacted-read stuck-reader bug fixed in [fix][broker] Fix compacted read could be stuck forever or message loss due to cursor mark delete #25998) — the read loop never finishes and the
future stays pending. The 60s
topicLoadTimeoutSecondsonly fails the individual topic-loadfuture; it does not clear the poisoned
policyCacheInitMapentry or close the stuck reader, so thenamespace stays poisoned and every later load times out the same way.
policyCacheInitMapbut never complete the future — most importantly the namespace-bundle unloadpath (
removeOwnedNamespaceBundleAsync). They relied on the reader being closed and the init chainfailing to complete the future indirectly; if that didn't happen, awaiting topic loads hung. This
matches the "futures accumulate until the broker is restarted" symptom in the report.
This is defense-in-depth that is complementary to #25998 (which fixes one concrete stuck-reader
trigger on the broker/cursor side): it guarantees the per-namespace init future is always completed,
so a single stuck/aborted initialization can no longer take a whole namespace's topics down until
restart.
Modifications
topicPoliciesCacheInitTimeoutSeconds(default60, dynamic). It bounds topic policies cacheinitialization for a namespace. Set to
0/negative to disable (previous unbounded behavior).prepareInitPoliciesCacheAsyncschedules a timeout that fails the init future. On timeout, anidentity-guarded cleanup (
cleanupAfterPolicyCacheInitTimeout) clears the cached state andcloses the stuck reader only when the timed-out future is still the current one — it captures
the reader before the gate and uses
policyCacheInitMap.remove(ns, future)/readerCaches.remove(ns, reader), so a concurrent retry (or an unload that already replaced theentry) is never clobbered. A new
pulsar.broker.topic.policies.cache.init.timeout.countOpenTelemetry counter records these events. The timeout task is cancelled as soon as initialization
completes, so it adds no overhead on the normal path.
cleanPoliciesCacheInitMapandclose()now complete any pending init future they drop(exceptionally), so awaiting topic loads fail fast and retry with a fresh reader instead of hanging.
The completion is done outside the
ConcurrentHashMap#computeremapping function, becausecompleting the future can run the awaiting topic-load callbacks synchronously and doing that while
holding the bin lock risks a recursive map update / deadlock (the hazard addressed in [Bug] [broker] Concurrent error in SystemTopicBasedTopicPoliciesService#prepareInitPoliciesCacheAsync #24977).
Verifying this change
This change added tests and can be verified as follows:
...TopicPoliciesServiceTest#testPrepareInitPoliciesCacheAsyncTimesOutWhenReaderStuck: spies the__change_eventsreader so it reports more events but never delivers one (a stuck reader), thenasserts
prepareInitPoliciesCacheAsyncfails with aTimeoutExceptioninstead of hanging, that thepoisoned
policyCacheInitMapentry is cleared, and that the stuck reader is closed. Verified redwithout the fix and green with it.
...TopicPoliciesServiceTest#testCleanPoliciesCacheInitMapCompletesPendingInitFuture: asserts thatdropping a pending init future (both the reader-close and non-reader-close branches) completes it
exceptionally and removes it from the map, and that an already-completed future is left untouched.
SystemTopicBasedTopicPoliciesServiceTestsuite passes, including the existinginit/cleanup tests (cleanup call counts and behavior unchanged on the normal path).
Does this pull request potentially affect one of the following parts:
topicPoliciesCacheInitTimeoutSeconds, default 60s; topic policies cache initialization is now bounded by default instead of unbounded)pulsar.broker.topic.policies.cache.init.timeout.count)