Skip to content

Add structured concurrency wrapper for CompletableFuture#2939

Open
cconstable wants to merge 3 commits into
mainfrom
structured-concurrency
Open

Add structured concurrency wrapper for CompletableFuture#2939
cconstable wants to merge 3 commits into
mainfrom
structured-concurrency

Conversation

@cconstable

@cconstable cconstable commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

What was changed

Add a small, nonblocking structured concurrency-like wrapper for CompletableFuture. Loosely inspired by Java 25 StructuredTaskScope and Swift TaskGroup.

Allows for us to write this:

CompletableFuture<List<String>> result =
    TaskScope.withScope( // 1. create a scope
        scope -> {
            scope.attach(startWork()); // 2. add tasks to the scope
            scope.attach(startMoreWork())
                .thenApply(value -> doDownstreamWork(value));

            return scope.awaitAll(); // 3. collect results
        });

result.get();        // 4. get results a usual
// or
result.cancel(true); // 5. cancel (propagates to all child tasks)

See the README for more details.

Why?

I'm doing extstore work and fan-in / fan-out + proper cancellation + error handling boilerplate with raw CompletableFuture is tedious and error prone. Scoped task creation gives us correctness guarantees and better ergonomics. I noticed other disparate abstractions solving similar problems and provided a general solution.

Checklist

I wrote and generated a ton of tests for this and also ran some "stress" tests locally. Since this is just a wrapper on top of CompletableFuture and we aren't creating/using our own executors or threading abstractions it's not as scary of a thing as it looks.

@cconstable cconstable marked this pull request as ready for review July 2, 2026 22:14
@cconstable cconstable requested a review from a team as a code owner July 2, 2026 22:14
Comment on lines +86 to +90
Throwable unwrapped = unwrap(error);
if (unwrapped instanceof CancellationException) {
next.completeExceptionally(unwrapped);
return;
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cancellation is an error but it's not something we can "recover from" so we should just immediately surface cancellations (skip the user supplied transform)

@cconstable cconstable force-pushed the structured-concurrency branch from b0455e7 to 6371f6d Compare July 3, 2026 01:49

/** Token that allows asynchronous code to observe cancellation requests. */
@Experimental
public interface CancellationToken {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • the only "public" part of this PR
  • meant to serve as a general purpose cancellation token
  • can replace existing bespoke tokens e.g. ActivityCancellationToken

* Attaches an existing asynchronous task to this scope. Scope cancellation requests cancellation
* on the attached future.
*/
<U> TaskChain<U> attach(CompletableFuture<U> future);

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main usage point:

scope.attach(someAsyncWork());

It lifts a CompletableFuture into the structured scope. It's the bridge from the unstructured CF world into the structured one. We return a TaskChain rather than a CompletableFuture so callers can chain downstream work (via map, recover, etc) without leaving the scope. It also does not expose blocking operations and foot-guns like exceptionally that break cancellation.

private ListUtils() {}

/** Concatenates a list of lists into a single list, preserving order. */
public static <T> List<T> flatten(List<? extends List<? extends T>> lists) {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't used in this PR but it's a natural part of the using this abstraction:

scope.awaitAll(ListUtils::flatten);

It's also being used in the branch I have this relies on this branch.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant