JEP 505: Structured Concurrency (Fifth Preview)

AuthorsAlan Bateman, Viktor Klang, & Ron Pressler
OwnerAlan Bateman
TypeFeature
ScopeSE
StatusCandidate
Componentcore-libs
Discussionloom dash dev at openjdk dot org
Relates toJEP 499: Structured Concurrency (Fourth Preview)
Reviewed byPaul Sandoz
Created2024/09/18 04:58
Updated2025/04/15 05:55
Issue8340343

Summary

Simplify concurrent programming by introducing an API for structured concurrency. Structured concurrency treats groups of related tasks running in different threads as single units of work, thereby streamlining error handling and cancellation, improving reliability, and enhancing observability. This is a preview API.

History

Structured Concurrency incubated in JDK 19 via JEP 428 and JDK 20 via JEP 437. It previewed in JDK 21 via JEP 453, with the fork method changed to return a Subtask rather than a Future. It re-previewed in JDK 22 via JEP 462 and JDK 23 via JEP 480.

We propose to preview the API once more in JDK 25 with several API changes. In particular, a StructuredTaskScope is now opened via static factory methods rather than public constructors. The zero-parameter open factory method covers the common case by creating a StructuredTaskScope that waits for all subtasks to succeed or any subtask to fail. Other policies and outcomes can be implemented by providing an appropriate Joiner to one of the richer open factory methods.

Goals

Non-Goals

Motivation

We manage complexity in programs by breaking tasks down into multiple subtasks. In ordinary single-threaded code, the subtasks execute sequentially. However, if the subtasks are sufficiently independent of each other, and if there are sufficient hardware resources, then the overall task can be made to run faster, i.e., with lower latency, by executing the subtasks concurrently. For example, a task that composes the results of multiple I/O operations will run faster if each I/O operation executes concurrently in its own thread. Virtual threads (JEP 444) make it cost-effective to dedicate a thread to every such I/O operation, but managing the huge number of threads that can result remains a challenge.

Unstructured concurrency with ExecutorService

The java.util.concurrent.ExecutorService API, introduced in Java 5, can execute subtasks concurrently.

For example, here is a method, handle(), that represents a task in a server application. It handles an incoming request by submitting two subtasks to an ExecutorService. One subtask executes the method findUser(), while the other executes the method fetchOrder(). The ExecutorService immediately returns a Future for each subtask, and executes the subtasks concurrently according to its scheduling policy. The handle() method awaits the subtasks' results via blocking calls to their futures' get() methods, so the task is said to join its subtasks.

Response handle() throws ExecutionException, InterruptedException {
    Future<String> user = executor.submit(() -> findUser());
    Future<Integer> order = executor.submit(() -> fetchOrder());
    String theUser = user.get();   // Join findUser
    int theOrder = order.get();    // Join fetchOrder
    return new Response(theUser, theOrder);
}

Because the subtasks execute concurrently, each subtask can succeed or fail independently. (Failure, in this context, means to throw an exception.) Often, a task such as handle() should fail if any of its subtasks fail. Understanding the lifetimes of the threads can be surprisingly complicated when failure occurs:

In each case, the problem is that our program is logically structured with task-subtask relationships, but these relationships exist only in our minds.

This not only creates more room for error, but it makes diagnosing and troubleshooting such errors more difficult. Observability tools such as thread dumps, for example, will show handle(), findUser(), and fetchOrder() on the call stacks of unrelated threads, with no hint of the task-subtask relationship.

We might attempt to do better by explicitly cancelling other subtasks when an error occurs, for example by wrapping tasks with try-finally and calling the cancel(boolean) methods of the futures of the other tasks in the catch block for the failing task. We would also need to use the ExecutorService inside a try-with-resources statement, as shown in the examples in JEP 444, because Future does not offer a way to wait for a task that has been cancelled. But all this keeping track of the inter-task relationships can be tricky to get right, and it often makes the logical intent of the code harder to discern.

This need to manually coordinate lifetimes is due to the fact that ExecutorService and Future allow unrestricted patterns of concurrency. There are no constraints upon, or ordering of, any of the threads involved. One thread can create an ExecutorService, a second thread can submit work to it, and the threads which execute the work have no relationship to either the first or second thread. Moreover, after a thread has submitted work, a completely different thread can await the results of execution. Any code with a reference to a Future can join it, i.e., await its result by calling get() — even code in a thread other than the one which obtained the Future. In effect, a subtask started by one task does not have to return to the task that submitted it. It could return to any of a number of tasks — or possibly none.

