back to all blogsSee all blog posts

Reactive programming in microservices with MicroProfile on Open Liberty 19.0.0.4

image of author
Laura Cowen on Apr 26, 2019
Post available in languages:

Open Liberty 19.0.0.4 introduces our implementation of the MicroProfile Reactive Streams Operators feature. The Reactive Streams Operators feature provides flow control and elegant error handling when subscribing to and processing streams of events. Open Liberty 19.0.0.4 also provides support for Java SE 12 and for Oracle UCP if you are using Oracle Real Application Clusters.

View the list of fixed bugs from 19.0.0.4

If you’re using Maven, here are the coordinates:

<dependency>
    <groupId>io.openliberty</groupId>
    <artifactId>openliberty-runtime</artifactId>
    <version>19.0.0.4</version>
    <type>zip</type>
</dependency>

Or for Gradle:

dependencies {
    libertyRuntime group: 'io.openliberty', name: 'openliberty-runtime', version: '[19.0.0.4,)'
}

Or if you’re using Docker:

docker pull open-liberty

Or take a look at our Downloads page.

Ask a question on Stack Overflow

Reactive programming in microservices (MicroProfile Reactive Streams Operators 1.0)

Reactive streams provide a method to implement publish subscribe ('pub-sub') pipelines that handle flow-control and error handling in an elegant manner. MicroProfile Reactive Streams Operators bundles a reactive streams API together with a set of standard operators that can be applied to streams. For example, you could write a reactive streams publisher that sends events that capture sensor data from an 'internet of things' (IOT) device, such as temperature or pressure, and then subscribes to these events in a way that enables the client to control how frequently the events are sent without have to 'block' the event processing to slow things down. Additionally, it would be easy to apply reactive streams operators, such as filters or mappings, to the events that are received by the subscribing client.

Reactive streams provide for a powerful abstraction that can handle, perhaps infinite, streams of events in a manner that prevents downstream clients being overwhelmed. Provided operators can filter, combine or map events, handle errors, and so on, all without breaking the stream processing pipeline. Additionally, streams are generic over application domain types and allow developers to compose multiple technologies using a common succinct approach enabling them to concentrate more on application domain problems.

Non-reactive approaches often control the flow of data inadvertantly by simply getting slower at processing it, thus blocking the thread of control from being used to send further work while doing so. If we attempt to improve on this approach, by using asynchronous techniques to free up the thread, it can often lead to a problem where the rate of data arriving at a component exceeds the rate that it can be processed. Buffering can help to smooth arrival rates but, long term, it does not help throttle the incoming work to an rate that is sustainable. The reactive approach uses a system of ticketing whereby the sender reacts to requests for data. This ticketing approach creates 'back pressure' between the publisher and subscriber which limits the amount of data that can be sent to levels that can be handled successfully and contributes towards protecting the system from overload.

As well as the benefits of back pressure, reactive streams provides an abstraction that can be used to glue together different technologies in an efficient manner. For example, a publisher of CustomerOrder can be directly connected to a subscriber of CustomerOrder using very concise Java code that is the same regardless of which underlying technology the CustomerOrder is being sourced from or sent to. It is possible to chain together publishers, subscribers and intermediate processor classes, as well as operators from a standard library, to compose applications without detailed use of a particular technology’s API. Reactive streams-based drivers for different enterprise Java technologies can be constructed and subsequently shared by applications using the fluent builder APIs of this feature.

To enable the MicroProfile Reactive Streams Operators feature in your server.xml:

<featureManager>
        <feature>mpReactiveStreams-1.0</feature>
</featureManager>

You can then use code in your application like this:

import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
...
assertEquals(
                ReactiveStreams.of("1", "2", "3")
                        .collect(Collectors.joining(", ")).run().toCompletableFuture().get(),
                "1, 2, 3");

Support for Java SE 12

Any official Java SE 12 release from AdoptOpenJDK, OpenJDK, or Oracle works with OpenLiberty. Java SE 12 is not a long-term supported release, with standard support scheduled to end in September 2019.

The primary features added in this release include garbage collection improvements and startup time improvements for the Hotspot VM. For more details, check the Java 12 project page.

Support for Oracle Universal Connection Pool (UCP)

Oracle Universal Connection Pool (UCP) is a standalone JDBC connection pool. When using Oracle UCP with Open Liberty you are using the Oracle UCP connection pool instead of using Open Liberty’s built-in connection pooling functionality. This allows users of Oracle Real Application Clusters (RAC) to take advantage of Oracle’s high availability and performance functionality such as Fast Connection Failover (FCF), Fast Application Notification (FAN), and Oracle Notification Services (ONS). Support for Oracle UCP was added in Open Liberty version 19.0.0.4.

To enable Oracle UCP in your server configuration:

