Add structured concurrency wrapper for CompletableFuture#2939
Add structured concurrency wrapper for CompletableFuture#2939cconstable wants to merge 3 commits into
Conversation
| Throwable unwrapped = unwrap(error); | ||
| if (unwrapped instanceof CancellationException) { | ||
| next.completeExceptionally(unwrapped); | ||
| return; | ||
| } |
There was a problem hiding this comment.
cancellation is an error but it's not something we can "recover from" so we should just immediately surface cancellations (skip the user supplied transform)
b0455e7 to
6371f6d
Compare
|
|
||
| /** Token that allows asynchronous code to observe cancellation requests. */ | ||
| @Experimental | ||
| public interface CancellationToken { |
There was a problem hiding this comment.
- the only "public" part of this PR
- meant to serve as a general purpose cancellation token
- can replace existing bespoke tokens e.g.
ActivityCancellationToken
| * Attaches an existing asynchronous task to this scope. Scope cancellation requests cancellation | ||
| * on the attached future. | ||
| */ | ||
| <U> TaskChain<U> attach(CompletableFuture<U> future); |
There was a problem hiding this comment.
This is the main usage point:
scope.attach(someAsyncWork());It lifts a CompletableFuture into the structured scope. It's the bridge from the unstructured CF world into the structured one. We return a TaskChain rather than a CompletableFuture so callers can chain downstream work (via map, recover, etc) without leaving the scope. It also does not expose blocking operations and foot-guns like exceptionally that break cancellation.
| private ListUtils() {} | ||
|
|
||
| /** Concatenates a list of lists into a single list, preserving order. */ | ||
| public static <T> List<T> flatten(List<? extends List<? extends T>> lists) { |
There was a problem hiding this comment.
This isn't used in this PR but it's a natural part of the using this abstraction:
scope.awaitAll(ListUtils::flatten);It's also being used in the branch I have this relies on this branch.
What was changed
Add a small, nonblocking structured concurrency-like wrapper for
CompletableFuture. Loosely inspired by Java 25StructuredTaskScopeand SwiftTaskGroup.Allows for us to write this:
See the README for more details.
Why?
I'm doing extstore work and fan-in / fan-out + proper cancellation + error handling boilerplate with raw
CompletableFutureis tedious and error prone. Scoped task creation gives us correctness guarantees and better ergonomics. I noticed other disparate abstractions solving similar problems and provided a general solution.Checklist
I wrote and generated a ton of tests for this and also ran some "stress" tests locally. Since this is just a wrapper on top of
CompletableFutureand we aren't creating/using our own executors or threading abstractions it's not as scary of a thing as it looks.