Because ExecutorService and Future allow for such unstructured use, they do not enforce or even track relationships among tasks and subtasks, even though such relationships are common and useful. Accordingly, even when subtasks are submitted and joined in the same task, the failure of one subtask cannot automatically cause the cancellation of another: In the above handle() method, the failure of fetchOrder() cannot automatically cause the cancellation of findUser(). The future for fetchOrder() is unrelated to the future for findUser(), and neither is related to the thread that will ultimately join it via its get() method.

Rather than manage such cancellation manually, we should aim to reliably automate it.

Task structure should reflect code structure

In contrast to the freewheeling assortment of threads under ExecutorService, the execution of single-threaded code always enforces a hierarchy of tasks and subtasks. The body block {...} of a method corresponds to a task, and the methods invoked within the block correspond to subtasks. An invoked method must either return to, or throw an exception to, the method that invoked it. An invoked method cannot outlive the method that invoked it, nor can it return to or throw an exception to a different method. Thus all subtasks finish before the task, each subtask is a child of its parent, and the lifetime of each subtask relative to the others and to the task is governed by the syntactic block structure of the code.

For example, in this single-threaded version of handle(), the task-subtask relationship is apparent from the syntactic structure:

Response handle() throws IOException {
    String theUser = findUser();
    int theOrder = fetchOrder();
    return new Response(theUser, theOrder);
}

We do not start the fetchOrder() subtask until the findUser() subtask has completed, whether successfully or unsuccessfully. If findUser() fails then we do not start fetchOrder() at all, and the handle() task fails implicitly. The fact that a subtask can return only to its parent is significant: It implies that the parent task can implicitly treat the failure of one subtask as a trigger to cancel other unfinished subtasks and then fail itself.

In single-threaded code, the task-subtask hierarchy is reified in the call stack at run time. We thus get the corresponding parent-child relationships, which govern error propagation, for free. When observing a single thread, the hierarchical relationship is obvious: findUser(), and later fetchOrder(), appear subordinate to handle(). This makes it easy to answer the question, "what is handle() working on now?"

Concurrent programming would be easier, more reliable, and more observable if the parent-child relationships between tasks and their subtasks were evident from the syntactic structure of the code and also reified at run time — just as for single-threaded code. The syntactic structure would delineate the lifetimes of subtasks and enable a runtime representation of the inter-thread hierarchy, analogous to the intra-thread call stack. That representation would enable error propagation and cancellation as well as meaningful observation of the concurrent program.

(The Java Platform already has an API for imposing structure on concurrent tasks, namely java.util.concurrent.ForkJoinPool, which is the execution engine behind parallel streams. However, that API is designed for compute-intensive tasks rather than tasks which involve I/O.)

Structured concurrency

Structured concurrency is an approach to concurrent programming that preserves the natural relationship between tasks and subtasks, which leads to more readable, maintainable, and reliable concurrent code. The term "structured concurrency" was coined by Martin Sústrik and popularized by Nathaniel J. Smith. Ideas from other languages, such as Erlang's hierarchical supervisors, inform the design of error handling in structured concurrency.

Structured concurrency derives from a simple principle: If a task splits into concurrent subtasks then they all return to the same place, namely the task's code block.

In structured concurrency, subtasks work on behalf of a task. The task awaits the subtasks' results and monitors them for failures. As with structured programming techniques for code in a single thread, the power of structured concurrency for multiple threads comes from two ideas: The flow of execution through a block of code has well-defined entry and exit points, and the lifetimes of operations are nested in a way that mirrors their syntactic nesting in the code.

Because the entry and exit points of a block of code are well defined, the lifetime of a concurrent subtask is confined to the syntactic block of its parent task. Because the lifetimes of sibling subtasks are nested within that of their parent task, they can be reasoned about and managed as a unit. Because the lifetime of the parent task is, in turn, nested within that of its parent, the runtime can reify the hierarchy of tasks into a tree that is the concurrent counterpart of the call stack of a single thread. This allows policies, such as deadlines, to be applied to entire sub-trees of tasks, and allows observability tools to present subtasks as subordinate to their parent tasks.

Structured concurrency is a great match for virtual threads, which are lightweight threads implemented by the JDK. Virtual threads share operating-system threads, allowing for enormous numbers of virtual threads. In addition to being plentiful, virtual threads are cheap enough to represent any concurrent unit of behavior, even behavior that involves I/O. This means that a server application can use structured concurrency to process thousands or millions of incoming requests at once: It can dedicate a new virtual thread to the task of handling each request, and when a task fans out by submitting subtasks for concurrent execution then it can dedicate a new virtual thread to each subtask. Behind the scenes, the task-subtask relationship is reified into a tree by arranging for each virtual thread to carry a reference to its unique parent, similar to how a frame in the call stack refers to its unique caller.

In summary, virtual threads deliver an abundance of threads. Structured concurrency can correctly and robustly coordinate them, and enables observability tools to display threads as they are understood by developers. Having an API for structured concurrency in the Java Platform would make it easier to build maintainable, reliable, and observable server applications.

Description

The principal class of the structured concurrency API is StructuredTaskScope, in the java.util.concurrent package. This class allows us to structure a task as a family of concurrent subtasks, and to coordinate them as a unit. Subtasks are executed in their own threads by forking them individually and then joining them as a unit. StructuredTaskScope confines the lifetimes of the subtasks to a clear lexical scope in which all of a task's interactions with its subtasks — forking, joining, handling errors, and composing results — takes place.

Here is the handle() example from earlier, revised to use StructuredTaskScope:

Response handle() throws InterruptedException {

    try (var scope = StructuredTaskScope.open()) {

        Subtask<String> user = scope.fork(() -> findUser());
        Subtask<Integer> order = scope.fork(() -> fetchOrder());

        scope.join();   // Join subtasks, propagating exceptions

        // Both subtasks have succeeded, so compose their results
        return new Response(user.get(), order.get());

    }

}

In contrast to the original example, understanding the lifetimes of the threads involved here is easy: Under all conditions, their lifetimes are confined to a lexical scope, namely the body of the try-with-resources statement. Furthermore, the use of StructuredTaskScope ensures a number of valuable properties:

StructuredTaskScope is a preview API, disabled by default

To use the StructuredTaskScope API you must enable preview APIs, as follows:

Using StructuredTaskScope

The StructuredTaskScope<T, R> API, where T is the result type of tasks forked in the scope and R is the result type of the join method, can be summarized as:

public sealed interface StructuredTaskScope<T, R> extends AutoCloseable {

    public static <T> StructuredTaskScope<T, Void> open();
    public static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T,
                                                               ? extends R> joiner);

    public <U extends T> Subtask<U> fork(Callable<? extends U> task);
    public Subtask<? extends T> fork(Runnable task);
  
    public R join() throws InterruptedException;

    public void close();

}

The general workflow of code using StructuredTaskScope is:

  1. Open a new scope by calling one of the static open methods. The thread that opens the scope is the scope's owner.

  2. Use the fork methods to fork subtasks in the scope.

  3. Use the join method to join all of the scope's subtasks, as a unit. This may throw an exception.

  4. Process the outcome.

  5. Close the scope, usually implicitly via try-with-resources. This cancels the scope, if it is not already cancelled, thereby cancelling all of its remaining subtasks and waiting for them to terminate.

In the handle() example, the zero-parameter open() factory method creates and opens a StructuredTaskScope that implements the default completion policy, which is to fail if any subtask fails. Other policies can be implemented, as we shall see below, by passing a suitable Joiner to the one-parameter open method.

Each call to a fork method starts a thread to execute a subtask, which by default is a virtual thread. A subtask can create its own StructuredTaskScope to fork its own subtasks, thus creating a hierarchy of scopes. That hierarchy is reflected in the code's block structure, which confines the lifetimes of the subtasks: All of the subtasks' threads are guaranteed to have terminated once the scope is closed, and no thread is left behind when the block exits.

The join method must be called by the scope's owner thread from within the scope. If a scope's block exits before joining then the scope is cancelled and the owner will wait in the close method for all subtasks to terminate before throwing an exception.

After joining, the scope's owner can process the results of the subtasks using the Subtask objects returned from the fork methods. The Subtask::get method throws an exception if called before joining.

Cancellation

A scope's owner thread may be interrupted either before or while joining. For example, the owner could itself be a subtask of an enclosing scope that has been cancelled. If this occurs then join() will throw an exception because there is no point in continuing. The try-with-resources statement will then cancel the scope, which will cancel all the subtasks and wait for them to terminate. This has the effect of automatically propagating the cancellation of the task to its subtasks.