Ensure that one of the JDBC features is enabled in server configuration. Oracle UCP can be used with any version of JDBC supported by Open Liberty, but support for any specific specification functionality is limited to what is supported by Oracle UCP.

<featureManager>
    <feature>jdbc-4.2</feature>
    <feature>jndi-1.0</feature> <!-- Required only if JNDI is desired to look up resources -->
</featureManager>

Configure the datasource and library in the server config. The UCP driver and Oracle JDBC driver can be obtained from Oracle.

<dataSource id="oracleUCPDS" jndiName="jdbc/oracleUCPDS" >
    <jdbcDriver libraryRef="OracleUCPLib" />
    <properties.oracle.ucp URL="jdbc:oracle:thin:@//localhost:1521/SAMPLEDB" />
</dataSource>

<library id="OracleUCPLib">
    <fileset dir="C:/Oracle/Drivers" includes="ojdbcx.jar ucp.jar"/>
</library>

Oracle UCP might require some properties to be set in the properties.oracle.ucp element, such as the user and password, depending on the version used.

Because the Liberty connection pool is disabled, some of the Open Liberty datasource and connection manager configuration values are ignored and overridden. The following connection manager properties cannot be used with Oracle UCP: agedTimeout, connectionTimeout, maxIdleTime, maxPoolSize, minPoolSize, purgePolicy, reapTime, maxConnectionsPerThread, maxConnectionsPerThreadLocal. Additionally, the datasource properties statementCacheSize and validationTimeout cannot be used when using UCP with Open Liberty, as Open Liberty’s statement caching and connection validation are disabled.

For most of those datasource and connection manager properties, Oracle UCP provides equivalent functionality. See Oracle’s documentation for more details; some default property values differ between Open Liberty’s connection manager and Oracle UCP. The Oracle UCP properties can be applied to the properties.oracle.ucp properties element. The dataSource types of ConnectionPoolDataSource and Driver are not supported with Oracle UCP since the UCP driver does not provide an implementation of those interfaces.

The data source can be accessed and used by the application using the standard JDBC APIs.

Ready to give it a try?

Get the Maven or Gradle coordinates (and other download options) from the top of this post.

Ask a question on Stack Overflow

Previews of early implementations available in the latest development builds

You can now also try out early implementations of some new capabilities in the latest Open Liberty development builds:

This early implementation is not available in 19.0.0.4 but you can try it out by downloading the latest Open Liberty development build. Let us know what you think!

MicroProfile Context Propagation 1.0 (formerly MicroProfile Concurrency 1.0)

MicroProfile Context Propagation (formerly MicroProfile Concurrency) allows you to create completion stages that run with predictable thread context regardless of which thread the completion stage action ends up running on.

MicroProfile Context Propagation provides completion stages that run with predictable thread context that also benefit from being backed by the automatically-tuned Liberty global thread pool. Configuration of concurrency constraints and context propagation is possible programmatically with fluent builder API where defaults can be established using MicroProfile Config.

To enable the MicroProfile Context Propagation 1.0 feature in your server.xml:

<featureManager>
    <feature>mpContextPropagation-1.0</feature>
    <feature>cdi-2.0</feature> <!-- used in example -->
    <feature>jndi-1.0</feature> <!-- used in example -->
    ... other features
</featureManager>

Example usage of programmatic builders:

ManagedExecutor executor = ManagedExecutor.builder()
    .maxAsync(5)
    .propagated(ThreadContext.APPLICATION, ThreadContext.SECURITY)
    .build();

CompletableFuture<Integer> stage1 = executor.newIncompleteFuture();
stage1.thenApply(function1).thenAccept(value -> {
    try {
        // access resource reference in application's java:comp namespace,
        DataSource ds = InitialContext.doLookup("java:comp/env/jdbc/ds1");
        ...
    } catch (Exception x) {
        throw new CompletionException(x);
    }
};
...
stage1.complete(result);

Example usage in a CDI bean:

// CDI qualifier which is used to identify the executor instance
@Qualifier
@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER })
public @interface AppContext {}

// Example producer field, defined in a CDI bean,
@Produces @ApplicationScoped @AppContext
ManagedExecutor appContextExecutor = ManagedExecutor.builder()
    .propagated(ThreadContext.APPLICATION)
    .build();

// Example disposer method, also defined in the CDI bean,
void disposeExecutor(@Disposes @AppContext exec) {
    exec.shutdownNow();
}

// Example injection point, defined in a CDI bean,
@Inject @AppContext
ManagedExecutor executor;

...

CompletableFuture<Integer> stage = executor
    .supplyAsync(supplier1)
    .thenApply(function1)
    .thenApplyAsync(value -> {
        try {
            // access resource reference in application's java:comp namespace,
            DataSource ds = InitialContext.doLookup("java:comp/env/jdbc/ds1");
            ...
            return result;
        } catch (Exception x) {
            throw new CompletionException(x);
        }
    });

For more information: