Concurrency with MicroProfile Context Propagation

Concurrency, or the ability to coordinate, run, and track multiple tasks at once, is a key capability for microservice-based applications. MicroProfile Context Propagation enhances Java SE and Java EE concurrency support by providing context awareness between concurrent tasks, which improves consistency and visibility across an application. With Java’s CompletionStage interface and CompletableFuture class, you can chain together pipelines of dependent actions. Under this model, first introduced in Java SE 8, the completion of each stage triggers the execution of specified dependent actions. MicroProfile Context Propagation expands this capability by providing consistent and reliable thread contexts for dependent stage actions and by defaulting asynchronous dependent stage actions to run on the Liberty global thread pool. To enable these functions, MicroProfile Context Propagation introduces the ManagedExecutor and ThreadContext APIs.

By using managed executors in MicroProfile Context Propagation, you can use completion stages that run with predictable thread context, regardless of which thread the action runs on. Without this feature, the thread context of a completion stage action can depend on a range of possibilities for where the action might run, including:

  • the requesting thread.

  • the thread of a stage upon which the action depends.

  • a thread that is requesting the result.

  • a thread that forcibly completes the prior stage.

There is no guarantee under which context the action will run. However, with MicroProfile Context Propagation, the thread context is consistent. It is always captured from the thread that creates the completion stage and then applied when the action runs.

With Microprofile Context Propagation, you have control over how and when context awareness is enabled in your application. You can configure certain context types to clear rather than capture context. For example, if you clear the security context, no particular user is associated with the thread while the completion stage action runs. When a managed executor creates a completion stage, the managed executor remains associated with that completion stage. That managed executor determines thread context propagation for any dependent stages that are requested to run asynchronously without designating a specific executor. The managed executor remains associated with each dependent stage that is created, and each dependent stage that is created from those stages, and so forth. This configuration allows for predictable thread context propagation at every stage in the pipeline.

The following example shows a managed executor that is used to propagate the application’s namespace to an asynchronous action.

CompletableFuture<Integer> stage = executor.supplyAsync(supplier1)
    .thenApply(function1)
    .thenApply(function2)
    .thenApply(i -> {
        try {
            DataSource ds = InitialContext.doLookup("java:module/env/jdbc/ds1");
            ...
            return result;
        } catch (Exception x) {
           throw new CompletionExeption(x);
        }
    });

Managed executors in MicroProfile Context Propagation are fully compatible with ManagedExecutorService in Jakarta and Java EE Concurrency. Notably, the org.eclipse.microprofile.context.ManagedExecutor interface inherits from the java.util.concurrent.ExecutorService interface, and thus allows the same execute, submit, and invoke operations as ManagedExecutorService.

MicroProfile Context Propagation provides the org.eclipse.microprofile.concurrent.ThreadContext interface to pre-contextualize completion stage actions. This is useful when you have an unmanaged completion stage that is not thread-context aware. When you have a completion stage that isn’t created by a managed executor, it can still run with predictable thread context if you pre-contextualize its action with the corresponding method of MicroProfile Context Propagation ThreadContext. For example,

CompletableFuture<Long> stage = CompletableFuture.supplyAsync(supplier1)
    .thenApplyAsync(function1)
    .thenApply(threadContext.contextualFunction(function2));

In the preceding example, supplier1 and function1 run with non-deterministic thread context. However, the pre-contextualized action, function2, always runs with the context of the thread that invoked the contextualFunction operation.

How to obtain ManagedExecutor and ThreadContext instances

There are a variety of ways to obtain instances of ManagedExecutor and ThreadContext:

  • MicroProfile Context Propagation offers a fluent builder pattern for programmatic usage.

    ManagedExecutor executor = ManagedExecutor.builder()
        .maxAsync(10)
        .propagated(ThreadContext.APPLICATION, ThreadContext.SECURITY)
        .cleared(ThreadContext.ALL_REMAINING)
        .build();

    When the executor instance is no longer needed, make sure to shut down executor instances built by the application.

  • If you are using Java EE Concurrency, you can cast your existing ManagedExecutorService to ManagedExecutor.

  • With MicroProfile Context Propagation, you can also configure and inject instances by way of CDI (requires the cdi-2.0 feature or higher).

    Example usage in a CDI bean:

    // CDI qualifier which is used to identify the executor instance
    @Qualifier
    @Retention(RetentionPolicy.RUNTIME)
    @Target({ ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER })
    public @interface AppContext {}
    
    // Example producer field, defined in a CDI bean,
    @Produces @ApplicationScoped @AppContext
    ManagedExecutor appContextExecutor = ManagedExecutor.builder()
        .propagated(ThreadContext.APPLICATION)
        .build();
    
    // Example disposer method, also defined in the CDI bean,
    void disposeExecutor(@Disposes @AppContext exec) {
        exec.shutdownNow();
    }
    
    // Example injection point, defined in a CDI bean,
    @Inject @AppContext
    ManagedExecutor executor;
    
    ...
    
    CompletableFuture<Integer> stage = executor
        .supplyAsync(supplier1)
        .thenApply(function1)
        .thenApplyAsync(value -> {
            try {
                // access resource reference in application's java:comp namespace,
                DataSource ds = InitialContext.doLookup("java:comp/env/jdbc/ds1");
                ...
                return result;
            } catch (Exception x) {
                throw new CompletionException(x);
            }
        });

MicroProfile Context Propagation enhances Java’s concurrency support and builds out the infrastructure around it, which enables reliable, consistent context awareness among microservices. With Microprofile Context Propagation, your applications react to events as they happen, under a dependable thread context, and backed by the performance of Liberty threading.