To allow for cancellation, subtasks must be coded so that they finish as soon as possible when interrupted. Subtasks that do not respond to interrupts because, e.g., they block on methods that are not interruptible, may delay the closing of a scope indefinitely. The close method always waits for threads executing subtasks to finish, even if the scope is cancelled. Execution cannot continue beyond the close method until the interrupted threads finish.

Scoped values

Subtasks forked in a scope inherit ScopedValue bindings (JEP 487). If a scope's owner reads a value from a bound ScopedValue then each subtask will read the same value.

Structured use is enforced

At run time, StructuredTaskScope enforces structure and order upon concurrent operations. For example, attempts to call a fork method from a thread that is not the scope's owner will fail with an exception. Using a scope outside of a try-with-resources block and returning without calling close(), or without maintaining the proper nesting of close() calls, may cause the scope's methods to throw a StructureViolationException.

StructuredTaskScope does not implement the ExecutorService or Executor interfaces, since instances of those interfaces are commonly used in a non-structured way (see below). However, it is straightforward to migrate code that uses ExecutorService, but would benefit from structure, to use StructuredTaskScope.

Joiners

In the handle() example, if any subtask fails then the join() method throws an exception and the scope is cancelled. If all subtasks succeed then the join() method completes normally and returns null. This is the default completion policy.

Other policies can be selected by creating a StructuredTaskScope with a suitable StructuredTaskScoped.Joiner. A Joiner object handles subtask completion and produces the outcome for the join() method. Depending on the joiner, the join() method may return a result, a stream of elements, or some other object.

The Joiner interface declares factory methods to create joiners for some common cases. For example, the factory method anySuccessfulResultOrThrow() returns a new joiner that yields the result of any subtask that completes successfully:

<T> T race(Collection<Callable<T>> tasks) throws InterruptedException {
    try (var scope = StructuredTaskScope.open(Joiner.<T>anySuccessfulResultOrThrow()) {
        tasks.forEach(scope::fork);
        return scope.join();
    }
}

As soon as one subtask succeeds then the scope is cancelled, cancelling the unfinished subtasks, and join() returns the result of the successful subtask. If all subtasks fail then join() throws a FailedException with the exception from one of the subtasks as the cause. This pattern can be useful in, e.g., server applications that require a result from any one of a collection of redundant services.

The factory method allSuccessfulOrThrow() returns a new joiner that, when all subtasks complete successfully, yields a stream of the subtasks:

<T> List<T> runConcurrently(Collection<Callable<T>> tasks) throws InterruptedException {
    try (var scope = StructuredTaskScope.open(Joiner.<T>allSuccessfulOrThrow())) {
        tasks.forEach(scope::fork);
        return scope.join().map(Subtask::get).toList();
    }
}

If one or more subtasks fail then join() throws a FailedException with the exception from one of the failed subtasks as the cause. The completion policy implemented by the Joiner is the same as the default policy implemented by the zero-parameter open() method. They differ in outcome in that the join() method returns a stream of completed subtasks rather than null, making this joiner suited for cases where all subtasks return a result of the same type and where the Subtask objects returned by the fork method is ignored. In this example, the Subtask elements in the stream are mapped to the result and accumulated into a list.

The Joiner interface declares three additional factory methods:

When using any kind of Joiner, it is critical to create a new Joiner for each StructuredTaskScope. Joiner objects should never be used in different task scopes or re-used after a scope is closed.

Custom Joiners

The Joiner interface can be implemented directly in order to support custom completion policies. It has two type parameters: T for the result type of the subtasks executed in the scope, and R for the result type of the join() method. The interface can be summarized as:

public interface Joiner<T, R> {
    public default boolean onFork(Subtask<? extends T> subtask);
    public default boolean onComplete(Subtask<? extends T> subtask);
    public R result() throws Throwable;
}

The onFork method is invoked when forking a subtask, while the onComplete method is invoked when a subtask completes. Both methods return a boolean to indicate whether the scope should be cancelled. The result method is invoked to produce the result for the join method, or else throw an exception, when all subtasks have completed or the scope is cancelled. If the result method throws an exception then the join method will throw a FailedException with that exception as the cause.

Here is a Joiner class that collects the results of subtasks that complete successfully, ignoring subtasks that fail. The onComplete method may be invoked by several threads concurrently, and so must be thread-safe. The result method returns a stream of the task results.

class CollectingJoiner<T> implements Joiner<T, Stream<T>> {

    private final Queue<T> results = new ConcurrentLinkedQueue<>();

    public boolean onComplete(Subtask<? extends T> subtask) {
        if (subtask.state() == Subtask.State.SUCCESS) {
            results.add(subtask.get());
        }
        return false;
    }

    public Stream<T> result() {
        return results.stream();
    }

}

This custom policy can be used like so:

<T> List<T> allSuccessful(List<Callable<T>> tasks) throws InterruptedException {
      try (var scope = StructuredTaskScope.open(new CollectingJoiner<T>())) {
          tasks.forEach(scope::fork);
          return scope.join().toList();
      }
  }

Exception handling

How exceptions are handled will depend on usage. The join() method throws a FailedException when the scope is considered to have failed. In the handle() example, if a subtask fails then a FailedException is thrown with the exception from the failed subtask as the cause. In some cases it may be useful to add a catch block to the try-with-resources statement in order to handle exceptions after the scope is closed:

try (var scope = StructuredTaskScope.open()) {
   ...
} catch (StructuredTaskScope.FailedException e) {
   Throwable cause = e.getCause();
   switch (cause) {
       case IOException ioe -> ..
       default -> ..
   }
}

Exception-handling code can use the instanceof operator with pattern matching (JEP 394) to handle specific causes.

A specific exception in a subtask may trigger the return of a default value. In such cases it may be more appropriate to catch the exception in the subtask and have it complete with the default value as the result rather than have the scope's owner handle the exception.

Configuration

The earlier summary of the StructuredTaskScope API showed two static open methods. A third such method accepts a Joiner together with a function which can produce a configuration object in order to set the scope's name for monitoring and management purposes, to set the scope's timeout, and to set the thread factory to be used by the scope's fork methods to create threads,

Here is a revised version of the runConcurrently method which sets a thread factory and a timeout:

<T> List<T> runConcurrently(Collection<Callable<T>> tasks,
                            ThreadFactory factory,
                            Duration timeout)
    throws InterruptedException
{
    try (var scope = StructuredTaskScope.open(Joiner.<T>allSuccessfulOrThrow(),
                                              cf -> cf.withThreadFactory(factory)
                                                      .withTimeout(timeout))) {
        tasks.forEach(scope::fork);
        return scope.join().map(Subtask::get).toList();
    }
}

The fork method in this scope will invoke the given thread factory to create the thread to execute each subtask. This can be useful to, e.g., set the thread's name or other properties.

The timeout is specified as a java.time.Duration. If the timeout expires before or while waiting in the join() method then the scope is cancelled, which cancels all incomplete subtasks, and join() throws a TimeoutException.

Observability

We extend the JSON thread-dump format added for virtual threads to show how StructuredTaskScopes group threads into a hierarchy:

$ jcmd <pid> Thread.dump_to_file -format=json <file>

The JSON object for each scope contains an array of the threads forked in the scope, together with their stack traces. The owner thread of a scope will typically be blocked in a join method, waiting for subtasks to complete; the thread dump makes it easy to see what the subtasks' threads are doing by showing the tree hierarchy imposed by structured concurrency. The JSON object for a scope also has a reference to its parent, so that the structure of the program can be reconstituted from the dump.

The com.sun.management.HotSpotDiagnosticsMXBean API can also be used to generate such thread dumps, either directly or indirectly via the platform MBeanServer and a local or remote JMX tool.

Alternatives

Enhance the ExecutorService interface

We prototyped an implementation of this interface that always enforces structure and restricts which threads can submit tasks. However, we found it to be problematic because most uses of ExecutorService, and its parent interface Executor, in the JDK and in the ecosystem are not structured. Reusing the same API for a far more restricted concept is bound to cause confusion. For example, passing a structured ExecutorService instance to existing methods that accept this type would be all but certain to throw exceptions in most situations.

Have the fork methods return a Future

When the StructuredTaskScope API was incubating, the fork methods returned a Future. This provided a sense of familiarity, by making these methods resemble the existing ExecutorService::submit method. However, the fact that StructuredTaskScope is intended to be used in a structured way, while ExecutorService is not, brought more confusion than clarity.

In the current API, Subtask::get() behaves exactly as Future::resultNow() did when the API was incubating.